ksqlDB is a fantastically powerful tool for processing and analysing streams of data in Apache Kafka. But sometimes, you just want a quick way to profile the data in a topic in Kafka. I wrote about this previously with a convoluted (but effective) set of bash commands pipelined together to perform a GROUP BY
on data. Then someone introduced me to visidata
, which makes it all a lot quicker!
Let’s imagine we have data in Kafka, and we’re going to go and build some cool stuff with it. We’re going to process it and build a pipeline, and we need to know something about the data we’re working with. Visidata is a commandline tool to work with data in all sorts of formats, including from stdin
. Coupled with kafkacat
for consuming data from a topic to stdout
they make a perfect pairing:
This samples 100000 JSON records from a topic and pipes it into visidata:
Once visidata is open, press Shift-F to create histogram
kafkacat -b localhost:9092 -t my_topic -C -e -o-100000 | \
vd --filetype jsonl
Once visidata is open, use the arrow keys to move to the column on which you want to build a histogram and press Shift-F. Since it works with pipes if you leave the -e
off the kafkacat
argument you get a live stream of messages from the Kafka topic and the visidata will continue to update as messages arrive (although I think you need to replot the histogram if you want it to refresh).
If your data is in Avro instead you can use kafkacat’s support for Avro conversion (-s avro
) and JSON output (-J
):
kafkacat -b localhost:9092 -t my_topic -C -e -o-100000 \
-r http://schema-registry:8081 -s avro -J | \
jq -c '.payload'| \
vd --filetype jsonl
The fields may well be nested - use g(
in visidata to expand them.
Using it with Confluent Cloud #
Thank you! Maybe you could also write an article on how to use tools with Confluent Cloud? Everything is simple on localhost, but if you add Broker- and AVRO SchemaRegistry authentication the list of command line arguments explodes. Would like to learn how to use it efficiently.
— Victor Cazacov (@vcazacov) March 4, 2021
Here you go:
-
Raw JSON messages:
kafkacat -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \ -b BROKER.gcp.confluent.cloud:9092 \ -X sasl.username="CCLOUD_API_KEY" \ -X sasl.password="CCLOUD_API_PASSWORD" \ -t my_topic -C -e -o-10000 | \ vd --filetype jsonl
-
Avro data (Schema Registry on Confluent Cloud):
kafkacat -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \ -b BROKER.gcp.confluent.cloud:9092 \ -X sasl.username="CCLOUD_API_KEY" \ -X sasl.password="CCLOUD_API_PASSWORD" \ -s avro \ -r https://SR_API_KEY:SR_API_SECRET@SR_ENDPOINT.gcp.confluent.cloud \ -t my_avro_topic -C -e -o-10000 | \ vd --filetype jsonl
Note: You need to URL encode your credentials when supplying them in the Schema Registry URL (thanks to a6kme for this tip!). If you don’t you may well get the error
Avro/Schema-registry message deserialization: REST request failed (code -1): HTTP request failed: URL using bad/illegal format or missing URL : terminating
.So if your Schema Registry API key and Secret were
key123!
andS3cr3t/kjna%$!%dsf£
you’d URL encode it and usehttps://key123%21:S3cr3t%2Fkjna%25%24%21%25dsf%C2%A3@SR_ENDPOINT.gcp.confluent.cloud