Skipping bad records with the Kafka Connect JDBC sink connector
The Kafka Connect framework provides generic error handling and dead-letter queue capabilities which are available for problems with [de]serialisation and Single Message Transforms. When it comes to errors that a connector may encounter doing the actual pull
or put
of data from the source/target system, it’s down to the connector itself to implement logic around that. For example, the Elasticsearch sink connector provides configuration (behavior.on.malformed.documents
) that can be set so that a single bad record won’t halt the pipeline. Others, such as the JDBC Sink connector, don’t provide this yet. That means that if you hit this problem, you need to manually unblock it yourself. One way is to manually move the offset of the consumer on past the bad message.
TL;DR : You can use kafka-consumer-groups --reset-offsets --to-offset <x>
to manually move the connector past a bad message
Kafka Connect and Elasticsearch
I use the Elastic stack for a lot of my talks and demos because it complements Kafka brilliantly. A few things have changed in recent releases and this blog is a quick note on some of the errors that you might hit and how to resolve them. It was inspired by a lot of the comments and discussion here and here.
Copying data between Kafka clusters with Kafkacat
kafkacat gives you Kafka super powers 😎
I’ve written before about kafkacat and what a great tool it is for doing lots of useful things as a developer with Kafka. I used it too in a recent demo that I built in which data needed manipulating in a way that I couldn’t easily elsewhere. Today I want share a very simple but powerful use for kafkacat as both a consumer and producer: copying data from one Kafka cluster to another. In this instance it’s getting data from Confluent Cloud down to a local cluster.
Kafka Summit GoldenGate bridge run/walk
Coming to Kafka Summit in San Francisco next week? Inspired by similar events at Oracle OpenWorld in past years, I’m proposing an unofficial run (or walk) across the GoldenGate bridge on the morning of Tuesday 1st October. We should be up and out and back in plenty of time to still attend the morning keynotes. Some people will run, some may prefer to walk, it’s open to everyone :)
Staying sane on the road as a Developer Advocate
Where I’ll be on the road for the remainder of 2019
Reset Kafka Connect Source Connector Offsets
Starting a Kafka Connect sink connector at the end of a topic
When you create a sink connector in Kafka Connect, by default it will start reading from the beginning of the topic and stream all of the existing—and new—data to the target. The setting that controls this behaviour is auto.offset.reset
, and you can see its value in the worker log when the connector runs:
[2019-08-05 23:31:35,405] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
…
Resetting a Consumer Group in Kafka
Migrating Alfred Clipboard to New Laptop
So how DO you make those cool diagrams? July 2019 update
Taking the Vienna-Munich sleeper train
Manually delete a connector from Kafka Connect
Kafka Connect has as REST API through which all config should be done, including removing connectors that have been created. Sometimes though, you might have reason to want to manually do this—and since Kafka Connect running in distributed mode uses Kafka as its persistent data store, you can achieve this by manually writing to the topic yourself.
Automatically restarting failed Kafka Connect tasks
Here’s a hacky way to automatically restart Kafka Connect connectors if they fail. Restarting automatically only makes sense if it’s a transient failure; if there’s a problem with your pipeline (e.g. bad records or a mis-configured server) then you don’t gain anything from this. You might want to check out Kafka Connect’s error handling and dead letter queues too.
Putting Kafka Connect passwords in a separate file / externalising secrets
Kafka Connect configuration is easy - you just write some JSON! But what if you’ve got credentials that you need to pass? Embedding those in a config file is not always such a smart idea. Fortunately with KIP-297 which was released in Apache Kafka 2.0 there is support for external secrets. It’s extendable to use your own ConfigProvider
, and ships with its own for just putting credentials in a file - which I’ll show here. You can read more here.
Deleting a Connector in Kafka Connect without the REST API
Kafka Connect exposes a REST interface through which all config and monitoring operations can be done. You can create connectors, delete them, restart them, check their status, and so on. But, I found a situation recently in which I needed to delete a connector and couldn’t do so with the REST API. Here’s another way to do it, by amending the configuration Kafka topic that Kafka Connect in distributed mode uses to persist configuration information for connectors. Note that this is not a recommended way of working with Kafka Connect—the REST API is there for a good reason :)
A poor man’s KSQL EXPLODE/UNNEST technique
There is an open issue for support of EXPLODE
/UNNEST
functionality in KSQL, and if you need it then do up-vote the issue. Here I detail a hacky, but effective, workaround for exploding arrays into multiple messages—so long as you know the upper-bound on your array.
When a Kafka Connect converter is not a converter
Kafka Connect is a API within Apache Kafka and its modular nature makes it powerful and flexible. Converters are part of the API but not always fully understood. I’ve written previously about Kafka Connect converters, and this post is just a hands-on example to show even further what they are—and are not—about.
Note
|
To understand more about Kafka Connect in general, check out my talk from Kafka Summit London From Zero to Hero with Kafka Connect. |
Reading Kafka Connect Offsets via the REST Proxy
When you run Kafka Connect in distributed mode it uses a Kafka topic to store the offset information for each connector. Because it’s just a Kafka topic, you can read that information using any consumer.
Pivoting Aggregates in Ksql
Prompted by a question on StackOverflow, the requirement is to take a series of events related to a common key and for each key output a series of aggregates derived from a changing value in the events. I’ll use the data from the question, based on ticket statuses. Each ticket can go through various stages, and the requirement was to show, per customer, how many tickets are currently at each stage.