集群四部曲(四):完美的Kafka集群搭建

    之前写过一篇关于Kafka消息的发布-订阅,只不过是基于一台服务器,不够全面,下面我要说下Kafka集群环境的搭建和消息的发布-订阅,希望大家喜欢。

     一、环境:虚拟机CentOs7系统,完整的环境,请确认已安装JDK,Zookeeper(这里可参考之前的Zookeeper集群搭建:https://my.oschina.net/u/3747963/blog/1635507),可通过克隆已配置好的虚拟机环境,此次仍然沿用之前使用过的服务器,所以,使用的节点为3个。

    二、环境配置(Kafka的解压不说了)

    (1)进入config目录,修改server.properties文件内容如下:

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=slave01

//中间省略,默认配置即可
############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=slave01:2181,slave02:2181,slave03:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

    网上的版本不尽相同,但殊途同归,主要修改的点只有几个:broker.id,zookeeper.connect,注意:broker.id和Zookeeper集群配置时myid文件中的值保持一致。同时,还要修改一处地方(这里可能被大多数人忽略了,导致之后启动报错),手动修改meta.properties文件,在配置的log.dir目录下,若不修改此处会报下面的错误:

FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    kafka.common.InconsistentBrokerIdException: Configured brokerId 2 doesn't match stored brokerId 1 in meta.properties
            at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:630)
            at kafka.server.KafkaServer.startup(KafkaServer.scala:175)
            at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:99)
            at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:45)

    总结一下,修改broker.id需要注意 :

    server.prorperties文件;

    meta.properties文件;

    (2)拷贝修改配置

    可以通过scp拷贝的方式将kafka安装目录放置到另外两个节点上,注意需要修改对应的值:broker.id和host.name,其他一致。

    (3)启动Kafka集群

    启动之前请确保关闭防火墙(详见之前的集群配置)。

    在三个节点上,均执行下面的命令:

./bin/kafka-server-start.sh config/server.properties 

    如无意外,均可启动成功。

    (4)创建topic 

    在slave01节点上,执行以下命令创建一个topic:

bin/kafka-topics.sh --create --topic kafkatopictest --replication-factor 3 --partitions 2 --zookeeper slave01:2181

    创建成功显示:

Created topic "kafkatopictest"

    (5)发送消息至kafka 

    在slave02节点上执行以下命令向kafka发送消息:

[hadoop@slave02 bin]$ ./kafka-console-producer.sh --broker-list slave02:9092 --sync --topic kafkatopictest
>hello world, 下班了,everyone!

    (6)接收kafka发送来的消息

    在slave03节点上执行以下命令接收kafka发送消息:

[hadoop@slave03 bin]$ ./kafka-console-consumer.sh --zookeeper slave01:2181 --topic kafkatopictest --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
hello world下班了,everyone!

    好了,到此Kafka集群搭建和测试完成了。

文章来源:

Author:海岸线的曙光
link:https://my.oschina.net/u/3747963/blog/1647943