Ubuntu安装Kafka
系统环境
Ubuntu 16.04 LTS kafka_2.12-0.11.0.0.tgz zookeeper-3.5.2-alpha.tar.gz安装测试
1. 下载tar包并解压
tar -zxvf kafka_2.12-0.11.0.0.tgz
2. 修改Kafka服务端配置文件server.properties(最重要配置:broker.id、log.dirs、zookeeper.connect)
# 设置节点id
broker.id=0
# 设置日志文件目录
log.dirs=/opt/apps/kafka/kafka_2.12-0.11.0.0/logs
# 设置zookeeper连接地址
zookeeper.connect=172.17.0.15:2181
3. 启动kafka服务端(确保zookeeper服务已经启动),结尾添加&符号可以在启动之后离开控制台
bin/kafka-server-start.sh config/server.properties &
4. 创建主题
创建单副本单分区的主题"test"
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看主题列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
test
5. 发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
first kafka message
second kafka message
6. 消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
first kafka message
second kafka message
设置多broker集群
1. 添加多个配置文件
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
修改以下配置:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/opt/apps/kafka/kafka_2.12-0.11.0.0/logs-1
zookeeper.connect=172.17.0.15:2181
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/opt/apps/kafka/kafka_2.12-0.11.0.0/logs-2
zookeeper.connect=172.17.0.15:2181
2. 启动额外两个节点
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
3. 创建主题
创建三副本单分区的主题"my-replicated-topic"
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
my-replicated-topic
test
查看主题描述
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
leader:该节点负责所有指定分区的读和写,每个节点的leader都是随机选择的。
replicas:副本节点,无论该节点是否为leader或是否存活都会显示。
isr:副本节点集合,存储存活的节点集合。
4. 发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>replicated-topic-message-1
>replicated-topic-message-2
5. 消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
replicated-topic-message-1
replicated-topic-message-2
6. 测试错误容错
broker 0作为leader,查询该节点进程
ps aux | grep server.properties
root 3919 1.2 20.8 2307376 399936 pts/0 Sl 11:20 0:23 /opt/apps/jdk/jdk1.8.0_131/bin/java -Xmx256M -Xms128M
杀掉leader节点进程
kill -9 3919
查看主题描述(broker 1变为leader节点)
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2
消费消息(使用broker 1节点消费)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic my-replicated-topic
replicated-topic-message-1
replicated-topic-message-2
异常问题
1. 启动kafka server显示内存不足?
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed;error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /opt/apps/kafka/kafka_2.12-0.11.0.0/hs_err_pid18185.log
解决方法:调整启动脚本kafka-server-start.sh中kafka的JVM参数
默认配置
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
修改配置
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
2. topic删除无效?
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic_test
Topic topic_test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
解决方法:默认删除主题功能是不可用的,可在server.properties中修改配置 delete.topic.enable=true
3. 查看kafka集群中所有的broker节点?
解决方法:
# 执行zk客户端脚本
./zkCli.sh
# 显示broker节点id集合
ls /brokers/ids
4. kafka客户端无法连接远程kafka服务?
解决方法:在server.properties中添加配置advertised.host.name=<远程kafka服务器ip地址>
文章来源:
Author:LaravelShao
link:https://my.oschina.net/LaravelShao/blog/1648575