Latest version: 4.3.x
Update a SQL Processor
In Lenses it is not possible to directly edit a SQL Processor, for example changing the query or changing some of its deployment configuration.
There is a reason behind this: a change in a processor may result in a new processor that is incompatible with the previous one, producing incorrect or duplicated results.
But if you know what we are doing, and you are confident that the change will be a compatible one, there is a way to achieve this.
In this article we will first see how to update a processor, and then we will briefly analyse when it is safe to do so.
Overview of our strategy
To emulate the editing of a processor, we will create a new one sharing the same Processor ID
as the previous one.
The Processor ID
will determine the
consumer group
of the underlying Kafka Streams application.
Sharing the same consumer group, will allow the processor to start exactly from where the previous one left off.
We will now set up a simple example and see, step by step, how to apply this strategy.
Our example
We will assume we have a simple topic called player_results
, storing the result of chess games. Each record will contain information
about the outcome of a game for a single player.
The _key
is an identifier identifying the result is referring to.
The result
can be one of Win
, Loss
, and Draw
.
Create the input topic
From SQL Studio, create the topic and insert some data into it:
create table player_results(playerId string, result string)
FORMAT(long, protobuf);
INSERT INTO player_results(_key, playerId, result) values
(0, "Player 1", "Win"),
(0, "Player 2", "Loss"),
(1, "Player 1", "Loss"),
(1, "Player 2", "Win"),
(2, "Player 1", "Draw"),
(2, "Player 2", "Draw"),
(3, "Player 3", "Win"),
(3, "Player 1", "Loss"),
(4, "Player 2", "Draw"),
(4, "Player 1", "Draw");
Here we used PROTOBUF
as a serialisation format, but that is not relevant for the sake of this example.
Create the processor
We want to compute the total number of points for each player, initially following these rules
2
points for aWin
1
point for aDraw
0
points for aLoss
To do that, create the following processor:
SET defaults.topic.autocreate=true;
insert into player_stats
select stream
SUM(case
when result = "Win" then 2
when result = "Draw" then 1
else 0
end) as points
from player_results
group by playerId
Save the processor and start it.
Check the data
After a while (it may take some time because the processor is stateful), you will see the following results in the output topic (you can check them in SQL Studio):
KEY: "Player 3"
points: 2
KEY: "Player 2"
points: 4
KEY: "Player 1"
points: 4
That is consistent with our data: Player 3
scored a single Win, for a total of 2 points
,
while Player 1
and Player 2
scored both a win and two draws, for a total of 4 points
.
Update the processor
At some point we decide to change our scoring algorithm, assigning 3 points
in case of victory, instead of 2
.
We decided that the change should not affect historical data, but only new games.
Since we are going to delete the processor, we need first to copy all the information we need from it, in order to be used in the new processor.
1. Stop the existing processor
We need to stop the processor in order to avoid to compute new data with the old point algorithm. You can stop it
clicking on Actions > Stop
in the processor page.
2. Copy the Processor ID
This is a mandatory step, since without Processor ID
we will lose the ability to start from where the old processor stopped.
You can find the Processor ID
under the Configuration
tab of the processor page.
3. Copy other details of the Processor
Copy any other detail you may need for the creation of the new processor. In our case, we are just going to copy the SQL query.
4. Delete the processor
Make sure you copied all you need, and delete the processor.
This step is necessary, since Lenses does not allow the creation of multiple processors with the same Processor ID
.
5. Create the new processor
Create a new processor with the following data:
Processor ID
: the one you copied in one of the previous stepsSQL Query
: the query of the previous processor, updated to assign3
points to a victory:
SET defaults.topic.autocreate=true;
insert into player_stats
select stream
SUM(case
when result = "Win" then 3
when result = "Draw" then 1
else 0
end) as points
from player_results
group by playerId
Save the processor and start it.
6. Insert some new data
In SQL Studio, insert the results for a new game between Player 1
and Player 2
:
INSERT INTO player_results(gameId, playerId, result) values
(5, "Player 1", "Win"),
(5, "Player 2", "Loss");
7. Verify the data
After a while, we should see the following data in player_stats
:
KEY: "Player 3"
points: 2
KEY: "Player 2"
points: 4
KEY: "Player 1"
points: 4
KEY: "Player 1"
points: 7
KEY: "Player 2"
points: 4
As expected, the points of Player 2
and Player 3
did not change.
This means that the data was not-recomputed from scratch, since in that case, according to the new rules, they should have
been assigned 5
points each.
On the other hand, Player 1
scored a victory when the new scoring system was in place, getting three points more,
raising its total to 7
.
8. Enjoy the new version of the processor
The new version of the processor has been created, and we verified that it produced the desired results.
Warning
We saw that updating a processor, although not directly supported by Lenses, is quite an easy task.
But updating a processor is also a very delicate process. For this reason it should be done with the maximal care, only when you are 100% sure that the change is compatible with the previous version.
Particular attention has to go to stateful processors, i.e. processors that make use of one of the following:
JOIN
GROUP BY
SELECT TABLE
In such cases, the processor will internally build names for its state-related internal topics. If something significant is changed in the processor query, those names may change in the new version, causing the loss of the old state.
Safe changes
Stateless processors: It is safe to update a non-stateful processor, for example adding a WHERE
statement. Since the processor is not stateful, no internal topic names are involved.
Stateful processors: In a stateful processor, the only safe change is to replace an expression with another one, like we did in our example, since such a change will not affect the name of internal topics.
Unsafe changes
Almost everything else is an unsafe change, unless you know what are the consequences of the change and you are OK with that.
Even adding a simple WHERE
statement to a stateful processor may cause some internal topics to change name, causing
the loss of state described above.