1. 서론
안녕하세요.
저희 조직은 DB cdc를 kafka 로 흘려주며 데이터의 스키마는 schema registry를 사용하여 관리하고 있습니다.
최근 spring kafka + schema registry + gradle plugin 를 사용하여 application 에서 cdc 데이터를 consume 하는 작업을 하였는데
다른 분들에게 공유하기에 좋다고 생각들어 포스팅하였습니다.ㅎㅎ
2. gradle plugin
schema registry 를 사용하여 kafka record를 consume하는 과정은 아래 절차로 이루어져야 합니다.
- schema registry 를 통해 avro 스키마 DOWNLOAD
- DOWNLOAD 한 avro 스키마를 기반으로 java class 생성
- 생성된 java class 를 kafka consume record의 모델로 사용
이때, 1~2번의 과정은 빌드 도구인 gradle에서 제공하는 plugin이 있습니다.
1) com.github.imflog:kafka-schema-registry-gradle-plugin
schema registry에서 avsc file을 다운로드 하는 plugin 입니다.
plugin 적용 방법은 아래와 같습니다.
1. repositories 추가
build.gradle 파일에 repository를 등록해주세요.
2. dependencies classpath 추가
repository를 등록했으니 필요한 dependencies classpath를 추가합니다.
이때, 버전 호환이 중요합니다.
kafka-schema-registry-gradle-plugin 과 gradle-avro-plugin 는 모두 org.apache.avro:avro를 참조하고 있습니다.
org.apache.avro:avro 의 1.8.x 버전은 bug 도 존재할 뿐더러 time 관련된 부분을 java.utile.time 이 아닌 joda를 사용하고 있는데요.
때문에, 가능하다면schema-registry-plugin:5.5.1 , gradle-avro-plugin:0.21.0 의 조합으로 사용하는것을 권장합니다.
권장하는 조합의 plugin 버전은 org.apache.avro:avro 1.10.x 를 사용하며
miner 버전 조합인 schema-registry-plugin:5.3.4 , gradle-avro-plugin:0.16.0 은 org.apache.avro:avro 1.8.x를 사용하고 있습니다.
3. gradle task 추가
아래와 같이 plugin은 apply 하게 되면 2번째 사진과 같이 schemaRegistry 구문을 사용할 수 있습니다.
download.subject는 subjectName, download path 로 이해하시면 됩니다.
저의 경우 consume만 하므로 download task만 적용하였습니다.
plugin에 대한 자세한 내용은 여기를 참고해주세요.
2) com.commercehub.gradle.plugin:gradle-avro-plugin
avsc file 을 사용하여 java class를 생성해주는 plugin 입니다.
plugin 적용 방법은 아래와 같습니다.
1. repository 및 dependencies 추가
buildscript {
repositories {
jcenter()
}
dependencies {
classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:VERSION"
}
}
2. gradle plugin apply 추가
apply plugin: "com.commercehub.gradle.plugin.avro-base"
위 1~2번까지 완료하신 후 gradle sync를 한번 해주신다면 generateAvro라는 task 를 사용할 수 있습니다.
해당 task 를 수행하게 되면 avsc 파일을 사용하여 java class를 생성하게 됩니다.
avro file path, gerate java class path 같은 부분은 gradle 파일에서 custom 가능합니다.
3. 적용 시 겪은 문제점
저의 경우 2개의 plugin 모두 적용 후 실제 app에서 kafka record를 가져가는데 문제가 있었습니다.
1) stringType
문제에 대해 설명하기 전 avro의 string type은 3가지를 지원하고 있습니다.
Utf8의 경우에는 python과 같은 java 뿐만이 아닌 언어의 string type까지 지원하기 위해서 인것 같습니다.
- CharSequence (java.lang.CharSequence)
- String (java.lang.String)
- Utf8 (org.apache.avro.util.Utf8)
기본은 String 이며 아래와 같이 변경도 가능합니다.
문제는 com.commercehub.gradle.plugin:gradle-avro-plugin 를 통해 생성한 java class에 kafka record를 setting 할때 발생했습니다.
kafka record를 java class에 세팅하는 작업은 plugin을 통해 만든 java class의 put method를 통해 이루어 집니다.
put 메서드는 아래와 같이 type casting을 (java.lang.String) 으로 하게 되는데요.
제가 consume할 kafka record는 python code 로 publishing을 하고 있어 type casting exception이 발생하게 됩니다.
하지만, 이 문제는 다행히 plugin version up으로 해결되었습니다.
최신 version인 gradle-avro-plugin:0.21.0 를 사용하면 아래와 같이 put 메서드가 만들어집니다.
.toString으로 type casting 하도록 수정되어 더이상 type casting 예외는 발생하지 않게됩니다.
2) java.time.Instant
저희는 kafka record에서 timestamp 관련 데이터는 long type의 logicalType은 timestamp-millis로 정의해서 사용하고 있습니다.
이 경우, put method는 아래와 같이 만들어집니다.
type이 long인데 , java.time.Instant 타입을 사용하는게 의아해 할 수 있는데요.
이유는 logicalType으로 timestamp-millis 를 지정했기 때문입니다.
참고 - TimeConversions
하지만, 실제로 데이터를 consume 과정에서는 아래 예외가 발생하게 됩니다.
java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.time.Instant
이유는, 역직렬화시 long type의 데이터를 logicalType 명시에 맞도록 casting 되지 않아서 입니다.
이 문제는, kafka consumer의 설정으로 해결할 수 있었습니다.
kafka + schema registry 연계를 많이 사용하고 있어 kafka 측에서는 consumer에 아래와 같은 옵션을 제공합니다.
기본은 false이며, avro schema를 사용하여 consume 하는 경우에는 해당 옵션을 true로 변경하여 사용하면 됩니다.
4. 마무리
이번 포스팅은 spring kafka + schema registry + gradle plugin 적용에 대해서 진행했습니다.
모두, 저같이 삽질을 하지 않았으면 좋겠다는 마음으로 작성하게 되었는데요.
도움이 되었으면 좋겠네요ㅎㅎ
감사합니다.
'MQ > Kafka' 카테고리의 다른 글
(7) Kafka Trouble Shooting (0) | 2021.11.19 |
---|---|
(5) 카프카 사용 시 고려사항 (0) | 2020.02.26 |
(4) 카프카 매니저 & 스키마 레지스트리 (0) | 2020.02.26 |
(3) 카프카 사용 예제 (0) | 2020.02.25 |
(2) 카프카 설치 (0) | 2020.02.25 |