A Short Guide to Removing Messages from a Kafka Topic
Apache Kafka is a powerful data streaming platform that has become an important part of the data fabric in many companies. While it has many features that make it an important component in reactive architectures, one of the most powerful is its ability to keep a persistent log of messages even after they have been processed by downstream consumers.
Unfortunately, the very thing that makes Kafka so powerful -- its ability to retain messages -- can also make it difficult to work with. If developing a Faust consumer, for example, you may need to replay the same set of messages many times or check how your program works from a clean state. Re-deploying Kafka is one option, as is removing and re-creating the topic, but this can be time-consuming and may destroy specialized topic configurations. So what are other options that can be used to clean up or remove old messages from Kafka while keeping both your deployment and topics intact?
In this article, we'll look at two strategies:
- using message expiry settings to have Kafka automatically flush the messages after a certain amount of time
- utilize a special tool called
kafka-delete-records.sh
to allow you to remove messages by specifying message and offset
Option 1: Message Expiry
The intended way of removing data from Kafka is to use one of the several configurable options for message expiry. Expiry conditions are controlled by configuration parameters, and that can be based on how old messages are (sometimes called time to live or TTL) or the size of the topic. The performance of Kafka is not affected by the data size of messages, so retaining lots of data is not a problem.
Here's how the lifecycle of a message works when expiry conditions have been enabled: a message is sent to a Kafka cluster by a producer and appended to the end of a topic, consumers process the topic and read the message, the message stays in the topic until the expiration conditions met, after which it is removed.
Expiry conditions apply to all messages within a given topic and can be set when the topic is first created or modified later for topics that already exist. There are three time-based configuration parameters that can be used. Higher-precision values such as ms
take precedence over lower precision values such as hours
.
log.retention.hours
: number of hours the message will be savedlog.retention.minutes
: number of minuteslog.retention.ms
: number of milliseconds
Size based parameters include log.retention.bytes
. When a size-based rule has been defined, messages are removed until the segments are below the provided size. If no expiry rules are defined, the data will be retained forever.
Viewing and Modifying Expiry Configuration Values
The default expiry configuration can be set for the entire storage cluster through the config/server.properties
. The kafka-topics.sh
and kafka-config.sh
shell are used for viewing and modifying the values for specific topics.
Example: Create a Topic and Set Expiration Options
In this example, we'll show how to create a new topic, view the default options, and modify the retention configuration.
Topics are created using the kafka-topics.sh --create
:
# Create new topic with 1 partition and a replication factor of 1 kafka-topics.sh --zookeeper zookeeper:2181 \ --create --topic test-topic --if-not-exists \ --partitions 1 --replication-factor 1
Once a topic has been created, you can retrieve the details using kafka-topics --describe
:
# Retrieve topic details $ kafka-topics.sh --zookeeper zookeeper:2181 \ --describe --topic test-topic Topic:test-topic PartitionCount:1 ReplicationFactor:1 Configs: Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
The expiry configuration is set using kafka-configs.sh --alter
. The example below shows how to set the retention.ms
to 60000 ms (one minute). Note that the parameter name in the example below omits the log
portion which would be used in the general configuration. A full list of the available topic config values is available from the Kafka documentation.
# Set the message retention time to one minute (60000 ms) kafka-configs.sh --alter --zookeeper zookeeper:2181 \ --entity-type topics --entity-name test-topic \ --add-config retention.ms=60000
It's possible to verify that the configuration has been applied to the topic by running the kafka-topics.sh --describe
command from above. The custom configuration will be visible in the Configs
section.
# Retrieve topic details $ kafka-topics.sh --zookeeper zookeeper:2181 \ --describe --topic test-topic Topic:test-topic PartitionCount:1 ReplicationFactor:1 Configs:retention.ms=60000 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Option 2: Record Deletion
While message expiry will meet many use-cases, sometimes we want to delete records directly. This can be done using the kafka-delete-records.sh
tool to read a configuration file (JSON format) that describes the records we want to be deleted.
The example in the listing below can be used to remove all messages from the test-topic
created earlier. The contents of the file should be saved as delete-test.config
. The script specifies that all messages (indicated by an offset of -1) should be removed from test-topic
partition 0.
{ "partitions": [ { "topic": "test-topic", "partition": 0, "offset": -1 } ], "version": 1 }
To delete the records, run kafka-delete-records.sh
:
# Delete topic messages using a offset JSON file $ kafka-delete-records.sh --bootstrap-server kafka:9092 \ --offset-json-file delete-test.config Executing records delete operation Records delete operation completed: partition: test-topic-0 low_watermark: 5
You can play with kafka-delete-records.sh
by first populating the test-topic
with messages using kafka-console-producer.sh
and then reading those messages using kafka-console-consumer.sh
. Refer to "Hello Kafka: Getting Acquainted With Data Streaming" for details and sample commands.
Option 3: Remove the Topic
If the first two options in this article don't quite meet your needs, you can also delete the topic and recreate it. Important: this is only possible if the delete.topic.enable
property is set to true
in the Kafka server configuration.
To delete the topic, use kafka-topics.sh
:
# Remove test-topic from the Kafka server kafka-topics.sh --zookeeper zookeeper:2181 \ --delete --topic test-topic
Comments
Loading
No results found