Control event time

Every message in Kafka comes with a timestamp, and Lenses Engine Streaming mode uses that by default when doing time-dependent operations, like aggregations and joins.

Sometimes though that timestamp is not exactly what you need, and you would like to use a field in the record value or key as the new timestamp.

In Lenses SQL you can use the special EVENTTIME BY ... syntax to control records timestamp.

Setting up our example

In our toy example, we have a simple topic where electricity meter readings events are collected:

CREATE TABLE electricity_events(
    KW double
    , customer_id int
    , event_time long
)
FORMAT (string, avro);

We can also insert some example data to do our experiments:

INSERT INTO electricity_events(
    KW
    , customer_id
    , event_time
) VALUES
(1.0, 1, 1592848400000),
(2.0, 2, 1592848400000),
(1.5, 3, 1592848400000),
(2.0, 1, 1592848405000),
(1.0, 2, 1592848405000),
(2.5, 3, 1592848405000),
(3.0, 1, 1592848410000),
(0.5, 2, 1592848410000),
(4.0, 3, 1592848405000)
;

If you query the events, you can see that Kafka sets a timestamp for each record. That timestamp is, in our case, the time of when the record was inserted. As you can see, it is totally unrelated to the event_time field we have in the payload.

[
{"value":{"KW":1,"customer_id":1,"event_time":1592848400000},"metadata":{...,"timestamp":1594041840812,...},
{"value":{"KW":2,"customer_id":2,"event_time":1592848400000},"metadata":{...,"timestamp":1594041840821,...},
{"value":{"KW":1.5,"customer_id":3,"event_time":1592848400000},"metadata":{...,"timestamp":1594041840828,...},
{"value":{"KW":2,"customer_id":1,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840834,...},
{"value":{"KW":1,"customer_id":2,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840842,...},
{"value":{"KW":2.5,"customer_id":3,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840848,...},
{"value":{"KW":3,"customer_id":1,"event_time":1592848410000},"metadata":{...,"timestamp":1594041840855,...},
{"value":{"KW":0.5,"customer_id":2,"event_time":1592848410000},"metadata":{...,"timestamp":1594041840862,...},
{"value":{"KW":4,"customer_id":3,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840868,...}
]

(if you want to know more how you can configure the timestamp set by Kafka for the records, check our documentation )

Computing moving averages

We would like to transform our original stream of events, aggregating events with a hopping window of 10s width and an increment of 5s, computing the average for each window.

You can create a new processor that streams those averages, using the special WINDOW BY ... syntax:

SET defaults.topic.autocreate=true;

INSERT INTO electricity_events_avg_wrong
SELECT STREAM 
    customer_id
    , AVG(KW) as KW
FROM electricity_events
WINDOW BY HOP 10s,5s
GROUP BY customer_id

For customer 1, we have three events in input, with a 5s delay between them, so we expect four output events for that customer, since 4 is the number of hopping windows involved.

ButChecking the emitted records we see that only two are produced.

This is because by default windowing operations works on the record timestamp, and in our case all the timestamps are pretty much the same, and they coincide with the time the records were inserted.

Fortunately we can change this behaviour using the special EVENTTIME BY ... syntax, specifying an expression to be used as a timestamp:

SET defaults.topic.autocreate=true;

INSERT INTO electricity_events_avg
SELECT STREAM 
    customer_id
    , AVG(KW) as KW
FROM electricity_events
EVENTTIME BY event_time
WINDOW BY HOP 10s,5s
GROUP BY customer_id

As you can see, the results have been windowed using event_time as the timestamp:

[
{"key":{"value":1,"window":{"start":1592848395000,"end":null}},"value":{"customer_id":1,"KW":1}, ...},
{"key":{"value":1,"window":{"start":1592848400000,"end":null}},"value":{"customer_id":1,"KW":1.5}, ...},
{"key":{"value":1,"window":{"start":1592848405000,"end":null}},"value":{"customer_id":1,"KW":2.5}, ...},
{"key":{"value":1,"window":{"start":1592848410000,"end":null}},"value":{"customer_id":1,"KW":3}, ...}
]

To know more about time and time windows in Lenses SQL, check our documentation .