Kafka and Google Protobuf

Kafka and Google Protobuf

Since Lenses version 5.0, PROTOBUF is supported as a first class data format in Lenses. If you are running on an older Lenses version and you don’t plan on upgrading, please refer to our old tutorial on how to render PROTOBUF data via a custom serdes.

Requirements

  • Lenses 5.0 or higher.
  • Confluent Platform / Schema Registry 5.5.0 or higher.

Application-side setup

In order for Lenses to be able to work with PROTOBUF data, you will have to use a schema-registry-aware Kafka producer that publishes data encoded according to the Confluent wire format . Such format allows Lenses, as well as any other data consumer, to resolve the correct schema from the registry before decoding the data.

If you are working with the JVM, we recommand using kafka-protobuf-serializer in conjunction with something like protoc-jar-maven-plugin or any build-time solution to generate classes from a protobuf schema.

Assuming your build tool is configured to compile one or several .proto files into java classes, you should be able to produce data with code like the following:

var properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("schema.registry.url", "http://localhost:8081");

KafkaProducer<String, CardData.CreditCard> producer = new KafkaProducer<>(properties);
var topic = "protobuf-topic";

customers.forEach(customer -> {
    var cardData = CardData.CreditCard.newBuilder()
            .setName(customer.name)
            .setCountry(customer.countryCode)
            .setCurrency(customer.card.currency)
            .setBlocked(customer.card.isBlocked)
            .setType(customer.card.cardType)
            .setCardNumber(customer.card.number)
            .build();

    var record = new ProducerRecord<String, CardData.CreditCard>(topic, customer.id, cardData);
    producer.send(record);
});

producer.close();

Notice that, in the snippet above, the CreditCard is the generated java class for the following schema:

syntax = "proto3";

package io.lenses.examples.serde.protobuf;

option java_package = "com.example";
option java_outer_classname = "CardData";

message CreditCard {
  string name = 1;
  string country = 2;
  string currency = 3;
  string cardNumber = 4;
  bool blocked = 5;
  enum CardType {
    VISA = 0;
    MASTERCARD = 1;
    AMEX = 2;
  }
  CardType type = 6;
}

Build and deploy

If you get the source code, run the following command in the folder containing the pom.xml file:

mvn clean package

Interacting with your data

Shortly after the data is persisted into the selected kafka topic, Lenses will automatically detect its Key/Value formats as STRING/PROTOBUF. From now on, the records just published should be viewable from the topic screen as well as queriable from the SQL Studio section. Please refer to our reference for directions on how to harness your data programmatically using Lenses SQL studio and Lenses Apps. Also, head to our data publishing tutorial if you are looking for a quick and easy way to publish a few JSON-encoded records directly from the Lenses topic screen.

Current limitations

Lenses should handle correctly non-trivial schemas expressed either in version 2 or version 3 of the Protobuf syntax. However, it does not support yet a few schemas encodings expressible in Protobuf. Most notably, these include:

  • Import of external schemas (.i.e. schema references).
  • Recursive message structs like google.protobuf.Value .
  • Well-known types other than google.type.Date, google.type.TimeOfDay, google.protobuf.Timestamp.
  • non-string map keys (i.e. currently, keys are always parsed as strings).