Skip to content

Commands

The following commands serve to create, alter and delete Kafka topics within the LogMan.io environment. All Kafka topics managed by LogMan.io next to the internal ones are specified in event lane *.yaml declarations inside /EventLanes folder in the library.

Prerequisites

All commands should be run from Kafka Docker container. It is very convenient to create auxilliary temporary container for admin commands.

docker run --network=host --rm -it confluentinc/cp-kafka:<version> bash

This creates new Kafka container that will be removed after you exit it and is equipped with Kafka Command Line interface, which is documented here: Kafka Command-Line Interface (CLI) Tools

Create a topic

In order to create a topic, specify the topic name, number of partitions and replication factor. The replication factor should be set to 1 and partitions to 6, which is the default for LogMan.io Kafka topics.

/usr/bin/kafka-topics --bootstrap-server localhost:9092 --create --topic "events.tenant.fortigate" --replication-factor 1 --partitions 6

Replace events.tenant.fortigate with your topic name.

Kafka version less than 7.0

For Kafka versions less than 7.0, the command does not use the --bootstrap-server option but instead uses the --zookeeper option. Here is the command for older Kafka versions:

/usr/bin/kafka-topics --zookeeper localhost:2181 --create --topic "events.tenant.fortigate" --replication-factor 1 --partitions 6

Configure a topic

Retention

The following command changes the retention of data for Kafka topic to 86400000 milliseconds, that is 1 day. This means that data older than 1 day will be deleted from Kafka to spare storage space:

/usr/bin/kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name "events\.tenant\.fortigate" --alter --add-config retention.ms=86400000
Replace events\.tenant\.fortigate with your topic name.

Info

All Kafka topics in LogMan.io should have a retention for data set.

Info

When editing a topic setting in Kafka, the special characters like dot (.) should be escaped with slash (\).

Reseting a consumer group offset for a given topic

In order to reset the reading position, or the offset, for the given group ID (consumer group), use the following command:

/usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092  --group "my-console-client"  --topic "events\.tenant\.fortigate"  --reset-offsets --to-datetime 2020-12-20T00:00:00.000 --execute

Replace events\.tenant\.fortigate with your topic name.

Replace my-console-client with the given group ID.

Replace 2020-12-20T00:00:00.000 with the time to reset the reading offset to.

Hint

To reset the group to the current offset, use --to-current instead of --to-datetime 2020-12-20T00:00:00.000.

Deleting a consumer group offset for a given topic

The offset for the given topic can be deleted from the consumer group, hence the consumer group would be effectively disconnected from the topic itself. Use the following command:

/usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092  --group "my-console-client"  --topic "events\.tenant\.fortigate" --delete-offsets

Replace events\.tenant\.fortigate with your topic name.

Replace my-console-client with the given group ID.

Deleting the consumer group

A consumer group for ALL topics can be deleted with its offset information using the following command:

/usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group my-console-client

Replace my-console-client with the given group ID.

Alter a topic

Change the number of partitions

The following command increases the number of partitions within the given topic.

/usr/bin/kafka-topics --bootstrap-server localhost:9092 --alter -partitions 6 --topic "events\.tenant\.fortigate"

Replace events\.tenant\.fortigate with your topic name.

Specify ZooKeeper node

Kafka reads and alters data stored in ZooKeeper. In case you've configured Kafka so its files are stored in specific ZooKeeper node, you will get this error.

Error while executing topic command : Topic 'events.tenant.fortigate' does not exist as expected
[2024-05-06 10:16:36,207] ERROR java.lang.IllegalArgumentException: Topic 'events.tenant.fortigate' does not exist as expected
at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:539)
at kafka.admin.TopicCommand$ZookeeperTopicService.alterTopic(TopicCommand.scala:408)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)

Adjust the --zookeeper argument accordingly. E.g. Kafka data is stored in kafka node of ZooKeeper:

/usr/bin/kafka-topics --zookeeper lm11:2181/kafka --alter --partitions 6 --topic 'events\.tenant\.fortigate'

Try to remove escape characters (/) if the topic name is still not recognized.

Delete a topic

The topic can be deleted using the following command. Please keep in mind that Kafka topics are automatically created if new data are being produced/sent to it by any service.

/usr/bin/kafka-topics --bootstrap-server localhost:9092 --delete --topic "events\.tenant\.fortigate"

Replace events\.tenant\.fortigate with your topic name.

Troubleshooting

There are many logs in others and I cannot find the ones with "interface" attribute inside

Kafka Console Consumer can be used to obtain events from multiple topics, here from all topics starting with events..

Next it is possible the grep the field in doublequotes:

/usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --whitelist "events.*" | grep '"interface"'

This command gives you all incoming logs with "interface" attribute from all events topics.