Latest version: 4.3.x
Automate Kafka topic creation
Requirement
We want to automate the process of creating Kafka topics and their schema. How can I do that?
Solution
Lenses allows Kafka topic management, including their schema. When it comes to the schema, the Avro storage schema is also registered with the Schema Registry connection provided to Lenses. For JSON, CSV, or XML schemaless storage format, Lenses attempts to infer their schema, but Schema Registry is not involved. It enhances the Explore screen user experience and allows building stream analytics pipelines using SQL processors.
Here is the API to call to create a topic and attach a schema to it.
POST /api/v1/kafka/topic
Request body
{
"name" : "$YOUR_TOPIC",
"replication" : 1,
"partitions" : 1,
"configs" : {
"$KAKFA_TOPIC_CONFIG_KEY" : "$VALUE"
},
"format" : {
"key" : {
"format" : "INT/STRING/LONG/AVRO/JSON/CSV/XML",
"schema" : "$AVRO_SCHEMA"
},
"value" : {
"format" : "INT/STRING/LONG/AVRO/JSON/CSV/XML",
"schema" : "$AVRO_SCHEMA"
}
}
}
Field | Description | Required | Remarks |
---|---|---|---|
name | The Kafka topic name to create | yes | |
replication | The topic replication factor. If provided, it has to be greater than zero, and less than the cluster brokers count | no | |
partitions | The topic partitions. If provided has to be greater than 0 | no | |
configs | A key-value set, where the key is the Kafka topic configuration. | no | “configs” is required even without any properties set |
format.key.format | Specifies the storage format for the Kafka message Key | yes | Supported values: STRING, INT, LONG, AVRO, JSON, CSV, XML |
format.key.schema | Key data format in AVRO representation. | yes except primitives (INT, LONG, STRING) | |
format.value.format | Specifies the storage format for the Kafka message Value | yes | Supported values: STRING, INT, LONG, AVRO, JSON, CSV, XML |
format.value.schema | Value data format in AVRO representation. | yes except primitives (INT, LONG, STRING) |
Setting up our example
We have an input topic, let’s call it ‘input,’ the topic message key is text(i.e., STRING), and the message value is an Avro with this schema:
{
"type": "record",
"name": "ConnectDefault",
"namespace": "io.lenses.connect.avro",
"fields": [
{
"name": "EVENTID",
"type": "string"
},
{
"name": "EVENTDATETIME",
"type": "string"
},
{
"name": "CLIENTID",
"type": "string"
},
{
"name": "CUSTOMERCATEGORY",
"type": "string"
},
{
"name": "CUSTOMERID",
"type": "string"
},
{
"name": "ENTITYID",
"type": "string"
},
{
"name": "EVENTCORRELATIONID",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "EVENTSOURCE",
"type": "string"
},
{
"name": "SUPERPRODUCTID",
"type": "string"
},
{
"name": "TENANTID",
"type": "string"
},
{
"name": "TREATMENTCODE",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "SAPREASONCODE",
"type": "string"
},
{
"name": "TRXAMOUNT",
"type": "string"
},
{
"name": "TRXCODE",
"type": "string"
},
{
"name": "TRXDESCRIPTION",
"type": "string"
},
{
"name": "SourceType",
"type": [
"null",
"string"
],
"default": null
}
]
}
Step 1 - Create the security group
Navigate to the Admin section using the top bar navigation. From the left navigation system, navigate to Groups and add this group.
Step 2 - Create the service account
From there follow the left navigation system to go to Service accounts page, and add a new entry:
Grab the token provided for the service account; step 3 will use it.
Step 3 - Create the topic and the schema
{
"name": "input",
"replication": 1,
"partitions": 3,
"configs": {},
"format": {
"key": {
"format": "STRING"
},
"value": {
"format": "AVRO",
"schema": "{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.lenses.connect.avro\",\"fields\":[{\"name\":\"EVENTID\",\"type\":\"string\"},{\"name\":\"EVENTDATETIME\",\"type\":\"string\"},{\"name\":\"CLIENTID\",\"type\":\"string\"},{\"name\":\"CUSTOMERCATEGORY\",\"type\":\"string\"},{\"name\":\"CUSTOMERID\",\"type\":\"string\"},{\"name\":\"ENTITYID\",\"type\":\"string\"},{\"name\":\"EVENTCORRELATIONID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"EVENTSOURCE\",\"type\":\"string\"},{\"name\":\"SUPERPRODUCTID\",\"type\":\"string\"},{\"name\":\"TENANTID\",\"type\":\"string\"},{\"name\":\"TREATMENTCODE\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"SAPREASONCODE\",\"type\":\"string\"},{\"name\":\"TRXAMOUNT\",\"type\":\"string\"},{\"name\":\"TRXCODE\",\"type\":\"string\"},{\"name\":\"TRXDESCRIPTION\",\"type\":\"string\"},{\"name\":\"SourceType\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
}
}
}
Paste the content after running this command
cat > body.json
[Paste the JSON above]
[Hit Enter then CTRL+C]
Lenses authentication and authorization require a token passed via X-Kafka-Lenses-Token.
Since we created the service account entry, this token is already available.
curl --header "Content-Type: application/json" \
--header "X-Kafka-Lenses-Token: demo-sa:48cd653d-1889-42b6-9f38-6e5116db691a" \
-i -X POST \
--data @body.json \
http://localhost:24015/api/v1/kafka/topic
On successful call there will be an input
Kafka topic, and a input-value
Schema Registry subject entry.