Latest version: 4.3.x
Filtering data
Filtering messages and copying them to a topic can be achieved using the WHERE clause.
Setting up our example
In our example, we have a topic where our application registers bank transactions.
We have a topic called payments
where records have this shape:
KEY: "6A461C60-02F3-4C01-94FB-092ECBDE0837"
VALUE: {
"amount": "12.10",
"currency": "EUR",
"from_account": "xxx",
"to_account": "yyy",
"time": 1591970345000
}
We can replicate such a structure running the following query in SQL Studio:
CREATE TABLE payments(
amount double
, currency string
, from_account string
, to_account string
, time datetime
)
FORMAT (string, avro);
Each event has a unique string key generated by the upstream system.
We can again use SQL Studio to insert some data to play with:
INSERT INTO payments(
_key
, amount
, currency
, from_account
, to_account
, time
)
VALUES
("6A461C60-02F3-4C01-94FB-092ECBDE0837", 10, "EUR", "account-1", "account-2", 1590970345000),
("E5DA60E8-F622-43B2-8A93-B958E01E8AB3", 100000, "EUR", "account-1", "account-3", 1591070346000),
("0516A309-FB2B-4F6D-A11F-3C06A5D64B68", 5300, "USD", "account-2", "account-3", 1591170347000),
("0871491A-C915-4163-9C4B-35DEA0373B41", 6500, "EUR", "account-3", "account-1", 1591270348000),
("2B557134-9314-4F96-A640-1BF90887D846", 300, "EUR", "account-1", "account-4", 1591370348000),
("F4EDAE35-45B4-4841-BAB7-6644E2BBC844", 3400, "EUR", "account-2", "account-1", 1591470349000),
("F51A912A-96E9-42B1-9AC4-42E923A0A6F8", 7500, "USD", "account-2", "account-3", 1591570352000),
("EC8A08F1-75F0-49C8-AA08-A5E57997D27A", 2500000, "USD", "account-1", "account-3", 1591670356000),
("9DDBACFF-D42B-4042-95AC-DCDD84F0AC32", 1000, "GBP", "account-2", "account-3", 1591870401000)
;
Example 1 - Find big transactions
Let’s assume we need to detect significant transactions that will be then fed into our anti-fraud system.
We want to copy those transactions into a new topic, maintaining the content of the records as it is.
For our first example, we will use a simple predicate to filter transactions with an amount larger than 5000, regardless of the currency.
Lenses SQL supports all the common
comparison operators
to compare values, so for our goal it is enough to use a WHERE
statement with a >=
condition:
SET defaults.topic.autocreate=true;
INSERT INTO big_payments
SELECT STREAM *
FROM payments
WHERE amount >= 5000
Checking the records emitted by the processor, we see that we got the transactions we were looking for.
Because of the *
projection, records content has not changed.
KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, ... }
------------------------------------
KEY:"0516A309-FB2B-4F6D-A11F-3C06A5D64B68"
VALUE: { amount:5300, ... }
------------------------------------
KEY:"0871491A-C915-4163-9C4B-35DEA0373B41"
VALUE: { amount:6500, ... }
------------------------------------
KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8"
VALUE: { amount:7500, ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, ... }
Not all currencies are the same, so we would like to add a specific threshold for each currency.
As a first cut, we combine multiple conditions with AND
s and OR
s:
SET defaults.topic.autocreate=true;
INSERT INTO big_eur_usd_payments
SELECT STREAM *
FROM payments
WHERE
(amount >= 5000 AND currency = 'EUR') OR
(amount >= 5500 AND currency = 'USD')
As an improvement, we want to capture the threshold for each currency in a single expression. We will use a CASE
statement for that:
SET defaults.topic.autocreate=true;
INSERT INTO big_payments_case
SELECT STREAM *
FROM payments
WHERE
amount >= (CASE
WHEN currency = 'EUR' THEN 5000
WHEN currency = 'USD' THEN 5500
WHEN currency = 'GBP' THEN 4500
ELSE 5000
END)
getting the results:
KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, currency:"EUR", ... }
------------------------------------
KEY:"0871491A-C915-4163-9C4B-35DEA0373B41"
VALUE: { amount:6500, currency:"EUR", ... }
------------------------------------
KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8"
VALUE: { amount:7500, currency:"USD", ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, currency:"USD", ... }
Example 2 - Filtering by date
In this section, we will find all the transactions that happened during the (UTC) night. To do that we can use one of our many date and time functions .
You will also see how to use a CAST
expression to convert from one type to another.
SET defaults.topic.autocreate=true;
INSERT INTO night_payments
SELECT STREAM *
FROM payments
WHERE
CAST(HOUR(time) AS INT) >= 0 AND
CAST(HOUR(time) AS INT) <= 5
Checking the output, we can see that only one transaction satisfied our predicate:
KEY:"6A461C60-02F3-4C01-94FB-092ECBDE0837"
VALUE: { amount:10, currency:"EUR", time:1590970345000, ... }
------------------------------------
KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, currency:"EUR", time:1591070346000, ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, currency:"USD", time:1591670356000, ... }
Example 3 - Sampling with random functions
Let’s imagine that we have to build some intelligence around all the payments we process, but we do not have the capacity and the need to process all of them.
We decided then to build a reduced copy of the payments topic, with only 10% of the original records.
To do that we are going to use our RANDINT
function:
SET defaults.topic.autocreate=true;
INSERT INTO payments_sample
SELECT STREAM *
FROM payments
WHERE CAST(ABS(RANDINT()) AS DOUBLE) / 2147483647 < 0.01
RANDINT
generates a random integer, we take its absolute value to make sure it is positive, and we then normalise the result
dividing it by the max possible integer, getting an (almost) uniform sample of numbers between 0 and 1.
We have to CAST
to double
on the way; otherwise, the division would be between integers, and the result would always be 0
.