Ubuntu安装Kafka

系统环境

Ubuntu 16.04 LTS kafka_2.12-0.11.0.0.tgz zookeeper-3.5.2-alpha.tar.gz

安装测试

1. 下载tar包并解压

tar -zxvf kafka_2.12-0.11.0.0.tgz

2. 修改Kafka服务端配置文件server.properties(最重要配置:broker.id、log.dirs、zookeeper.connect)

# 设置节点id
broker.id=0
# 设置日志文件目录
log.dirs=/opt/apps/kafka/kafka_2.12-0.11.0.0/logs
# 设置zookeeper连接地址
zookeeper.connect=172.17.0.15:2181

3. 启动kafka服务端(确保zookeeper服务已经启动),结尾添加&符号可以在启动之后离开控制台

bin/kafka-server-start.sh config/server.properties &

4. 创建主题

创建单副本单分区的主题"test"

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看主题列表

bin/kafka-topics.sh --list --zookeeper localhost:2181
test

5. 发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
first kafka message
second kafka message

6. 消费消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
first kafka message
second kafka message

设置多broker集群

1. 添加多个配置文件

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

修改以下配置:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/opt/apps/kafka/kafka_2.12-0.11.0.0/logs-1
    zookeeper.connect=172.17.0.15:2181
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/opt/apps/kafka/kafka_2.12-0.11.0.0/logs-2
    zookeeper.connect=172.17.0.15:2181

2. 启动额外两个节点

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

3. 创建主题

创建三副本单分区的主题"my-replicated-topic"

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看主题

bin/kafka-topics.sh --list --zookeeper localhost:2181
my-replicated-topic
test

查看主题描述

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
    Topic: my-replicated-topic	Partition: 0	Leader: 0	Replicas: 0,1,2	Isr: 0,1,2
leader:该节点负责所有指定分区的读和写,每个节点的leader都是随机选择的。 replicas:副本节点,无论该节点是否为leader或是否存活都会显示。 isr:副本节点集合,存储存活的节点集合。

4. 发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>replicated-topic-message-1
>replicated-topic-message-2

5. 消费消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
replicated-topic-message-1
replicated-topic-message-2

6. 测试错误容错

 broker 0作为leader,查询该节点进程

ps aux | grep server.properties
root      3919  1.2 20.8 2307376 399936 pts/0  Sl   11:20   0:23 /opt/apps/jdk/jdk1.8.0_131/bin/java -Xmx256M -Xms128M

杀掉leader节点进程

kill -9 3919

查看主题描述(broker 1变为leader节点)

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
    Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 0,1,2	Isr: 1,2

消费消息(使用broker 1节点消费)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic my-replicated-topic
replicated-topic-message-1
replicated-topic-message-2

异常问题

1. 启动kafka server显示内存不足?

Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed;error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /opt/apps/kafka/kafka_2.12-0.11.0.0/hs_err_pid18185.log

解决方法:调整启动脚本kafka-server-start.sh中kafka的JVM参数

默认配置

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

修改配置

export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

2. topic删除无效?

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic_test
Topic topic_test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

解决方法:默认删除主题功能是不可用的,可在server.properties中修改配置 delete.topic.enable=true

3. 查看kafka集群中所有的broker节点?

解决方法:

# 执行zk客户端脚本
./zkCli.sh
# 显示broker节点id集合
ls /brokers/ids

4. kafka客户端无法连接远程kafka服务?

解决方法:在server.properties中添加配置advertised.host.name=<远程kafka服务器ip地址>

文章来源:

Author:LaravelShao
link:https://my.oschina.net/LaravelShao/blog/1648575