Commands¶
The following commands serve to create, alter and delete Kafka topics within the LogMan.io environment. All Kafka topics managed by LogMan.io next to the internal ones are specified in event lane *.yaml
declarations inside /EventLanes
folder in the library.
Prerequisites¶
All commands should be run from Kafka Docker container. It is very convenient to create auxilliary temporary container for admin commands.
docker run --network=host --rm -it confluentinc/cp-kafka:<version> bash
This creates new Kafka container that will be removed after you exit it and is equipped with Kafka Command Line interface, which is documented here: Kafka Command-Line Interface (CLI) Tools
Create a topic¶
In order to create a topic, specify the topic name, number of partitions and replication factor. The replication factor should be set to 1 and partitions to 6, which is the default for LogMan.io Kafka topics.
/usr/bin/kafka-topics --bootstrap-server localhost:9092 --create --topic "events.tenant.fortigate" --replication-factor 1 --partitions 6
Replace events.tenant.fortigate
with your topic name.
Kafka version less than 7.0
For Kafka versions less than 7.0, the command does not use the --bootstrap-server
option but instead uses the --zookeeper
option. Here is the command for older Kafka versions:
/usr/bin/kafka-topics --zookeeper localhost:2181 --create --topic "events.tenant.fortigate" --replication-factor 1 --partitions 6
Configure a topic¶
Retention¶
The following command changes the retention of data for Kafka topic to 86400000
milliseconds, that is 1 day. This means that data older than 1 day will be deleted from Kafka to spare storage space:
/usr/bin/kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name "events\.tenant\.fortigate" --alter --add-config retention.ms=86400000
events\.tenant\.fortigate
with your topic name.
Info
All Kafka topics in LogMan.io should have a retention for data set.
Info
When editing a topic setting in Kafka, the special characters like dot (.) should be escaped with slash (\).
Reseting a consumer group offset for a given topic¶
In order to reset the reading position, or the offset, for the given group ID (consumer group), use the following command:
/usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --group "my-console-client" --topic "events\.tenant\.fortigate" --reset-offsets --to-datetime 2020-12-20T00:00:00.000 --execute
Replace events\.tenant\.fortigate
with your topic name.
Replace my-console-client
with the given group ID.
Replace 2020-12-20T00:00:00.000
with the time to reset the reading offset to.
Hint
To reset the group to the current offset, use --to-current instead of --to-datetime 2020-12-20T00:00:00.000.
Deleting a consumer group offset for a given topic¶
The offset for the given topic can be deleted from the consumer group, hence the consumer group would be effectively disconnected from the topic itself. Use the following command:
/usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --group "my-console-client" --topic "events\.tenant\.fortigate" --delete-offsets
Replace events\.tenant\.fortigate
with your topic name.
Replace my-console-client
with the given group ID.
Deleting the consumer group¶
A consumer group for ALL topics can be deleted with its offset information using the following command:
/usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group my-console-client
Replace my-console-client
with the given group ID.
Alter a topic¶
Change the number of partitions¶
The following command increases the number of partitions within the given topic.
/usr/bin/kafka-topics --bootstrap-server localhost:9092 --alter -partitions 6 --topic "events\.tenant\.fortigate"
Replace events\.tenant\.fortigate
with your topic name.
Specify ZooKeeper node
Kafka reads and alters data stored in ZooKeeper. In case you've configured Kafka so its files are stored in specific ZooKeeper node, you will get this error.
Error while executing topic command : Topic 'events.tenant.fortigate' does not exist as expected
[2024-05-06 10:16:36,207] ERROR java.lang.IllegalArgumentException: Topic 'events.tenant.fortigate' does not exist as expected
at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:539)
at kafka.admin.TopicCommand$ZookeeperTopicService.alterTopic(TopicCommand.scala:408)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
Adjust the --zookeeper
argument accordingly. E.g. Kafka data is stored in kafka
node of ZooKeeper:
/usr/bin/kafka-topics --zookeeper lm11:2181/kafka --alter --partitions 6 --topic 'events\.tenant\.fortigate'
Try to remove escape characters (/
) if the topic name is still not recognized.
Delete a topic¶
The topic can be deleted using the following command. Please keep in mind that Kafka topics are automatically created if new data are being produced/sent to it by any service.
/usr/bin/kafka-topics --bootstrap-server localhost:9092 --delete --topic "events\.tenant\.fortigate"
Replace events\.tenant\.fortigate
with your topic name.
Troubleshooting¶
There are many logs in others and I cannot find the ones with "interface" attribute inside¶
Kafka Console Consumer can be used to obtain events from multiple topics, here from all topics starting with events.
.
Next it is possible the grep the field in doublequotes:
/usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --whitelist "events.*" | grep '"interface"'
This command gives you all incoming logs with "interface"
attribute from all events topics.