Change data format

In this example, we will show how to convert Kafka topics between the different formats supported by Lenses (JSON, AVRO and PROTOBUF).

In particular we will

  • convert a JSON topic to AVRO
  • convert an AVRO topic to PROTOBUF

Requirements

For this to work, Lenses has to know what the source topic schema is. Lenses can do this in one of three ways:

  • through direct user action where the schema is manually set
  • through inference; Lenses will try to infer the schema of a topic by looking at the topic data
  • and lastly, if the topic is created through Lenses, the schema will be automatically set

Creating the JSON data

With the following SQL we can create our initial JSON topic:

CREATE TABLE car_speed_events_json(
    _key.plate string
    , speedMph int
    , sensor.id string
    , sensor.lat double
    , sensor.long double
    , event_time long
)
FORMAT (json, json);

to which we can add data using:

INSERT INTO car_speed_events_json (
    _key.plate
    , speedMph
    , sensor.id 
    , sensor.lat
    , sensor.long 
    , event_time 
) VALUES
("20-GH-38", 50, "sensor_1", 45.1241, 1.177, 1591970345),
("20-VL-12", 30, "sensor_1", 45.1241, 1.177, 1591970345),
("20-JH-98", 90, "sensor_1", 45.1241, 1.177, 1591970345);

It can be quickly verified that the format of our newly created topic is JSON for both key and value by searching our topic car_speed_events_json in our explore view.

Converting from JSON to AVRO

To create a new topic with AVRO format, we can create a processor that will copy the data from our original topic to a new topic, changing the format in the process.

To do this, we start by going to Apps > New App > SQL Processor (or SQL Processors > New SQL Processor if you are in Lenses < 5.0) and defining our processor with the following code:

SET defaults.topic.autocreate=true;

INSERT INTO car_speed_events_avro
STORE 
    KEY AS AVRO 
    VALUE AS AVRO
SELECT STREAM *
FROM car_speed_events_json;

Notice the addition of STORE KEY AS AVRO VALULE AS AVRO. This statement will tell our processor which format we want each facet (key or value) to be stored as.

Hitting “Create New Processor” will start a new processor.

We can see the events were added and from now on, Lenses will keep pushing any new events added to car_speed_events_avro into car_speed_events_avro.

From the Explore section, we can verify that the format of car_speed_events_avro is (AVRO, AVRO), and that the generated AVRO schema is

{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "plate",
      "type": "string"
    }
  ]
}

for the key, and

{
  "type": "record",
  "name": "record0",
  "fields": [
    {
      "name": "speedMph",
      "type": "int"
    },
    {
      "name": "sensor",
      "type": {
        "type": "record",
        "name": "record",
        "fields": [
          {
            "name": "id",
            "type": "string"
          },
          {
            "name": "lat",
            "type": "double"
          },
          {
            "name": "long",
            "type": "double"
          }
        ]
      }
    },
    {
      "name": "event_time",
      "type": "long"
    }
  ]
}

for the value.

Converting from AVRO to PROTOBUF

We will now convert the AVRO topic just created to PROTOBUF, natively supported by Lenses from version 5.0.

As before, we are going to create a processor that simply copies data, changing the format of both facets to PROTOBUF:

SET defaults.topic.autocreate=true;

INSERT INTO car_speed_events_protobuf
STORE 
    KEY AS PROTOBUF
    VALUE AS PROTOBUF
SELECT STREAM *
FROM car_speed_events_avro;

After the creating and starting the processor, we will see that data is produced into the car_speed_events_protobuf topic.

We can again verify that the format of our new topic is PROTOBUF for both the key and value facets from the Exploresection.

The protobuf schemas generated by Lenses will be

syntax = "proto3";
package io.lenses.schema;

message record {
  string plate = 1;
}

for the key, and

syntax = "proto3";
package io.lenses.schema;

message record0 {
  int32 speedMph = 1;
  record sensor = 2;
  int64 event_time = 3;
}
message record {
  string id = 1;
  double lat = 2;
  double long = 3;
}

for the value.