Kafka 常用命令之 kafka-topics.sh
kafka-topics.sh 脚本主要负责 topic 相关的操作。它的具体实现是通过 kafka-run-class 来调用 TopicCommand
类,并根据参数执行指定的功能。
场景再现
若程序报错:
[Producer clientId=producer-1] Error while fetching metadata with correlation id 312 : {logger-channel=UNKNOWN_TOPIC_OR_PARTITION}
可能原因:
topic 不存在,切自动创建失败
需要设置:
auto.create.topics.enable=true
如果不方便修改配置,则需要手动创建 topic。
查看 topic
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181/spacemanti
创建 topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181/spacemanti --config max.message.bytes=128000000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic logger-channel
查看 topic
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181/spacemanti --topic manti-logger-channel Topic:manti-logger-channel PartitionCount:1 ReplicationFactor:1 Configs:max.message.bytes=128000000,flush.messages=1
Topic: manti-logger-channel Partition: 0 Leader: 2 Replicas: 2 Isr: 2
PartitionCount:partition 个数。
ReplicationFactor:副本个数。
Partition:partition 编号,从 0 开始递增。
Leader:当前 partition 起作用的 breaker.id。
Replicas: 当前副本数据所在的 breaker.id,是一个列表,排在最前面的其作用。
Isr:当前 kakfa 集群中可用的 breaker.id 列表。
删除 topic
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181/spacemanti --topic logger-channel
- 若 delete.topic.enable=true:直接彻底删除该 Topic。
- 若 delete.topic.enable=false:如果当前 Topic 没有使用过即没有传输过信息:可以彻底删除。
如果当前 Topic 有使用过即有过传输过信息:并没有真正删除 Topic 只是把这个 Topic 标记为删除 (marked for deletion),重启 Kafka Server 后删除。
注:delete.topic.enable=true 配置信息位于配置文件 config/server.properties 中 (较新的版本中无显式配置,默认为 true)。
修改 Topic
增加分区数
bin/kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --partitions 3 1
修改分区数时,仅能增加分区个数。若是用其减少 partition 个数,则会报如下错误信息:
org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic hadoop currently has 3 partitions, 2 would not be an increase. 1
不能用来修改副本个数。(请使用 kafka-reassign-partitions.sh 脚本增加副本数)
增加配置
bin/kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --config flush.messages=1 1
删除配置
bin/kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --delete-config flush.messages
配置属性
当如下所示的属性配置到 Topic 上时,将会覆盖 server.properties 上对应的属性。
属性名 | 值类型 | 默认值 | 有效值 | 服务器默认属性 | 描述 |
---|---|---|---|---|---|
cleanup.policy | list | delete | delete compact | log.cleanup.policy | 过期或达到上限日志的清理策略。 delete:删除 compact:压缩 |
compression.type | string | producer | uncompressed snappy lz4 gzip producer | compression.type | 指定给该 topic 最终的压缩类型 |
delete.retention.ms | long | 86400000 | [0,…] | log.cleaner.delete.retention.ms | 压缩的日志保留的最长时间,也是客户端消费消息的最长时间。 与 log.retention.minutes 的区别在于:一个控制未压缩的数据,一个控制压缩后的数据。 |
file.delete.delay.ms | long | 60000 | [0,…] | log.segment.delete.delay.ms | 从文件系统中删除前所等待的时间 |
flush.messages | long | 9223372036854775807 | [0,…] | log.flush.interval.messages | 在消息刷到磁盘之前,日志分区收集的消息数 |
flush.ms | long | 9223372036854775807 | [0,…] | log.flush.interval.ms | 消息在刷到磁盘之前,保存在内存中的最长时间,单位是 ms |
index.interval.bytes | int | 4096 | [0,…] | log.index.interval.bytes | 执行 fetch 操作后,扫描最近的 offset 运行空间的大小。 设置越大,代表扫描速度越快,但是也更耗内存。 (一般情况下不需要设置此参数) |
message.max.bytes | int | 1000012 | [0,…] | message.max.bytes | log 中能够容纳消息的最大字节数 |
min.cleanable.dirty.ratio | double | 0.5 | [0,…,1] | log.cleaner.min.cleanable.ratio | 日志清理的频率控制,占该 log 的百分比。 越大意味着更高效的清理,同时会存在空间浪费问题 |
retention.bytes | long | -1 | log.retention.bytes | topic 每个分区的最大文件大小。 一个 topic 的大小限制 = 分区数 * log.retention.bytes。 -1 表示没有大小限制。 | |
retention.ms | int | 604800000 | [-1,…] | log.retention.minutes | 日志文件保留的分钟数。 数据存储的最大时间超过这个时间会根据 log.cleanup.policy 设置的策略处理数据 |
segment.bytes | int | 1073741824 | [14,…] | log.segment.bytes | 每个 segment 的大小 (默认为 1G) |
segment.index.bytes | int | 10485760 | [0,…] | log.index.size.max.bytes | 对于 segment 日志的索引文件大小限制 (默认为 10M) |
相关文章