kafka数据存储目录间迁移

生产环境kafka集群,在数据量大的情况下,会出现单机各个磁盘间的占用不均匀情况,经常出现“一边倒”的情形。

原因探究

这是因为kafka只保证分区数量在各个磁盘上均匀分布,但它无法统计每个分区实际占用磁盘空间。因此很有可能出现某些分区消息数量巨大导致占用大量磁盘空间的情况。在1.1版本之前,用户对此基本没有优雅的处理方法,即便手动迁移日志文件和offset信息,也需要重启生效,风险极高。因为1.1之前kafka只支持分区数据在不同broker间的重分配,而无法做到在同一个broker下的不同磁盘间做重分配。1.1版本正式支持副本在不同路径间的迁移,具体的实现细节详见kafka官方wikiKIP-113

目录间迁移步骤

假设我在server.properties文件中配置了多个日志存储路径(代表多块磁盘),如下所示:

123
# A comma seperated list of directories under which to store log fileslog.dirs=/data1/kafka-logs,/data2/kafka-logs,/data3/kafka-logs

然后我创建了一个9分区的topic,并发送了900W条消息。查询这些目录发现Kafka均匀地将9个分区分布到这三个路径上,如下所示:

1234567891011121314
> ll /data1/kafka-logs/ |grep test-topicdrwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-3drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-4drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-5> ll /data2/kafka-logs/ |grep test-topicdrwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-0drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-1drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-2> ll /data3/kafka-logs/ |grep test-topicdrwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-6drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-7drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-8

假设由于还有其他topic数据分布等原因,导致磁盘存储不均衡。需要将test-topic678分区全部迁移到/data2路径下,并且把test-topic1分区迁移到/data1下。若要实现这个需求,我们首先需要写一个JSON文件,migrate-replica.json:

123456789101112131415161718192021222324252627282930313233343536373839404142434445
{    "partitions": [        {            "topic": "test-topic",            "partition": 1,            "replicas": [                0            ],            "log_dirs": [                "/data1/kafka-logs"            ]        },        {            "topic": "test-topic",            "partition": 6,            "replicas": [                0            ],            "log_dirs": [                "/data2/kafka-logs"            ]        },        {            "topic": "test-topic",            "partition": 7,            "replicas": [                0            ],            "log_dirs": [                "/data2/kafka-logs"            ]        },        {            "topic": "test-topic",            "partition": 8,            "replicas": [                0            ],            "log_dirs": [                "/data2/kafka-logs"            ]        }    ],    "version": 1}

其中,replicas中的0表示broker ID,由于本文只启动了一个broker,且broker.id = 0,故这里只写0即可。实际上你可以指定多个broker实现为多个broker同时迁移副本的功能。另外当前的version固定是1。

保存好这个JSON后,我们执行以下命令执行副本迁移:

12345678
> bin/kafka-reassign-partitions.sh  --zookeeper localhost:2181 --bootstrap-server localhost:9092 --reassignment-json-file ../migrate-replica.json --executeCurrent partition replica assignment{"version":1,"partitions":[{"topic":"test-topic","partition":8,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":4,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":5,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":6,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":1,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":7,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":0,"replicas":[0],"log_dirs":["any"]}]}Save this to use as the --reassignment-json-file option during rollbackSuccessfully started reassignment of partitions.

迁移结果

执行完成后,我们再次查看存储目录副本分布:

123456789101112131415
> ll /data1/kafka-logs/ |grep test-topicdrwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-1drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-3drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-4drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-5> ll /data2/kafka-logs/ |grep test-topicdrwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-0drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-1drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-2drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-6drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-7drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-8> ll /data3/kafka-logs/ |grep test-topic

可以看到,678已经被成功地迁移到/data2下,而分区1也迁移到了/data1下。值得一提的是,不仅所有的日志段、索引文件被迁移,实际上分区外层的checkpoint文件也会被更新。比如我们检查/data2下的replication-offset-checkpoint文件可以发现,现在该文件已经包含了678分区的位移数据,如下所示:

12345678910
> cat replication-offset-checkpoint 07test-topic 8 1000000test-topic 2 1000000test 0 1285714test-topic 6 1000000test-topic 7 1000000test-topic 0 1000000test 2 1285714

文章来源:

Author:hyperxu
link:http://www.hyperxu.com/2019/12/13/kafka-2/