Latest version: 4.1.x
Join 3 Kafka topics
In this example, we will show how to aggregate three Kafka topics by using Streaming SQL processors.
We will be aggregating:
employee_dictionary: messages contain the name, surname and employee id
contact_info: messages contain the email and other contact information
address: message contain address details
The events are streamed into Kafka from an external database, and the goal is to aggregate all employee information into a single topic.
Streaming SQL enrichment on Apache Kafka
With the following SQL function, we can wrangle events that are sent to the three Kafka topics, (as they are sourced upstream from a Database using a CDC Change Data Capture Process Connector).
SET defaults.topic.autocreate=true; WITH AdressInfoTable AS ( SELECT TABLE metadata.masterKey as masterKey , employeeAddress FROM employee_address_events ); WITH ContactInfoTable AS ( SELECT TABLE metadata.masterKey as masterKey , employeeContactInfo FROM employee_contact_info_events ); WITH productTable AS ( SELECT TABLE p.masterKey as masterKey , c.employeeContactInfo , p.employeeAddress FROM AdressInfoTable AS p INNER JOIN ContactInfoTable AS c ON p.masterKey = c.masterKey ); INSERT INTO employee_enriched SELECT TABLE productTable.masterKey , employee_dictionary.employee.firstName as firstName , employee_dictionary.employee.lastName as lastName , employee_dictionary.employee.employeeId as employeeId , productTable.employeeContactInfo , productTable.employeeAddress FROM employee_dictionary INNER JOIN productTable ON employee_dictionary._key = productTable.masterKey;
The streaming topology that we want to achieve is effectively the bellow:
To simplify our testing process and run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:
CREATE TABLE employee_dictionary ( _key string , metadata.eventType string , metadata.entityType string , employee.firstName string , employee.lastName string , employee.employeeId string ) FORMAT(string, json) PROPERTIES(partitions=1, replication=1, compacted=true);
Note: The employee_dictionary topics is compacted since it hosts the name and employee id.
INSERT INTO employee_dictionary( _key , metadata.eventType , metadata.entityType , employee.firstName , employee.lastName , employee.employeeId ) VALUES ("196ea70d-b7f2-43ac-a15a-fa1774f7488a","employee_created","employee","Ollie","Johnson","30-6107836"), ("9d7f69e9-9ea0-4785-903a-d09d0f8342b7","employee_created","employee","Harry","Williamson","25-7784327");
CREATE TABLE employee_contact_info_events ( _key string , metadata.masterKey string , metadata.eventType string , metadata.entityType string , employeeContactInfo.type string , employeeContactInfo.preferencePriority int , employeeContactInfo.value string ) FORMAT(string, json) PROPERTIES(partitions=1, replication=1, compacted=false);
INSERT INTO employee_contact_info_events( _key , metadata.masterKey , metadata.eventType , metadata.entityType , employeeContactInfo.type , employeeContactInfo.preferencePriority , employeeContactInfo.value ) VALUES ("55f3730f-731b-45ae-8f06-1a333ab83210","196ea70d-b7f2-43ac-a15a-fa1774f7488a","employee_contact_info_added","employee_contact_info","email",1,"email@example.com"), ("57432ae9-fa30-478e-aa87-12d026cd2bad","9d7f69e9-9ea0-4785-903a-d09d0f8342b7","employee_contact_info_added","employee_contact_info","email","1","firstname.lastname@example.org");
CREATE TABLE employee_address_events( _key string , metadata.masterKey string , metadata.eventType string , metadata.entityType string , employeeAddress.type string , employeeAddress.address string , employeeAddress.city string , employeeAddress.state string , employeeAddress.zipCode int ) FORMAT(string, json) PROPERTIES(partitions=1, replication=1, compacted=false);
INSERT INTO employee_address_events( _key , metadata.masterKey , metadata.eventType , metadata.entityType , employeeAddress.type , employeeAddress.address , employeeAddress.city , employeeAddress.state , employeeAddress.zipCode ) VALUES ("479b6e61-1e3a-457c-a26f-2c7ef7d35f5b","196ea70d-b7f2-43ac-a15a-fa1774f7488a","employee_address_added","employee_address","physical","3415 Brookdale Drive","Santa Clara","CA","95051"), ("23cfb3a6-cebb-4746-a7ee-6cd64b527e1c","9d7f69e9-9ea0-4785-903a-d09d0f8342b7","employee_address_added","employee_address","physical","835 Shiloh Court","Redding","CA","96003");
Now that we have created the topics and populated them, we are ready to execute our streaming SQL function to create enriched information and write it into a new topic.
A Kafka Streams application executing the above SQL and deployed automatically (in ~ 2 seconds) onto Kubernetes will result into the following performance/monitoring information:
In just a few seconds, we have already enriched hundreds of thousand of events: