Centos安装zookeeper+kafka集群 – 记忆角落

Centos安装zookeeper+kafka集群

/ 0评 / 0

系统:centos 7

软件包:

jdk-8u131-linux-x64.rpm

zookeeper-3.4.12.tar.gz

kafka_2.11-2.2.2.tgz

1.演示安装服务器地址&名称:
172.42.100.7     hadoop4
172.42.100.8     hadoop5
172.42.100.9     hadoop6
2.安装jdk(每台都需安装)
2.1.上传文件到某一个服务器,通过scp使得每一台服务器都有文件

scp -r jdk-8u131-linux-x64.rpm root@hadoop5:/home/

2.2.查找安装过的java包

rpm -qa | grep java

如果查出来有 运行 rpm -e --nodeps 查出来的包名(将已安装java全部卸载)

2.3.安装java rpm包(每台机器执行)

ps:如果已经存在jdk1.8的可忽略

rpm -ivh jdk-8u131-linux-x64.rpm

2.4.验证是否安装成功
java -version 

or 

which java
3.搭建zookeeper集群
3.1.下载地址:

https://mirrors.huaweicloud.com/apache/zookeeper/zookeeper-3.4.12/

教程用的是zookeeper-3.4.12,区别应该不大,清华源最新已经是3.4.12...

3.2.所有节点安装zookeeper

可预先在一台服务器上配置完文件,再复制到其他集群服务器上

#文件都放到/data/kafka_test/

如没有kafka_test文件需要创建

mkdir /data/kafka_test

#将zookeeper解压到安装目录

tar -zxvf zookeeper-3.4.12.tar.gz

#更换文件夹名称

mv zookeeper-3.4.12 zookeeper

#新建日志目录

mkdir -p /data/kafka_test/zookeeper/dataLog

#新建保存数据的目录

mkdir -p /data/kafka_test/zookeeper/data

#配置环境变量并刷新

#vi /etc/profile

export ZK_HOME=/data/kafka_test/zookeeper

export PATH=$PATH:$ZK_HOME/bin

#刷新一下环境变量

source /etc/profile

3.3.所有节点配置zookeeper配置文件

可预先在一台服务器上配置完文件,再复制到其他集群服务器上

#进入配置文件文件夹

cd /data/kafka_test/zookeeper/conf/

#复制文件

cp -f zoo_sample.cfg zoo.cfg

#修改文件配置

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/data/kafka_test/zookeeper/data/
dataLogDir=/data/kafka_test/zookeeper/dataLog/
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

server.0=hadoop4:2888:3888
server.1=hadoop5:2888:3888
server.2=hadoop6:2888:3888

ps:

复制文件后需要就改的地方其实就如下:

#追加或修改

dataDir=/data/kafka_test/zookeeper/data/
dataLogDir=/data/kafka_test/zookeeper/dataLog/

#追加服务器集群信息

server.0=hadoop4:2888:3888
server.1=hadoop5:2888:3888
server.2=hadoop6:2888:3888

第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认

#配置myid

# echo "0" > /data/kafka_test/zookeeper/data/myid   #server0配置,各节点不同,跟上面配置server.0的号码一样

# echo "1" > /data/kafka_test/zookeeper/data/myid   #server1配置,各节点不同,跟上面配置server.1的号码一样

# echo "2" > /data/kafka_test/zookeeper/data/myid   #server2配置,各节点不同,跟上面配置server.2的号码一样
3.4.修改启动脚本-->zkServer.sh

cd /data/kafka_test/zookeeper/bin/

vi zkServer.sh

查找到if [ ! -w "$ZOO_LOG_DIR" ] ; then前追加一段:

export ZOO_LOG_DIR="/data/kafka_test/zookeeper/dataLog/"

image-20200712234239928

3.5.启动停止zookeeper命令

#启动

zkServer.sh start

#shell脚本同时启动

#! /bin/bash

ssh root@hadoop4 zkServer.sh start
ssh root@hadoop5 zkServer.sh start
ssh root@hadoop6 zkServer.sh start

#停止

zkServer.sh stop

#shell脚本同时关闭

#! /bin/bash

ssh root@hadoop4 zkServer.sh stop
ssh root@hadoop5 zkServer.sh stop
ssh root@hadoop6 zkServer.sh stop

#连接集群

zkCli.sh

4.搭建kafka集群
4.1.下载地址:

https://mirrors.huaweicloud.com/apache/kafka/2.2.2/

教程用的是kafka_2.11-2.2.2.tgz

4.2.所有节点上搭建kafka

可预先在一台服务器上配置完文件,再复制到其他集群服务器上

#文件都放到/data/kafka_test/

如没有kafka_test文件需要创建

mkdir /data/kafka_test

#新建kafka日志目录

mkdir -p /data/kafka-data

#解压kafka

tar -zxvf kafka_2.11-2.2.2.tgz

#更换文件夹名称

mv kafka_2.11-2.2.2 kafka

#配置环境变量并刷新

#vi /etc/profile

# kafka
export KAFKA_HOME=/data/kafka_test/kafka
export PATH=$PATH:$KAFKA_HOME/bin

#刷新一下环境变量

source /etc/profile

#备份并修改kafka配置文件:server.properties

cd /data/kafka_test/kafka/config/

mv server.properties server_bak.properties

#新建server.properties 并追加一下内容:

####################
#kafka broker config
####################

#broker config
#每一个broker在集群中的唯一标示,要求是正数
broker.id=0
# 选择启用删除主题功能,默认false
delete.topic.enable=true
# 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数
default.replication.factor=2 
auto.create.topics.enable=false
# 套接字服务器连接的地址
listeners=PLAINTEXT://:9092 
num.network.threads=10
num.io.threads=36
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
message.max.bytes=10485760
# replicas每次获取数据的最大大小
replica.fetch.max.bytes=12582912 
#kafka数据的存放地址
log.dirs=/data/kafka-data 
#segment文件保留的最长时间(小时),超时将被删除
log.retention.hours=12 
log.flush.interval.messages=20000
#消息体的最大大小,单位是字节
log.segment.bytes=1073741824  
#定期检查segment文件有没有达到1G(单位毫秒)
log.retention.check.interval.ms=120000 
num.recovery.threads.per.data.dir=6

#zookeeper集群的地址,可以是多个
zookeeper.connect=hadoop4:2181,hadoop5:2181,hadoop6:2181
zookeeper.connection.timeout.ms=6000

ps:broker.id 记得修改且每一台机器不一样~

4.3.kafka内存启动

kafka节点默认需要的内存为1G,如果需要修改内存,可以修改kafka-server-start.sh的配置项。

vi /data/kafka_test/kafka/bin/kafka-server-start.sh

#找到KAFKA_HEAP_OPTS配置项,例如修改如下:export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"

默认如下图:

image-20200712235921585

4.4启动停止kafka命令

#启动kafka

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

#shell同时启动

#! /bin/bash

ssh root@hadoop4 kafka-server-start.sh -daemon  $KAFKA_HOME/config/server.properties
ssh root@hadoop5 kafka-server-start.sh -daemon  $KAFKA_HOME/config/server.properties
ssh root@hadoop6 kafka-server-start.sh -daemon  $KAFKA_HOME/config/server.properties

#停止kafka

kafka-server-stop.sh

#shell同时停止

#! /bin/bash

ssh root@hadoop4 kafka-server-stop.sh
ssh root@hadoop5 kafka-server-stop.sh
ssh root@hadoop6 kafka-server-stop.sh
5.创建测试topic

创建32分区、2备份

kafka-topics.sh --create --zookeeper hadoop4:2181 --topic TEST_JS --replication-factor 2 --partitions 32

常用命令:

1.展示topic

kafka-topics.sh --list --zookeeper hadoop4:2181

2.创建topic

kafka-topics.sh --create --zookeeper hadoop4:2181,hadoop5:2181,hadoop6:2181 --replication-factor 1 --partitions 1 --topic topic_name

3.查看描述topic

kafka-topics.sh --describe --zookeeper hadoop4:2181,hadoop5:2181,hadoop6:2181 --topic topic_name

4.生产者发送消息

kafka-console-producer.sh --broker-list hadoop5:9092 --topic topic_name

5.消费者消费消息

kafka-console-consumer.sh --bootstrap-server hadoop4:9092,hadoop5:9092,hadoop6:9092 --topic topic_name

6.删除topic

kafka-topics.sh --delete --topic topic_name --zookeeper hadoop4:2181,hadoop5:2181,hadoop6:2181

7.查看每分区consumer_offsets(可以连接到的消费主机)

kafka-topics.sh --describe --zookeeper hadoop4:2181,hadoop5:2181,hadoop6:2181 --topic __consumer_offsets

发表评论

您的电子邮箱地址不会被公开。