kafka提交偏移量,kafka集群扩容并且分区迁移

  kafka提交偏移量,kafka集群扩容并且分区迁移

  转自:https://juejin.im/post/5a65b2df518825732a6d9ff1

  本文重点介绍kafka的两种常用数据迁移方法:1 .broker中不同数据磁盘间的分区数据迁移;2.不同代理之间分区数据迁移。

  一、broker 1.1后台介绍不同数据磁盘间分区数据迁移近日,腾讯云某重要客户发现kafka broker中主题分区数据存储分布不均,导致部分磁盘100%耗尽,而部分磁盘只有40%的消耗。

  分析显示,topic的部分分区数据集中在一些磁盘上,例如下面截图所示的/data5数据磁盘。

  根据分布式系统的特点,很容易想到数据迁移的方法,将broker中不同数据磁盘的分区数据进行迁移。在线集群数据迁移之前,为了保证生产集群的数据完整性和安全性,需要在测试集群中进行测试。

  1.2测试代理中不同数据磁盘分区数据迁移1.2.1建立测试主题,验证生产和消费正常。我们建立了一个测试集群。卡夫卡有三个经纪人,他们的主机名分别是tbds-172-16-16-11,TBDS-172-16-12,TBDS-172-16-12。每个代理配置两个数据磁盘,缓存的数据分别存储在/data/kafka-logs/和/data1/kafka-logs/中。

  首先,确立测试主题:/Kafka-topics . sh-Create-Zookeeper TBDS-172-16-16-11:2181-Replication-Factor 2-Partitions 3-Topic Test _ Topic复制代码然后发送500条数据到Topic production,同时消耗数据。然后检查主题的分区数据:

  组主题分区当前-偏移日志-结束-偏移滞后所有者组id1测试_主题0 172 172 0 Kafka-python-1 . 3 . 1 _ tbds-172-16-16-3/12.16.16.3组id 1测试_主题1 156 156 0 Kafka-python-1 . 3 . 1 _ tbds-172-16-3/172 . 16 . 16 . 3组id 1测试_主题2 172 172 0 Kafka-python-1

  1.2.2在磁盘之间迁移分区数据。现在,登录到代理节点tbds-172-16-16-12,并将分区数据目录test _ topic/data1/Kafka-logs/test _ topic-0/移到/data/kafka-logs/:

  在mv/data1/Kafka-logs/test_topic-0//data/Kafka-logs/copy code view/data/Kafka-logs/目录下,分区test _ topic-0的数据:

  1.2.3再次发送500条数据到测试题目的生产和消费数据,同时消费数据。然后检查数据:

  组主题分区当前-偏移日志-结束-偏移滞后所有者组id1测试_主题0 337 337 0 Kafka-Python-1 . 3 . 1 _ TBDS-172-16-16-3/172 . 16 . 16 . 3组id1测试_主题1 304 304 0 Kafka-Python-1 . 3 . 1 _ TBDS-172-16-16-3/72.16.16.3组ID 1测试_主题2 359 359 0 Kafka-Python-1再次检查tbds-172-16-16-12。

  发现从/data1/kafka-logs/到/data/kafka-logs/目录下的分区数据目录test_topic-0/(即编号为0的分区)的缓存数据没有增加。

  因为test_topic在每个分区中有两个副本,所以我找到了另一个编号为0的分区的副本数据,并将其存储在代理节点tbds-172-16-16-16中。登录代理节点TBDS-172-16-16,打开编号为0的分区缓存数据目录,获得以下信息:

  发现代理节点TBDS-172-16-16的分区数据目录test_topic-0/中的缓存数据量在增加,即缓存了二产发送的消息数据。

  可以看出,移动后的broker节点TBDS-172-16-12的编号为0的分区数据缓存目录中没有新的缓存数据。相应地,作为没有分区数据移动的代理节点的tbds-172-16-16-16在编号为0的分区缓存数据目录中有新添加的要再次发送的数据。

  这是否意味着分区数据不能在代理磁盘之间移动?

  1.2.4调用重启方案:重启kafka,重启kafka集群。重启后发现代理节点tbds-172-16-16-12的分区缓存数据目录中编号为0的数据也增加到了正常水平。

  表示重启后,broker不同磁盘间的迁移数据已经生效。

  1.2.5验证磁盘间迁移的分区数据生效,再次向test_topic发送500条数据,同时消耗数据,然后检查数据情况:

  组主题分区当前-偏移日志-结束-偏移滞后所有者组id1测试_主题0 521 521 0 Kafka-Python-1 . 3 . 1 _ TBDS-172-16-16-3/172 . 16 . 16 . 3组id1测试_主题1 468 468 0 Kafka-Python-1 . 3 . 1 _ TBDS-172-16-16-3/12.16.16.3组id 1测试_主题2 511 511 0 Kafka-Python-1

  发现两个复制品是相同的。

  1.3结论在Kafka broker中,分区数据目录可以在不同的数据盘之间自由迁移。迁移完成后,重启kafka即可生效。

  二、不同代理之间的分区数据传输kafka集群扩展时,新扩展的代理没有缓存数据,容易导致系统中数据分布不均匀。因此,有必要将原始集群代理的分区数据迁移到新扩展的代理节点。

  通过使用kafka自带的kafka-reasign-partitions.sh脚本工具,可以实现不同broker之间转移分区数据。

  在kafka测试集群原有三个经纪人的基础上,我们扩充了一个经纪人。

  2.1获取test_topic的分区分布,执行命令:/Kafka-topics . sh-Zookeeper 172 . 16 . 16 . 11:2181-topic test_topic-describe复制代码得到test _ topic的3个分区(每个分区有2个副本)在3个broker节点的分布:

  主题:测试主题分区计数:3复制因子:2配置:主题:测试主题分区:0主分区:1002副本:1002 Isr:1002,1001主题:测试主题分区:1主分区:1003副本:1003 Isr:1003,1002主题:测试主题分区:2主分区:1001副本:1001,1003 Isr: 1001编写分配脚本:move_kafka_topic.json,内容如下:

  { topics :[{ topic : test _ topic }], version: 1}

  执行分配计划生成脚本:/kafka-reassign-partitions . sh-zookeeper tbds-172-16-16-11:2181-topics-to-move-JSON-file/tmp/move _ Kafka _ topic . JSON-broker-list 1001,1002,1003,1004 -生成副本代码命令中的broker-list填写Kafka集群中四个代理的id。由于部署方法不同,不同的kafka集群具有不同的代理id。我们的测试集群代理id是1001、1002、1003和1004。读者需要根据自己的kafka集群设置的经纪人id来填写。

  执行该命令后,将获得以下结果:

  当前分区副本分配#当前分区的副本分配{version: 1, partitions :[{topic: test _ topic , partition: 0, replicas: [1002,1001]},{topic:test_topic , partition:2, replicas:[1001,1003]},{ topic : test _ topic , partition:1, replicas:[1003,1002]}]}建议的分区分配配置#建议的分区分配{version: 1, partitions :[{ partitions将建议的分区分配配置复制并保存到文件move_kafka_topic_result.json:

  {version:1, partitions :[{topic:test_topic , partition:0, replicas:[1001,1002]},{ topic : test _ topic , partition:2, replicas:[1003,1004]},{ topic: test _ topic , partition:1, replicas:[1002,1003]}]}

  2.3重新分发主题分区数据并执行重新分发命令:/Kafka-重新分配-分区。sh-zookeeper tbds-172-16-16-11:2181-reassign-JSON-file/tmp/move _ Kafka _ topic _ result。JSON-执行复制代码得到如下结果:

  当前分区副本分配{version:1, partitions :[{topic:test_topic , partition:0, replicas:[1002,1001]},{ topic : test _ topic , partition:2, replicas:[1001,1003]},{ topic: test _ topic , partition:1, replicas:[1003,1002]}]}保存此选项,以便在回滚成功开始重新分配分区时用作-赋值-JSON-file选项{ 复制代码从返回结果来看,分区数据重新分布任务已经启动成功。

  2.4 查看分区数据重新分布进度检查分配的状态,执行命令:/Kafka-重新分配-分区。sh-zookeeper tbds-172-16-16-11:2181-reassign-JSON-file/tmp/move _ Kafka _ topic _ result。JSON-验证复制代码得到结果:

  分区重新分配的状态:分区[测试主题,0]的重新分配成功完成分区[测试_主题,2]的重新分配成功完成分区[测试_主题,1]的重新分配成功完成复制代码表明分区数据重新分步任务已经完成。

  2.5 再次获取测试_主题的分区分布情况再次查看各个分区的分布情况,执行命令:/卡夫卡-话题。动物园管理员172。16 .16 .11:2181-话题测试_话题-描述复制代码得到返回结果:

  主题:测试主题分区计数:3复制因子:2配置:主题:测试主题分区:0领导者:1002副本:1001,1002 Isr: 1002,1001主题:测试主题分区:1领导者:1003副本:1002,1003 Isr: 1003,1002主题:测试主题分区:2领导者:1003副本:1003,1004 Isr: 1003,1003复制代码从结果看出,测试_主题的分区数据已经由原来的3个经纪人,重新分布到四个经纪人。

  三、测试结论卡夫卡经纪人内部不同数据盘之间可以自由迁移分区数据目录。迁移完成后,重启卡夫卡即可生效;

  卡夫卡不同经纪人之前可以迁移数据,使用卡夫卡自带的kafka-reassign-partitions.sh脚本工具实现。

  四、修复客户的卡夫卡集群故障我们采用本文测试的方法,对该客户的卡夫卡集群进行经纪人节点内部不同磁盘间的数据迁移,对多个主题均进行了数据迁移,最终实现磁盘间的数据缓存分布均匀化。

  同时,我们又对客户的卡夫卡集群进行扩容,扩容之后采用本文描述的不同经纪人之间迁移分区数据方法,对多个主题均进行了数据迁移,保证新扩容节点也有缓存数据,原来的经纪人节点存储压力减小。

kafka提交偏移量,kafka集群扩容并且分区迁移