Supported versions: 4.1+
Work with arrays
In this tutorial, we will see how to use Lenses SQL to extract, manipulate and inspect the single elements of an array.
LATERAL JOIN
to treat the elements of an arrays as a normal field.
You will learn to:
- Extract the single elements of an array with a single
LATERAL
join. - Extract elements of a multi-level array with nested
LATERAL
joins. - Use arbitrary complex array expressions as the right hand side of a
LATERAL
join.
Example 1 - Lateral Join on a simple array
In this example, we are getting data from a sensor.
Unfortunately, the upstream system register the readings in batches, while what we need is a single record for every reading.
An example of such a record is the following:
KEY: 1
VALUE: {
"meter_id": 1,
"readings": [100, 101, 102]
}
Notice how each record contains multiple readings, inside the reading
array field.
We can replicate such a structure running the following query in SQL Studio:
CREATE TABLE batched_readings(
meter_id int,
readings int[]
) FORMAT(int, AVRO);
We can again use SQL Studio to insert some data to play with:
INSERT INTO batched_readings(_key, meter_id, readings) VALUES
(1, 1, [100, 92, 93, 101]),
(2, 2, [81, 82, 81]),
(3, 1, [95, 94, 93, 96]),
(4, 2, [80, 82])
What we want to obtain is a new topic readings
, where each record contain a single reading, together with its meter_id
.
Considering the first record, we expect to explode it to four different records, one for each reading:
KEY: 1
VALUE: { "meter_id": 1, "reading": 100 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 92 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 93 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 101 }
In LensesSQL you can easily achieve that with the special LATERAL
syntax (see the
documentation
for more details).
You can create a processor defined as follows:
SET defaults.topic.autocreate=true;
INSERT INTO
readings
SELECT STREAM
meter_id,
reading
FROM
batched_readings
LATERAL readings as reading
The magic happens in batched_readings LATERAL readings as reading
. With that we are basically saying:
- for each record in
batched_readings
- for each element inside the
readings
array of that record - build a new record with all the fields of the original
batched_readings
record, plus an extrareading
field, that will contain the value of the current element ofreadings
.
We can then use in the SELECT
both the original fields and the new reading
field.
If you save the processor and run it, you will see that it will emit the records we expected.
Filtering
One of the powerful features of a LATERAL
join is that the expression you put in the
LATERAL
can be used as a normal field. This means that you can then use it for example also in a WHERE
or in a GROUP BY
.
In this section we will see how to filter records generated by a LATERAL
using a normal WHERE
.
We want to modify our previous processor in order to emit only the readings greater than 95
.
To do that is enough to use reading
as if it were a normal field, in the WHERE
section:
SET defaults.topic.autocreate=true;
INSERT INTO
readings
SELECT STREAM
meter_id,
reading
FROM
batched_readings
LATERAL readings as reading
WHERE
reading > 95
Running the processor we get the records
KEY: 1
VALUE: { "meter_id": 1, "reading": 100 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 101 }
-----------------------------
KEY: 3
VALUE: { "meter_id": 1, "reading": 96 }
Example 2 - Lateral Join on a multi-level array
This example is similar to the previous one. The only difference is that the readings are now stored in batches of batches:
CREATE TABLE batched_readings_nested(
meter_id int,
nested_readings int[][]
)
FORMAT(int, AVRO);
As you can see, nested_readings
is an array whose elements are array of integers.
We can again use SQL Studio to insert some data:
INSERT INTO batched_readings_nested(_key, meter_id, nested_readings) VALUES
(1, 1, [[100, 92], [93, 101]]),
(2, 2, [[81], [82, 81]]),
(3, 1, [[95, 94, 93], [96]]),
(4, 2, [[80, 82]])
We would like to define a processor that emits the same records of the previous one.
In this case though we are dealing with nested_readings
, that is a multi-level array, so a single LATERAL
join
is not enough. But nesting a LATERAL
inside another will do the job:
SET defaults.topic.autocreate=true;
INSERT INTO
readings
SELECT STREAM
meter_id,
reading
FROM
batched_readings_nested
LATERAL nested_readings as readings
LATERAL readings as reading
This is roughly what happens in the FROM
clause:
- We first unwrap the first level of the array, doing a
batched_readings_nested LATERAL nested_readings as readings
. - At that point,
readings
will be an array of integers. - We can then use it in the outer
... LATERAL readings as reading
join, and the single integers will finally be extracted and made available asreading
.
Example 3 - Complex array expressions
In this section we will see how it is possible to use any expression as the right hand side of a LATERAL
join,
as long as it gets evaluated to an array.
We have a table where the meter readings are split into two columns, readings_day
and readings_night
.
CREATE TABLE day_night_readings(
meter_id int,
readings_day int[],
readings_night int[]
)
FORMAT(int, AVRO);
Let’s insert the same data as the first example, but where the readings are split across the two columns.
INSERT INTO day_night_readings(_key, meter_id, readings_day, readings_night) VALUES
(1, 1, [100, 92], [93, 101]),
(2, 2, [81], [82, 81]),
(3, 1, [95, 94, 93], [96]),
(4, 2, [80], [81])
To extract the readings one by one, we need first to concatenate the two arrays readings_day
and readings_night
.
We can achieve that using flatten
, one of our
array functions
.
We can then use the concatenated array in a LATERAL
join:
SET defaults.topic.autocreate=true;
INSERT INTO
readings
SELECT STREAM
meter_id,
reading
FROM
day_night_readings
LATERAL flatten([readings_day, readings_night]) as reading
The processor defined above will emit the records
KEY: 1
VALUE: { "meter_id": 1, "reading": 100 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 92 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 93 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 101 }
-----------------------------
KEY: 2
VALUE: { "meter_id": 2, "reading": 81 }
...
as we expected.