Latest version: 4.3.x
Legacy protobuf support
We will see how data stored in Apache Kafka with Google Protobuf can be visualised and processed. Once the data is lifted in Lenses, data masking and stream processing with SQL can be unleashed. The code can be found on Github
Note: Since version 5.0, Lenses supports protobuf natively . This tutorial covers the custom serde based approach and is only relevant for 4.3 or earlier versions.
Requirements
I want do view my Google Protobuf data in Kafka, protect the sensitive information and be able to process it.
Quick video
Setting up our example
Let’s assume that we have a topic (cards
) that contains data regarding credit cards.
The Google Protobuf schema is this:
syntax = "proto2";
package io.lenses.examples.serde.protobuf;
option java_package = "io.lenses.examples.serde.protobuf.generated";
option java_outer_classname = "CardData";
message CreditCard {
required string name = 1;
required string country = 2;
required string currency = 3;
required string cardNumber = 4;
required bool blocked = 5;
required string type = 6;
}
Solution
Lenses exposes a light library to allow to plugin Google Protobuf payloads in Kafka.
What is needed is to implement the Serde
interface:
<dependency>
<groupId>com.landoop</groupId>
<artifactId>lenses-serde</artifactId>
<version>${lenses.serdes.version}</version>
</dependency>
The plugin implementation has to code the two methods:
getSchema()
- describes the payload structure. Returns Avro Schemadeserializer(Properties properties)
- contains the logic to translate the raw bytes stored in Kafka as Avro GenericRecord
At the moment, the serializer(Properties properties)
is not required (it is not used by Lenses).
@Override
public Serializer serializer(Properties properties) {
//not required
throw new NotImplementedException();
}
@Override
public Deserializer deserializer(Properties properties) {
// REQUIRED
}
@Override
public Schema getSchema() {
// REQUIRED
}
Implementation
There are two implementations provided in the example:
- CreditCardAutoProtobufSerde - more generic but not as performant
- CreditCardProtobufSerde - yields better performance at the expense of more coding
CreditCardAutoProtobufSerde class
First thing is to provide the schema for your Kafka payload.
avro-protobuf
library can handle that automatically.
Using the JVM class generated by the Protobuf schema, and ProtobufData
the following code can be used to obtain the Avro schema:
private final static Schema schema = ProtobufData.get()
.getSchema(CardData.CreditCard.class);
@Override
public Schema getSchema() {
return schema;
}
Next step is to translate the raw bytes, storing the card details using Google Protobuf, to a GenericRecord
.
Step one is to lift the raw bytes into an instance of CreditCard
class:
CardData.CreditCard card = CardData.CreditCard.parseFrom(bytes);
For step two, the card details are written to an in memory array as Avro:
ProtobufDatumWriter<CardData.CreditCard> pbWriter = new ProtobufDatumWriter<>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
pbWriter.write(card, encoder);
encoder.flush();
Last step reads the Avro bytes as a GenericRecord
:
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(), null);
GenericRecord record = datumReader.read(null, decoder);
return record;
CreditCardProtobufSerde class
Going from bytes to CreditCard
to bytes to GenericRecord
can be short-circuited to avoid the intermediary bytes.
This is where this implementation comes into play at the expense of more code.
Lifting the raw bytes into a CreditCard
remains the same. The next, and last step, requires requires the creation of an instance of GenericRecord
and populate with a value from the CreditCard
class instance.
GenericRecord record = new GenericData.Record(schema);
record.put("name", card.getName());
record.put("cardNumber", card.getCardNumber());
record.put("cardType", card.getType());
record.put("country", card.getCountry());
record.put("currency", card.getCurrency());
record.put("blocked", card.getBlocked());
Handling nested structures
Quite often, the payloads sent over Kafka using Google Protobuf contain nested data.
Let’s consider this logical representation for a class NestedObj
:
{
"a": 123,
"b": {
"x":"value1"
}
}
The implementation for getSchema
stays the same, the only thing which changed here is to extract the Avro schema for the b
field.
private final static Schema schema = ProtobufData.get().getSchema(CardData.CreditCard.class);
private final static Schema fieldBSchema = schema.getField("b").schema();
@Override
public Schema getSchema() {
return schema;
}
Next, the deserializer code needs to create and populate the GenericRecord
, including the nested one for field b
:
@Override
public Deserializer deserializer(Properties properties) {
return new Deserializer() {
@Override
public GenericRecord deserialize(byte[] bytes) throws IOException {
NestedObj obj = NestedObj.parseFrom(bytes);
GenericRecord record = new GenericData.Record(schema);
record.put("a", obj.getA());
GenericRecord recordFieldB = new GenericData.Record(fieldBSchema);
recordFieldB.put("x", obj.getB().getX());
record.put("b", recordFieldB);
return record;
}
};
}
Build and deploy
If you get the source code, run the following command in the folder containing the pom.xml
file:
mvn clean package
Follow the docs to make these two artifacts available to Lenses:
target/lenses-serde-protobuf-example-1.0.0.jar
deps/avro-protobuf-1.8.2.jar
Conclusion
In this tutorial you learned how to enable data stored in Kafka with Google Protobuf. Once the plugin is provided to Lenses, the topic containing the data will be associated with it. As a result the data can be queried using Lenses SQL. Alongside it, the data policies will also apply to the data and you can make sure the sensitive information is not available to the users accessing the data.
Last but not least, you can process the topic data as a stream using the SQL processor. The simplest example is to convert the data to Avro:
SET defaults.topic.autocreate=true;
INSERT INTO credit_card_as_avro
STORE VALUE AS AVRO
SELECT STREAM *
FROM credit_card_protobuf