Filtering data

Filtering messages and copying them to a topic can be achieved using the WHERE clause.

Setting up our example

In our example, we have a topic where our application registers bank transactions.

We have a topic called payments where records have this shape:

KEY: "6A461C60-02F3-4C01-94FB-092ECBDE0837"
VALUE: {
  "amount": "12.10",
  "currency": "EUR",
  "from_account": "xxx",
  "to_account": "yyy", 
  "time": 1591970345000
}

We can replicate such a structure running the following query in SQL Studio:

CREATE TABLE payments(
    amount double
    , currency string
    , from_account string
    , to_account string
    , time datetime
)
FORMAT (string, avro);

Each event has a unique string key generated by the upstream system.

We can again use SQL Studio to insert some data to play with:

INSERT INTO payments(
    _key
    , amount
    , currency 
    , from_account
    , to_account 
    , time
)
VALUES
("6A461C60-02F3-4C01-94FB-092ECBDE0837", 10, "EUR", "account-1", "account-2", 1590970345000),
("E5DA60E8-F622-43B2-8A93-B958E01E8AB3", 100000, "EUR", "account-1", "account-3", 1591070346000),
("0516A309-FB2B-4F6D-A11F-3C06A5D64B68", 5300, "USD", "account-2", "account-3", 1591170347000),
("0871491A-C915-4163-9C4B-35DEA0373B41", 6500, "EUR", "account-3", "account-1", 1591270348000),
("2B557134-9314-4F96-A640-1BF90887D846", 300, "EUR", "account-1", "account-4", 1591370348000),
("F4EDAE35-45B4-4841-BAB7-6644E2BBC844", 3400, "EUR", "account-2", "account-1", 1591470349000),
("F51A912A-96E9-42B1-9AC4-42E923A0A6F8", 7500, "USD", "account-2", "account-3", 1591570352000),
("EC8A08F1-75F0-49C8-AA08-A5E57997D27A", 2500000, "USD", "account-1", "account-3", 1591670356000),
("9DDBACFF-D42B-4042-95AC-DCDD84F0AC32", 1000, "GBP", "account-2", "account-3", 1591870401000)
;

Example 1 - Find big transactions

Let’s assume we need to detect significant transactions that will be then fed into our anti-fraud system.

We want to copy those transactions into a new topic, maintaining the content of the records as it is.

For our first example, we will use a simple predicate to filter transactions with an amount larger than 5000, regardless of the currency.

Lenses SQL supports all the common comparison operators to compare values, so for our goal it is enough to use a WHERE statement with a >= condition:

SET defaults.topic.autocreate=true;

INSERT INTO big_payments
SELECT STREAM *
FROM payments
WHERE amount >= 5000

Checking the records emitted by the processor, we see that we got the transactions we were looking for.

Because of the * projection, records content has not changed.

KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, ... }
------------------------------------
KEY:"0516A309-FB2B-4F6D-A11F-3C06A5D64B68"
VALUE: { amount:5300, ... }
------------------------------------
KEY:"0871491A-C915-4163-9C4B-35DEA0373B41"
VALUE: { amount:6500, ... }
------------------------------------
KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8"
VALUE: { amount:7500, ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, ... }

Not all currencies are the same, so we would like to add a specific threshold for each currency. As a first cut, we combine multiple conditions with ANDs and ORs:

SET defaults.topic.autocreate=true;

INSERT INTO big_eur_usd_payments
SELECT STREAM *
FROM payments
WHERE
    (amount >= 5000 AND currency = 'EUR') OR
    (amount >= 5500 AND currency = 'USD')

As an improvement, we want to capture the threshold for each currency in a single expression. We will use a CASE statement for that:

SET defaults.topic.autocreate=true;

INSERT INTO big_payments_case
SELECT STREAM *
FROM payments
WHERE
    amount >= (CASE 
        WHEN currency = 'EUR' THEN 5000
        WHEN currency = 'USD' THEN 5500
        WHEN currency = 'GBP' THEN 4500
        ELSE 5000
    END)

getting the results:

KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, currency:"EUR", ... }
------------------------------------
KEY:"0871491A-C915-4163-9C4B-35DEA0373B41"
VALUE: { amount:6500, currency:"EUR", ... }
------------------------------------
KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8"
VALUE: { amount:7500, currency:"USD", ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, currency:"USD", ... }

Example 2 - Filtering by date

In this section, we will find all the transactions that happened during the (UTC) night. To do that we can use one of our many date and time functions .

You will also see how to use a CAST expression to convert from one type to another.

SET defaults.topic.autocreate=true;

INSERT INTO night_payments
SELECT STREAM *
FROM payments
WHERE
    CAST(HOUR(time) AS INT) >= 0 AND
    CAST(HOUR(time) AS INT) <= 5 

Checking the output, we can see that only one transaction satisfied our predicate:

KEY:"6A461C60-02F3-4C01-94FB-092ECBDE0837"
VALUE: { amount:10, currency:"EUR", time:1590970345000, ... }
------------------------------------
KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, currency:"EUR", time:1591070346000, ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, currency:"USD", time:1591670356000, ... }

Example 3 - Sampling with random functions

Let’s imagine that we have to build some intelligence around all the payments we process, but we do not have the capacity and the need to process all of them.

We decided then to build a reduced copy of the payments topic, with only 10% of the original records.

To do that we are going to use our RANDINT function:

SET defaults.topic.autocreate=true;

INSERT INTO payments_sample
SELECT STREAM *
FROM payments
WHERE CAST(ABS(RANDINT()) AS DOUBLE) / 2147483647 < 0.01

RANDINT generates a random integer, we take its absolute value to make sure it is positive, and we then normalise the result dividing it by the max possible integer, getting an (almost) uniform sample of numbers between 0 and 1.

We have to CAST to double on the way; otherwise, the division would be between integers, and the result would always be 0.