Latest version: 4.3.x
Change data format
In this example, we will show how to convert Kafka topics between the different formats supported by Lenses (JSON, AVRO and PROTOBUF).
In particular we will
- convert a JSON topic to AVRO
- convert an AVRO topic to PROTOBUF
Requirements
For this to work, Lenses has to know what the source topic schema is. Lenses can do this in one of three ways:
- through direct user action where the schema is manually set
- through inference; Lenses will try to infer the schema of a topic by looking at the topic data
- and lastly, if the topic is created through Lenses, the schema will be automatically set
Creating the JSON data
With the following SQL we can create our initial JSON topic:
CREATE TABLE car_speed_events_json(
_key.plate string
, speedMph int
, sensor.id string
, sensor.lat double
, sensor.long double
, event_time long
)
FORMAT (json, json);
to which we can add data using:
INSERT INTO car_speed_events_json (
_key.plate
, speedMph
, sensor.id
, sensor.lat
, sensor.long
, event_time
) VALUES
("20-GH-38", 50, "sensor_1", 45.1241, 1.177, 1591970345),
("20-VL-12", 30, "sensor_1", 45.1241, 1.177, 1591970345),
("20-JH-98", 90, "sensor_1", 45.1241, 1.177, 1591970345);
It can be quickly verified that the format of our newly created topic is JSON for both key and value
by searching our topic car_speed_events_json
in our explore view.
Converting from JSON to AVRO
To create a new topic with AVRO format, we can create a processor that will copy the data from our original topic to a new topic, changing the format in the process.
To do this, we start by going to Apps > New App > SQL Processor
(or SQL Processors > New SQL Processor
if you
are in Lenses < 5.0) and defining our processor with
the following code:
SET defaults.topic.autocreate=true;
INSERT INTO car_speed_events_avro
STORE
KEY AS AVRO
VALUE AS AVRO
SELECT STREAM *
FROM car_speed_events_json;
Notice the addition of STORE KEY AS AVRO VALULE AS AVRO
. This statement will tell our processor
which format we want each facet (key or value) to be stored as.
Hitting “Create New Processor” will start a new processor.
We can see the events were added and from now on, Lenses will keep pushing
any new events added to car_speed_events_avro
into car_speed_events_avro
.
From the Explore
section, we can verify that the format of car_speed_events_avro
is (AVRO, AVRO)
, and that
the generated AVRO schema is
{
"type": "record",
"name": "record",
"fields": [
{
"name": "plate",
"type": "string"
}
]
}
for the key, and
{
"type": "record",
"name": "record0",
"fields": [
{
"name": "speedMph",
"type": "int"
},
{
"name": "sensor",
"type": {
"type": "record",
"name": "record",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "lat",
"type": "double"
},
{
"name": "long",
"type": "double"
}
]
}
},
{
"name": "event_time",
"type": "long"
}
]
}
for the value.
Converting from AVRO to PROTOBUF
We will now convert the AVRO topic just created to PROTOBUF, natively supported by Lenses from version 5.0.
As before, we are going to create a processor that simply copies data, changing the format of both facets to PROTOBUF:
SET defaults.topic.autocreate=true;
INSERT INTO car_speed_events_protobuf
STORE
KEY AS PROTOBUF
VALUE AS PROTOBUF
SELECT STREAM *
FROM car_speed_events_avro;
After the creating and starting the processor, we will see that data is produced into the car_speed_events_protobuf
topic.
We can again verify that the format of our new topic is PROTOBUF for both the key and value facets from the Explore
section.
The protobuf schemas generated by Lenses will be
syntax = "proto3";
package io.lenses.schema;
message record {
string plate = 1;
}
for the key, and
syntax = "proto3";
package io.lenses.schema;
message record0 {
int32 speedMph = 1;
record sensor = 2;
int64 event_time = 3;
}
message record {
string id = 1;
double lat = 2;
double long = 3;
}
for the value.