> CentOS > CentOS服务器 > 负载均衡 >

Centos安装Kafka集群

kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目。在它的主页描述kafka为一个高吞吐量的分布式(能将消息分散到不同的节点上)MQ。在这片博文中,作者简单提到了开发kafka而不选择已有MQ系统的原因。两个原因:性能和扩展性。Kafka仅仅由7000行Scala编写,据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。

安装准备

版本

Kafka版本:kafka_2.10-0.8.2.0

Zookeeper版本:3.4.6

Zookeeper 集群:hadoop104,hadoop107,hadoop108

Zookeeper集群的搭建参见:在CentOS上安装ZooKeeper集群

物理环境

安装两台物理机:

192.168.40.104  hadoop104(运行3个Broker)

192.148.40.105  hadoop105(运行2个Broker)

该集群的创建主要分为三步,单节点单Broker,单节点多Broker,多节点多Broker

 

单节点单Broker

本节以hadoop104上创建一个Broker为例

下载kafka

 

 

下载路径:http://kafka.apache.org/downloads.html

[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. #tar -xvf kafka_2.10-0.8.2.0.tgz  
  2. # cd kafka_2.10-0.8.2.0  

配置

修改config/server.properties

[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. broker.id=1    
  2. port=9092  
  3. host.name=hadoop104    
  4. socket.send.buffer.bytes=1048576    
  5. socket.receive.buffer.bytes=1048576    
  6. socket.request.max.bytes=104857600    
  7. log.dir=./kafka1-logs  
  8. num.partitions=10  
  9. zookeeper.connect=hadoop107:2181,hadoop104:2181,hadoop108:2181    


启动Kafka服务

 

[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. #bin/kafka-server-start.sh config/server.properties  

 

 

创建Topic

 

[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. #bin/kafka-topics.sh --create --zookeeper hadoop107:2181,hadoop104:2181,hadoop108:2181 --replication-factor 1 --partitions 1 --topic test  

 

查看Topic

 

[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. #bin/kafka-topics.sh --list --zookeeper hadoop107:2181,hadoop104:2181,hadoop108:2181  

 

输出:

 

producer发送消息

 

[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test   

 


 

consumer接收消息

 

[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. #bin/kafka-console-consumer.sh --zookeeper hadoop107:2181,hadoop104:2181,hadoop108:2181 --topic test --from-beginning  

 

 

如果要最新的数据,可以不带--from-beginning参数即可。

/bin/kafka-console-consumer.sh --zookeeper  hadoop107:2181,hadoop104:2181,hadoop108:2181  --topic test

 

单节点多个Broker

 

配置

将上个章节中的文件夹再复制两份分别为kafka_2,kafka_3

[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. #cp -r kafka_2.10-0.8.2.0 kafka_2  
  2.   
  3. #cp -r kafka_2.10-0.8.2.0 kafka_3  

分别修改kafka_2/config/server.properties以及kafka_3/config/server.properties 文件中的broker.id,以及port属性,确保唯一性

[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. kafka_2/config/server.properties  
  2. broker.id=2  
  3. port=9093  
  4. kafka_3/config/server.properties  
  5. broker.id=3  
  6. port=9094  

启动

启动另外两个Broker
[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. #cd kafka_2  
  2. # bin/kafka-server-start.sh config/server.properties &  
  3. #cd ../kafka_3  
  4. # bin/kafka-server-start.sh config/server.properties &  

创建一个replication factor为3的topic

[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. #bin/kafka-topics.sh --create --zookeeper hadoop107:2181,hadoop104:2181,hadoop108:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic  

查看Topic的状态


			
[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. bin/kafka-topics.sh --describe --zookeeper  hadoop107:2181,hadoop104:2181,hadoop108:2181  --topic my-replicated-topic  
从上面的内容可以看出,该topic包含1个part,replicationfactor为3,且Node3 是leador
解释如下:
  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader. 
 
再来看一下之前创建的test topic, 从下图可以看出没有进行replication
 

多个节点的多个Broker

 
在hadoop105上分别把下载的文件解压缩到kafka_4,kafka_5两个文件夹中,再将hadoop104上的server.properties配置文件拷贝到这连个文件夹中
[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. #scp -r config/ root@hadoop105:/root/hadoop/kafka_4/  
  2. #scp -r config/ root@hadoop105:/root/hadoop/kafka_5/  

配置

并分别修改内容如下:
[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. kafka_4  
  2.     brokerid=4  
  3.     port=9095  
  4.     host.name=hadoop105  
  5. kafka_5  
  6.     brokerid=5  
  7.     port=9096  
  8.     host.name=hadoop105  
启动服务
[html] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. #cd kafka_4  
  2. # bin/kafka-server-start.sh config/server.properties &  
  3. #cd ../kafka_5  
  4. # bin/kafka-server-start.sh config/server.properties &  
 
到目前为止,两台物理机上的5个Broker已经启动完毕

 

总结

在kafka的核心思路中,不需要在内存里缓存数据,因为操作系统的文件缓存已经足够完善和强大,只要不做随机写,顺序读写的性能是非常高效的。kafka的数据只会顺序append,数据的删除策略是累积到一定程度或者超过一定时间再删除。Kafka另一个独特的地方是将消费者信息保存在客户端而不是MQ服务器,这样服务器就不用记录消息的投递过程,每个客户端都自己知道自己下一次应该从什么地方什么位置读取消息,消息的投递过程也是采用客户端主动pull的模型,这样大大减轻了服务器的负担。Kafka还强调减少数据的序列化和拷贝开销,它会将一些消息组织成Message Set做批量存储和发送,并且客户端在pull数据的时候,尽量以zero-copy的方式传输,利用sendfile(对应java里的FileChannel.transferTo/transferFrom)这样的高级IO函数来减少拷贝开销。可见,kafka是一个精心设计,特定于某些应用的MQ系统,这种偏向特定领域的MQ系统我估计会越来越多,垂直化的产品策略值的考虑。

只要磁盘没有限制并且不出现损失,kafka可以存储相当长时间的消息(一周)。





(责任编辑:IT)