반응형

1. 서론

Kafka 를 사용하며 겪은 문제에 대해 설명하고 해결방안을 공유합니다.

 

2. 문제

Kafka 사용 중에 send한 데이터가 유실된 것을 발견되었습니다.

저는 acks 설정을 all로 했으며, send 후에는 수동으로 flush 까지 호출을 했습니다.

 

하지만 메시지는 유실이 되어 저는 매우 당황했습니다.

 

문제 파악을 위해 Kafka Producer의 flush 메서드를 살펴봤습니다.

 

 

코드를 살펴보니 요청이 실패하면 record는 유실이 될수도 있다고 친절히 나와있네요.. OMG

 

그럼, 실패로 간주하는 경우는 언제 일까요?

 

Producer 

 

먼저 Producer의 내부 동작을 알아보겠습니다.

 

Producer는 내부적으로 아래와 같이 동작합니다.

 

1. Kafka Producer send => accumulator 버퍼에 append

2. linger.ms 와 batch.size 설정을 통해 flush => 실제 broker에 쓰기 요청

3. 이때 acks 설정에 따라 응답을 받게되며, 실패 시 retries 와 delivery.timeout.ms 설정으로 재시도 여부를 판단.

 

아래는 KafkaProducer가 실제 send를 하기위해 사용하는 Sender 클래스의 일부입니다.

 

 

broker에게서 error를 받은 경우 canRetry 메서드를 호출합니다.

canRetry는 재시도가 가능한지 판단하는 메서드입니다.

아래 그림과 같이 retries, 와 delivery.timeout.ms 설정에 따라 boolean 값을 반환합니다.

 

그럼, retries와 delivery.timeout.ms default 값을 알아보겠습니다.

 

retries는 Integer.MAX로 2147483647 입니다.

delivery.timeout.ms120000 로 2분 입니다.

 

즉, 2분 동안 2147483647 횟수만큼은 재시도를 한다가 되는데요.

 

이 조건에도 실패가 되는 경우에는 사용자 정의 callback 함수를 통해 재처리하는 방향으로 구현을 해야합니다.

 

 

반응형

 

3. 해결 방안

저의 경우 실패한 이유는 broker 측에서 timeout 내에 replica가 이루어지지 않아서 인데요.

 

위와 같은 경우에는 브로커 성능상 이슈로 아래와 같은 해결방안들이 있을 수 있습니다.

 

  1. 토픽의 replica 설정 값을 내리는 방법(reduce topic's replica-factor config value)
  2. 브로커 설정 중 num.network.threads and num.io.threads 의 값을 늘리는 방법  (Increase the value of num.network.threads and num.io.threads among broker settings)
  3. broker 서버의 cgroup memory limit을 늘려, follower들의 fetch write시 disk io 를 감소시켜 복제 속도를 증가시키는 방법
  4. 2번의 broker 설정 보다 producer의 병렬 처리 갯수가 과도하게 많은지 체크.
    1. 많다면, broker 의 cpu, memory 사용률이 surge하지 않는 선에서 producer와 broker 설정 조절

4. 마무리

위 내용에서 본 것처럼, Kafka 는 여러 Config 를 제공하고 있습니다. 

여러분은 Kafka 내부동작을 자세히 알고 비즈니스 요구사항에 맞게 설정값을 세팅해서 사용하도록 해야합니다.

 

이 글이 여러분의 설정 값 세팅에 기여했으면 좋겠습니다.

 

감사합니다.

반응형
반응형

1. 서론

안녕하세요.

 

저희 조직은 DB cdc를 kafka 로 흘려주며 데이터의 스키마는 schema registry를 사용하여 관리하고 있습니다.

 

최근 spring kafka + schema registry + gradle plugin 를 사용하여 application 에서 cdc 데이터를 consume 하는 작업을 하였는데

다른 분들에게 공유하기에 좋다고 생각들어 포스팅하였습니다.ㅎㅎ

 

2. gradle plugin

schema registry 를 사용하여 kafka record를 consume하는 과정은 아래 절차로 이루어져야 합니다.

 

  1. schema registry 를 통해 avro 스키마 DOWNLOAD
  2. DOWNLOAD 한 avro 스키마를 기반으로 java class 생성
  3. 생성된 java class 를 kafka consume record의 모델로 사용

이때, 1~2번의 과정은 빌드 도구인 gradle에서 제공하는 plugin이 있습니다.

 

1) com.github.imflog:kafka-schema-registry-gradle-plugin

 

schema registry에서 avsc file을 다운로드 하는 plugin 입니다.

 

plugin 적용 방법은 아래와 같습니다.

 

1. repositories 추가

 

build.gradle 파일에 repository를 등록해주세요.

 

 

2. dependencies classpath 추가

 

repository를 등록했으니 필요한 dependencies classpath를 추가합니다.

 

 

이때, 버전 호환이 중요합니다.

 

kafka-schema-registry-gradle-plugin 과 gradle-avro-plugin 는 모두 org.apache.avro:avro를 참조하고 있습니다.

 

org.apache.avro:avro 의 1.8.x 버전은 bug 도 존재할 뿐더러 time 관련된 부분을 java.utile.time 이 아닌 joda를 사용하고 있는데요.

때문에, 가능하다면schema-registry-plugin:5.5.1 , gradle-avro-plugin:0.21.0 의 조합으로 사용하는것을 권장합니다.

권장하는 조합의 plugin 버전은 org.apache.avro:avro 1.10.x 를 사용하며
miner 버전 조합인 schema-registry-plugin:5.3.4 , gradle-avro-plugin:0.16.0 은 org.apache.avro:avro 1.8.x를 사용하고 있습니다.

 

3. gradle task 추가

 

아래와 같이 plugin은 apply 하게 되면 2번째 사진과 같이 schemaRegistry 구문을 사용할 수 있습니다.

download.subject는 subjectName, download path 로 이해하시면 됩니다.

저의 경우 consume만 하므로 download task만 적용하였습니다.

 

 

plugin에 대한 자세한 내용은 여기를 참고해주세요.

 

 

2) com.commercehub.gradle.plugin:gradle-avro-plugin

 

avsc file 을 사용하여 java class를 생성해주는 plugin 입니다.

 

plugin 적용 방법은 아래와 같습니다.

 

1. repository 및 dependencies 추가

 

buildscript { 
	repositories { 
    	jcenter() 
    } 
    dependencies { 
    	classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:VERSION" 
    } 
}

 

2. gradle plugin apply 추가

 

apply plugin: "com.commercehub.gradle.plugin.avro-base"

 

위 1~2번까지 완료하신 후 gradle sync를 한번 해주신다면 generateAvro라는 task 를 사용할 수 있습니다.

 

해당 task 를 수행하게 되면 avsc 파일을 사용하여 java class를 생성하게 됩니다.

avro file path, gerate java class path 같은 부분은 gradle 파일에서 custom 가능합니다.

 

 

반응형

 

 

3. 적용 시 겪은 문제점

저의 경우 2개의 plugin 모두 적용 후 실제 app에서 kafka record를 가져가는데 문제가 있었습니다.

 

1) stringType

 

문제에 대해 설명하기 전 avro의 string type은 3가지를 지원하고 있습니다.

Utf8의 경우에는 python과 같은 java 뿐만이 아닌 언어의 string type까지 지원하기 위해서 인것 같습니다.
  • CharSequence (java.lang.CharSequence)
  • String (java.lang.String)
  • Utf8 (org.apache.avro.util.Utf8)
기본은 String 이며 아래와 같이 변경도 가능합니다.

 

문제는 com.commercehub.gradle.plugin:gradle-avro-plugin 를 통해 생성한 java class에 kafka record를 setting 할때 발생했습니다.

 

kafka record를 java class에 세팅하는 작업은 plugin을 통해 만든 java class의 put method를 통해 이루어 집니다.

 

put 메서드는 아래와 같이 type casting을 (java.lang.String) 으로 하게 되는데요.

제가 consume할 kafka record는 python code 로 publishing을 하고 있어 type casting exception이 발생하게 됩니다.

 

 

하지만, 이 문제는 다행히 plugin version up으로 해결되었습니다.

 

최신 version인 gradle-avro-plugin:0.21.0 를 사용하면 아래와 같이 put 메서드가 만들어집니다.

 

.toString으로 type casting 하도록 수정되어 더이상 type casting 예외는 발생하지 않게됩니다.

 

 

2) java.time.Instant

 

저희는 kafka record에서 timestamp 관련 데이터는 long type의 logicalType은 timestamp-millis로 정의해서 사용하고 있습니다.

 

이 경우, put method는 아래와 같이 만들어집니다.

 

 

type이 long인데 , java.time.Instant 타입을 사용하는게 의아해 할 수 있는데요.

이유는 logicalType으로 timestamp-millis 를 지정했기 때문입니다.

참고 - TimeConversions

 

하지만, 실제로 데이터를 consume 과정에서는 아래 예외가 발생하게 됩니다.

java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.time.Instant 

 

이유는, 역직렬화시 long type의 데이터를 logicalType 명시에 맞도록 casting 되지 않아서 입니다.

 

이 문제는, kafka consumer의 설정으로 해결할 수 있었습니다.

kafka + schema registry 연계를 많이 사용하고 있어 kafka 측에서는 consumer에 아래와 같은 옵션을 제공합니다.

 

 

기본은 false이며, avro schema를 사용하여 consume 하는 경우에는 해당 옵션을 true로 변경하여 사용하면 됩니다.

 

4. 마무리

이번 포스팅은 spring kafka + schema registry + gradle plugin 적용에 대해서 진행했습니다.

 

모두, 저같이 삽질을 하지 않았으면 좋겠다는 마음으로 작성하게 되었는데요.

도움이 되었으면 좋겠네요ㅎㅎ

 

감사합니다.

반응형

'MQ > Kafka' 카테고리의 다른 글

(7) Kafka Trouble Shooting  (0) 2021.11.19
(5) 카프카 사용 시 고려사항  (0) 2020.02.26
(4) 카프카 매니저 & 스키마 레지스트리  (0) 2020.02.26
(3) 카프카 사용 예제  (0) 2020.02.25
(2) 카프카 설치  (0) 2020.02.25
반응형

1. 서론

카프카 사용시 고려사항에 대해 포스팅을 진행하겠습니다.

실제로, 카프카를 운영 및 사용하게 된다면 고려할 사항들은 생각보다 많습니다.

그 중, 3가지 정도만을 추려 공유드리고자 합니다.

공유 드릴 내용은 아래와 같습니다.

  1. idempotent (멱등성) 보장
  2. produce 역전 가능성
  3. 카프카 성능

2. idempotent (멱등성) 보장

카프카는 pub-sub 구조입니다.

이 말은, sub 하는 입장에서 동일한 메시지를 한번이 아닌 여러번 처리할 수 있다는 얘기가 됩니다.

결국, consumer를 개발할 시 idempotent (=멱등성)이 보장되어야 합니다.

idempotent란 같은 input에 대해서는 항상 동일한 결과가 나오는 것을 의미합니다.

idempotent가 보장되지 않는다면 데이터가 꼬여버리는 현상을 맞이하게 될 것입니다.

 

 

 

 

 

 

반응형

 

 

 

 

 

 

3. produce 역전 가능성

이전 포스팅에서 partition이 1인 경우 메시지의 순서가 보장된다고 한적이 있습니다.

정확하게 말씀드리면 보장이 되지 않습니다.

그 이유는 아래 그림을 보며 설명해 드리도록 하겠습니다.

카프카는 내부적으로 producer에게 받은 메시지를 buffer에 저장 후, 차례대로 broker 서버의 disk에 write 하게 됩니다.

하지만, 이 buffer에 역전되어 들어가는 상황이라면

위 그림의 2번 메시지가 먼저 write하게 되며 아래그림처럼 저장이 됩니다.

 

2번 메시지가 1번 메시지보다 offset이 작아지게 되고 결국, 개발할 경우 항상 역전에 대해 고려를 해야합니다.

 

저의 경험을 공유드리자면, 저는 메시지를 sub하여 DB CUD를 치는 상황이었습니다.

이슈는 아래와 같은 방법으로 해결하였습니다.

 

  1. 메시지에 timestamp 값 추가.
  2. 메시지를 pollSize 만큼 가져와 sorting 혹은 merge 수행.
  3. DB 테이블에 version 칼럼을 추가하여 timestamp 값 기준으로 optimistic lock CUD 수행.

DB에 칼럼까지 추가한 이유는 poll 과 poll 사이에 역전이 된 메시지가 존재할 수 있기 때문입니다.

 

4. 카프카 성능

 

카프카는 메시지 유입 시, leader에 write 후 follower가 fetch write를 하게됩니다.

 

이때, fetch write시 카프카는 os에서 제공하는 shared page cache를 사용합니다.

 

shared page cache에는 최근 메시지를 저장하게 되고, follower는 이 cache 에 메시지가 없는경우 disk 접근하여 fetch write를 수행합니다.

 

또한, 이 shared page cache는 리눅스의 cgroup의 memory limit에 영향을 받습니다.

 

그러므로, 카프카의 메시지 복제 성능을 향상하고 싶은 경우 cgroup의 memory limit을 늘려주는것도 하나의 방법입니다.

 

5. 마무리

이번 포스팅에서는 실제로 제가 겪은 카프카 사용시 고려사항들에 대해 포스팅 하였습니다.

 

이렇게, 카프카 관련 1~5의 포스팅을 완료 하였습니다.

 

감사합니다.

반응형

'MQ > Kafka' 카테고리의 다른 글

(7) Kafka Trouble Shooting  (0) 2021.11.19
(6) spring kafka + schema registry + gradle plugin 적용  (4) 2020.10.22
(4) 카프카 매니저 & 스키마 레지스트리  (0) 2020.02.26
(3) 카프카 사용 예제  (0) 2020.02.25
(2) 카프카 설치  (0) 2020.02.25
반응형

1. 서론

이번 포스팅에서는 카프카 운영에 도움이 되는 Tool인 

카프카 매니저와 스키마 레지스트리에 대해 소개하려고 합니다.

 

2. 카프카 매니저

 

2-1. 소개

 

카프카 매니저는 야후에서 오픈소스로 제공하는 카프카 관리 GUI 툴입니다.

 

아마 이전 포스팅을 보셨다면, 터미널이 아닌 사진을 보셨을 텐데요.

그 사진이 제 카프카 매니저에서 발췌한 사진입니다.ㅎㅎ

 

카프카 매니저는 아래와 같은 기능을 제공합니다.

 

  • 다수의 카프카 클러스터 상태 확인
  • 클러스터의 브로커 상태 확인
  • 토픽 리스트 조회
  • 토픽 생성, 삭제
  • 토픽 설정 변경
  • 컨슈머 그룹 offset & lag 확인
  • 토픽 상태 확인( isr, leader 등)
  • 파티션의 leader & follower 변경 ( = partition reassign )

 

2-2. 설치

 

1) 파일 다운로드 

 

파일은 아래 주소에서 원하는 버전을 선택하여 다운받을 수 있습니다.

 

저는 현재 최신 버전인 3.0.0.1 release를 다운받도록 하겠습니다.

  • 저는 홈디렉터리에서 다운받아 진행하도록 하겠습니다.
wget https://github.com/yahoo/CMAK/archive/3.0.0.1.tar.gz
tar -zxvf 3.0.0.1.tar.gz
cd CMAK-3.0.0.1

 

위 명령어를 수행하면 ~/CMAK-3.0.0.1 폴더가 생긴것을 확인할수 있습니다.

 

2) sbt 빌드

 

카프카 매니저는 스칼라 언어로 되어있어 빌드 툴인 sbt를 이용하여 빌드를 진행합니다.

sbt는 소스에 들어가있어 별도 설치는 하지않으셔도 됩니다.

 

아래와 같이 sbt 명령어를 수행합니다.

 

  • CMAK 3.0.0.1 버전에서 제공되는 sbt는 1.3.8 버전입니다.
  • dist 명령어는 컴파일 및 빌드하여 어플리케이션이 실행될수 있도록 zip 파일로 제공해주는 명령어입니다.
  • zip 파일은 target/universal 폴더에 만들어집니다.
./sbt clean dist

 

시간이 상당히 걸려 잠시 커피한잔 마시고 오시는것을 추천드립니다.ㅎㅎ

 

 

수행이 모두 끝나면 ~/CMAK-3.0.0.1/target/universal/cmak-3.0.0.1 zip 파일이 생성된것을 확인할 수 있습니다.

 

 

 

 

 

 

반응형

 

 

 

 

 

 

3) 압축 해제 & 설정 파일 수정

 

이제 ~/CMAK-3.0.0.1/target/universal/cmak-3.0.0.1 zip 파일을 압축 해제합니다. 명령어는 아래와 같습니다.

 

unzip ~/CMAK-3.0.0.1/target/universal/CMAK-3.0.0.1.zip

 

해제가 완료되었으면 cmak-3.0.0.1 폴더가 생성된것을 확인할 수 있습니다.

 

이젠 설정 파일을 수정할 차례입니다.

 

설정파일 경로는 ~/CMAK-3.0.0.1/target/universal/cmak-3.0.0.1/conf/application.conf 입니다.

 

수정 내용은 아래 cmak.zkhosts 의 값을 zookeeper 주소를 적어 주시면 됩니다.

  • 주소 기입 양식 = "<zookeeper node1 hostname>:<zookeeper node1 client port>,<zookeeper node2 hostname>:<zookeeper node2 client port>"

 

 

 

4) 실행

 

실행은 ~/CMAK-3.0.0.1/target/universal/cmak-3.0.0.1/bin/cmak 명령어를 실행하여 주시면 됩니다.

 

5) 확인

 

실행한 서버의 9000 포트로 브라우저 접속을 하시면 아래와 같은 결과를 볼 수 있습니다.

 

3. 스키마 레지스트리

 

3-1. 소개

 

카프카는 메시지의 스키마를 관리하지는 않습니다. 단순히, 메시지를 저장하고 제공하는 역할을 할 뿐입니다.

 

결국, 항상 producer와 consumer는 메시지의 스키마 약속을 해야했고,

한쪽이 약속을 어긴다면 정상처리를 못하는 상황이 일어나게 됩니다.

 

스키마 레지스트리는 이와 같은 문제점을 해결하기 위하여

각 토픽에 들어갈 메시지의 스키마를 중앙관리하는 서비스입니다.

 

스키마 레지스트리의 특징은 아래와 같습니다.

 

  • Avro를 사용하여 스키마를 정의합니다.
  • restApi로 스키마 조회/생성/삭제 기능을 제공합니다.
  • 각 스키마는 버저닝이 가능합니다.

 

 

3-2. 설치

 

저의 경우 카프카 설치 에서 소개한 confulent를 이용하여 설치하도록 하겠습니다.

 

카프카 설치 포스팅에서 말씀드린 ~/apps/confluent-5.4.0 에 다운받았다는 가정하에 진행하겠습니다.

(카프카 설치 를 안보신 분은 보고 오시는 것을 추천드립니다.)

 

1) 설정 파일 수정

 

스키마 레지스트리 설정 파일을 수정합니다.

 

  • 경로 = ~/apps/confluent-5.4.0/etc/schema-registry/schema-registry.properties
  • listeners , host.name , kafkastore.bootstrap.servers 설정을 수정합니다.
    • 아래 listeners 설정 = 모든 ip에서 8081 port에 대한 접근을 허용.
    • 아래 host.name 설정 = 현재 서버의 ip 주소.
    • 아래 kafkastore.bootstrap.servers 설정 = 카프카 브로커 주소.
listeners=http://0.0.0.0:8081
host.name=host-ip
kafkastore.bootstrap.servers=PLAINTEXT://broker-1:9092,SSL://broker-2:9092

 

2) 실행

 

스키마 레지스트리 실행은 아래와 같습니다.

 

  • bin 디렉토리에 있는 schema-registry-start 명령어를 수행합니다.
  • 명령어 수행 시 인자로는 위에서 정의한 schema-registry.properties 파일 경로를 제공합니다.
~/apps/confluent-5.4.0/bin/schema-registry-start ~/apps/confluent-5.4.0/etc/schema-registry/schema-registry.properties

 

3) 확인

 

실행한 서버의 8081 port로 request 시 {} response가 오면 정상 실행된것입니다.

 

 

 

4. schema-registry-ui

 

schema-registry-ui는 스키마 레지스트리의 restApi 기능을 GUI로 제공해주는 서비스입니다.

설치는 https://github.com/lensesio/schema-registry-ui 를 참고하시면 됩니다.

 

  • docker image 로도 제공.
  • 스키마 리스트 확인.
  • 스키마 생성, 수정, 삭제.

아래는 schema-registry-ui 의 화면입니다.

 

 

 

5. 마무리

 

이번 포스팅에서는 카프카 운영에 도움이 되는 카프카 매니저 & 스키마 레지스트리 소개를 진행하였습니다.

다음 포스팅은 카프카 사용 시 고려사항들에 대해 포스팅하도록 하겠습니다.

 

반응형

'MQ > Kafka' 카테고리의 다른 글

(6) spring kafka + schema registry + gradle plugin 적용  (4) 2020.10.22
(5) 카프카 사용 시 고려사항  (0) 2020.02.26
(3) 카프카 사용 예제  (0) 2020.02.25
(2) 카프카 설치  (0) 2020.02.25
(1) 카프카란?  (2) 2020.02.25
반응형

1. 서론

이번 포스팅에서는 CLI를 통해 간단한 사용 예제를 포스팅하려고 합니다.

 

예제로는 아래와 같습니다.

 

  1. 토픽 생성
  2. 토픽 리스트 조회
  3. 토픽 상세 조회
  4. message pub
  5. message sub
  6. 컨슈머 그룹의 offset/lag 정보 확인

2. 사용 예제

 

2-1 토픽 생성

 

  • 명령어 경로 = 카프카 설치 에서 진행한 ~/apps/confluent-5.4.0/bin 디렉토리에 있습니다.
  • 아래는 replication이 2이고, partition 갯수는 3개인 test_topic을 생성하는 명령어 입니다.
  • --bootstrap-server 에는 각 브로커 서버의 hostname과 port를 ',' 구분자로 적어주시면 됩니다.
    • 저의 경우 브로커 3대를 기입하였습니다.
    • 브로커의 default port는 9092 입니다.

 

kafka-topics --create --bootstrap-server broker-1:9092,broker-2:9092,broker-3:9092 --replication-factor 2 --partitions 3 --topic test_topic

 

2-2 토픽 조회

 

토픽을 생성하였으니 잘 생성되었는지 확인이 필요합니다.

이런 경우, 아래와 같이 토픽 리스트 조회 명령어를 수행하면 됩니다.

kafka-topics --list --bootstrap-server --bootstrap-server broker-1:9092,broker-2:9092,broker-3:9092

 

아래와 같이 test_topic을 확인할 수 있습니다.

 

 

2-3. 토픽 상세 조회

 

토픽 조회에서는 단순히 카프카에 있는 토픽명만을 리스트로 보여주게 됩니다.

토픽의 상세 내용을 조회하고 싶은 경우는 아래 명령어를 통해 확인할 수 있습니다.

kafka-topics --describe --bootstrap-server broker-1:9092,broker-2:9092,broker-3:9092 --topic test_topic

 

결과로는 아래 정보들을 확인할 수 있습니다.

  • partition 갯수
  • replication 수
  • reader broker id
  • isr

 

 

 

반응형

 

 

 

 

 

 

 

2-4. message pub

 

토픽을 생성하였으니, 이번엔 해당 토픽에 메시지를 pub 하겠습니다.

 

명령어는 아래와 같습니다.

kafka-console-producer --broker-list broker-1:9092,broker-2:9092,broker-3:9092 --topic test_topic

 

CLI의 경우 enter 기반으로 메시지를 구분합니다.

저는 아래와 같이 [hi, my name is, young!!] 이라는 3개의 메시지를 pub하였습니다.

 

 

2-5. message sub

 

이번에는 pub한 메시지를 sub을 해보겠습니다.

sub을 하기 위해서는 consumer를 통해 수행됩니다.

 

아래는 sub하는 명령어 입니다.

 

kafka-console-consumer --bootstrap-server broker-1:9092,broker-2:9092,broker-3:9092 --topic test_topic --group test_consumer_group --from-beginning

 

명령어를 보시면 --group을 하는 것을 볼 수 있습니다.

이것은 카프카란? 에서 설명드린것과 같이 consumer는 consumer group이 있어야 하기 때문입니다.

  • 저의 test_consumer_group 이란 consumer group을 가진 consumer를 통하여 sub을 하였습니다.

 

아래는 sub한 결과 사진입니다.

[hi, my name is, young!!] 순서대로 pub했지만 sub은 순서대로되지 않은것을 확인할 수 있습니다.

 

이는 topic의 partition 갯수가 1이 아니기 때문입니다.

  • 카프카에서는 메시지의 pub 순서를 보장하기 위해서는 파티션 갯수가 1이 아닌이상 보장이 되지 않습니다.

 

아래 사진의 latest Offset을 보시면 3개의 메시지들이 3개의 파티션에 각각 들어간것을 볼 수 있습니다.

따라서, consumer는 각 파티션의 메시지를 sub하였고, 위와 같이 순서가 보장되지 않은 결과를 볼 수 있습니다.

 

 

2-6. 컨슈머 그룹의 offset/lag 정보 확인

 

이번엔 특정 컨슈머 그룹의 offset/lag 정보를 확인하는 명령어 입니다.

 

명령어는 아래와 같습니다.

  • --group에 확인하고 싶은 consumer group을 기입합니다.(저의 경우 위에서 sub한 test_consumer_group을 지정하였습니다.)
kafka-consumer-groups --bootstrap-server broker-1:9092,broker-2:9092,broker-3:9092 --group test_consumer_group --describe

 

 

아래는 결과 사진입니다.

  • 위에서 테스트한 test_topic의 결과를 볼 수 있습니다.
  • pub 메시지를 모두 sub하였으니 각 파티션의 lag는 0입니다.

 

 

3. 마무리

이번 포스팅에서는 간단한 CLI 기반의 카프카 사용예제를 진행하였습니다.

다음 포스팅에서는 카프카 사용시 도움되는 Tool들에 대해 소개하도록 하겠습니다.

 

반응형

'MQ > Kafka' 카테고리의 다른 글

(6) spring kafka + schema registry + gradle plugin 적용  (4) 2020.10.22
(5) 카프카 사용 시 고려사항  (0) 2020.02.26
(4) 카프카 매니저 & 스키마 레지스트리  (0) 2020.02.26
(2) 카프카 설치  (0) 2020.02.25
(1) 카프카란?  (2) 2020.02.25
반응형

1. 서론

이번 포스팅에서는 카프카 설치하는 방법을 튜토리얼처럼 정리하려고 합니다.

중간중간, 용어가 이해가지 않는 분들은 카프카란? 을 보고오시면 좋을거 같습니다.

 

2. 설치

설치는 최근 카프카를 주도적으로 이끌고 있는 confluent를 사용하여 진행하도록 하겠습니다.

 

confluent에서는 설치 방법을 다양하게 제공합니다. 그 중 저는 tar를 이용하여 진행하도록 하겠습니다.

그리고, ~/apps 경로에서 작업하겠습니다.

 

1) confluent-community tar 파일 다운로드

 

  • 카프카, 주키퍼만을 설치할 예정으로 community를 다운받도록 하겠습니다.
  • 현재 기준 최신인 5.4.0 버전을 다운받겠습니다.
curl -O http://packages.confluent.io/archive/5.4/confluent-community-5.4.0-2.12.tar.gz

 

2) 파일 해제

tar -zxvf confluent-community-5.4.0-2.12.tar.gz

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

3) zookeeper.properties 파일 수정

  • 파일 경로 = ~/apps/confluent-5.4.0/etc/kafka/zookeeper.properties
  • 아래는 confluent에서 가이드 되어있는 부분 중 dataDir, server.* 만 수정한 사진입니다.

  • server.* 에는 앙상블을 이룰 서버의 ip 혹은 hostname을 적습니다.
    • confluent에서는 server.<myid>=<hostname>:<leaderport>:<electionport> 형식으로 가이드 되어있습니다.
  • dataDir에는 zookeeper의 데이터를 담는 경로를 지정합니다.
    • 저의 경우에는 ~/zookeeper 경로를 만들어 지정하였습니다.
    • dataDir 경로에는 server.* 에 지정한 번호값이 myid 파일로 존재해야 합니다.
    • 아래 사진의 경우에는 앙상블 1번 서버의 myid 파일을 cat으로 확인한 사진입니다.

4) server.properties 파일 수정

  • 파일 경로 = ~/apps/confluent-5.4.0/etc/kafka/server.properties
  • broker.id, log.dirs, zookeeper.connect 3개의 값을 변경해줍니다.
    • broker.id는 unique 해야합니다.
    • log.dirs는 카프카관련 로그 경로를 지정해줍니다. (저의 경우 ~에 kafka 디렉토리를 만들어 지정하였습니다.)
    • zookeeper.connect의 경우에는 앙상블 서버들을 ',' 구분자로 이어줍니다. (port는 zookeeper.properties파일에 정의한 clientPort를 지정합니다)
broker.id=1
log.dirs=~/kafka/kafka-logs
zookeeper.connect=zookeeper-server-1:2181,zookeeper-server-2:2181,zookeeper-server-3:2181

 

5) zookeeper 실행

 

주키퍼 실행은 아래와 같습니다.

  • bin 디렉토리에 있는 zookeeper-server-start 명령어를 수행합니다.
  • 명령어 수행 시 인자로는 위에서 정의한 zookeeper.properties 파일 경로를 제공합니다.
~/apps/confluent-5.4.0/bin/zookeeper-server-start ~/apps/confluent-5.4.0/etc/kafka/zookeeper.properties

 

6) broker 실행

 

브로커 실행은 아래와 같습니다.

  • bin 디렉토리에 있는 kafka-server-start 명령어를 수행합니다.
  • 명령어 수행 시 인자로는 위에서 정의한 server.properties 파일 경로를 제공합니다.
~/apps/confluent-5.4.0/bin/kafka-server-start ~/apps/confluent-5.4.0/etc/kafka/server.properties

 

 

3. 마무리

설치는 비교적 간단하게 이루어졌습니다.

다음 포스팅에서는 설치를 하였으니, 간단하게 CLI 기반의 사용예제를 진행하도록 하겠습니다.

 

* 참고

실행 시 에러 로그들이 보일텐데, 이것은 각 노드들이 아직 연결이 안되었기 때문에 나는 로그입니다.

각 노드들을 실행해도 난다면 설정 파일에 잘못된것이 있는지 다시 한번 확인하시기 바랍니다.

반응형
반응형

1. 서론

최근, MQ(=message queue) 중 많은 사용과 관심이 증가하는 카프카에 대해서 소개하려고 합니다.

 

2. 카프카란?

아파치에서 제공하는 pub-sub 기반의 분산형 메시지 큐입니다.

dispatch가 아닌, subscribe 방식으로 기존 RabbitMQ에 비해 고성능입니다.

(단, RabbitMQ에서 제공하는 전체 트랜잭션은 제공되지 않습니다)

 

3. 카프카 용어

아래 그림은 카프카의 기본 컨셉 그림입니다.

그림에 나오는 broker, zookeeper, topic, partition, leader, follower, producer, consumer, (+ consumer group)대해

각 역할을 설명하도록 하겠습니다. 

3-1. broker (브로커)

 

브로커는 실제로 메시지를 저장하는 카프카의 각 노드입니다.

 

브로커의 역할은 아래와 같습니다.

 

  • pub으로 인해 들어온 데이터 저장 (disk 기반)
  • leader, follower 개념 존재

 

3-2. zookeeper (주키퍼)

 

주키퍼는 아파치에서 제공하는 코디네이션 분산 플랫폼입니다.

주키퍼는 hadoop, hbase 연계로도 사용하지만, 카프카와의 연계로도 사용합니다.

 

카프카와의 연계 시 역할은 아래와 같습니다.

 

  • broker health check
  • leader 선출
  • znode(파일 시스템의 폴더 구조)라는 곳에 broker의 meta정보 관리

 

zookeeper 클러스터는 앙상블이라 부르며, 앙상블은 데이터 write시 과반수가 넘으면 성공으로 간주하게 됩니다.

즉, 앙상블은 2N+1인 홀수로 구성이 필요합니다.

 

 

 

 

 

반응형

 

 

 

 

 

 

 

 

3-3. topic (토픽)

 

토픽은 큐들의 집합입니다.

큐들의 집합이라는 용어를 사용한 이유는 토픽은 한개 이상의 파티션이 있어야 하고, 파티션이 한개의 큐이기 때문입니다.

 

토픽의 역할과 특징은 아래와 같습니다.

 

 

3-4. partition (파티션)

 

파티션은 토픽을 이루는 큐입니다.

각 파티션에는 leader, follower, isr 존재합니다.

 

각 용어의 의미는 아래와 같습니다.

  • leader는 메시지를 write, read하는 역할을 수행하며, 토픽의 replication 갯수만큼 follower에게 복제를 명령합니다.
  • follower는 leader의 요청을 받아 메시지를 복제하는 역할을 합니다.
  • isr는 replication group을 의미하며, leader & follwer 선출 시 이 isr 내에서 선출하게 됩니다.

아래는 파티션의 leader, follower, isr에 관련해서 보여드리기 위해 캡처한 사진입니다.

 

위 사진을 보시면 0번 파티션의 정보로는,

leader는 2번 broker, isr(in sync replicas)는 현재 2번, 3번으로 되어 있는것을 보실 수 있습니다.

 

3-5. producer (프로듀서)

 

프로듀서는 카프카에서 메시지를 발행(pub)하는 주최를 의미합니다.

 

프로듀서의 특징은 아래와 같습니다.

  • 메시지 발행 시 직렬화를 제공합니다.(string, json, avro 등)
  • partition에 특정 데이터만을 발행하기 위해 메시지의 key를 지정할 수 있습니다.
  • 메시지 발행 후 broker로 부터 ack를 받을 수 있습니다. (https://kafka.apache.org/23/documentation.html#producerapi)

 

3-6. consumer (컨슈머)

 

컨슈머는 메시지를 subscribe하는 주최입니다.

 

컨슈머의 특징은 아래와 같습니다.

 

  • commit이라는 행위로 어느 offset까지 subscribe했는지 주키퍼에게 알림.
  • 컨슈머는 살아있다면 zookeeper에게 heart beat를 전송.
  • poll이라는 행위로 메시지를 subscribe하며, 이때 시간과 최대 갯수등 설정 가능(https://kafka.apache.org/23/documentation.html#consumerconfigs)

 

여기서 offset은 각 파티션에 메시지가 유입된 순서를 의미합니다.

 

 

3-7. consumer group (컨슈머 그룹)

 

컨슈머 그룹은 컨슈머들의 논리적인 그룹을 의미합니다.

 

컨슈머 그룹은 아래와 같은 특징을 가지고 있습니다.

 

  • 토픽별 컨슈머 그룹 단위로 offset과 lag를 관리.
  • 컨슈머는 컨슈머그룹을 필수로 가져야 함.
  • 토픽의 파티션 갯수만큼 컨슈머들이 동일 컨슈머 그룹에 존재하는것이 Best!!
    • 한 파티션은 하나의 컨슈머만 점유 가능하기 때문.
  • 파티션을 점유중인 컨슈머가 down 되었을 시 컨슈머 그룹내 리밸런싱 동작.
    • down된 컨슈머가 점유한 파티션을 다른 컨슈머에게 위임하는 작업.

 

여기서, lag는 한 토픽의 (총 메시지 갯수 - 컨슈머 그룹이 subscribe한 메시지 갯수) 입니다.

간단히, 한 토픽에서 한 컨슈머 그룹이 소비해야하는 메시지 갯수입니다.

 

 

4. 마무리

이번 포스팅에서는 카프카에 대해 알아보았습니다.

다음 포스팅부터는 카프카 설치 및 cli를 통한 사용예제를 포스팅하도록 하겠습니다.

 

반응형

+ Recent posts