본문 바로가기
System/Kafka

zookeeper-kafka cluster install

0. 사전작업 


테스트 환경은 아래와 같다.

hostname
ip address
resource
kafka-01172.16.194.24memory: 8G hdd:50G
kafka-02172.16.194.12memory: 8G hdd:50G
kafka-03172.16.194.16memory: 8G hdd:50G
*사내오픈스택 구성 


Kafka  cluester  를 구성 하기 위한 ZooKeeper,kafka 모두 java기반으로 동작 하기 위해서 아래와 같이 jdk를 모든 노드에 설치 한다.

openjdk install(ALL NODE)
yum install java-1.8.0-openjdk.x86_64 -y
java -version
    openjdk version "1.8.0_171"
    OpenJDK Runtime Environment (build 1.8.0_171-b10)
    OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)


1-1. ZooKeeper?

 본래 hadoop 의 서브 프로젝트 였던 ZooKeeper는 hadoop의 분산 어플리케이션을 관리 하는 코디네이션 어플리케이션으로 사용 되었다.

(ZooKeeper는 한국 말로 사육사를 의미 하는데, hadoop 생태계의 프로젝트들이 대부분 동물의 이름으로 네이밍 되어 있어서 동물을 관리 하는 사육사로 ZooKeeper라는 네이밍 되었다는 이야기가 있다.)

분산 어플리케이션을 개발하면서 동시에 코디네이션 어플리케이션을 개발 하려면 추가적인 개발 리소스가 필요하기 때문에 ,hadoop에서 검증된 ZooKeeper를 분산 어플리케이션을 코디네이션 하기 위해서 많이 사용 한다.

kafka도 동일한 분산 어플리케이션으로 안정적인 코디네이션 서비스를 하기 위하여 zookeeper 를 사용 한다.  kafka뿐만 아니라  storm,hbase 등으로 많은 어플리케이션에서 사용 가능 하다.

코디네이션 서비스는 분산 되어 있는 어플리케이션을 안정적인 서비스를 할 수 있도록 각 어플리케이션의 분산동기화,환경정보 유지 보수 등을 집중화 서비스 한다.

ZooKeeper 는 앙상블이라는 이름의 클러스터를 구성 한다. 일반적으로 3대 이상의 홀수 단위로 서버를 구성 한다.

만약  ZooKeeper 서버들간의 데이터 불일치가 발생할 경우 과반수의 룰을 적용 하기 때문에 데이터정합성 측면에서 유리하다.

앙상블을 구성하게 되면 각 ZooKeeper 서버들은  Leader와  follower을 선출 하며 실제 데이터는 client가 follwer를 통하여 받고, leader로 전달 그리고 leader가  모든  follower에 전달 하여 모두 동일한 데이터를 갖을 수 있도록 유지 한다.


https://zookeeper.apache.org/doc/r3.3.3/zookeeperOver.html


각 상태정보는 znode 에 key-value 형식으로 저장 하며  아래 그림과 같이 계층적 구조로 정보를 저장 한다.

그리고 저장되는 데이터를 메모리에 저장 하여 빠른 처리 속도를 나타낸다.

https://zookeeper.apache.org/doc/r3.3.3/zookeeperOver.html



1-2. ZooKeeper install 

 아래와 같이 ZooKeeper 를 설치 한다.

zookeeper install(ALL NODE)
cd /opt
tar xvzf zookeeper-3.4.12.tar.gz
ln -s zookeeper-3.4.12 zookeeper
cd zookeeper
cp -aR conf/zoo_sample.cfg conf/zoo.cfg


zoo.cfg 설정 하기에 앞서 별도의 데이터 디렉토리를 설정 해야 한다. 데이터 디렉토리는  ZooKeeper의 recovery를 위하여 사용 된다.
recovery는 ZooKeeper데몬이 재시작 되거, 서버가 재시작 되더라도 복구 할 수 있도록 트렌젝션 로그를 파일시스템에 저장 한다. 만약 재시작 할경우 해당 파일시스템에 있는 트렌젝션로그를 읽어서 

이전의 상태를 보장 할 수 있도록 한다.

만약 트렌젝션 로그의 크기가 클 경우 재시작 하기 위해서 시간이 많이 소요가 될 수 있어서 ,ZooKeeper에서는 일정 이상의 사이즈의 트렌젝션 로그가 생성이 되면 스냅샷데이터로 만들고 새로운 트랜젝션 로그를 만들어서 빠른 재시작을 보장 

할 수 있도록 한다.


Create DataDirectory (ALL NODE)
mkdir -p /DATA


앙상블 구성시 ZooKeeper 노드를 구분 하기 위하여 각각 파일 시스템별로 myid 파일이라는 이름의 ID 파일을 생성 한다.

CreateID (kafka-01)
echo 1 > /DATA/myid


CreateID (kafka-02)
echo 2> /DATA/myid
CreateID (kafka-03)
echo 3 > /DATA/myid


이제, zoo.cfg 파일을 수정 한다. 

dataDir는 이전에 생성한 디렉토리로 설정하여 스냅샷 파일과 트렌젝션 로그가 저장 될 수 있도록 설정한다.

앙상블 설정을 위하여  server.n 구조로 n에는 앞서 설정한 id 번호를 설정 하여 각 앙상블 호스트 이름을 설정 하며 뒤에 2888:3888 포트는 앙상블 안에 있는 노드들 끼리 연결 하는데 사용 한다.

- tickTime millisecond 단위로 heartbeat과 최소 세션 타임아웃 시간으로 사용
- dataDir :in-memory 데이터베이스 스냅샷을 저장하는 위치로 지정하지 않는경우 메모리에 트랜잭션 업데이트 로그를 저장
- clientPort : Listen하는 포트
/opt/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/DATA
clientPort=2181
 
server.1=kafka-01:2888:3888
server.2=kafka-02:2888:3888
server.3=kafka-03:2888:3888


 위에서 설정파일로 ZooKeeper를 실행 하고 확인 한다.


Start ZooKeeper
# /opt/zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
 
 
# netstat  -nltp | grep 2181
tcp6       0      0 :::2181                 :::*                    LISTEN      14528/java
 
 
# ls /DATA/
myid  version-2  zookeeper_server.pid


관리의 편의서을 위한 systemd 파일을 생성 한다. 

/etc/systemd/system/zookeeper.service
[Unit]
 
Description=ZooKeeper Service
Documentation=http://zookeeper.apache.org
 
 
[Service]
 
Type=forking
User=root
Group=root
ExecStart=/opt/zookeeper/bin/zkServer.sh start /opt/zookeeper/conf/zoo.cfg
ExecStop=/opt/zookeeper/bin/zkServer.sh stop /opt/zookeeper/conf/zoo.cfg
ExecReload=/opt/zookeeper/bin/zkServer.sh restart /opt/zookeeper/conf/zoo.cfg
WorkingDirectory=/DATA
 
[Install]
WantedBy=default.target


아래와 같이 systemd 에 등록 , 실행하여 상태를 확인 한다.

configure systemctl
# systemctl  daemon-reload
# systemctl  start zookeeper.service
# systemctl  enable zookeeper.service
# systemctl  status zookeeper.service
● zookeeper.service - ZooKeeper Service
   Loaded: loaded (/etc/systemd/system/zookeeper.service; enabled; vendor preset: disabled)
   Active: active (running) since Fri 2018-07-06 05:49:30 UTC; 1min 18s ago
     Docs: http://zookeeper.apache.org
 Main PID: 14685 (java)
           └─14685 java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /opt/zookeeper/...
 
Jul 06 05:49:29 kafka-02.novalocal systemd[1]: Starting ZooKeeper Service...
Jul 06 05:49:29 kafka-02.novalocal zkServer.sh[14676]: ZooKeeper JMX enabled by default
Jul 06 05:49:29 kafka-02.novalocal zkServer.sh[14676]: Using config: /opt/zookeeper/conf/zoo.cfg
Jul 06 05:49:30 kafka-02.novalocal zkServer.sh[14676]: Starting zookeeper ... STARTED
Jul 06 05:49:30 kafka-02.novalocal systemd[1]: Started ZooKeeper Service.



2-1. kafka install 


 아래와 같이 kafka 를 설치 한다.

kafka install(ALL NODE)
cd /opt
tar xvzf kafka_2.11-1.1.0.tgz
ln -s kafka_2.11-1.1.0 kafka


KAFKA_LOG 라는 log가 쌓일 디렉토리를 생성 한다.
Create Directory(ALL NODE)
mkdir /KAFKA_LOG

server.properties 파일을 수정 한다.

기본 설정 파일에서 변경할 부분은 아래와 같다.

broker.id는 브로커간 구분 해야할id로, 서로 다른 값을 준다. 편의상 각노드 숫자로 입력 한다.
log.dirs 은 위에서 생성한 디렉토리로 설정 하며
zookeeper.connect 은 ZooKeeper의 접속정보를 수정하며, 브로커가 시작할 때 지노드가 주키퍼에 없으면 자동으로 생성 한다.

delete.topic.enable 은 topic을 삭제 하기 위해서 추가 해야 한다(optional)

/opt/kafka/config/server.properties(ALL NODE)

아래와 같이 모든 노드에서 kafka를 실행 한다.

Start Kafka (ALL NODE)
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

중지는 아래와 같이 가능 하다.

Stop Kafka (ALL NODE)
/opt/kafka/bin/kafka-server-stop.sh

Kafka도 관리의 편의성을 위하여 systemd에 등록 하기 위하여 kafka.service  파일을 생성 한다.

/etc/systemd/system/kafka.service (ALL NODE)
[Unit]
Description=Apache Kafka server (broker)
 
[Service]
Type=simple
User=root
Group=root
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
 
[Install]

아래와 같이 systemd 에 등록 , 실행하여 상태를 확인 한다.

configure systemctl
# systemctl enable kafka.service
Created symlink from /etc/systemd/system/multi-user.target.wants/kafka.service to /etc/systemd/system/kafka.service.
# systemctl stop kafka.service
# systemctl start kafka.service
# systemctl status kafka.service
● kafka.service - Apache Kafka server (broker)
   Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: disabled)
   Active: active (running) since Fri 2018-07-06 06:49:39 UTC; 4s ago
 Main PID: 17938 (java)
           └─17938 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapO...
 
Jul 06 06:49:41 kafka-02.novalocal kafka-server-start.sh[17938]: [2018-07-06 06:49:41,160] INFO [Grou...)
Jul 06 06:49:41 kafka-02.novalocal kafka-server-start.sh[17938]: [2018-07-06 06:49:41,160] INFO [Grou...)
Jul 06 06:49:41 kafka-02.novalocal kafka-server-start.sh[17938]: [2018-07-06 06:49:41,174] INFO [Prod...)
Jul 06 06:49:41 kafka-02.novalocal kafka-server-start.sh[17938]: [2018-07-06 06:49:41,193] INFO [Tran...)
Jul 06 06:49:41 kafka-02.novalocal kafka-server-start.sh[17938]: [2018-07-06 06:49:41,194] INFO [Tran...)
Jul 06 06:49:41 kafka-02.novalocal kafka-server-start.sh[17938]: [2018-07-06 06:49:41,205] INFO [Tran...)
Jul 06 06:49:41 kafka-02.novalocal kafka-server-start.sh[17938]: [2018-07-06 06:49:41,236] INFO [/con...)
Jul 06 06:49:41 kafka-02.novalocal kafka-server-start.sh[17938]: [2018-07-06 06:49:41,248] INFO Kafka...)
Jul 06 06:49:41 kafka-02.novalocal kafka-server-start.sh[17938]: [2018-07-06 06:49:41,248] INFO Kafka...)
Jul 06 06:49:41 kafka-02.novalocal kafka-server-start.sh[17938]: [2018-07-06 06:49:41,249] INFO [Kafk...)

2-2. kafka test


cy 라는 이름의 replication 1,partition 1 로 설정 된 topic을 생성한다 .

create topic
# /opt/kafka/bin/kafka-topics.sh --create --zookeeper kafka-01:2181,kafka-02:2181,kafka-03:2181 --replication-factor 1 --partitions 1 --topic cy
Created topic "cy".


삭제는 아래와 같이  --delete옵션을 통하여 가능 하다.

create topic
#  /opt/kafka/bin/kafka-topics.sh --zookeeper kafka-01:2181,kafka-02:2181,kafka-03:2181 --topic cy --delete


삭제한 topic을 재 생성후, topic리스트를 확인 한다.

recreate topic,topic list
# /opt/kafka/bin/kafka-topics.sh --create --zookeeper kafka-01:2181,kafka-02:2181,kafka-03:2181 --replication-factor 1 --partitions 1 --topic cy
Created topic "cy".
# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper kafka-01:2181,kafka-02:2181,kafka-03:2181
Topic:cy PartitionCount:1 ReplicationFactor:1 Configs:
Topic: cy Partition: 0 Leader: 1 Replicas: 1 Isr: 1


console-producer 에서 쉘 환경으로 메세지입력 
console-producer
# /opt/kafka/bin/kafka-console-producer.sh --broker-list  kafka-01:9092,kafka-02:9092,kafka-03:9092  --topic cy
>123
>123
>123
>123
>123


다른 쉡을 띄어서 console-consume 를 통하여 메세지가 확인 되는것을 통하여 테스트 진행 

console-consumer
# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 --topic cy_Test --from-beginning
 
123
123
123
123
123



반응형

'System > Kafka' 카테고리의 다른 글

kafka manager  (0) 2018.07.08
kafka partition, replication(ISR)  (0) 2018.07.07
Kafka Default install test  (0) 2017.09.30