The article discusses using Protobufs as the messages on Kafka Topics/Streams.

Protocol Buffers

Protobufs has many pluses that are easily measurable - data size and serialization performance are the most commonly attributed ones among them. However, I think the most important ones are the quality attributes that a data representation format backed by an IDL specification guarantee. Automatically the data quality in the data plane improves. There are fewer errors due to bad data. Consumers know how to parse the data.

All our micro-services talk protobufs over gRPC. In fact, all the Protobuf message types and gRPC service definitions live in a single repository that is a dependency of all services. So it was a natural progression to extend the same binary protocol to the async messages over Kafka.

Kafka Message Headers

The producer side of things is easy. All messages on Kafka are binary is a direct match to protobuf. However, there is a challenge on the consumer side. Unlike JSON which is self describing format, a protobuf message cannot be de-serialized without prior knowledge of the message type. So it is imperative that additional metadata is included by the producer to describe the message.

Similar to HTTP headers, Kafka supports message headers that can carry any metadata. The below snippets outline a strategy to serialize and de-serialize Protobuf messages.

The key abstractions are KafkaEventPublisher that takes a Domain Event object that can be converted to a proto message using the toProto method. The protobuf java type name is included in the header proto.eventType.

public abstract class DomainEvent<T extends DomainEvent> {
    final Id<T> eventId = Id.of();
    final Instant when =;

    public abstract toProto();

// Annotate all DomainEvent implementations with this annotation.
// @EventMessageType("app.types.account.UserRegistered")
// public class UserRegistered {
// }
public @interface EventMessageType {
    String value();

public class KafkaEventSerializer implements Serializer<DomainEvent<?>> {

    public static final String EVENT_SERIALIZER_SELECTOR_KEY
            = "proto.eventSerializer";
    public static final String EVENT_TYPE_HEADER_KEY 
            = "proto.eventType";

    public byte[] serialize(String topic, DomainEvent data) {
        Message message = data.toProto();
        return message.toByteArray();

public class KafkaEventPublisher implements DomainEventPublisher {

    private final String topic;
    private final KafkaTemplate<String, DomainEvent<?>> kafkaTemplate;
    private final EventCounter eventCounter;

    public void publish(@NonNull DomainEvent domainEvent) {
        Header serializerSelectorHdr = new RecordHeader(

        EventMessageType annotation = AnnotationUtils.findAnnotation(
            domainEvent.getClass(), EventMessageType.class);
        if (null == annotation) {
            throw new AppException("Event lacks EventMessageType annotation. " +
                    "Deserialization will fail during Consumption. Event=" + 
        String eventType = annotation.value();

        Header eventTypeHdr = new RecordHeader(

        ProducerRecord record = new ProducerRecord(
                topic,null, (Object) null, domainEvent,
                Arrays.asList(serializerSelectorHdr, eventTypeHdr));


        eventCounter.increment("produced_" + eventType);

On the producer side, create the KafkaTemplate employing the above Serializer.

public class KafkaConfiguration {

    private String kafkaBootstrapServers;

    private String applicationName;
    private int partitionCount;

    private int replicaCount;

    public DomainEventPublisher eventPublisher(MeterRegistry meterRegistry) {
        return new KafkaEventPublisher(topicName(), 
        	kafkaTemplate(), new EventCounter(meterRegistry));

    public String topicName() {
        return applicationName;

    public NewTopic applicationTopic() {
        return TopicBuilder

    public ProducerFactory<String, DomainEvent<?>> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put("bootstrap.servers", kafkaBootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, applicationName);

        DelegatingSerializer delegatingSerializer = new DelegatingSerializer();
                new KafkaEventSerializer());

        return new DefaultKafkaProducerFactory(
        	props, new StringSerializer(), delegatingSerializer);

    public KafkaTemplate<String, DomainEvent<?>> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());

On the receiver side, the same header (kafka.eventType) is read and the class name is used to parse the protobuf object using reflection.

public class KafkaEventDeSerializer implements Deserializer<Message> {

    public Message deserialize(String topic, byte[] data) {
        throw new IllegalStateException("Headers not available");

    public Message deserialize(String topic, Headers headers, byte[] data) {
        Header eventTypeHeader = headers.lastHeader(
        String eventType = new String(eventTypeHeader.value());
        try {
            Class clazz = ClassUtils.forName(eventType,
            if (!Message.class.isAssignableFrom(clazz)) {
                throw new AppException(String.format("The eventTypeClass [%s] is not a subtype of protobuf.Message.", clazz.getName()));
            Method m = ReflectionUtils.findMethod(clazz, "parseFrom", byte[].class);
            if (null == m) {
                throw new AppException(String.format("The message class [%s] must have a parseFrom(byte[] bytes) method", clazz.getName()));
            return (Message) m.invoke(null, (Object) data);
        } catch (Exception e) {
            throw new AppException("Error deserializing. EventType=" + eventType, e);

On the consumer side, create the KafkaListenerFactory employing the KafkaEventDeserializer. The property to set is ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG.

public class KafkaConsumerConfiguration {

    private String kafkaBootstrapServers;

    private String appName;

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, appName);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaEventDeSerializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;

    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());

    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

Now receiving a typed message is as simple as:

public class Receiver {

    @KafkaListener(topics = {"registration"})
    public void receive(UserRegistered message) {"Received UserRegistered message");