How to handle poison pills in Spring Kafka?
Modern applications are increasingly build using smaller components that communicate with each other using events, a so-called event-driven architecture. Event-driven architectures have three key components: event producers, event broker, and event consumers. It is important that the consumers can consume the events the producer has produced. Whenever this is not the case and you’re not ready for this scenario a single event can cause your systems to halt. Such an event is a poison pill.
In this blog I will discuss what exactly are poison pills, how they occur and how to deal with them. I also created an example project where you can see the effects of a poison pill and what you can do to handle them.
Serialization and deserialization
To understand what a poison pill is and how they occur, we first need to learn what serialization and deserialization is exactly. Wikipedia has in my opinion one of the best explanations:
Serialization is the process of converting an object state into a format (series of bytes) that can be stored or transmitted and reconstructed later possibly in a different computer environment. The opposite operation, converting a series of bytes to an object state, is called deserialization.
In the context of Kafka, serialization is used to convert a Java object into a byte array that is subsequently produced by the producer to a topic. Deserialization is again used in the opposite manner, consuming a series of bytes from a topic and converting it into a Java object. Both the key and value of the record are serialized and later when consumed deserialized.
What is a poison pill
A poison pill is a record that has been produced to a Kafka topic, but cannot be consumed by a consumer, regardless of how many times it is attempted. The three most likely causes of a poison pill are corrupted records, an error during the deserialization of the record or a schema mismatch. A schema is used to describe the format of the produced and consumed events. If a consumed event does not adhere to the schema the consumer will return an error as well. Deserialization errors and schema mismatches are the most likely to occur. In my time using Kafka for the last two years, I have never seen a corrupt record, but I have seen quite some deserialization errors and schema mismatches. I never caused any of them of course😉.
Cause of a poison pill
Like I explained in the previous paragraph, corrupted records could be the cause of a poison pill. Luckily only in some rare circumstances it can happen that a corrupted record is placed in a Kafka topic. A network hiccup is an example of what can result in a corrupted record. If a bit is flipped during transport the CRC checksum will fail and the data can no longer be trusted. Since corrupt records do not occur often and can have many peculiar explanations, this blog will not focus on dealing with corrupted records. The remainder of this blog will focus on deserialization errors instead. Deserialization errors and schema mismatches can be used interchangeable in the context of a poison pill and the same solutions can be used to handle schema mismatches. Whenever you read deserialization error, you can think of a schema mismatch as well.
If the producer and consumer are using compatible serializers and deserializers everything will work as expected. In the image below an example of compatible JSON serializers and deserializers is given. The poison pill scenario is in this case avoided.
Deserialization errors will occur when producers and consumers have incompatible serializers and deserializers. Both the key and value deserializers should be compatible in order to prevent deserialization errors. In the example below the producer is using a StringSerializer
while the consumer is using the incompatible deserializer FloatDeserializer
.
Poison pill in action
If you want to see for yourself what a poison pill looks like, I have created an example project that uses an incompatible serializer and deserializer which mimics a poison pill. The kafka-poison-pill
example project can be found on GitHub. In case you don’t have a local Kafka environment, the Confluent documentation you can help you setting up Kafka locally. If you’re already running Kafka, don’t forget to update application.yml
with your configuration if necessary. You can also read along to see what will happen. Consider this your spoiler alert warning!
After running the application, you can run the following command or pasting the url in your browser to trigger the poison pill:
curl http://localhost:8080/produce-order
In the logs you will now see a continuous stream of error logs. Here is an example of a log message:
2021-06-13 13:21:27.196 ERROR 69190 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.1.jar:2.7.1]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition orders-0 at offset 32. If needed, please seek past the record to continue consumption.
Here is what happened:
- The consumer tries to deserialize the record. Since the record is serialized using the
StringSerializer
and the consumer expects JSON, the consumer is not able to deserialize the series of bytes. - The consumer is not able to acknowledge the record, and thus the offset is not moving ahead.
- Now the consumer will try to deserialize the record again which will fail again and thus the consumer will end up in an infinite loop. Each deserialization failure is logged and even after a few moments thousands of messages have already been logged.
Consequences
You can imagine that if for every deserialization failure a message is logged, many gigabytes of logs are written to disk rapidly. Many production grade systems will also ship their logs to a centralized system where flooding this system can lead to huge costs. Also, finding other non-Kafka logs will become basically impossible. Depending on the size of the storage you have in place, all the storage could be filled by Kafka logs which will probably result in a non-functioning machine. So, in short, a poison pill could have quite a severe impact.
How to deal with poison pills?
There are four options of dealing with poison pills, however I would strongly recommend only one.
The first option is to wait until the retention period of the Kafka topic has passed, which is 7 days per default. After the retention period the poison pill is discarded. Waiting for 7 days until the consumer can consume messages again is far from ideal. There is the possibility to set the retention period to a lower number, which will discard poison pills earlier. However, records that are produced closely after the poison pill will likely be discarded as well. Depending on the situation this can be even more damaging.
The second option is to manually update the offset to after the offset of the poison pill. The advantage of this option is that you have a lot of control. You can set the offset to exactly to the offset after the poison pill. The disadvantage is that it is not straightforward. You must have access to the production Kafka cluster - which is never a good sign - and you will need to have knowledge about the Kafka binaries.
In case you do need to reset the offset programmatically you can use the following command:
kafka-consumer-groups --bootstrap-server $BOOTSTRAP_SERVER --group $CONSUMER_GROUP --reset-offsets --to-offset $OFFSET --topic $TOPIC --execute
Note: The consumer should be stopped before resetting the offset
The third option is easier than the second option and does not require the execution of commands. It is also possible to change the consumer group and start consuming from the latest offset. The disadvantage of this ‘solution’ is that messages between the poison pill and the last produced record to the topic won’t be consumed and will be lost. Should another poison pill occur in the future, the consumer group will need to be changed again. The image below shows what happens after the consumer group is changed.
The last and recommended option is to configure a ErrorHandlingDeserializer
using Spring Kafka.
ErrorHandlingDeserializer setup
The first thing we have to do is to replace the deserializers by the ErrorHandlingDeserializer
for both the key and value. Since the ErrorHandlingDeserializer
will delegate the deserialization to the real deserializers, we need to add the classes where the ErrorHandlingDeserializer
can delegate to. The delegate classes can be added to the config by adding the spring.deserializer.key.delegate.class
and spring.deserializer.value.delegate.class
properties:
application.yml
spring:
kafka:
consumer:
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
How does the ErrorHandlingDeserializer
work in practice? The ErrorHandlingDeserializer
will try to deserialize the key and value using the delegated class. If no DeserializationException
is thrown the record is passed to the consumer and will work as normal. However, if a DeserializationException
is thrown, the record is no longer passed to the consumer. The configured ErrorHandler
is instead called with the thrown exception and the failing record. After the ErrorHandler
has handled the error, the consumer will resume consuming as if nothing has happened. The offset has now been moved forward. The ErrorHandler
swallowed the poison pill and no more continuous streams of error messages.
Optionally, a simple bean can be configured that will log some additional information about the failing record compared to the default SeekToCurrentErrorHandler
:
@Bean
public LoggingErrorHandler errorHandler() {
return new LoggingErrorHandler();
}
In case you have multiple consumers, it is also possible to programmatically configure the ErrorHandlingDeserializer
. The following examples demonstrates how to configure a JsonDeserializer with the LoggingErrorHandler
:
private final KafkaProperties kafkaProperties;
private ConsumerFactory<String, String> jsonConsumerFactory() {
JsonDeserializer jsonDeserializer = new JsonDeserializer();
jsonDeserializer.addTrustedPackages("your.package");
ErrorHandlingDeserializer errorHandlingDeserializer = new ErrorHandlingDeserializer(jsonDeserializer);
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), errorHandlingDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> jsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(jsonConsumerFactory());
factory.setErrorHandler(new LoggingErrorHandler());
return factory;
}
I have also created an example project, where the ErrorHandlingDeserializer
is configured, which can be found on GitHub as well. In this project you can again trigger the poison pill by running the following command or pasting the url in your browser:
curl http://localhost:8080/produce-order
With the ErrorHandlingDeserializer
now configured you should see only one log message with the DeserializationException
. Crisis averted 😃 !
Dead letter topics
You might have noticed that using the LoggingErrorHandler
does not actually log the value of the poison pill, but instead only shows the sequence of bytes. It is possible to log the actual value of the poison pill, by publishing the poison pill to a so-called dead letter topic (DLT). You can think of a dead letter topic as a backup topic where records that were not able to be consumed are send to.
You can configure a DeadLetterPublishingRecoverer
that will send the poison pill to the dead letter topic. It can be configured as follows:
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher() {
DefaultKafkaProducerFactory<String, byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties(), new StringSerializer(), new ByteArraySerializer());
KafkaTemplate<String, byte[]> bytesKafkaTemplate = new KafkaTemplate<>(defaultKafkaProducerFactory);
return new DeadLetterPublishingRecoverer(bytesKafkaTemplate);
}
Since you’re most likely already configured a serializer and deserializer that is not using the byte[]
type, we’ll need to configure a separate serializer and deserializer for the DeadLetterPublishingRecoverer
:
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher() {
DefaultKafkaProducerFactory<String, byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties(), new StringSerializer(), new ByteArraySerializer());
KafkaTemplate<String, byte[]> bytesKafkaTemplate = new KafkaTemplate<>(defaultKafkaProducerFactory);
return new DeadLetterPublishingRecoverer(bytesKafkaTemplate);
}
private ConsumerFactory<String, byte[]> bytesArrayConsumerFactory() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new ByteArrayDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> bytesArrayListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(bytesArrayConsumerFactory());
return factory;
}
You can now create a separate consumer that is able to consume messages from the DLT topic:
@Service
public class DltConsumer {
@KafkaListener(topics = {"orders.DLT"}, containerFactory = "bytesArrayListenerContainerFactory")
public void recoverDLT(@Payload ConsumerRecord<String, byte[]> consumerRecord) {
log.info("Poison pill value: {}", new String(consumerRecord.value()));
}
}
In the example project the DLT logic can be enabled by setting the property spring.kafka.dlt.enable
to true. With the ErrorHandlingDeserializer
and the DeadLetterPublishingRecoverer
now configured you should see two log messages. One log message for the deserialization exception and one that displays the value of the poison pill.
Note: In the consumer above the poison pill's value is converted from bytes to a string. This conversion only works if the event value is a serialized string. In case the event value is of a different type, the poison pill consumer should convert it this type instead.
In short
A poison pill is a record that cannot be consumed, no matter how many times it is attempted by the consumer. Poison pills occur either because of a corrupt record , deserialization error or a schema mismatch. Without a configured ErrorHandlingDeserializer
, many gigabytes of logs can be rapidly written to disk, which can cause the system to halt. The recommended way to handle a deserialization error is to configure an ErrorHandlingDeserializer
, which will make sure that the consumer can continue processing new records. In case you need the actual value of a poison pill, a DeadLetterPublishingRecoverer
can be configured, which allows you to log the actual value of the record.
I also provided an example project in which you could see the effects of a poison pill and I showed how you can survive a poison pill. If you have any questions or improvements, feel free to start a new discussion on GitHub.
Follow my blog for more upcoming posts about Kafka and related topics!