본문 바로가기

D.S/DE

220326토 - 카프카, 주키퍼 설정

728x90

“카프카 데이터 플랫폼의 최강자“ 책 참조

주키퍼

  • 주키퍼는 과반수 방식으로 운영되서 홀수로 서버를 구성해야 함
  • 지노드 스냅샷, 트랜잭션 로그, myid 저장 폴더
    • 주키퍼는 별도의 데이터 디렉토리에 지노드의 복사본인 스냅샷과 트랜잭션 로그들을 저장한다. 지노드에 변경사항이 발생하면, 이러한 변경사항은 트랜잭션 로그에 추가됨. 로그가 어느 정도 커지면, 현재 모든 지노드의 상태 스냅샷이 파일시스템에 저장되는 중요한 디렉토리이기 때문에 설치 경로와는 다른 경로로 설정하는 것이 바람직함. ( * 현재 주키퍼 docker 설정도 보면 보통 data 폴더에 로그를 생성하는 것 같다.)
    • 주키퍼 클러스터를 구성할 때, 이 data 폴더에 주키퍼 노드를 구분하기 위한 ID를 생성해야 한다. (myid 라고 부름)
    • zoo.cfg에서 dataDir에 이 폴더 경로를 등록
# 주키퍼 노드의 data 폴더에 myid 파일을 만들고 id(여기선 1)를 저장
echo 1 > ./data/myid

zoo.cfg 기본(?) 설정

# The number of milliseconds of each tick
tickTime=2000

# The number of ticks that the initial
# synchronization phase can take
initLimit=10

# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5

# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zookeeper-3.4.6/data

# the port at which the clients will connect
clientPort=2181

# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60

#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#

# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3

# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1

# 주키퍼 클러스터 구성시
# server.{myid}=zk_hostname:2888:3888
# server.1=my_zk01:2888:3888
# server.2=my_zk02:2888:3888


주키퍼를 사용하는 자바 애플리케이션에서 주의해야 할 사항

  • 주키퍼로 노드를 괸리하는 애플리케이션들은 임시 노드(ephemeral node)를 이용해 애플리케이션의 호스트를 등록하고, 애플리케이션과 통신이 끊어지면 임시 노드에서 해당 호스트를 삭제하게 된다.
  • 자바 기반의 애플리케이션들은 간혹 메모리를 많이 사용하면서 full GC(full garbage collection)이 발행하게 되는데 애플리케이션은 해당 gc 타임 동안 일시적으로 멈춤 상태에 빠지게 된다.
  • 자바 애플리케이션의 주키퍼 세션 타임아웃(zookeeper.session.timeout) 설정을 너무 짧게 설정하게 되면 GC 타임으로 인해 노드가 다운된 것으로 판단해 의도치 않은 동작을 하게 될 가능성이 높다.
  • 그러므로 자바 애플리케이션의 GC 타임을 주기적으로 체크하고, 세션 타임아웃 설정도 3초 이상으로 설정해두기를 권장

카프카

  • 카프카는 주키퍼와 다르게 홀수운영 구성을 하지 않아도 된다. (4대, 5대, 10대 자유롭게 서버 구성 가능)
  • 카프카와 주키퍼는 다른 서버에서 관리하자. 카프카와 주키퍼를 같은 서버에 두는건 대규모로 카프카를 운영하는 환경에서는 좋은 방법이 아니다.
  • 카프카에서 스칼라 버전이 따라 붙는데, 스칼라는 자바로 컴파일되어 자바 가상머신(JVM)에서 실행된다.
  • 주키퍼 설정 정보 입력 시 주키퍼 클러스터의 모든 호스트를 입력해줘야 함. 한 대만 입력한 경우 그 주키퍼가 다운되면 문제가 생김
#주키퍼 앙상블 서버 리스트를 모두 입력하자
zookeeper.connect=my-zk001:2181,my-zk002:2181,my-zk003:2181 

  • 위처럼 설정했을 때는 주키퍼 지노드 최상위 경로를 사용하게 된다. 그렇게 되면 하나의 주키퍼 앙상블 세트와 하나의 애플리케이션만 사용할 수 있게 된다. (서로 다른 애플리케이션에서 동일한 지노드를 사용하게 될 경우 데이터 충돌이 발생할 수 있음)
  • 최상위 경로를 사용하지 않고 지노드를 구분해서 사용하면 주키퍼 앙상블 세트를 여러 개의 애플리케이션에서 공용으로 사용할 수 있다.
  • 지노드 구분해서 사용하려면 호스트 이름과 포트 정보를 입력한 후 마지막 부분에 사용할 지노드 이름을 추가 입력하면 된다.
# 폴더구조처럼 구분해서 사용
zookeeper.connect=my-zk001:2181,my-zk002:2181,my-zk003:2181/my-test01
zookeeper.connect=my-zk001:2181,my-zk002:2181,my-zk003:2181/my-test02

zookeeper.connect=my-zk001:2181,my-zk002:2181,my-zk003:2181/my-test/01
zookeeper.connect=my-zk001:2181,my-zk002:2181,my-zk003:2181/my-test/02


카프카 데이터 보관 폴더

  • 카프카는 일반 메시지 큐 서비스들과 달리 컨슈머가 메시지를 가져가더라도 저장된 데이터를 임시로 보관하는 기능이 있다.
  • 데이터 보관 디렉토리를 하나만 구성할 수도 있고 여러 개를 만들 수도 있다.
  • 디스크가 여러 개인 서버의 경우 각 디스크의 수만큼 디렉토리를 만들어줘야 각 디스크별로 I/O 분산할 수 있다.
# conf에서 log.dir 설정
# 데이터 보관 디렉토리를 여러 개 사용할 떄 설정
# comma seperated list of directories under which to store log files
log.dirs=/data1,/data2


kafka 안의 properties 확인
server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://kafka:29092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://1.232.228.35:29092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/kafka/kafka-logs-3b91f50dabc3

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=zookeeper:2181/test01

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
port=9092
inter.broker.listener.name=PLAINTEXT

consumer.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092

# consumer group id
group.id=test-consumer-group

# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
#auto.offset.reset=
반응형