반응형

1. 서론

이번 포스팅에서는 Apache Impala에 대해 소개하려고 합니다.

 

Cloudera 도큐먼트를 보며 개인적으로 6.x 버전부터 Impala 와 Kudu 조합을 추천하고 있다고 느꼈습니다.
때문에, Hadoop에 관심이 있으신 분이라면 Impala와 Kudu에 대해 알아보는 것을 추천합니다.

물론, Impala는 Kudu가 아니더라도 File에 대해서도 SQL을 수행할 수 있는 서비스입니다.

 

2. Impala란?

임팔라는 Hadoop Eco에 저장되어 있는 데이터를 실시간 병렬 조회가 가능한 SQL 쿼리 엔진입니다.

 

Impala는 아래와 같이 병렬성을 가질수 있는 아키텍처로 이루어져 있습니다.

 

 

 

1) Impalad

 

Impalad는 impala daemon으로 실제 쿼리를 수행하는 역할을 담당합니다.

 

Impalad는 그림과 같이 Query Compiler, Query Coordinator, Query Executor로 이루어져 있습니다.

 

Compiler는 말그대로 client에서 요청받은 SQL 질의를 Compile 한 후 어떻게 처리할지 결정합니다.

Coordinator는 Compiler가 전달한 요청서를 받아 각각 다른 호스트에 있는 Impalad의 Executor에 요청합니다.

이는 데이터의 locality를 보장하며 이로인해 분산 질의 처리가 가능해지며 성능이 향상됩니다.

 

마지막으로 Executor는 실제로 자신이 담당하는 데이터에 대해서 SQL 질의를 수행하는 역할을 합니다.

질의 수행이 끝난 Executor는 Coordinator에게 결과를 반환합니다.

 

 

여기서, 눈치 채셨겠지만 각 Impalad는 metadata를 가지고 있습니다.

metadata는 필요한 데이터가 어느 호스트(노드)에 있는지에 대한 mapping 정보가 담겨져 있습니다.

 

Impalad가 metadata를 가지고 있기 때문에, Client의 요청은 모든 Impalad가 수신 할 수 있습니다.

 

 

2) StateStore

 

StateStore는 각 Impalad의 상태를 관리하며 데이터 일관성을 위해 Metadata의 변경 사항을 모든 Impalad에 브로드캐스팅하는 역할을 담당합니다.

 

StateStore는 Impala 서비스에서 없다고 동작이 안하진 않습니다.

다만, 클러스터 내에서 Impalad가 제외된 경우 쿼리 가능한 daemon 그룹에서 제외를 못하며, metadata 갱신이 안되는 경우가 발생할 수 있습니다.

 

때문에, 사실상 운영시 StateStore도 띄어야 합니다.

 

3) Catalog Service

 

Catalog Service는 Impala 쿼리로 데이터의 CUD 혹은 DDL 작업 시,

이를 impalad의 metadata에 반영하기 위해 StateStore에 브로드캐스팅해달라고 요청하는 역할을 담당합니다.

브로드캐스팅은 StateStore를 통해서 수행되므로 Catalog Service와 StateStore는 같은 호스트에서 수행하는 것이 좀 더 효율적입니다.

그림에서와 같이 Catalog Service도 metadata가 있으며, 이 metadata가 원본이라고 생각하시면 됩니다.

 

3. Impala 사용시 주의점

Impala는 HMS( Hive MetaStore ) 가 떠있어야 가능합니다.

 

그 이유는 Catalog Service가 가지고 있는 Metadate 관리는 실제로 HMS에서 관리하는 데이터를 복사한것이기 때문입니다.

 

때문에, Hive와 Impala를 같이 사용시 주의할 점이 생기게 됩니다.

 

주의할 점은 HiveQL로 인한 metadata 변경은 impala에서 알 수 없다는 점입니다.

HiveQL로 변경된 데이터는 HMS에는 반영되나 impala에서 원본 metadata를 관리하는 Catalog Service에는 반영이 안되기 때문입니다.

 

하지만, 위에서 말씀드린것처럼 Catalog Service의 metadata 은 HMS에서 Copy한 것입니다.

그러니, HiveQL로 변경된 부분을 Impala metadata에도 반영하고 싶은 경우에는 HMS에서 Copy를 다시 하면 됩니다.

 

다시 변경 부분만을 Copy 해와 Impalad에 브로드캐스팅할 수 있도록 Impala는 REFRESH, INVALIDATE METADATA 쿼리를 제공합니다.

 

그럼, Hive에서도 Impala 쿼리로 인한 변경사항을 반영해야하는 문제가 있을까요?

결국 Metadata의 원본은 HMS이며, Impala에서 변경된 부분은 HMS에도 반영됩니다.

또한, Hive는 매 질의시 HMS에서 Metadata를 조회해 처리하기 때문에 Hive는 REFRESH,INVALIDATE METADATA 와 같은 작업을 하지 않아도 됩니다.

 

반응형

 

4. Hive 와 Impala 차이점

Hive와 Impala의 차이점은 동작의 차이로 인해 발생합니다.

Hive는 SQL을 통해 편하게 MR을 수행하는 서비스입니다.

Impala는 MR로 동작하지 않습니다. 위에서 소개한것과 같이 Impala는 MR이 아닌 SQL 엔진을 통해 동작합니다.

 

이로인해, Impala는 Hive에 비해 처리속도가 빠릅니다.

그렇다고, Hive를 모두 Impala 로 대체하는것은 옳바르진 않습니다.

 

우선, Hive, Impala 모두 OLAP 적합한 서비스입니다. 다만, Impala는 SQL 처리가 메모리상에서 이루어집니다.

때문에, 호스트에 메모리가 부족하거나 메모리를 너무 과도하게 차지하는 쿼리의 경우에는

Hive를 사용하는것이 클러스터에 부담도 주지않으며 좋은 선택일 수 있습니다.

물론, Hive는 MR이기에 Spark로 대체하는 방법도 좋은 방안입니다.
하지만, Spark도 기본은 메모리 연산이기에 각 요구사항에 맞게 선택하여 사용해야 합니다.

 

또한, Hive, Impala 는 OLAP에 적합한 서비스라고 했습니다. 하지만 이는 Hdfs 파일에 대해서 쿼리를 날리는 경우입니다.

 

Impala 를 SQL 인터페이스로 사용하지만 실제 데이터는 Kudu 혹은 Hbase에 있다면, 쿼리는 Kudu, Hbase 특성에 맞게 작성해야합니다.

예를들어, Kudu의 경우에는 PK에 대해서 B+Tree로 index를 가지고 있기 때문에 Where 절에 PK 넣는 쿼리의 경우에는 데이터가 대용량이 아니더라도 응답속도가 빠르게 나오게 됩니다.

impalad 가 Kudu Master 혹은 Hbase Master에 요청하여 데이터를 가져오기 때문입니다.
이런 경우에는 metadata도 사실상 무의미하며 해당 데이터를 담당하는 Kudu, Hbase 영역으로 넘어가게 됩니다.

하지만, 이런 저장소에서 제공하는 별도의 클라이언트를 사용하지 않고 Impala 를 통해 간편하게 SQL로 데이터를 Handling할 수 있다는 점이 Impala의 장점입니다.

 

5. HAProxy

Client의 요청은 모든 Impalad가 수신 할 수 있다고 했습니다.

때문에, 관리자 입장에서는 하나의 Impalad에만 요청이 쏠리지 않도록 HAProxy를 구축해야 합니다.

 

HAProxy를 구축하면 얻는 이점은 아래와 같습니다.

 

  1. 부하 분산
  2. Client의 요청을 단일 endpoint 로 관리 가능
    1. Impalad가 추가된다면 HAProxy에 추가된 Impalad 호스트만 추가하면 됩니다.
Cloudera Impala Document 에서는 무료 오픈소스인 haproxy 를 소개하고 있습니다.

 

6. 마무리

이번 포스팅에서는 Impala에 대해 알아봤습니다.

 

부족한 설명이였지만 읽어주셔서 감사합니다.

반응형

'BigData > Impala' 카테고리의 다른 글

Impala - Hibernate  (1) 2021.02.24
Impala - Mybatis  (0) 2021.02.24
반응형

1. 서론

이번 포스팅에서는 Apache Hadoop Eco 저장소 중 하나인 Kudu 에 대해 소개하려고 합니다.

개인적으로 최근 Kudu 를 사용하는 회사들이 늘어나는거 같습니다.
아무래도, cloudera 측에서 impala와의 연계 저장소로 추천하고 있다는 점이 크다고 봅니다.

 

소개 목차로는 아래와 같습니다.

 

  • Kudu란?
  • Hbase와의 차이점
  • Kudu 지원 & 미지원

2. Kudu란?

Kudu는 Hadoop Eco 저장소 중 하나이며, Columnar Storage 입니다.

Columnar Storage인 이유는 Mongo, Hbase와 같이 schemaless 여서가 아니며, 물리적으로 Column 별로 파일에 저장하기 때문입니다.

 

Kudu 는 분산 플랫폼으로 아래와 같은 아키텍처를 가지고 있습니다.

 

 

1) Tablet

 

Kudu에서 테이블은 파티셔닝 테이블입니다.

파티셔닝 테이블이란 특정 column을 기준으로 데이터를 나눠 저장한다는 의미입니다.

 

바로 이 파티셔닝 테이블에서 하나의 파티셔닝이 Tablet 입니다.

데이터를 샤딩하는것과 같으며, 하나의 샤딩이라고 보시면 됩니다.

 

이 Tablet은 Leader, Follower로 이루어져 있습니다.

모든 Write 요청은 Leader가 받으며, Read 요청은 Leader, Follower 모두 받습니다.

 

Leader, Follower에서 Read가 가능하게 하기 위해 Write의 동작방식은 아래와 같이 이루어지게 됩니다.

 

 

 

 

데이터 유실을 방지하기 위해 가장 먼저 Leader의 WAL에 쓰기 작업을 하며,

그 이후는 각 Follwer에게 쓰기를 요청하여 모두 성공한 경우 클라이언트에게 성공 응답을 보내게 됩니다.

개인적으로 이런 부분은 분산 플랫폼 중 Kafka와 유사하다고 볼 수 있습니다.

 

2) Tablet Server

 

Tablet Server는 Tablet 을 가지고 있는 서버를 말하며, 여러개의 Tablet 을 가지고 있습니다.

 

Tablet Server는 크게 Master 서버와 Tablet 서버로 나누어지며,

각 역할은 Hdfs의 Name 노드와 Data 노드와 같이 Tablet의 메타데이터는 Master Server, 실제 데이터는 Tablet Server에 있습니다.

Master 서버의 메타데이터 또한 Leader, Follwer 구조로 이루어져 있습니다.

 

3) MRS

 

MRS는 MemRowSet으로 WAL에 데이터 Write 후 써지는 저장소입니다.

메모리로 이루어져 있으며 B+ tree로 구성되어져 있습니다.

 

아래는 MRS 그림입니다.

 

 

 

4) DRS

 

MRS가 일정 시간 혹은 크기가 넘어가게 되면 Disk에 Write를 하게됩니다.

바로 Write하는 곳이 DiskRowSet이며 DRS 라고 합니다.

 

5) DeltaMemeStore

 

DeltaMemStore는 Update 요청으로 데이터 변경분을 주기적으로 Redo 파일에 Write하는 메모리 영역입니다.

 

Kudu는 MRS, DRS, DeltaMemStore와 같이 중간에 메모리와 디스크를 통해 영구적인 데이터 보존과 함께, 속도까지 겸비한것을 알 수 있습니다.

 

하지만, 조회를 하는 클라이언트 입장에서는 MRS, DRS, RedoFile 등등 조회할 부분이 많습니다.

이를 위해, Kudu는 compaction 작업을 통해 조회시 접근할 메모리 영역과 디스크 영역을 최소화합니다.

 

 

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

 

3. Hbase와의 차이점

 

Kudu와 Hbase는 비슷한 점이 많습니다.

비슷한 부분은 Tablet <-> Region, Tablet Server <-> Region Server, Columnar Storage 등이 있습니다.

 

하지만 apache 깃헙을 가보시면 Hbase, Kudu는 별도의 프로젝트이며, 엄연히 사용처가 다른 것을 짐작할 수 있습니다.

 

Hbase와 Kudu의 가장 큰 차이점은 자료구조입니다.

Hbase는 LSM인 Log Structured Merge 방식을 사용하며, Kudu는 B+tree 방식의 MRS, DRS를 사용합니다.

 

이로인해, key를 통해 read하는 경우 Kudu가 Hbase에 비해 속도가 우수합니다.

물론, Hbase의 경우에도 Read 성능을 올리기위해 저장파일인 HFile이 Multi-layerd index 방식으로 이루어져 있습니다. 

 

두번째 차이점으로는 저장 파일단위입니다.

Hbase는 HFile, Kudu는 CFile로 데이터를 저장합니다.

 

이로인해, table의 컬럼별 집계와 같은 aggregation 쿼리의 경우 Kudu가 File IO가 적게 들어 우수한 성능이 나옵니다.

 

이러한 차이점으로 인해 Kudu가 우수한 부분도 있지만 Hbase가 우수한 부분도 있습니다.

 

첫째 Write 성능

 

Hbase는 MemStore, LSM 의 방식으로 인해 Write의 경우 Kudu보다 성능이 좀 더 우수합니다.

 

두번째 Scan 성능

 

Hbase의 경우 내부적으로 SortedMap 과 같이 정렬하여 데이터를 저장하고 있습니다.

이는, Scan 연산시 kudu에 비해 유리하게 작동할 수 있습니다.

 

세번째 컬럼별 버저닝

 

Hbase는 컬럼별 버저닝을 지원하고 있습니다. 이는 kudu에 비해 우수한 부분이기 보단 차이점으로,

이력 데이터를 관리하기에 용이한 저장소입니다.

 

4. Kudu 지원 & 미지원

kudu는 현재 지원되는 부분과 지원되지 않는 부분이 있습니다.

 

지원되는 부분으로는 아래와 같습니다.

 

  1. compound primary key
  2. single row transaction

미지원 항목은 아래와 같습니다.

 

  1. secondary indexes
  2. multi row transaction
  3. TTL(Time-to-Live)

 

5. 마무리

이번 포스팅에서는 Kudu에 대해 알아봤습니다.

 

Kudu가 Hbase의 대체제는 아니며

서비스 특성에 따라 Kudu 가 잘 맞을수도, Hbase가 잘 맞을수도 있습니다.

 

좋은 서비스를 위해서는 각 저장소의 특징을 알고, 적절하게 선택하여 사용해야 합니다.

 

감사합니다.

반응형
반응형

1. 서론

안녕하세요.

 

저희 조직은 DB cdc를 kafka 로 흘려주며 데이터의 스키마는 schema registry를 사용하여 관리하고 있습니다.

 

최근 spring kafka + schema registry + gradle plugin 를 사용하여 application 에서 cdc 데이터를 consume 하는 작업을 하였는데

다른 분들에게 공유하기에 좋다고 생각들어 포스팅하였습니다.ㅎㅎ

 

2. gradle plugin

schema registry 를 사용하여 kafka record를 consume하는 과정은 아래 절차로 이루어져야 합니다.

 

  1. schema registry 를 통해 avro 스키마 DOWNLOAD
  2. DOWNLOAD 한 avro 스키마를 기반으로 java class 생성
  3. 생성된 java class 를 kafka consume record의 모델로 사용

이때, 1~2번의 과정은 빌드 도구인 gradle에서 제공하는 plugin이 있습니다.

 

1) com.github.imflog:kafka-schema-registry-gradle-plugin

 

schema registry에서 avsc file을 다운로드 하는 plugin 입니다.

 

plugin 적용 방법은 아래와 같습니다.

 

1. repositories 추가

 

build.gradle 파일에 repository를 등록해주세요.

 

 

2. dependencies classpath 추가

 

repository를 등록했으니 필요한 dependencies classpath를 추가합니다.

 

 

이때, 버전 호환이 중요합니다.

 

kafka-schema-registry-gradle-plugin 과 gradle-avro-plugin 는 모두 org.apache.avro:avro를 참조하고 있습니다.

 

org.apache.avro:avro 의 1.8.x 버전은 bug 도 존재할 뿐더러 time 관련된 부분을 java.utile.time 이 아닌 joda를 사용하고 있는데요.

때문에, 가능하다면schema-registry-plugin:5.5.1 , gradle-avro-plugin:0.21.0 의 조합으로 사용하는것을 권장합니다.

권장하는 조합의 plugin 버전은 org.apache.avro:avro 1.10.x 를 사용하며
miner 버전 조합인 schema-registry-plugin:5.3.4 , gradle-avro-plugin:0.16.0 은 org.apache.avro:avro 1.8.x를 사용하고 있습니다.

 

3. gradle task 추가

 

아래와 같이 plugin은 apply 하게 되면 2번째 사진과 같이 schemaRegistry 구문을 사용할 수 있습니다.

download.subject는 subjectName, download path 로 이해하시면 됩니다.

저의 경우 consume만 하므로 download task만 적용하였습니다.

 

 

plugin에 대한 자세한 내용은 여기를 참고해주세요.

 

 

2) com.commercehub.gradle.plugin:gradle-avro-plugin

 

avsc file 을 사용하여 java class를 생성해주는 plugin 입니다.

 

plugin 적용 방법은 아래와 같습니다.

 

1. repository 및 dependencies 추가

 

buildscript { 
	repositories { 
    	jcenter() 
    } 
    dependencies { 
    	classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:VERSION" 
    } 
}

 

2. gradle plugin apply 추가

 

apply plugin: "com.commercehub.gradle.plugin.avro-base"

 

위 1~2번까지 완료하신 후 gradle sync를 한번 해주신다면 generateAvro라는 task 를 사용할 수 있습니다.

 

해당 task 를 수행하게 되면 avsc 파일을 사용하여 java class를 생성하게 됩니다.

avro file path, gerate java class path 같은 부분은 gradle 파일에서 custom 가능합니다.

 

 

반응형

 

 

3. 적용 시 겪은 문제점

저의 경우 2개의 plugin 모두 적용 후 실제 app에서 kafka record를 가져가는데 문제가 있었습니다.

 

1) stringType

 

문제에 대해 설명하기 전 avro의 string type은 3가지를 지원하고 있습니다.

Utf8의 경우에는 python과 같은 java 뿐만이 아닌 언어의 string type까지 지원하기 위해서 인것 같습니다.
  • CharSequence (java.lang.CharSequence)
  • String (java.lang.String)
  • Utf8 (org.apache.avro.util.Utf8)
기본은 String 이며 아래와 같이 변경도 가능합니다.

 

문제는 com.commercehub.gradle.plugin:gradle-avro-plugin 를 통해 생성한 java class에 kafka record를 setting 할때 발생했습니다.

 

kafka record를 java class에 세팅하는 작업은 plugin을 통해 만든 java class의 put method를 통해 이루어 집니다.

 

put 메서드는 아래와 같이 type casting을 (java.lang.String) 으로 하게 되는데요.

제가 consume할 kafka record는 python code 로 publishing을 하고 있어 type casting exception이 발생하게 됩니다.

 

 

하지만, 이 문제는 다행히 plugin version up으로 해결되었습니다.

 

최신 version인 gradle-avro-plugin:0.21.0 를 사용하면 아래와 같이 put 메서드가 만들어집니다.

 

.toString으로 type casting 하도록 수정되어 더이상 type casting 예외는 발생하지 않게됩니다.

 

 

2) java.time.Instant

 

저희는 kafka record에서 timestamp 관련 데이터는 long type의 logicalType은 timestamp-millis로 정의해서 사용하고 있습니다.

 

이 경우, put method는 아래와 같이 만들어집니다.

 

 

type이 long인데 , java.time.Instant 타입을 사용하는게 의아해 할 수 있는데요.

이유는 logicalType으로 timestamp-millis 를 지정했기 때문입니다.

참고 - TimeConversions

 

하지만, 실제로 데이터를 consume 과정에서는 아래 예외가 발생하게 됩니다.

java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.time.Instant 

 

이유는, 역직렬화시 long type의 데이터를 logicalType 명시에 맞도록 casting 되지 않아서 입니다.

 

이 문제는, kafka consumer의 설정으로 해결할 수 있었습니다.

kafka + schema registry 연계를 많이 사용하고 있어 kafka 측에서는 consumer에 아래와 같은 옵션을 제공합니다.

 

 

기본은 false이며, avro schema를 사용하여 consume 하는 경우에는 해당 옵션을 true로 변경하여 사용하면 됩니다.

 

4. 마무리

이번 포스팅은 spring kafka + schema registry + gradle plugin 적용에 대해서 진행했습니다.

 

모두, 저같이 삽질을 하지 않았으면 좋겠다는 마음으로 작성하게 되었는데요.

도움이 되었으면 좋겠네요ㅎㅎ

 

감사합니다.

반응형

'MQ > Kafka' 카테고리의 다른 글

(7) Kafka Trouble Shooting  (0) 2021.11.19
(5) 카프카 사용 시 고려사항  (0) 2020.02.26
(4) 카프카 매니저 & 스키마 레지스트리  (0) 2020.02.26
(3) 카프카 사용 예제  (0) 2020.02.25
(2) 카프카 설치  (0) 2020.02.25

+ Recent posts