Latest version: 4.3.x
Multiple topics
In this tutorial, we will see how we can read data from multiple topics, process it as needed, and write the results to as many output topics we need, all by using a single SQL Processor.
Setting up our example
Let’s assume that we have a topic (game-sessions
) that contains data regarding remote gaming sessions by users.
Each gaming session will contain:
- the points the user achieved throughout the session
- Metadata information regarding the session:
- The country where the game took place
- The language the user played the game in
The above structure represents the value
of each record in our game-sessions
topic.
Additionally, each record is keyed by user details.
- A pid, or player id, representing this user uniquely
- Some additional denormalised user details:
- a name
- a surname
- an age
In light of the above, a record might look like the following (in JSON for simplicity):
{
"key":{
"pid": 1,
"name": "Billy",
"surname": "Lagrange",
"age": 30
},
"value":{
"points": 5,
"sessionMetadata": {
"country": "Italy",
"language": "IT"
}
}
}
Finally, let’s assume we also have another, normalised, compacted topic user-details
, keyed by an int
matching the pid
from topic game-sessions
and containing user information like address and period of membership to the platform.
In light of the above, a record might look like the following (in JSON for simplicity):
{
"key": 1,
"value":{
"fullName": "Billy Lagrange",
"memberYears": 3,
"address": {
"country": "Italy",
"street": "Viale Monza 5",
"city": "Milan"
}
}
}
We can replicate such structures using SQL Studio and the following query:
CREATE TABLE game-sessions(
_key.pid int
, _key.name string
, _key.surname string
, _key.age int
, points double
, sessionMetadata.country string
, sessionMetadata.language string
)
FORMAT (avro, avro);
CREATE TABLE user-details(
fullName string
, memberYears int
, address.country string
, address.street string
, address.city string
) FORMAT (int, avro);
We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:
INSERT into game-sessions(
_key.pid
, _key.name
, _key.surname
, _key.age
, points
, sessionMetadata.country
, sessionMetadata.language
) VALUES
(1, 'Billy', 'Lagrange', 35, 5, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 30, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 0, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 50, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 10, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 10, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 80, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 30, 'UK', 'EN'),
(7, 'Nigel', 'Calling', 50, 5, 'UK', 'EN'),
(2, 'Maria', 'Rossi', 27, 10, 'UK', 'EN'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 16, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 70, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 50, 'Italy', 'IT'),
(6, 'Dave', 'Holden', 31, 70, 'Spain', 'ES'),
(2, 'Maria', 'Rossi', 27, 70, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT')
;
INSERT into user-details(
_key
, fullName
, memberYears
, address.country
, address.street
, address.city
) VALUES
(1, 'Billy Lagrange', 3, 'Italy', 'Viale Monza 5', 'Milan'),
(2, 'Maria Rossi', 1, 'Italy', 'Stazione Termini', 'Rome'),
(3, 'Jorge Escudero', 5, 'Spain', '50 Passeig de Gracia', 'Barcelona'),
(4, 'Juan Suarez', 0, 'Mexico', 'Plaza Real', 'Tijuana'),
(5, 'John Bolden', 2, 'USA', '100 Wall Street', 'New Work'),
(6, 'Dave Holden', 1, 'UK', '25 Bishopsgate', 'London'),
(7, 'Nigel Calling', 6, 'UK', '10 Queen Anne Street', 'Brighton')
;
Multiple transformations all in one go
Let’s imagine that, given the above data, we are given the following requirements:
- For each country in the
games-sessions
, create a record with the count of games played in from that country. Write the results to thegames-per-country
topic. - For each record in the
games-sessions
, reshape the records to remove everything from the key besidepid
. Additionally, add the user’smemberYears
to the value. Write the results to thegames-sessions-normalised
topic .
We can obtain the above with the following query:
SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';
WITH userDetailsTable AS(
SELECT TABLE *
FROM user-details
);
WITH joinedAndNormalised AS(
SELECT STREAM
gs.*
, ud.memberYears
FROM game-sessions AS gs JOIN userDetailsTable AS ud
ON (gs._key.pid = ud._key)
);
INSERT INTO games-per-country
SELECT STREAM
COUNT(*) AS gamesPlayed
FROM game-sessions
GROUP BY sessionMetadata.country;
INSERT INTO games-sessions-normalised
SELECT STREAM *
FROM joinedAndNormalised;
The result of this processor in the UI will be a processor graph similar to the following:
Finally, the content of the output topics games-per-country
and games-sessions-normalised
can now be inspected in the Lenses Explore screen:
Conclusion
In this tutorial, we learned how to read data from multiple topics, combine it, and process in different ways and save it in as many output topics as needed.
For more details about the specific operations used within this tutorial, please refer to the Aggregations and Joins sections of the user documentation.
Good luck and happy streaming!