Kafka and Google Protobuf

In this tutorial we will see how data stored in Apache Kafka with Google Protobuf can be visualised and processed. Once the data is lifted in Lenses, data masking and stream processing with SQL can be unleashed. The code can be found on Github

Requirements

I want do view my Google Protobuf data in Kafka, protect the sensitive information and be able to process it.

Quick video

Setting up our example

Let’s assume that we have a topic (cards) that contains data regarding credit cards. The Google Protobuf schema is this:

syntax = "proto2";

package io.lenses.examples.serde.protobuf;

option java_package = "io.lenses.examples.serde.protobuf.generated";
option java_outer_classname = "CardData";

message CreditCard {
  required string name = 1;
  required string country = 2;
  required string currency = 3;
  required string cardNumber = 4;
  required bool blocked = 5;
  required string type = 6;

}

Solution

Lenses exposes a light library to allow to plugin Google Protobuf payloads in Kafka. What is needed is to implement the Serde interface:

 <dependency>
    <groupId>com.landoop</groupId>
    <artifactId>lenses-serde</artifactId>
    <version>${lenses.serdes.version}</version>
</dependency>

The plugin implementation has to code the two methods:

  • getSchema() - describes the payload structure. Returns Avro Schema
  • deserializer(Properties properties) - contains the logic to translate the raw bytes stored in Kafka as Avro GenericRecord

At the moment, the serializer(Properties properties) is not required (it is not used by Lenses).

    @Override
    public Serializer serializer(Properties properties) {
        //not required
        throw new NotImplementedException();
    }
    
    @Override
    public Deserializer deserializer(Properties properties) {
        // REQUIRED
    }
    
    @Override
    public Schema getSchema() {
        // REQUIRED
    }

Implementation

There are two implementations provided in the example:

  • CreditCardAutoProtobufSerde - more generic but not as performant
  • CreditCardProtobufSerde - yields better performance at the expense of more coding

CreditCardAutoProtobufSerde class

First thing is to provide the schema for your Kafka payload. avro-protobuf library can handle that automatically. Using the JVM class generated by the Protobuf schema, and ProtobufData the following code can be used to obtain the Avro schema:

   private final static Schema schema = ProtobufData.get()
                       .getSchema(CardData.CreditCard.class);
   
   @Override
   public Schema getSchema() {
    return schema;
   }

Next step is to translate the raw bytes, storing the card details using Google Protobuf, to a GenericRecord. Step one is to lift the raw bytes into an instance of CreditCard class:

CardData.CreditCard card = CardData.CreditCard.parseFrom(bytes);

For step two, the card details are written to an in memory array as Avro:

ProtobufDatumWriter<CardData.CreditCard> pbWriter = new ProtobufDatumWriter<>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
pbWriter.write(card, encoder);
encoder.flush();

Last step reads the Avro bytes as a GenericRecord:

GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(), null);
GenericRecord record = datumReader.read(null, decoder);
return record;

CreditCardProtobufSerde class

Going from bytes to CreditCard to bytes to GenericRecord can be short-circuited to avoid the intermediary bytes. This is where this implementation comes into play at the expense of more code.

Lifting the raw bytes into a CreditCard remains the same. The next, and last step, requires requires the creation of an instance of GenericRecord and populate with a value from the CreditCard class instance.

GenericRecord record = new GenericData.Record(schema);
record.put("name", card.getName());
record.put("cardNumber", card.getCardNumber());
record.put("cardType", card.getType());
record.put("country", card.getCountry());
record.put("currency", card.getCurrency());
record.put("blocked", card.getBlocked());

Handling nested structures

Quite often, the payloads sent over Kafka using Google Protobuf contain nested data. Let’s consider this logical representation for a class NestedObj:

{
  "a": 123,
  "b": {
    "x":"value1"
  }
}

The implementation for getSchema stays the same, the only thing which changed here is to extract the Avro schema for the b field.

private final static Schema schema = ProtobufData.get().getSchema(CardData.CreditCard.class);
private final static Schema fieldBSchema = schema.getField("b").schema();   

@Override
public Schema getSchema() {
 return schema;
}

Next, the deserializer code needs to create and populate the GenericRecord, including the nested one for field b:

@Override
public Deserializer deserializer(Properties properties) {
   return new Deserializer() {
        @Override
        public GenericRecord deserialize(byte[] bytes) throws IOException {
       
            NestedObj obj = NestedObj.parseFrom(bytes);
       
            GenericRecord record = new GenericData.Record(schema);
            record.put("a", obj.getA());
       
            GenericRecord recordFieldB = new GenericData.Record(fieldBSchema);
            recordFieldB.put("x", obj.getB().getX());
            record.put("b", recordFieldB);
            
            return record;
        }
   };
}

Build and deploy

If you get the source code, run the following command in the folder containing the pom.xml file:

mvn clean package

Follow the docs to make these two artifacts available to Lenses:

    target/lenses-serde-protobuf-example-1.0.0.jar
    deps/avro-protobuf-1.8.2.jar

Conclusion

In this tutorial you learned how to enable data stored in Kafka with Google Protobuf. Once the plugin is provided to Lenses, the topic containing the data will be associated with it. As a result the data can be queried using Lenses SQL. Alongside it, the data policies will also apply to the data and you can make sure the sensitive information is not available to the users accessing the data.

Last but not least, you can process the topic data as a stream using the SQL processor. The simplest example is to convert the data to Avro:

SET defaults.topic.autocreate=true;

INSERT INTO credit_card_as_avro
STORE VALUE AS AVRO
SELECT STREAM * 
FROM credit_card_protobuf