Zayd Vanderson
Dec 30, 2021

A Short Guide to Removing Messages from a Kafka Topic

Unfortunately, the very thing that makes Kafka so powerful (its ability to retain messages) can also make it difficult to work with. Sometimes you need to restore a topic to a clean state, and it's not always clear how to do that. In this article, we'll look at several options for deleting messages from a topic.

Apache Kafka is a powerful data streaming platform that has become an important part of the data fabric in many companies. While it has many features that make it an important component in reactive architectures, one of the most powerful is its ability to keep a persistent log of messages even after they have been processed by downstream consumers.

Unfortunately, the very thing that makes Kafka so powerful -- its ability to retain messages -- can also make it difficult to work with. If developing a Faust consumer, for example, you may need to replay the same set of messages many times or check how your program works from a clean state. Re-deploying Kafka is one option, as is removing and re-creating the topic, but this can be time-consuming and may destroy specialized topic configurations. So what are other options that can be used to clean up or remove old messages from Kafka while keeping both your deployment and topics intact?

In this article, we'll look at two strategies:

  • using message expiry settings to have Kafka automatically flush the messages after a certain amount of time
  • utilize a special tool called kafka-delete-records.sh to allow you to remove messages by specifying message and offset

Option 1: Message Expiry

The intended way of removing data from Kafka is to use one of the several configurable options for message expiry. Expiry conditions are controlled by configuration parameters, and that can be based on how old messages are (sometimes called time to live or TTL) or the size of the topic. The performance of Kafka is not affected by the data size of messages, so retaining lots of data is not a problem.

Here's how the lifecycle of a message works when expiry conditions have been enabled: a message is sent to a Kafka cluster by a producer and appended to the end of a topic, consumers process the topic and read the message, the message stays in the topic until the expiration conditions met, after which it is removed.

Expiry conditions apply to all messages within a given topic and can be set when the topic is first created or modified later for topics that already exist. There are three time-based configuration parameters that can be used. Higher-precision values such as ms take precedence over lower precision values such as hours.

  • log.retention.hours: number of hours the message will be saved
  • log.retention.minutes: number of minutes
  • log.retention.ms: number of milliseconds

Size based parameters include log.retention.bytes. When a size-based rule has been defined, messages are removed until the segments are below the provided size. If no expiry rules are defined, the data will be retained forever.

Viewing and Modifying Expiry Configuration Values

The default expiry configuration can be set for the entire storage cluster through the config/server.properties. The kafka-topics.sh and kafka-config.sh shell are used for viewing and modifying the values for specific topics.

Example: Create a Topic and Set Expiration Options

In this example, we'll show how to create a new topic, view the default options, and modify the retention configuration.

Topics are created using the kafka-topics.sh --create:

# Create new topic with 1 partition and a replication factor of 1
kafka-topics.sh --zookeeper zookeeper:2181 \
  --create --topic test-topic --if-not-exists \
  --partitions 1 --replication-factor 1

Once a topic has been created, you can retrieve the details using kafka-topics --describe:

# Retrieve topic details
$ kafka-topics.sh --zookeeper zookeeper:2181 \
  --describe --topic test-topic

Topic:test-topic  PartitionCount:1  ReplicationFactor:1     Configs:
Partition: 0      Leader: 1001      Replicas: 1001          Isr: 1001

The expiry configuration is set using kafka-configs.sh --alter. The example below shows how to set the retention.ms to 60000 ms (one minute). Note that the parameter name in the example below omits the log portion which would be used in the general configuration. A full list of the available topic config values is available from the Kafka documentation.

# Set the message retention time to one minute (60000 ms)
kafka-configs.sh --alter --zookeeper zookeeper:2181 \
  --entity-type topics --entity-name test-topic \
  --add-config retention.ms=60000

It's possible to verify that the configuration has been applied to the topic by running the kafka-topics.sh --describe command from above. The custom configuration will be visible in the Configs section.

# Retrieve topic details
$ kafka-topics.sh --zookeeper zookeeper:2181 \
  --describe --topic test-topic

Topic:test-topic  PartitionCount:1  ReplicationFactor:1  Configs:retention.ms=60000
Partition: 0      Leader: 1001      Replicas: 1001       Isr: 1001

Option 2: Record Deletion

While message expiry will meet many use-cases, sometimes we want to delete records directly. This can be done using the kafka-delete-records.sh tool to read a configuration file (JSON format) that describes the records we want to be deleted.

The example in the listing below can be used to remove all messages from the test-topic created earlier. The contents of the file should be saved as delete-test.config. The script specifies that all messages (indicated by an offset of -1) should be removed from test-topic partition 0.

{
  "partitions": [
    {
      "topic": "test-topic",
      "partition": 0,
      "offset": -1
    }
  ],
  "version": 1
}

To delete the records, run kafka-delete-records.sh:

# Delete topic messages using a offset JSON file
$ kafka-delete-records.sh --bootstrap-server kafka:9092 \
  --offset-json-file delete-test.config
  
Executing records delete operation
Records delete operation completed:
partition: test-topic-0 low_watermark: 5

You can play with kafka-delete-records.sh by first populating the test-topic with messages using kafka-console-producer.sh and then reading those messages using kafka-console-consumer.sh. Refer to "Hello Kafka: Getting Acquainted With Data Streaming" for details and sample commands.

Option 3: Remove the Topic

If the first two options in this article don't quite meet your needs, you can also delete the topic and recreate it. Important: this is only possible if the delete.topic.enable property is set to true in the Kafka server configuration.

To delete the topic, use kafka-topics.sh:

# Remove test-topic from the Kafka server
kafka-topics.sh --zookeeper zookeeper:2181 \
  --delete --topic test-topic
Zayd Vanderson Dec 30, 2021
More Articles by Zayd Vanderson

Loading

Unable to find related content

Comments

Loading
Unable to retrieve data due to an error
Retry
No results found
Back to All Comments