Kafka 常用命令介绍
提示
从 Kafka 2.2 版本开始,社区推荐用 --bootstrap-server 参数替换 --zookeeper 参数,以下命令基于 kafka 1.1.1 版本,Kafka 3.5 版本已经开始标记弃用 zookeeper,预计在 Kafka 4.0 之后将完全移除 zookeeper,将通过 KRaft 管理元数据
# 一、主题
- 执行
./bin/kafka-topics.sh
,必须存在的动作是--list/--create/--describe/--alter/--delete
之一;
# 1,列出主题
# 列出所有可用的主题
./bin/kafka-topics.sh --zookeeper 127.0.0.1:12181 --list
1
2
2
# 2,创建主题
# 创建主题,并设置分区数量和副本因子
./bin/kafka-topics.sh --zookeeper 127.0.0.1:12181 --create --topic my_topic --partitions 3 --replication-factor 2
1
2
2
# 3,显示主题信息
# 显示所有主题信息
./bin/kafka-topics.sh --zookeeper 127.0.0.1:12181 --describe
# 显示指定主题的分区信息
./bin/kafka-topics.sh --zookeeper 127.0.0.1:12181 --describe --topic my_topic
1
2
3
4
2
3
4
# 4,修改主题
# 修改指定主题的分区信息
./bin/kafka-topics.sh --zookeeper 127.0.0.1:12181 --alter --topic my_topic --partitions 4
1
2
2
# 5,删除主题
# 删除指定主题
./bin/kafka-topics.sh --zookeeper 127.0.0.1:12181 --delete--topic my_topic
1
2
2
# 二、动态配置
- 执行
./bin/kafka-configs.sh
,必须存在的动作是--describe/--alter
之一;
# 1,显示配置
--entity-type <String>
实体类型,topics/clients/users/brokers 之一;--entity-name <String>
实体类型对应的名字,主题名、客户端id、节点id、用户名 之一;
# 显示所有主题的配置
./bin/kafka-configs.sh --zookeeper 127.0.0.1:12181 --describe --entity-type topics
# 显示指定主题的配置
./bin/kafka-configs.sh --zookeeper 127.0.0.1:12181 --describe --entity-type topics --entity-name my_topic
1
2
3
4
5
2
3
4
5
# 2,修改配置
# 增加或是修改指定主题的配置
# --add-config 'k1=v1,k2=[v1,v2,v2],k3=v3',key已经存在则是直接覆盖旧值
./bin/kafka-configs.sh --zookeeper 127.0.0.1:12181 --entity-type topics --entity-name my_topic --alter --add-config 'flush.ms=520,max.message.bytes=2048'
# 删除配置
# --add-config 'k1,k2'
./bin/kafka-configs.sh --zookeeper 127.0.0.1:12181 --entity-type topics --entity-name my_topic --alter --delete-config 'max.message.bytes'
1
2
3
4
5
6
7
2
3
4
5
6
7
# 三、消费者组
- 执行
./bin/kafka-consumer-groups.sh
,用于列出消费者组、显示消费者组信息、删除消费者组、重置消费者组位移;
# 1,列出消费者组
--bootstrap-server
使用该选项时,只显示使用 API 的消费者组,不显示使用ZooKeeper进行消费者管理的消费者组;--zookeeper
,使用该选项时,使用的是ZooKeeper进行消费者管理信息的消费者组才会被展示,不包括使用Java消费者API的消费者组;
# 列出所有的消费者组
./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --list
1
2
2
# 2,显示消费者信息
# 显示消费者信息,包括分区、offset、堆积、客户端id等
./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group go_part_auto_discover_test1_sarama
1
2
2
# 3,删除消费者组
./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --delete --group go_part_auto_discover_test1_sarama
1
# 4,重置消费者组位移
- 消费位移重置的主题:
- --all-topics:所有主题;
- --topic <String: topic>:指定主题,默认所有分区,还可以指定分区形如:
topic1:0,1,2
;
- 消费位移重置的位置:
- --to-current:重置到当前 offset 处;
- --to-earliest:重置到最早的 offset 处;
- --to-latest:重置到最晚的 offset 处;
- --to-offset <Long: offset>:重置到指定的 offset 处;
- --to-datetime <String: datetime>:重置到指定时间处,形如:'YYYY-MM-DDTHH:mm:SS.sss';
- --excute 执行真正的位移调整,如果不加则是默认选项 --dry-run,只是打印位移调整方案,不实际执行;
# 重置指定消费者组、指定主题、指定分区的消费位移到最早处
./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --reset-offsets --group go_part_auto_discover_test1_sarama --topic 'go_part_auto_discover_test1:0,1,2' --to-earliest --execute
1
2
2
# 四、控制台消费者
- 执行
./bin/kafka-console-consumer.sh
命令读取消息,并输出到标准输出中; - --offset:消费位移,可以是非负整数或是、earliest、latest;
# 指定主题从头消费
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:19092 --topic go_part_auto_discover_test1
# 指定主题、指定分区、指定位移开始消费,并指定消费者组
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:19092 --topic go_part_auto_discover_test1 --group my_group --partition 0 --offset '100'
1
2
3
4
5
2
3
4
5
# 五、控制台生产者
- 执行
./bin/kafka-console-producer.sh
命令从标准输入中读取消息并发送到kafka;
# 向指定的节点和主题建立标准输入
./bin/kafka-console-producer.sh --broker-list '127.0.0.1:19092,127.0.0.1:29092,127.0.0.1:39092' --topic my_topic
1
2
2
编辑 (opens new window)
上次更新: 2024/08/26, 11:21:36