CentOS 7环境下Kafka消息的发布-订阅

    这段时间一直在学习大数据相关的知识,从Spark,Spark Streaming,Scala到Kafka等等,涉及到的知识面很多,总体看下来,觉得大数据还是很好玩的,在现在及以后的方方面面都很适用。下面说下Kafka消息的发布-订阅。

    (一)基本环境准备

    本人在虚拟机下安装了CentOs7,这里自行略过,必备的环境jdk,zookeeper,kafka

    (二)环境搭建(若不是root登录,则以下操作需加上sudo)

    1、JDK安装及环境配置(最好jdk8以上,此处略过)

    2、Zookeeper安装及环境配置

    (1)解压及移动至其他目录

#解压Zookeeper并重命名
sudo tar -zxvf zookeeper-3.3.6.tar.gz
sudo mv zookeeper-3.3.6 zookeeper
#将zookeeper移动到/usr/local/目录下,按自己喜好
sudo mv zookeeper /usr/local

    (2)编辑Zookeeper的配置文件

# 复制一份zoo_sample.cfg文件并改名为zoo.cfg
sudo cp /opt/zookeeper/zoo_sample.cfg zoo.cfg
# 编辑zoo.cfg 文件
sudo vim /opt/zookeeper/zoo.cfg
#主要修改dataDir和server.1=127.0.0.1:2888:3888这2处
# the directory where the snapshot is stored.
dataDir=/usr/local/zookeeper/data
# the port at which the clients will connect
clientPort=2181
server.1=127.0.0.1:2888:3888

     上面配置中的参数可参考:https://www.linuxidc.com/Linux/2017-06/144950.htm

    (3)配置Zookeeper环境变量

sudo vim /etc/profile
#修改如下
JAVA_HOME=/usr/java/jdk1.8.0_161
JRE_HOME=/usr/java/jdk1.8.0_161/jre
SCALA_HOME=/usr/local/scala
ZOOKEEPER_HOME=/usr/local/zookeeper
KAFKA_HOME=/usr/local/kafka
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$SCALA_HOME/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME SCALA_HOME ZOOKEEPER_HOME KAFKA_HOME PATH CLASSPATH

    注意,配置完成后必须执行:source /etc/profile,否则不生效。

    (4)启动Zookeeper

#cd 到Zookeeper/bin目录下
./zkServer.sh start

    启动成功如下:

ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

    关闭Zookeeper同上,只需修改部分:

#cd 到Zookeeper/bin目录下
./zkServer.sh stop

     3、Kafka安装及环境配置

    (1)解压及移动至其他目录

# 解压及重命名为kafka
sudo tar -zxvf kafka_2.12-1.0.0.tgz
sudo mv kafka_2.12-1 kafka
# 移动至/usr/local/目录下
sudo mv kafka /usr/local

    (2)编辑Kafka的配置文件

#创建日志存放目录
cd /usr/local/kafka
mkdir logs
#修改配置文件/usr/local/kafka/config/server.properties
sudo vim /usr/local/kafka/config/server.properties
#主要修改下面几项内容如下:
broker.id=0
delete.topic.enable=true
listeners = PLAINTEXT://127.0.0.1:9092
log.dirs=/usr/local/kafka/logs/
zookeeper.connect=127.0.0.1:2181

    上面配置中的参数可参考:https://www.cnblogs.com/wangb0402/p/6187503.html

    (3)配置Kafka环境变量:详见上面的Zookeeper配置

    (4)启动Kafka

# cd到kafka/bin目录下
./kafka-server-start.sh /usr/local/kafka/config/server.properties 

    (三)Kafka的消息发布-订阅:打开四个终端

    (1)创建一个Topic

# kafka/bin目录下
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test

     显示如下:

[hadoop@bogon bin]$ ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

    若已存在该test的Topic,可以通过delete删除:

#利用命令删除需要删除的topic(配置中已经设置为true)
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

    (2)Producer向Topic发送消息

./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

     显示如下:

[hadoop@bogon bin]$ ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
>producer send message
>hello kafka
>hello world
>spark
>heihei
>send everything for people
>

    (3)Consumer读取Topic的消息

./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning

    显示如下,注意开始的警告信息不影响使用,需稍等片刻:

[hadoop@bogon bin]$ ./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 -topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
producer send message
hello kafka
hello world
spark
heihei
send everything for people

    

文章来源:

Author:海岸线的曙光
link:https://my.oschina.net/u/3747963/blog/1627946