CentOS 7环境下Kafka消息的发布-订阅
这段时间一直在学习大数据相关的知识,从Spark,Spark Streaming,Scala到Kafka等等,涉及到的知识面很多,总体看下来,觉得大数据还是很好玩的,在现在及以后的方方面面都很适用。下面说下Kafka消息的发布-订阅。
(一)基本环境准备
本人在虚拟机下安装了CentOs7,这里自行略过,必备的环境jdk,zookeeper,kafka
(二)环境搭建(若不是root登录,则以下操作需加上sudo)
1、JDK安装及环境配置(最好jdk8以上,此处略过)
2、Zookeeper安装及环境配置
(1)解压及移动至其他目录
#解压Zookeeper并重命名
sudo tar -zxvf zookeeper-3.3.6.tar.gz
sudo mv zookeeper-3.3.6 zookeeper
#将zookeeper移动到/usr/local/目录下,按自己喜好
sudo mv zookeeper /usr/local
(2)编辑Zookeeper的配置文件
# 复制一份zoo_sample.cfg文件并改名为zoo.cfg
sudo cp /opt/zookeeper/zoo_sample.cfg zoo.cfg
# 编辑zoo.cfg 文件
sudo vim /opt/zookeeper/zoo.cfg
#主要修改dataDir和server.1=127.0.0.1:2888:3888这2处
# the directory where the snapshot is stored.
dataDir=/usr/local/zookeeper/data
# the port at which the clients will connect
clientPort=2181
server.1=127.0.0.1:2888:3888
上面配置中的参数可参考:https://www.linuxidc.com/Linux/2017-06/144950.htm
(3)配置Zookeeper环境变量
sudo vim /etc/profile
#修改如下
JAVA_HOME=/usr/java/jdk1.8.0_161
JRE_HOME=/usr/java/jdk1.8.0_161/jre
SCALA_HOME=/usr/local/scala
ZOOKEEPER_HOME=/usr/local/zookeeper
KAFKA_HOME=/usr/local/kafka
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$SCALA_HOME/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME SCALA_HOME ZOOKEEPER_HOME KAFKA_HOME PATH CLASSPATH
注意,配置完成后必须执行:source /etc/profile,否则不生效。
(4)启动Zookeeper
#cd 到Zookeeper/bin目录下
./zkServer.sh start
启动成功如下:
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
关闭Zookeeper同上,只需修改部分:
#cd 到Zookeeper/bin目录下
./zkServer.sh stop
3、Kafka安装及环境配置
(1)解压及移动至其他目录
# 解压及重命名为kafka
sudo tar -zxvf kafka_2.12-1.0.0.tgz
sudo mv kafka_2.12-1 kafka
# 移动至/usr/local/目录下
sudo mv kafka /usr/local
(2)编辑Kafka的配置文件
#创建日志存放目录
cd /usr/local/kafka
mkdir logs
#修改配置文件/usr/local/kafka/config/server.properties
sudo vim /usr/local/kafka/config/server.properties
#主要修改下面几项内容如下:
broker.id=0
delete.topic.enable=true
listeners = PLAINTEXT://127.0.0.1:9092
log.dirs=/usr/local/kafka/logs/
zookeeper.connect=127.0.0.1:2181
上面配置中的参数可参考:https://www.cnblogs.com/wangb0402/p/6187503.html
(3)配置Kafka环境变量:详见上面的Zookeeper配置
(4)启动Kafka
# cd到kafka/bin目录下
./kafka-server-start.sh /usr/local/kafka/config/server.properties
(三)Kafka的消息发布-订阅:打开四个终端
(1)创建一个Topic
# kafka/bin目录下
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
显示如下:
[hadoop@bogon bin]$ ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
若已存在该test的Topic,可以通过delete删除:
#利用命令删除需要删除的topic(配置中已经设置为true)
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
(2)Producer向Topic发送消息
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
显示如下:
[hadoop@bogon bin]$ ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
>producer send message
>hello kafka
>hello world
>spark
>heihei
>send everything for people
>
(3)Consumer读取Topic的消息
./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning
显示如下,注意开始的警告信息不影响使用,需稍等片刻:
[hadoop@bogon bin]$ ./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 -topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
producer send message
hello kafka
hello world
spark
heihei
send everything for people
文章来源:
Author:海岸线的曙光
link:https://my.oschina.net/u/3747963/blog/1627946