What would a blog post on rmoff.net
be if it didn’t include the dirty hack option? 😁
The secret to dirty hacks is that they are often rather effective and when needs must, they can suffice. If you’re prototyping and need to JFDI, a dirty hack is just fine. If you’re looking for code to run in Production, then a dirty hack probably is not fine.
Let’s assume we’ve got XML data sat somewhere that we can access with a shell tool that will output it to stdout
. In this example it’s a REST endpoint somewhere that we can poll, like this (courtesy of TfL OpenData). We’re going to use the power of unix pipelines to string together some powerful tools:
Wrangling the XML data and streaming it into Kafka #
You can try out all this code by spinning up this Docker Compose |
Let’s start by checking what we actually want to send to Kafka. The raw payload from the REST endpoint looks like this:
<?xml version="1.0" encoding="utf-8"?>
<stations lastUpdate="1601312340962" version="2.0">
<station>
<id>1</id>
<name>River Street , Clerkenwell</name>
<terminalName>001023</terminalName>
<lat>51.52916347</lat>
…
</station>
<station>
<id>2</id>
<name>Phillimore Gardens, Kensington</name>
<terminalName>001018</terminalName>
<lat>51.49960695</lat>
…
…
Using xq
we use the same kind of construction as we would with jq
to construct a target JSON object:
curl --show-error --silent https://tfl.gov.uk/tfl/syndication/feeds/cycle-hire/livecyclehireupdates.xml | \
xq '.'
This gives us a JSON structure that looks like this
{
"stations": {
"@lastUpdate": "1601462461108",
"@version": "2.0",
"station": [
{
"id": "1",
"name": "River Street , Clerkenwell",
"terminalName": "001023",
"lat": "51.52916347",
…
},
{
"id": "2",
"name": "Phillimore Gardens, Kensington",
"terminalName": "001018",
"lat": "51.49960695",
…
},
We need to decide how to carve up the data, since we’ve effectively got a batch of data here and Kafka works on the concept of messages/records. Therefore we’re going to do this:
-
Take each
station
element as its own message -
Add in the
lastUpdate
value from thestations
element into eachstation
message (i.e. denormalise the payload somewhat)
We can use some xq
magic to do this, extracting each element from the station array into its own root-level object (.stations.station[]
) and adding in the lastUpdate
field (+ {lastUpdate: .stations."@lastUpdate"}
). If you want to learn more about the power of jq
(on which xq
is modelled) you can try out this code here.
So with the source REST API data piped through xq
we get this:
curl --show-error --silent https://tfl.gov.uk/tfl/syndication/feeds/cycle-hire/livecyclehireupdates.xml | \
xq '.stations.station[] + {lastUpdate: .stations."@lastUpdate"}'
{
"id": "1",
"name": "River Street , Clerkenwell",
"terminalName": "001023",
"lat": "51.52916347",
…
"lastUpdate": "1601462700830"
}
{
"id": "2",
"name": "Phillimore Gardens, Kensington",
"terminalName": "001018",
"lat": "51.49960695",
…
"lastUpdate": "1601462700830"
}
If we send the data to Kafka in this form using kafkacat we’ll end up with garbled data because each line will be taken as its own message (the line break would act as the default message delineator). To fix this we’ll use the -c
flag with xq
:
curl --show-error --silent https://tfl.gov.uk/tfl/syndication/feeds/cycle-hire/livecyclehireupdates.xml | \
xq -c '.stations.station[] + {lastUpdate: .stations."@lastUpdate"}'
Here are our nicely wrangled and presented messages from the source XML, one message per line.
{"id":"1","name":"River Street , Clerkenwell","terminalName":"001023","lat":"51.52916347",…,"lastUpdate":"1601462880994"}
{"id":"2","name":"Phillimore Gardens, Kensington","terminalName":"001018","lat":"51.49960695",…,"lastUpdate":"1601462880994"}
We’re now in a position to stream this into a Kafka topic, by adding kafkacat
to the pipeline:
curl --show-error --silent https://tfl.gov.uk/tfl/syndication/feeds/cycle-hire/livecyclehireupdates.xml | \
xq -c '.stations.station[] + {lastUpdate: .stations."@lastUpdate"}' | \
kafkacat -b localhost:9092 -t livecyclehireupdates_01 -P
We can use kafkacat
as a consumer too (-C
), here specifying -c1
to consume just one message so that we can smoke-test the pipeline:
kafkacat -b localhost:9092 -t livecyclehireupdates_01 -C -c1
{"id":"1","name":"River Street , Clerkenwell","terminalName":"001023","lat":"51.52916347",…,"lastUpdate":"1601464200733"}
👍 looks good.
What about keys? #
Kafka messages are key/value, and we’ve specified a value but no key. This is where the hack gets just that little bit more hacky. We’re going to use xq
to write the id
field from the XML payload as a prefix to each message, with a separator so that kafkacat can identify where the key ends and the value stops.
I wrote a separate blog about how this technique works, check it out if you want to know more about it.
Our xq
invocation now looks like this:
xq -rc --arg sep $'\x1c' '.stations.station[] + { lastUpdate: .stations."@lastUpdate"} | [ .id + $sep, tostring] | join("")'
Which combined with kafkacat looks like this:
curl --show-error --silent https://tfl.gov.uk/tfl/syndication/feeds/cycle-hire/livecyclehireupdates.xml | \
xq -rc --arg sep $'\x1c' '.stations.station[] + { lastUpdate: .stations."@lastUpdate"} | [ .id + $sep, tostring] | join("")' | \
kafkacat -b localhost:9092 -t livecyclehireupdates_02 -P -K$'\x1c'
Checking the data in the topic with kafkacat we can see that we’ve now set the key as we wanted, taking the value of the id
field:
kafkacat -b localhost:9092 \
-t livecyclehireupdates_02 \
-C -c2 \
-f 'Key: %k, Payload: %s\n'
Key: 1, payload: {"id":"1","name":"River Street , Clerkenwell","terminalName":"001023","lat":"51.52916347",…"lastUpdate":"1601485080861"}
Key: 2, payload: {"id":"2","name":"Phillimore Gardens, Kensington","terminalName":"001018","lat":"51.49960695",…"lastUpdate":"1601485080861"}
We’ve got data, but no schema #
So we now have a Kafka topic with the XML-sourced data in it, but held in plain JSON. For it to be really useful, we want it in a form that is usable by consumers with little-or-no input from the producer of the data, and for that we want to declare and store the schema. I’m going to use ksqlDB for this - you can use other stream processing options such as Kafka Streams if you’d rather.
To start with I’ll declare the schema itself, on top of the topic.
You hopefully see straightaway why serialisation methods that include a schema declaration (Avro/Protobuf/JSON Schema) are easier for the consumer, if only because they don’t have to type the schema in! |
CREATE STREAM CYCLE_HIRE_SRC (
id VARCHAR KEY
,name VARCHAR
,terminalName VARCHAR
,lat DOUBLE
,long DOUBLE
,installed VARCHAR
,locked VARCHAR
,installDate BIGINT
,removalDate BIGINT
,temporary VARCHAR
,nbBikes INT
,nbEmptyDocks INT
,nbDocks INT
,lastUpdate BIGINT
) WITH (KAFKA_TOPIC='livecyclehireupdates_02',
VALUE_FORMAT='JSON',
TIMESTAMP='lastUpdate');
Now we can project certain fields from the topic to see the schema in action:
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS LASTUPDATE,
ID,
NAME,
NBBIKES,
NBEMPTYDOCKS,
NBDOCKS
FROM CYCLE_HIRE_SRC
EMIT CHANGES LIMIT 5;
+---------------------+-------+-------------------------------------+----------+--------------+--------+
|LASTUPDATE |ID |NAME |NBBIKES |NBEMPTYDOCKS |NBDOCKS |
+---------------------+-------+-------------------------------------+----------+--------------+--------+
|2020-10-01 14:45:00 |1 |River Street , Clerkenwell |2 |16 |19 |
|2020-10-01 14:45:00 |2 |Phillimore Gardens, Kensington |13 |24 |37 |
|2020-10-01 14:45:00 |3 |Christopher Street, Liverpool Street |6 |26 |32 |
|2020-10-01 14:45:00 |4 |St. Chad's Street, King's Cross |14 |7 |23 |
|2020-10-01 14:45:00 |5 |Sedding Street, Sloane Square |26 |0 |27 |
Limit Reached
Query terminated
A few things to note:
-
The
ID
field is taken from the Kafka message key - in theory we could have omitted it from the payload of the message -
We’re telling ksqlDB to use the
lastUpdate
field as the timestamp field for the messages. By default it will simply take the timestamp of the Kafka message itself (by default, when it hit the broker), so this is a useful thing to do, particularly if we do things like time-based windowing or joins. In the query above we’ve validated that it’s worked by showing theROWTIME
field in the selection. -
Whilst fields like
installed
andlocked
are boolean, they are seen as a string in the JSON model and so need declaring as such. We can fix this in subsequent processing.
At this stage we could just build a stream processing application to continually serialise the data to a new topic with something like Protobuf:
-- Make sure we process all records in the topic
SET 'auto.offset.reset' = 'earliest';
-- Populate a new stream (and thus Kafka topic) with everything from
-- the source stream, serialised to Protobuf
CREATE STREAM CYCLE_HIRE_PROTOBUF_01
WITH (KAFKA_TOPIC='livecyclehireupdates_protobuf_01',
VALUE_FORMAT='PROTOBUF') AS
SELECT * FROM CYCLE_HIRE_SRC;
Looking at the topics on the broker now we can see that there is a new topic livecyclehireupdates_protobuf_01
. If we try to read the data as normal it won’t work, and we can see it looks 'weird', because it’s binary data being read by something that expects just normal strings:
kafkacat -b localhost:9092 \
-t livecyclehireupdates_protobuf_01 \
-C -c1
River Street , Clerkenwell001023I@!'H'*true2false8˹%JfalsePX`h.
The correct thing to do is use a Protobuf consumer against it to validate that the data is there and correct:
kafka-protobuf-console-consumer --bootstrap-server localhost:9092 \
--from-beginning \
--topic livecyclehireupdates_protobuf_01 \
--max-messages 1
{"NAME":"River Street , Clerkenwell","TERMINALNAME":"001023","LAT":51.52916347,"LONG":-0.109970527,"INSTALLED":"true","LOCKED":"false","INSTALLDATE":"1278947280000","REMOVALDATE":"0","TEMPORARY":"false","NBBIKES":2,"NBEMPTYDOCKS":16,"NBDOCKS":19,"LASTUPDATE":"1601559900874"}
Processed a total of 1 messages
Wrangling the data #
Above I’ve shown you how to simply apply a schema to a Kafka topic that’s in JSON format (it’d work with delimited data too) and serialise it to a new topic in a format that will store the schema in the Schema Registry for use by any consumer.
There are a few things in the data though that would probably benefit from a bit of wrangling, such as:
-
Casting the boolean fields ingested as
VARCHAR
toBOOLEAN
-
Nesting the lat/long fields into a single location field
You can do that with ksqlDB to do that here too - meaning that anyone wanting to use the data downstream can do so on a cleansed datastream instead of the raw one.
CREATE STREAM CYCLE_HIRE_PROTOBUF_02
WITH (KAFKA_TOPIC='livecyclehireupdates_protobuf_02',
VALUE_FORMAT='PROTOBUF') AS
SELECT ID,
NAME,
TERMINALNAME,
LASTUPDATE,
STRUCT(LATITUDE := LAT, LONGITUDE:= LONG) AS LOCATION,
CAST(CASE
WHEN LCASE(INSTALLED)='false' THEN FALSE
WHEN LCASE(INSTALLED)='true' THEN TRUE
END AS BOOLEAN) AS INSTALLED,
CAST(CASE
WHEN LCASE(LOCKED)='false' THEN FALSE
WHEN LCASE(LOCKED)='true' THEN TRUE
END AS BOOLEAN) AS LOCKED,
INSTALLDATE,
REMOVALDATE,
CAST(CASE
WHEN LCASE(TEMPORARY)='false' THEN FALSE
WHEN LCASE(TEMPORARY)='true' THEN TRUE
END AS BOOLEAN) AS TEMPORARY,
NBBIKES,
NBEMPTYDOCKS,
NBDOCKS
FROM CYCLE_HIRE_SRC
EMIT CHANGES;
From this we now have a nice Kafka topic (livecyclehireupdates_protobuf_02
) that any consumer can use with full access to a schema to use however they want. The topic is driven by any changes to the source topic - call it streaming ETL, if you like.
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
--------------------------------------------------------------------
livecyclehireupdates_02 | 1 | 1
livecyclehireupdates_protobuf_02 | 1 | 1
--------------------------------------------------------------------
ksql> SELECT NAME, LOCATION, INSTALLED FROM CYCLE_HIRE_PROTOBUF_02 EMIT CHANGES LIMIT 5;
+-------------------------------------+-----------------------------------------------+----------+
|NAME |LOCATION |INSTALLED |
+-------------------------------------+-----------------------------------------------+----------+
|River Street , Clerkenwell |{LATITUDE=51.52916347, LONGITUDE=-0.109970527} |true |
|Phillimore Gardens, Kensington |{LATITUDE=51.49960695, LONGITUDE=-0.197574246} |true |
|Christopher Street, Liverpool Street |{LATITUDE=51.52128377, LONGITUDE=-0.084605692} |true |
|St. Chad's Street, King's Cross |{LATITUDE=51.53005939, LONGITUDE=-0.120973687} |true |
|Sedding Street, Sloane Square |{LATITUDE=51.49313, LONGITUDE=-0.156876} |true |
Limit Reached
Query terminated
Obligatory ksqlDB materialised view demo #
I can’t open up ksqlDB to show streaming ETL like the above without also showing materialised views. These are so cool because they let you take a stream of data in a Kafka topic, and build it into state that you can query, and is kept up to date automagically as any new messages arrive on the underlying topic.
CREATE TABLE CYCLE_HIRE AS
SELECT ID,
LATEST_BY_OFFSET(NAME) AS NAME,
LATEST_BY_OFFSET(NBBIKES) AS NBBIKES,
LATEST_BY_OFFSET(NBEMPTYDOCKS) AS NBEMPTYDOCKS,
LATEST_BY_OFFSET(NBDOCKS) AS NBDOCKS,
LATEST_BY_OFFSET(LASTUPDATE) AS LAST_UPDATE_TS
FROM CYCLE_HIRE_PROTOBUF_02
GROUP BY ID;
ksql> SELECT TIMESTAMPTOSTRING(LAST_UPDATE_TS,'yyyy-MM-dd HH:mm:ss','Europe/London') AS TS,
NAME,
NBBIKES,
NBEMPTYDOCKS
FROM CYCLE_HIRE
WHERE ID='42';
+------------------------+---------+-------------+
|NAME |NBBIKES |NBEMPTYDOCKS |
+------------------------+---------+-------------+
|Wenlock Road , Hoxton |2 |26 |
ksql>
This is called a pull query and you can run it from any client application using the REST API. As new messages arrive, the materialised view updates automagically and is reflected whenever its subsequently queried. What about if you want to know as soon as they’re updated? For that you can use a push query, in which you effectively subscribe to any changes, denoted by the EMIT CHANGES
clause. As with the pull query you can run this over the REST API too.
In the above image the pull query is in the top half - note how the query exits once complete, and can be re-run to query the current state. The push query is in the lower half, and once run will emit any changes as soon as they are received.
🤔 🧐 This sounds like a bit of a hack - what are my other options for getting XML into Kafka? #
This was option 1, or perhaps I should have called it option zero. It’s okay, it does a job, but you’d not bet your call-out rota on it, right? Shell scripts and bits of string have a habit of working great right up until the moment they don’t usually at 0400 in the morning on Christmas Day, or just before a high-profile business event like Black Friday…
So, what are the other options to ingest XML into Kafka, and to do it properly?