반응형

1. 서론

Hadoop eco system에서 비관계형 분산 데이터 베이스로 많이 사용하는 Hbase에 대해 공부한 내용을 공유하고자 합니다.

책은 HBase 완벽 가이드 를 통해 공부하였습니다.

 

이번 포스팅에서는 1장인 소개부분을 진행하도록 하겠습니다.

 

2. 관계형 데이터 베이스 시스템의 문제점

 

책에서는 RDB에 대한 확장성 측면에서 문제점을 얘기하고 있습니다.

물론, RDB의 대표적인 MySql, Oracle, Postgres 등은 실제 서비스에서도 많이 사용되고 있는 DB이며,
현재는 확장성을 고려해 MySql, Postgres 등은 분산 클러스터 기능도 제공하고 있습니다.

 

DB 기반 서비스는 확장됨에 따라 DB 성능이 매우 중요해지게 됩니다.

 

이 성능을 올리기 위해 일반적으로 아래와 같은 방법을 사용할 수 있습니다.

 

  1. Read 질의를 위한 슬레이브 데이터 베이스 사용
  2. 멤캐시드와 같은 캐시 추가

하지만, 위 2가지 방법의 경우에는 Read를 위한 방법으로 Write에 대한 성능 향상은 아니며,

마스터/ 슬레이브 구조에서는 마스터와 슬레이브의 컴퓨팅 성능의 차이가 클수록 성능에도 악영향을 끼칠수 있게 됩니다.

 

결국, DB를 구성한 서버의 자원을 모두 올려야하는 비용이 들며, 캐시를 사용하게 되면 순간적인 데이터 일관성이 깨지게 되는 문제점이 발생할 수 밖에 없게 됩니다.

 

최종적으로는 RDB의 장점인 정규화를 없애고, 성능을 위해 비정규화를 하게 됩니다.

정규화는 결국 데이터를 분리하게 되는것입니다. 그렇다는것은 데이터를 가져오기 위해서 join연산을 수행해야 합니다.
하지만 RDB에서는 이 join 연산이 비싸기 때문에 성능을 올리기 위해서는 일반적으로 비정규화를 하게 됩니다.

 

3. RDBMS vs NoSql

 

RDBMS와 NoSql은 정반대의 관계가 아닙니다.

 

그 이유로는 상용에 있는 RDBMS와 NoSql 종류에 따라 저장 방식도 모두 다르고 지향하는 점이 다르기 때문입니다.

개인적으로, RDBMS와 NoSql의 구분은 스키마의 자유도로 나누면 된다고 생각합니다.
트랜잭션 강도를 제외한 이유로는 DBMS의 제품에 따라 NoSql 이라도 강한 제품이 있기 때문입니다.

 

그렇기에, 서비스에 맞는 DBMS를 선택해야 합니다. 책에서는 선택할 때 고려해야할 기준을 소개하고 있습니다.

 

1) 데이터 모델

 

데이터 모델로는 대표적으로 아래와 같이 있습니다.

 

  • 키/값 방식
  • 반 구조적
  • 컬럼지향 방식
  • 문서 지향 방식

 

2) 저장 모델

 

인메모리 방식인지, 영구저장 방식인지도 기준으로 들 수 있습니다.

 

영구저장의 경우 디스크에 쓰게되며, 이 경우 성능에 어떤 영향을 미치는지도 고려해야 합니다.

 

3) 일관성 모델

 

일관성 정책의 엄격함도 고려해야 합니다.

 

각 DBMS에 따라 느슨한경우와 엄격한 경우가 있기 때문입니다.

 

4) 물리적 모델

 

물리 장치가 단일한지 분산인지도 고려해야 합니다.

 

단일 장치의 장/단점과 분산 장치의 장/단점이 존재하기 때문입니다.

 

간단한 예로 분산의 경우는 네트워크 비용이 어쩔수 없이 들게 된다는 단점이 있지만,
수평 확장이 가능하여 저장 부분에서는 무한하다는 장점도 가지고 있습니다.

 

5) 읽기/쓰기 성능

 

현재 서비스는 DB에 읽기와 쓰기 중에 비율이 어떤지도 고려해야합니다.

 

아래 크게 3가지에 따라서 선택해야 하는 DBMS도 다르지만 DBMS의 정책도 달라지기 때문입니다.

 

  1. 읽기 > 쓰기
  2. 읽기 = 쓰기
  3. 읽기 < 쓰기

 

6) 보조색인

 

보조색인의 필요성도 고려대상에 포함됩니다.

 

보조색인이 필요 없다고 판단될 땐 확장성을 생각하여 어플리케이션에서 충분히 커버가 가능한지도 알아봐야 합니다.

 

7) 장애 처리

 

장애처리에 대해서 각 DBMS들은 어떠한 대처를 하는지도 알아봐야 합니다.

 

데이터가 인메모리 저장 방식의 경우 각 DB 서버는 graceful shutdown으로 디스크에 저장되도록 되어 있지 않다면 데이터 유실이 발생할 것 입니다.

 

8) 압축

 

압축이 기본적으로 제공되는지 혹은 필요시 플러그인으로 압축 기법을 사용할 수 있는지도 고려 대상에 포함됩니다.

 

9) 부하 분산

 

읽기/쓰기에 대한 부하를 DB 자체적으로 분산해주는지도 고려대상입니다.

 

(처리량이 많은 어플리케이션 구조에서 고려해보면 됩니다.)

 

10) 원자적 읽기, 갱신, 쓰기

 

원자적인 CRUD가 가능한지도 고려 대상입니다.

 

DB에서 제공하는지의 여부에 따라 클라이언트 측 어플리케이션의 복잡도에 영향이 가기 때문입니다.

 

11) 락걸기, 대기, 데드락

 

데이터 접근 시 어떤 유형의 락 모델을 제공하는지도 고려대상입니다.

 

이것은 성능에도 직접적인 영향이 있기 때문에 놓치지 않아야 하는 기준입니다.

 

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

4. 구성 요소

이제 Hbase에 대한 구성요소를 간단히 소개하겠습니다.

 

1) 테이블

 

Hbase에는 RDBMS와 동일하게 테이블이라는 구성 요소가 존재합니다.

 

2) 로우

 

로우는 유일한 키인 로우 키를 가지고 있습니다.

또한, 이 로우가 다수 모여 테이블을 이루게 됩니다.

 

추가로,로우는 로우키를 기준으로 사전 편찬식으로 저장되어 집니다.

 

아래는 scan했을때의 예입니다.

 

row-1 column=cf1:, timestamp=11111
row-10 column=cf1:, timestamp=11111
row-11 column=cf1:, timestamp=11111
row-2 column=cf1:, timestamp=11111

 

사전편찬식 정렬에서는 2진수 수준에서 바이트 단위로 왼쪽부터 비교하게 됩니다.

그로인해, 위와 같이 row-10, row-11이 row-2보다 위에 있게 됩니다.

 

3) 컬럼

 

컬럼은 상위에 컬럼패밀리라는 것을 가져야 합니다.

 

컬럼패밀리는 데이터를 의미적으로 분류하게 해주는 것으로, 이 컬럼 패밀리 단위로 압축이나 메모리 상주같은 설정이 가능하게 됩니다.

 

또한, 컬럼패밀리 안의 모든 컬럼은 HFile이라는 하나의 Hbase에서 관리하는 저수준 저장 파일에 함께 저장됩니다.

 

컬럼패밀리는 테이블 생성될 때 최소 하나 이상은 정의해야하며, 갯수가 많아서는 안되는 제약사항이 있습니다.

 

그래서, Hbase에서의 컬럼은 '컬럼패밀리:퀄리파이어' 로 표현할 수 있습니다.

여기서 퀄리파이어는 바이트 배열로 패밀리안에 속한 컬럼키라고 보시면 됩니다.

 

퀄리파이어의 경우 컬럼패밀리와 달리 갯수에 제약사항이 없어, 하나의 컬럼패밀리안에는 수백만개의 퀄리파이어를 저장할 수 있습니다.

또한, 데이터 타입이나 길이에도 제한이 없습니다.

 

4) 셀

 

셀은 컬럼의 값으로서 타임스탬프를 가지고 있습니다.

 

타임스탬프를 가지고 있다는 것은 컬럼의 값을 타임스탬프 기준으로 버저닝한다는 의미입니다.

 

타임스탬프는 사용자가 직접 지정할 수도 있으며, Hbase 내부적으로 부여할 수도 있습니다.

또한, 동일한 값은 타임스탬프 기준으로 정렬되어 항상 최신의 값을 먼저 읽을 수 있도록 되어 있습니다.

 

추가로, 셀의 경우 바이트 배열로 되어 있어, 클라이언트 측에서 어떻게 처리해야 할지 알고 있어야 합니다. 


아래는 Hbase의 전체적인 구조를 나타낸 것입니다.

 

(Table, RowKey, Family, Column, Timestamp) -> Value

 

정렬의 기능을 추가하여 프로그래밍적으로 나타내게 된다면 아래와 같습니다.

 

SortedMap<RowKey, List<SortedMap<Column, List<Value, Timestamp>>>>

 

5) 원자성

 

 

Hbase는 로우 단위로 컬럼 수와는 무관하게 원자성을 보장하고 있습니다.

 

단, 여러 로우나 테이블에 걸친 원자성이나 트랜잭션을 보장하지 않습니다.

 

 

6) 자동 샤딩

 

Hbase에서는 확장성, 로드밸런싱을 위해 리전이라는 단위를 사용하고 있습니다.

 

리전은 단순히 특정 범위의 로우 집합으로 이해하면 되며,

리전은 사이즈가 커지게 되면 자동으로 분할하게 되며, 반대로 합쳐지기도 합니다.

 

최초 테이블 생성 시에는 하나의 리전이 존재하고, 시스템이 모니터링을 하다 특정 기준이 넘어가면 둘로 분리하게 됩니다.

 

각 리전은 하나의 리전서버에서 운용되며 각 서버는 수많은 리전을 운용할 수 있습니다.

 

리전의 특징으로는 아래와 같습니다.

 

  • 서버 고장 시 리전을 다른 리전 서버로 이동시켜 재빨리 복구 가능
  • 세밀한 로드밸런싱

 

5. Hbase 내부 동작 방식

hbase는 데이터 색인 방식으로 LSM(= Log Structured Merge) Tree 을 사용합니다.

 

hbase는 데이터를 HFile 이라는 파일에 저장하게 되고,

이 파일은 영구 저장, 정렬, 고정 불변의 키/값 쌍의 맵이라고 보시면 됩니다.

 

1) HFile

 

HFile은 연속적인 블록이며 블록에 대한 색인은 블록 끝에 저장되어 있습니다.

색인은 HFile이 열릴때 메모리로 로드되어 사용하게 됩니다.

 

또한, HFile은 위에서 설명한 LSM Tree를 기반으로하여 특정값 혹은 시작값~끝값의 range 스캔이 가능합니다.

 

추가로, 모든 HFile은 블록 색인을 갖고 있어 검색은 단 한번의 디스크 판독으로 수행될 수 있습니다.

검색하고자하는 키를 통해 블록 색인에서 이진탐색이 이루어지게 됩니다.

 

 

2) WAL 

 

WAL은 Write-Ahead-Log로 Hbase는 데이터 갱신시 여기에 먼저 씌어지게 됩니다.

Hbase에서는 커밋로그 라고도 불립니다.

 

WAL에 먼저 쓰여지기 때문에 장애가 났을시에도 데이터의 유실은 있지 않습니다.

 

3) 멤스토어

 

WAL에 씌어진 다음에는 메모리인 멤스토어에 저장이 됩니다.

 

멤스토어에 있는 설정한 값을 넘게 되면 그때야 비로소 HFile에 쓰여지게 됩니다.

HFile에 쓰여진 이후에는 WAL에 있던 쓰여진 데이터도 삭제되어 집니다.

 

데이터를 읽을때에는 HFile과 멤스토어간의 데이터 정합이 안맞을 수 있어, 두 곳에서 데이터를 통합하여 반환하게 됩니다.

 

LSM 트리 구조상 삭제는 바로 할 수 없습니다. 하지만, 삭제표시를 해두어 읽기 연산 시 삭제된것처럼 데이터를 감출 수 있습니다. 

 

4) 컴팩션

 

멤스토어에서 HFile로 write 할 때마다 파일의 수는 증가하기 때문에 Hbase는 내부적으로 컴팩션이라는 것을 수행하게 됩니다.

 

이 컴팩션은 부 컴팩션 과 주 컴팩션으로 구분할 수 있습니다.

 

1. 부 컴팩션

 

부 컴팩션은 작은 파일의 내용을 큰 파일에 병합시켜 파일의 갯수를 줄이는 행위입니다.

 

HFile은 내부적으로 정렬되어 있기 때문에 병합속도는 빠르게 이루어지며, 오직 디스크 입출력 성능에 영향을 받습니다.

 

2. 주 컴팩션

 

주 컴팩션은 하나의 리전안의 컬럼패밀리 하나를 구성하는 모든 파일을 새로운 파일 하나로 다시 쓰는 작업입니다.

 

부 컴팩션과의 차이로는 위에 설명한 삭제표시가 달린 데이터를 다시 쓰는 과정에서 제외시켜 영구적으로 삭제할 수 있다는 점 입니다.

 

6. Hbase 클러스터 구성

Hbase 클러스터는 마스터 서버 한대, 리전 서버 다수로 이루어져 있습니다.

 

1) 마스터서버

 

마스터 서버는 아래와 같은 역할을 수행합니다.

 

  1. 리전 서버에 리전 할당
  2. 리전 서버간의 리전의 부하 분산 처리
  3. 테이블 및 컬럼패밀리의 생성 같은 스키마 변경 사항 및 기타 메타데이터 작업 수행

위 역할들을 마스터 서버가 수행하기 위해서는 주키퍼라는 서비스를 필수로 사용해야 합니다.

 

2) 리전 서버

 

리전 서버는 클라이언트와 통신하며 데이터의 읽기와 쓰기를 담당하는 서버입니다.

 

쓰기도 담당하기 때문에 리전 분할의 역할도 이루어 집니다.

 

6. 마무리

이번에는 간략히 Hbase 소개에 대해서 포스팅하였습니다.

다음에는 2장인 설치 챕터이지만 해당 챕터는 Hadoop 설치 에서 사용한 cloudera manager를 사용하여 Hbase 서비스를 추가하는 것으로 대체하도록 하겠습니다.

반응형

'BigData > Hbase' 카테고리의 다른 글

(5) 클라이언트 API : 관리 기능  (0) 2020.06.02
(4) 클라이언트 API : 고급 기능  (0) 2020.04.19
(3) 클라이언트 API : 기본 기능  (0) 2020.04.09
(2) 설치  (0) 2020.04.08
반응형

1. 서론

이번 포스팅에서는 스트럭처 스트리밍에 대해 알아 보도록하겠습니다.

 

들어가기에 앞서 간략히 스트럭처 스트리밍에 대해 소개하겠습니다.

 

스트럭처 스트리밍은 스파크 2.0에서 제공하는 또 하나의 스트리밍 처리 API 입니다.

 

스트럭처 스트리밍의 경우 앞장에서 본 스파크 스트리밍과는 약간 다릅니다.

 

스파크 스트리밍의 경우, 데이터를 일정 구간과 간격을 두고 마이크로 배치로 처리하는것에 반해

스트럭처 스트리밍은 꾸준히 생성되는 데이터를 무한히 증가하는 하나의 커다란 데이터셋으로 간주하고 처리하는 방식입니다.

즉, 배치처리와 유사한 방법으로 스트리밍 데이터를 처리하도록 제공한다고 이해하시면 됩니다.

 

정확히 말하면 무한히 커지는 데이터셋을 처리하진 않습니다.
앞에서 얘기했던 reduceByKeyAndWindow와 같이 스트리밍 데이터의 상태값, 결과값을 저장하며
다음 스트리밍 데이터를 적절히 합해서 처리하는 방식입니다.

하지만 이러한 복잡한 로직을 스파크가 담당하여 처리해준다는 것에 의미가 있으며,
스파크에게 위임함에 따라 오류 복구처리도 제공합니다.

 

2. 데이터프레임과 데이터 셋 생성

스트럭처 스트리밍에서 사용하는 데이터 모델은 데이터 셋입니다.

이 데이터 셋을 생성하는 방법으로는 기존 DataFrameReader를 사용했었지만,

스트럭처 스트리밍 처리를 위해서는 DataStreamReader를 통해 사용해야 합니다

 

DataStreamReader의 경우는 스파크 세션의 readStream()을 통해 생성이 가능하며,

스파크 2.3.0 버전 기준으로 아래와 같이 지원합니다.

 

  • 파일 소스
  • 카프카 소스
  • 소켓 소스
  • Rate 소스
소켓, Rate 소스의 경우에는 데이터 중복이나 누락이 생길 수 있어 테스트 용도 외에는 사용하지 않는것을 권장합니다.

 

아래는 각 소스별 간단한 사용법입니다.

 

1) 파일 소스

 

path라는 옵션이 있으며, 디렉터리 정보를 넘기면 됩니다.

해당 디렉터리에 새로운 파일이 생길때마다 파일을 open하여 읽고 처리합니다.

 

glob과 같이 패턴 지정이 가능하지만 단 하나만 사용가능합니다.

 

그외 대표적인 옵션으로는 아래와 같습니다.

 

  • maxFilesTrigger = 한 번에 처리 가능한 최대 파일 수
  • lastestFirst = 가장 마지막에 들어온 파일을 가장 먼저 처리할 것인지 설정 여부
  • filenameOnly = 동일 파일 여부를 판단할 때 디렉터리 경로는 무시하고 파일 이름만 비교할 것인지 설정 여부

 

아래는 지원하는 대표적인 파일 포맷입니다.

 

  • csv
  • json
  • parquet
  • text
단, csv와 parquet은 반드시 스키마 정보를 같이 입력해야하며 
생략을 원할땐 spark.sql.streaming.schemaInterface 값을 true로 주어야 합니다.

 

2) 카프카 소스

 

카프카 토픽으로부터 데이터를 읽어 스트림을 생성합니다.

카프카의 경우 스파크와 연계성이 좋으며, 데이터 유실이나 중복이 발생하지 않는 연동 방식을 사용합니다.

 

 

3) 소켓 소스

 

소켓을 통해 데이터를 읽어 스트림을 생성합니다.

간단히 테스트용으로 많이 사용됩니다.

 

 

4) Rate 소스

 

timestamp, value 칼럼을 가지고 있는 데이터 소스이며, 스파크 세션의 readStream 메서드로 생성가능합니다.

소켓소스와 같이 테스트용으로 주로 사용됩니다.

 

 

아래는 파일 소스를 처리하는 간단한 예제 코드입니다. (python)

파일소스의 경우 스파크는 디렉터리를 모니터링하기 때문에,
기존 파일에 append가 아닌 파일을 새로 추가하는 방식으로 작업해야 합니다.

 

source = spark\
	.readStream\
        .schema(StructType().add("name", "string").add("age", "integer"))\
        .json(<path_to_dir>)

 

 

 

 

 

반응형

 

 

 

 

 

3. 스트리밍 연산

스트럭처 스트리밍은 무한한 크기를 가진 데이터셋 또는 데이터프레임으로 간주해서 처리합니다.

때문에, 데이터프레임 또는 데이터 셋 API에서 제공하는 대부분의 연산을 그대로 사용 가능합니다.

 

아래는 스트럭처 스트리밍 연산의 종류와 내용입니다.

 

 

1) 기본 연산 및 집계 연산

 

스트럭처 스트리밍의 경우 고정된 데이터와 실시간으로 생성되는 스트리밍에도 적용 가능한 API입니다.

그로인해, 연산이 가능할때가 있고 가능하지 않을때가 있습니다.

 

그 중, 스트리밍 데이터와 크기가 고정된 데이터 모두 사용 가능한 일반 연산으로 map, flatMap, reduce, select 등이 있습니다.

 

아래는 유의 사항이 있는 연산들입니다.

 

  • groupBy, agg = 스트리밍 데이터의 경우 하나 이상의 집계연산을 중복 적용할 수 없습니다.
  • limit, take, distinct = 스트리밍 데이터를 대상으로 limit, take, distinct 메서드는 호출할 수 없습니다.
  • sorting = 스트리밍 데이터에 대한 정렬은 집계 연산 적용 후 complete 출력 모드의 경우에만 가능합니다.
  • count = count 연산 역시 집계 연산의 결과에만 가능합니다.
  • foreach = 스트리밍 데이터셋의 경우 ds.writeStream.foreach 형태로만 사용 가능합니다.
  • show = 스트리밍 데이터셋의 경우 show가 아닌 DataStreamWriter의 출력 포맷을 콘솔로하여 출력해야 합니다.

 

2) 윈도우 연산

 

윈도우 연산은 스트리밍 데이터를 다룰 대 자주 사용되는 데이터 처리 기법 중 하나입니다.

스트럭처 스트리밍 역시 이 윈도우 연산을 지원합니다.

 

단, 앞의 스파크 스트리밍의 윈도우 연산과는 다른 방식으로 동작합니다.

이유는, 스트럭처 스트리밍은 윈도우를 데이터를 읽어들이는 단위가 아닌 그룹키로 사용하기 때문입니다.

 

아래는 스트럭처 스트리밍에서 윈도우 연산늘 통해 5분마다 단어 수 세기 예제 입니다.

 

spark = SparkSession.builder...

lines = spark\
	.readStream\
        .format("socket")\
        .option("host", "localhost")
        .option("port", "9999")\
        .option("includeTimestamp", False)\
        .load()\
        .select(col("value").alias("words"), current_timestamp().alias("ts"))

words = lines.select(explode(split(col("words"), " ")).alias("word"), window(col("ts"), "10 minute", "5 minute").alias("time"))
wordCount = words.groupBy("time", "word").count()

query = wordCount.writeStream...
query.awaittermination()

 

위 예제는 5분마다 10분동안의 데이터를 하나의 Row로 간주하여 처리하는 예제입니다.

그로인해, 실제 12:00:00~12:10:00 과 같은 칼럼 값이 그룹키로 할당되고 이를 통해 groupBy 연산이 가능하게 됩니다.

 

아래는 출력문을 간단히 테이블로 표시했습니다.

 

time word count
[2018-03-05 22:45:00, 2018-03-05 22:55:00] test 1
[2018-03-05 22:50:00, 2018-03-05 23:00:00] test 1

 

정각 기준으로 스트림 배치가 시작되는것을 원하지 않는다면, window 메서드의 4번째 인자로 시작 시간값을 설정하면 됩니다.

 

 

3) 워터마킹

 

스트럭처 스트리밍은 스트림 데이터의 이전 상태와 변경 분을 찾아내 필요한 부분만 수정하는 기능을 제공한다고 했습니다.

하지만 사실상, 위 같은 기능을 구현하기 위해서는 데이터를 메모리에 계속 들고 있으면서 스트림으로 유입되는 데이터를 무한정 기다리면서 처리해야합니다.

 

하지만, 스파크에서도 무한정 기다릴수는 없기 때문에 워터마킹 기법을 제공합니다.

 

워터마킹은 스파크 2.1.0 버전에서 새롭게 제공된 기능입니다.

워터 마킹은 간단하게 유입되는 이벤트의 유효 시간을 주는 것 입니다.

 

아래는 spark 공식 가이드에 나와 있는 워터마크 그림입니다.

 

 

워터 마크를 사용하는 경우 지정한 워터마크 시간동안 데이터가 유입될 수 있습니다.

이 말은, 지정한 윈도우안에 있는 값이 변경가능하게 된다는 의미이며, 워터마크시간까지 지나야 해당 윈도우에 있는 값은 불변 데이터로 변경되어집니다.

 

그로인해, 워터마크를 적용하게 되면 윈도우의 데이터는 실시간으로 결과를 출력할 수는 없습니다.

 

아래는 워터마크를 적용한 예제입니다.

워터마크 사용시에는 워터마크로 사용한 timestamp칼럼으로 집계함수의 키로 사용해야 합니다.

 

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.functions._
import spark.implicits._

val spark = SparkSession.builder() ...
val lines = spark.
	.readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", 9999)
        .option("includeTimestamp", false)
        .load()
        .select('value as "words", current_timestamp as "timestamp")

val words = lines.select(explode(split("words", " ")).as("word"), 'timestamp)

val wordCount = words.withWatermark("timestamp", "10 minutes").groupBy(window('timestamp, "10 monutes", "5 minutes"), 'word).count

val query = wordCount.writeStream
	.outputMode(OutputMode.Append())
        .format("console")
        .option("truncate", false)
        .start()

query.awaitTermination()

 

워터마크 시간을 길게 할수록 처리 결과가 늦게 출력되는 것을 유념하여 사용해야 합니다.

 

 

4) 조인 연산

 

스트럭처 스트리밍이 처음 소개 되었을때는 조인연산에 제약이 많았습니다.

하지만 2.3.0 버전부터는 대폭 개선되어 다양한 방식의 조인 연산이 가능해졌습니다.

 

  • inner join = inner join의 경우 특별한 제약없이 사용 가능합니다.
    • 단, 무한정 보관하는 것은 불가능한 일이기 때문에 워터마크를 사용하여 특정시점이 지난데이터는 지우도록해야합니다.
  • outer join = outer join의 경우 반드시 스트림 데이터셋을 기준으로 하는 outer join 만 가능합니다.

 

5) 스트리밍 쿼리

 

현재 까지는 스트리밍 데이터를 처리하는것을 소개했습니다.

하지만 이 처리 연산을 실행하기 위해서는 DataStreamWriter를 이용한 쿼리 작업이 필요합니다.

 

1. DataStreamWriter

 

DataStreamWriter는 데이터 셋의 writeStream 메서드로 생성할 수 있습니다.

 

사용용도 로는 아래와 같습니다.

 

  • 저장 모드
  • 쿼리명
  • 트리거 주기
  • 체크포인트

 

2. 스트리밍 쿼리

 

DataStreamWriter는 스트리밍 애플리케이션을 시작하기 위한 start 메서드를 제공합니다.

 

start 메서드의 반환값은 StreamingQuery 타입입니다.

이를 통해, 실행 중인 어플리케이션의 모니터링 및 관리가 가능합니다.

 

대표적으로는 아래와 같습니다.

 

  • 쿼리 이름, id 조회
  • 쿼리 중지
  • 소스와 싱크의 상태 정보 조회
  • awaitTermination 을 통한 실행 동작 제어

 

3. 저장 모드

 

DataStreamWriter의 저장모드에는 3가지가 있습니다.

 

아래와 같습니다.

 

모드 설명
Append(추가) 기존에 처리된 결과를 제외한 새롭게 추가된 데이터만 싱크로 전달하는 방법입니다.


이 경우는, 기존 데이터의 수정이 발생할 가능성이 0%가 전제되어야 하기 때문에 시간의 흐름에 따라 데이터 수정이 일어나는 케이스에는 사용이 불가능합니다.
Complete(완전) 데이터 프레임이 가지고 있는 전체 데이터를 모두 출력하는 방법입니다.


항상 모든 데이터를 출력하기 때문에 데이터 사이즈가 작은 경우에 사용해야 합니다.
Update(수정) 마지막 출력이 발생된 시점부터 다음 출력이 발생하는 시점 동안 변경된 데이터만 출력하는 모드입니다.


변경분만 출력하기 때문에 조인 연산을 사용하는 경우에는 사용이 불가능합니다.

 

 

4. 싱크 (Sink)

 

스트럭처 스트리밍은 필요한 경우 ForEachWriter라는 방식을 통해 출력을 커스터마이징이 가능합니다.

 

ForEachWriter를 사용하는 경우 open, process, close 메서드를 구현해야 합니다.

 

단, ForEachWriter는 드라이버가 아닌 익스큐터에서 수행되며,
드라이버에서 수행하도록 하고 싶은 경우에는 ForEachWriter 객체의 생성자 본문에 초기화 코드를 작성하면 됩니다.

 

아래는 ForEachWriter 사용 예제 입니다.

 

val query = wordCount
	.writerStream
        .outputMode(OutputMode.Complete)
        .foreach(new ForeachWriter[Row] {

            def open(partitionId: Long, version: Long): Boolean = {
                println(s"partitionId:${partitionId}, version:${version}")
		true
            }

            def process(record: Row) = {
                println("process:" + record.mkString(", "))
            }

            def close(errorOrNull: Throwable): Unit = {
                println("close")
            }
        }).start()

 

4. 마무리

이번 포스팅에서는 스트럭처 스트리밍에 대해 포스팅 하였습니다.

 

다음에는 스파크를 비교적 간단하게 Web 기반의 NoteBook에서 수행할 수 있도록 제공하는 제플린에 대해 포스팅하겠습니다.

반응형

'BigData > Spark' 카테고리의 다른 글

(8) CDH와 스파크 클라이언트 연동  (0) 2020.04.21
(7) Apache Zeppelin  (0) 2020.04.20
(5) 스파크 스트리밍  (0) 2020.03.19
(4) 스파크 SQL  (0) 2020.03.17
(3) 스파크 설정  (0) 2020.03.13
반응형

1. 서론

 

이번 포스팅에서는 스파크 스트리밍에 대해 알아 보도록하겠습니다.

 

대게, 스트리밍이라하면 실시간 처리로 알고 있습니다.
스파크는 마이크로 배치로 짧게 여러번 수행하여 스트리밍 처리를 제공합니다. 

 

2. 주요 용어

 

1) 스트리밍 컨텍스트

 

스파크 스트리밍을 수행하기 위해서는 스트리밍 모듈에서 제공하는 스트리밍 컨텍스트를 사용해야 합니다.

 

아래는 스트리밍 컨텍스트를 생성하고 사용하는 예제입니다. - (스칼라)

 

val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("StreamingSample")
conf.set("spark.driver.host", "127.0.0.1")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(3))
val rdd1 = sc.parallelize(List("Spark Streaming Sample ssc"))
val rdd2 = sc.parallelize(List("Spark Queue Spark API"))
val inputQueue = Queue(rdd1, rdd2)
val lines = ssc.queueStream(inputQueue, true)
val words = lines.flatMap(_.split(" "))
words.countByValue().print()

ssc.start()
ssc.awaitTermination()

 

예제에서 볼 수 있듯이 StreamingContext는 SparkContext에서 생성 할 수 있습니다.

추가로, 어느 주기로 수행할 지의 정보도 같이 넘겨야 합니다. 

예제에서는 Seconds(3)을 통해 3초에 한번씩 수행되도록 하였습니다.

 

그리고, 마지막에 있는 start 메소드를 실행해야만 스파크 스트리밍은 수행됩니다.

또한, awaitTermination 를 통해 임의로 애플리케이션이 종료되지 않도록 합니다.

 

종료는 개발자가 직접 어떤 시점 혹은 상황에 하도록 추가하여야 합니다.

 

 

2) DStream (Discretized Streams)

 

DStream은 RDD 와 같이 스파크에서 스트리밍을 위해 제공하는 데이터 모델입니다.

단순하게 RDD의 시퀀스로 이해하시면 되며, 지정한 배치 간격마다 input 에서 데이터를 가져와 DStream으로 변경하면서 처리하게 됩니다.

 

3. 데이터 읽기

스파크 스트리밍으로 처리할 수 있는 input 종류는 대표적으로 아래와 같습니다.

 

  • 소켓
  • 파일
  • RDD 큐
  • 카프카

 

1) 소켓

 

스파크 스트리밍은 TCP 소켓을 이용해 데이터를 수신할 수 있도록 지원하고 있습니다.

 

소켓을 통해 데이터를 읽는 예제는 아래와 같습니다. - (파이썬)

 

conf = SparkConf()
conf.set("spark.driver.host", "127.0.0.1")

sc = SparkConf()
ssc = StreamingContext(sc ,3)

ds = ssc.socketTextStream("localhost", 9000)
ds.pprint()

ssc.start()
ssc.awaitTermination()

 

socketTextStream 함수를 통해 데이터를 읽을 host와 port를 지정하여 사용하는것을 확인할 수 있습니다.

추가로, 데이터가 문자열이 아닌경우에는 convert() 함수를 사용하여 변환할 수 있습니다.

 

 

2) 파일

 

스파크는 하둡과 연계해서 자주 사용합니다.

그로인해, 파일에 대해서도 스트리밍 처리를 지원하고 있습니다.

 

사용법은 아래와 같습니다.

ds = ssc.textFileStream(<path>)

 

위의 path는 읽어 들일 파일의 디렉터리 위치입니다.

 

파일을 input으로 사용하는 경우, 파일의 변경내용을 추적하지 못한다는 것을 염두하고 사용해야 합니다.

 

 

3) RDD 큐

 

RDD 큐의 경우에는 위 예제에서도 보셨겠지만,  일반적으로 테스트용으로 많이 사용하는 input 종류입니다.

 

사용법은 아래와 같습니다.

 

queue = Queue(rdd1, rdd2)
ds = ssc.queueStream(queue)

 

 

4) 카프카

 

카프카는 스파크와의 연계가 가능한 분산 메시징 큐 시스템입니다.

카프카에 대한 자세한 내용은 아래를 참고해주시면 되겠습니다.

(https://geonyeongkim-development.tistory.com/category/MQ/Kafka)

 

카프카를 input으로 사용할때는 스파크에서 제공하는 KafkaUtils의 createDirectStream 함수를 사용할 수 있습니다.

 

예제는 아래와 같습니다. (스칼라 , spark-streaming-kafka-0-10 버전)

val ssc = new StreamingContext(sc, Seconds(3))

val params = Map(
	ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
	ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
	ConsumerConfig.GROUP_ID_CONFIG -> "test-group-1",
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"
)

val topics = Array("test")

val ds = KafkaUtils.createDirectStream[String, String](
	ssc,
    PreferConsistent,
    Subscribe[String, String](topics, params)
)

ds.flatMap(record => record.value.split(" "))
	.map((_, 1))
    .reduceByKey(_ + _)
    .print

ssc.start()
ssc.awaitTermination()

 

위 예제에서 볼 수 있듯이 카프카를 input으로 사용하는 경우에는 PreferConsistent, Subscribe 인자를 추가로 주어야 합니다.

 

Subsribe 인자는 스파크가 아닌 카프카 옵션으로 컨슈머가 수신할 토픽, 역직렬화 종류, 컨슈머 그룹 id 등이 있습니다.

 

PreferConsistent 인자는 스파크 내부에서 카프카 컨슈머를 어떻게 관리할지에 대한 설정입니다.

 

아래와 같이 총 3가지가 있으며 하나를 지정하시면 됩니다.

 

1 - PreferBrokers

 

스파크에서 카프카 컨슈머의 역할은 각 익스큐터에서 일어나게 됩니다.

매번 마이크로 배치가 수행될 때마다 컨슈머를 새로 생성하기보다는 캐싱해둔 컨슈머를 사용하는것이 성능상 유리합니다.

컨슈머는 prefetch라는 특징이 있어 현재 처리할 데이터 뿐만이 아니라 다음 데이터까지 가져와 버퍼에 저장합니다.
이 버퍼에 있는것을 사용하기 위해서는 전에 사용한 컨슈머를 사용해야 하기 때문입니다.

이를 위해, 파티션의 리더 서버와 같은 서버에서 동작중인 익스큐터에게 해당 파티션의 수신을 맡기는 옵션입니다.

( 브로커 서버와 익스큐터가 동일 서버에서 동작한다는 가정하 입니다 )

 위와 같은 설정은 카프카 클러스터와 스파크 클러스터를 동일한 서버에서 운용한다는 전제입니다.
개인적으로, 리소스를 카프카, 스파크 모두 사용해야 하므로 추천하지 않는 설정이며 클러스터 구축도 별도로 하길 추천합니다.

 

2 - PreferConsistent

 

익스큐터들에게 균등하게 파티션의 점유권을 할당하는 옵션입니다.

특별한 이유가 없다면 가장 적합한 옵션입니다.

 

3 - PreferFixed

 

이름에서 알 수 있듯이, 의도적으로 특정 익스큐터를 지정하여 데이터를 처리하도록 하는 옵션입니다. 

 

 

 

 

 

반응형

 

 

 

 

 

 

4. 데이터 다루기 (기본 연산)

DStream을 생성하는 방법을 알아봤으니, 이제 DStream을 이용하여 데이터를 가공할 수 있는 연산에 대해 알아보겠습니다.

 

연산 설명
print() DStream에 포함된 각 RDD를 출력합니다.
map(func) DStream의 RDD에 포함된 각 원소에 func함수를 적용하여 새로운 DStream을 반환합니다.
(RDD 연산과 동일하게 mapPartitions(), mapPartitionsWithIndex()도 가능합니다)
flatMap(func) RDD의 flatMap 함수와 동일합니다.
count(), countByValue() DStream에 포함된 데이터 갯수를 반환합니다.
RDD와 다른점은 반환 값이 DStream 인 점입니다. 
reduce(func), reduceByKey(func) DStream에 포함된 RDD값들을 집계해서 한개의 값으로 변환합니다.
RDD가 키와 값의 형태라면 reduceByKey를 통해 키 별로 집계도 가능합니다.
반환값은 DStream 입니다.
filter(func) DStream의 모든 요소에 func를 적용하여 결과가 true인 요소만 포함한 DStream을 반환합니다.
union() 두개의 DStream을 합해서 반환합니다.
join() 키와 값으로 구성된 두 개의 DStream을 키를 이용해 조인한 결과를 DStream으로 반환합니다.
RDD연산과 동일하게 leftOuterJoin, rightOuterJoin, fullOuterJoin 모두 가능합니다.

 

5. 데이터 다루기 (고급 연산)

4번에서 알아본 연산들은 이전장에서 배운 RDD 연산과 거의 비슷합니다.

 

이번에는 DStream에 특화된 연산에 대해 알아보겠습니다.

 

1) transform(func)

 

DStream 내부의 RDD에 func 함수를 적용 후 새로운 DStream을 반환합니다.
RDD에 접근하여 적용하는것이기 때문에, RDD 클래스에서만 제공되는 메서드도 사용가능합니다.

 

2) updateStateByKey()

 

스트림으로 처리하다 보면 이전 상태를 알아야하는 경우가 있습니다.

이를 위해 updateStateByKey()를 사용할 수 있습니다.

 

updateStateByKey는 배치가 수행될때마다 이전 배치의 결과를 같이 전달하여 처리할 수 있도록 제공하는 연산입니다.


이전 배치의 결과를 알고 있어야 하기 때문에 필수적으로 checkpoint라는 함수로 이전 결과를 영속화해야합니다.

 

아래는 예제와 결과입니다.

 

val ssc = new StreamContext(conf, Seconds(3))

val t1 = ssc.sparkContext.parallelize(List("a", "b", "c"))
val t2 = ssc.sparkContext.parallelize(List("b", "c"))
val t3 = ssc.sparkContext.parallelize(List("a", "a", "a"))
val q6 = mutable.Queue(t1, t2, t3)

val ds6 = ssc.queueStream(q6, true)

ssc.checkpoint(".") // 현재 디렉터리에 영속화

val updateFunc = (newValues: Seq[Long], currentValue: Option[Long]) => Option(currentValue.getOrElse(0L) + newValues.sum)

ds6.map((_, 1L)).updateStateByKey(updateFunc).print

 

[배치 1차]

(c, 1)

(a, 1)

(b, 1)

 

[배치 2차]

(c, 2)

(a, 1)

(b, 2)

 

[배치 3차]

(c, 2)

(a, 4)

(b, 2)

 

 

3) 윈도우 연산

 

스트림 처리를 하다보면 마지막 배치의 결과가 아닌 그 이전 배치의 결과까지도 필요한 경우가 있습니다.

이를 위해, 스파크 스트리밍에서는 윈도우 연산들을 제공합니다.

 

윈도우 연산은 아래 그림과 같이 동작하기 때문에 윈도우 길이(= window length)와, 배치주기(= sliding interval)가 추가로 필요합니다.

 

이 윈도우 길이와 배치주기값은 스트리밍 컨텍스트를 생성할때 지정한 배치 간격의 배수로 지정해야합니다.

예를 들어 배치 간격을 3초로 했다면 윈도우 길이와 배치주기는 6초, 9초, 12초 ... 여야 합니다.

 

 

3-1) window(windowLength, slideInterval)

 

윈도우 연산은 slideInterval 시간마다 windowLength 만큼의 시간동안 발생된 데이터를 포함한 DStream을 생성합니다.

 

3-2) countByWindow(windowLength, slideInterval)

 

윈도우 크기만큼 DStream의 요소 개수를 포함한 DStream을 생성합니다.

 

3-3) reduceByWindow(func, windowLength, slideInterval)

 

윈도우 크기만큼의 DStream에 func를 적용한 결과의 DStream을 생성합니다.

 

3-4) reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

 

키와 값의 형태에서 적용할 수 있는 연산입니다. 다만, invFunc 라는 함수가 추가로 받을 수 있습니다.

invFunc 함수에 대해 설명하기 이전에 윈도우 연산의 단점에 대해 설명하겠습니다.

 

윈도우에는 중복된 데이터가 있습니다.

그렇다는것은 처리한 데이터를 다시 처리할 수 있으며, 쓸모없는곳에 성능을 낭비할 수 있게 됩니다.

 

이를, 해결하기 위해 reduceByKeyAndWindow 연산에서는 invFunc인자를 받으며,

invFunc는 기존에 처리한 데이터는 빼도록 하는 역 리듀스함수입니다.

 

3-5) countByValueAndWindow(windowLength, slideInterval, [numTasks]))

 

윈도우 내에 포함된 데이터들을 Value 기준으로 카운트하여 새로운 DStream을 생성합니다.

 

6. 데이터 저장

DStream 역시, RDD와 같이 데이터를 저장해야하며 이를 위해 제공되는 함수들은 아래와 같습니다.

 

  • saveAsTextFiles()
  • saveAsObjectFiles()
  • saveAsHadoopFiles()

RDD와 동일하게 DStream 역시 파일, 객체, 하둡에 데이터를 저장할 수 있습니다.

 

다만, RDD 함수와 다른점은 인자로 접두어와 접미어를 받으며 실제 저장은 "접두어 + -  + 시간 + . + 접미어" 으로 된다는 점입니다.

 

아래는 예제와 같이 함수를 사용하게 된다면 결과는 <저장위치/test-시간(ms).txt> 로 저장하게 됩니다.

 

ds.saveAsTextFile("저장위치/test", "txt")

 

7. CheckPoint

체크포인팅은 분산 클러스터 환경처럼 오류가 발생할 가능성이 높은 환경에서 장시간 수행되는 시스템들이 주시로 상태값을 안전성이 높은 저장소에 저장하여, 장애가 발생할 경우 복구를 위해 사용하는 용어입니다.

 

스파크 스트리밍에서는 이 체크포인팅을 크게 데이터 체크포인팅과 메타데이터 체크포인팅으로 분리할 수 있습니다.

 

1) 메타데이터 체크포인팅

 

메타데이터 체크포인팅이란 드라이버 프로그램을 복구하는 용도로 사용됩니다.

포함 데이터로는 아래와 같습니다.

  • 스트리밍컨텍스트를 생성할 시에 사용했던 설정 값
  • DStream에 적용된 연산 히스토리
  • 장애로 인해 처리되지 못한 배치 작업

 

2) 데이터 체크포인팅

 

데이터 체크포인팅은 최종 상태의 데이터를 빠르게 복구하기 위해 사용합니다.

 

이를 위해서는 상태를 저장과 읽어야하며 각 제공 메서드는 아래와 같습니다.

 

// 저장
ssc.checkpoint("저장 할 디렉터리 경로")

// 읽기
StreamingContext.getOrCreate("체크포인팅 경로", 스트리밍 컨텍스트 생성 함수)

 

읽기의 경우는 체크포인팅 경로에 데이터가 있다면 데이터를 가진 채 스트리밍 컨텍스트가 반환되며, 없는경우에는 새 스트리밍 컨텍스트를 반환합니다.

 

8. 캐시

 

DStream의 경우에도 RDD와 동일하게 cache() 기능을 제공합니다.

 

updateStateByKey, reduceByWindow 와 같은 연산은 이전 배치 결과를 알아야하기 때문에 명시하지 않아도 캐시를 내부적으로 수행합니다.

 

9. 마무리

이번에는 데이터 스트리밍 대해서 포스팅하였습니다.

다음에는 스트럭쳐 스트리밍에 대해 포스팅하겠습니다.

반응형

'BigData > Spark' 카테고리의 다른 글

(7) Apache Zeppelin  (0) 2020.04.20
(6) 스트럭처 스트리밍  (0) 2020.03.31
(4) 스파크 SQL  (0) 2020.03.17
(3) 스파크 설정  (0) 2020.03.13
(2) RDD  (0) 2020.03.12
반응형

1. 서론

이번 포스팅에서는 스파크 SQL에 대해서 알아보겠습니다.

 

2. 스파크 SQL

스파크 SQL은 RDD에서 표현하지 못하는 스키마 정보를 표현가능하도록 보완된 모델입니다.

 

대표적으로 데이터 프레임과 데이터셋이 있습니다.

 

스파크 SQL은 언어별 사용유무는 아래와 같습니다.

 

스파크 SQL 스칼라 자바 파이썬
데이터 프레임 O X O
데이터 셋 O O X

 

1) 연산 종류

 

  • 트랜스포메이션 연산
  • 액션연산

 

2) 프로그래밍 구성요소

 

스파크 SQL을 사용하여 프로그래밍 시 필요한 구성요소는 아래와 같습니다.

 

구성요소 의미
스파크 세션 1. 데이터 프레임을 생성하기 위해 이용.
2. 인스턴스 생성을 위한 build()메서드 제공
3. 하이브 지원 default
데이터 셋 RDD와 같은 타입 기반연산 + SQL과 같은 비타입 연산 제공
데이터 프레임 ROW 타입으로 구성된 데이터 셋
DataFrameReader 1. SparkSession의 read()메서드를 통해 접근 가능
2. ["jdbc", "json", "parquet"] 등 다양한 input 제공.
DataFrameWriter 1. SparkSession의 write()메서드를 통해 접근 가능
로우, 칼럼 데이터 프레임을 구성하는 요소(한 로우에 1개 이상의 칼럼 존재)
functions 데이터를 처리할떄 사용할 수 있는 각종 함수를 제공하는 오브젝트
StructType, StructField 데이터에 대한 스키마 정보를 나타내는 API
GroupedData, GroupedDataSet 1. groupBy() 메서드로 인해 사용
2. 집계와 관련된 연산 제공

 

3) 코드 작성 절차

 

  1. 스파크 세션 생성
  2. 스파크 세션으로부터 데이터셋 또는 데이터 프레임 생성
  3. 생성된 데이터셋 또는 데이터프레임을 이용해 데이터 처리
  4. 처리된 결과 데이터를 외부 저장소에 저장
  5. 스파크 세션 종료

 

4) 스파크 세션

 

스파크 세션의 사용목적은 아래와 같습니다.

 

  1. 데이터 프레임, 데이터 셋 생성.
  2. 사용자 정의 함수(UDF)를 등록.

다음은 스파크 세션의 예제입니다.

 

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("sample")\
    .master("local[*]")\
    .getOrCreate()

 

 

 

 

 

반응형

 

 

 

 

 

 

3. 데이터프레임

데이터 프레임은 Row라는 스파크 데이터 모델을 사용하는 wrapper 모델입니다.

 

RDD와의 차이점으로는 데이터 값뿐만이 아닌 스키마 정보까지 함께 다룬다는 차이가 있습니다.

 

1) 데이터 프레임 생성 방법

 

1. 외부의 데이터 소스

  • DataFrameReader의 read()메서드 사용
  • DataFrameReader가 제공하는 주요 메서드는 아래와 같습니다.

 

메소드 의미
format() 1. input 데이터의 유형을 지정
2. orc, libsvm, kafka, csv, jdbc, json, parquet, text, console, socket 등 사용 가능
3. 써드파티 라이브러리 존재
option/options() 키와 값형태로 설정 정보 지정.
load() input으로 부터 실제 데이터 프레임을 생성.
jdbc() 데이터베이스를 input으로 사용하기위한 간편 메서드
json() json을 input으로 사용하기 위한 간편 메서드(파일의 경우 각 라인 단위로 json이 구성되어야 함)
orc() orc 형식 데이터 다룰수 있음
parquet() parquet형식 데이터를 가져와 데이터 프레임 생성
schema() 사용자 정의 스키마 지정가능
table() 테이블 데이터를 데이터프레임으로 생성
text() 텍스트 파일을 읽어 데이터 프레임 생성
csv() 1. csv 파일을 읽어 데이터 프레임 생성
2. 2.0버전부터 제공

 

2. 기존 RDD 및 로컬 컬렉션으로부터

 

  • RDD와 달리 스키마정보를 함께 지정하여 생성해야 합니다.
  • 스키마 지정 방법
    1. 리플렉션 = 스키마 정의를 추가하지 않고 데이터 값으로부터 알아서 스키마 정보를 추출하여 사용하는 방법입니다.
    2. 명시적 타입 지정 = 말 그대로 명시적으로 스키마를 정의하는것입니다. -> (StructField, StructType 사용)
    3. 이미지 파일을 이용한 데이터 생성
      • 2.3부터 지원
      • ImageSchema.readImages를 사용하여 생성 가능합니다.
      • 이미지 파일 경로, 가로, 세로, 채널정보 등 추출 가능합니다.

 

2) 주요 연산 및 사용법

 

1. 액션 연산

 

연산 설명
show() 1. 데이터를 화면에 출력하는 연산
2. 3개의 인자 존재(레코드 수, 표시할 값의 길이, 세로 출력 여부)
3. 기본값은 20개 레코드수, 20바이트 길이 제한
head(), first() 데이터셋의 첫번째 ROW를 반환.
take() 첫 n개의 ROW 반환
count() 데이터셋의 로우 갯수 반환
collect(), collectAsList() 데이터셋의 모든 데이터를 컬렉션 형태로 반환(Array, List)
decribe() 숫자형 칼럼에 대해 기초 통계값 제공(건수, 평균값, 표준편차, 최솟값, 최댓값)

 

2. 기본 연산

 

연산 설명
cache(), persist() 1. 데이터를 메모리에 저장.
2. [NONE, DISK_ONLY, DISK_ONLY_2, MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_ONLY_SER, MEMORY_ONLY_SER_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER, MEMORY_AND_DISK_SER_2, OFF_HEAP]옵션 가능
3. cache는 MEMORY_AND_DISK 옵션의 persist
createOrReplaceTempView() 1. 데이터프레임을 테이블로 변환
2. 스파크 세션인 종료되면 사라짐.
explain() 데이터 프레임 처리와 관련된 실행 계획 정보를 출력.

 

3. 비타입 트랜스포메이션 연산

 

연산 설명
alias(), as() 칼럼이름에 별칭 부여
isin() 칼러의 값이 인자로 지정된 값에 포함되어 있는지 여부 확인
when() 1. if~else와 같은 분기 처리 연산 수행
2. functions, Column 모두 제공(최초 사용시에는 functions의 메소드를 사용해야함)
max(), mean() 칼럼의 최댓값, 평균값 계산.
collect_list(), collect_set() 특정 칼럼을 모아서 list 혹은 set을 반환
count(), countDistinct() 특정 칼럼에 속한 데이터의 갯수, 중복을 제거한 데이터의 갯수
sum() 특정 칼럼 값의 합계
grouping(), grouping_id() 소계 결과에 그룹화 수준을 파악하기 위해 사용.
array_contains(), size(), sort_array() 특정값이 배열에 존재하는지, 배열의 사이즈, 배열 정렬 기능
explode(), posexplode() 1. 배열, map 데이터를 여러개의 row로 변환
2. posexplode의 경우 순서정보도 부여
current_data(), unix_timestamp(), to_date() 1. current_data = 현재 시간값
2. unix_timestamp = string을 Date로 변환(yyyy-MM-dd HH:mm:ss)
3. to_date = string을 Date로 변환(yyyy-MM-dd)
add_months(), date_add(), last_day() 1. 달 더하는 연산
2. 일 더하는 연산
3. 해당 달의 마지막 날짜.
window() 1. DateType 칼럼을 대상으로 적용 가능.
2. 일정크기의 시간을 기준으로 윈도우를 생성하여 각종 집계 연산 수행.
round(), sqrt() 반올림, 제곱근 값 계산
array() 여러 칼럼값을 배열로 만듬.
desc(), asc() sort() 메소드와 함께 사용하는 정렬 방법 메소드
desc_nulls_first, desc_nulls_last, asc_nulls_first, asc_nulls_last 정렬시 null값 기준 정의 메소드.
split(), length() 문자열 분리, 길이 반환 메소드
rownum(), rank() 오라클의 partitionBy개념의 Window 범위 안에서 사용하는 메소드
udf() 1. 사용자 정의 함수
2. 파이썬에서 자바 UDF사용하고 싶을 시 spark.udf.registerJavaFunction 메소드 사용
select(), drop() 특정 칼럼을 포함/ 제외한 데이터 프레임 생성
filter(), where() 특정 조건을 만족하는 데이터프레임만을 반환
agg() 특정칼럼에 대해 sum, max와 같은 집합연산을 수행하기 위해 사용.
apply(), col() 데이터프레임의 칼럼 객체 생성
groupBy() SQL문의 groupBy 연산 수행, 집합연산 수행 가능
cube() 데이터의 소계를 반환( = 소계대상인 칼럼들의 모든 가능한 조합의 부분합)
distinct(), dropDuplicates() 1. distinct의 경우 모든 컬럼값이 같을때 중복 판단
2. dropDuplicates는 중복을 제거하고자 하는 컬럼 지정 가능
intersect() 두개의 데이터프레임의 교집합
except() 두개의 데이터프레임의 차집합
join() 1. 두 개의 데이터프레임의 join
2. inner, outer, left_outer, right_outer, leftsemi 등 join 종류 모두 지원
3. 조인 칼럼에 대해서는 중복으로 포함되지 않는 특징 존재
crossjoin() 두 데이터프레임의 카테시안곱 지원
na() 1. null, NaN값이 포함되는 경우 사용
2. DataFrameNaFunctions 인스턴스 반환
3. DataFrameNaFunctions가 가지고 있는 메서드 사용(drop, fill, replace 등등)
orderBy() SQL의 orderBy와 같은 동작 수행
stat() 1. 특정 칼럼값의 통계수치를 제공하는 DataFrameStatFunctions 인스턴스 제공
2. corr, cov, crosstab, freqItems, sampleBy 등의 메소드 사용 가능
withColumn(), withColumnRenamed() 새로운 칼럼 추가 혹은 이름을 변경하는 메서드
repartitionByRange() repartition시 데이터 프레임의 갯수를 기준으로 균등하게 분포(2.3부터 가능)
colRegax() 정규식을 이용하여 칼럼을 선택할 수 있는 기능(2.3부터 가능)
unionByName() 1. union할 시 데이터프레임의 칼럼 이름을 기준으로 수행
2. 그냥 union은 데이터프레임의 데이터 순서로 수행되기 때문에 데이터가 엉킬수 있음.
to_json, from_json 칼럼 수준에서 데이터를 json으로 저장 및 가져오는 기능 제공.

 

 

3) 데이터 저장

 

DataFrameWriter.write 사용하여 데이터프레임 데이터를 저장합니다.

 

DataFrameWriter의 주요 메소드는 아래와 같습니다.

 

메소드 의미
save 데이터를 저장하는 메소드
format 저장하는 데이터의 format을 지정.
partitionBy 특정 칼럼값을 기준으로 파티션 설정
options 데이터 저장시 추가 설정 시 사용
mode 저장 모드 설정(Append, Overwrite, ErrorExists, Ignore)
saveAsTable 테이블 형태로 저장 및 영구 저장 가능

 

4) Pandas 연동

 

PyArrow 라이브러리를 사용하여 스파크의 데이터프레임과 파이썬의 Pandas의 상호 변환이 가능합니다.

스파크 2.3버전부터 가능합니다.
추가로, spark.sql.execution.arrow.enabled 옵션을 true로 하여야 합니다.

 

 

4. 데이터 셋

 

데이터 셋은 Row가 아닌 개발자 지정 클래스를 Type으로 쓰는 wrapper 모델입니다.

 

데이터셋의 장단점은 아래와 같습니다.

 

  • 장점 = 지정 클래스를 사용함으로 인해 컴파일 시 타입오류 검증이 가능합니다.
  • 단점 = 사용자 정의 타입 클래스를 사용하기 때문에 Spark SQL 최적화가 안될 수 있습니다.

1) 데이터 셋 생성

 

  • 생성 방법 = 자바 객체, 기존 RDD, 데이터 프레임, 외부 source 가 있습니다.
  • 생성 시 인코더 지정(필수) = 성능 최적화를 위해 기존 오브젝트를 스파크 내부 최적화 포맷으로 변환해야 하는데 그때 사용합니다.  (데이셋은 내부적으로 InternalRow클래스를 사용합니다)
  • 스칼라의 경우, 기본타입의 경우에는 import를 통하여 암묵적 인코더 사용 가능합니다.
  • 자바의 경우, 데이터 프레임을 만든 후 이것을 데이터 셋으로 변환하여 사용해야 합니다.
  • 데이터 프레임으로부터 생성시에는 type을 지정해야하기 때문에 as() 메서드를 통하여 생성할 수 있습니다.
  • range() 메서드를 통하여 간단한 샘플데이터 또한 생성 가능합니다.

 

2) 타입 트랜스포메이션 연산

 

연산 설명
select() 1. 데이터 프레임의 select 메서드와 동일
2. 단, as() 메서드를 통해 type을 지정해야함.
as() 1. 데이터 셋에 별칭 부여
2. column에 부여하는 것이 아닌 데이터 셋에 타입을 부여.
distinct() 중복을 제외한 요소만으로 데이터셋 반환
dropDuplicates() distonct() 메서드에서 칼럼을 지정할 수 있는 기능 추가.
filter() 사용자 정의 조건을 만족하는 데이터만으로 구성된 데이터 셋 반환.
map(), flatMap() 1. RDD의 map(), flatMap() 메서드와 동일
2. 단 데이터 셋의 타입을 인자로 사용 가능.
3. 자바의 경우 인코더 지정 필수
groupByKey() 1. RDD의 groupBy와 기능 동일
2. KeyValueGroupedDataset 클래스 반환
agg() 1. 데이터 프레임에서 본 agg() 메서드와 동일
2. 단, 사용하는 칼럼은 반드시 TypeedColumn 클래스여야 한다.
3. 사용가능한 집계 연산 = avg(), count(), sum(), sumLong()
mapValues(), reduceGroups() 1. KeyValueGroupedDataset클래스의 매서드
2. 맵연산, 리듀스 연산을 수행.

 

 

5. QueryExecution

 

QueryExecution은 Spark SQL 사용시 최적화 과정에서 일어나는 일들을 확인할 수 있는 API입니다.

 

Spark Sql의 Query 실행은 아래와 같은 순으로 진행되고 최적화됩니다.

 

순서 Plan 종류 확인 메소드 종류 
1 LogicalPlan QueryExecution.logical
2 SessionState의 Analyzer가 적용된 LogicalPlan  QueryExecution.analyzed
3 SessionState의 Optimizer가 적용된 LogicalPlan QueryExecution.optimizedPlan
4 SessionState의 SparkPlanner가 적용된 SparkPlan QueryExecution.sparkPlanner
5 SparkPlan에 추가적인 최적화 과정 적용하여 SparkPlan QueryExecution.executedPlan

 

6. 마무리

이번에는 스파크 SQL, 데이터 프레임, 데이터 셋 대해서 포스팅하였습니다.

다음에는 스파크 스트리밍에 대해 포스팅하겠습니다.

 

반응형

'BigData > Spark' 카테고리의 다른 글

(6) 스트럭처 스트리밍  (0) 2020.03.31
(5) 스파크 스트리밍  (0) 2020.03.19
(3) 스파크 설정  (0) 2020.03.13
(2) RDD  (0) 2020.03.12
(1) 스파크 소개  (0) 2020.03.11
반응형

1. 서론

이번 포스팅에서는 스파크 실행에 있어 다양한 설정값들에 대해 포스팅하도록 하겠습니다.

2. 스파크 프로퍼티

스파크 프로퍼티는 스파크 어플리케이션 실행과 관련한 설정값을 의미합니다.

 

설정값은 SparkConf 인스턴스를 통해 setting 할 수 있습니다. 다만, 코드에 항상 포함되어야 하는 단점이 있습니다.

 

이를 해결하기 위해서 아래의 방법들이 있습니다.

 

  • 스파크 쉘 혹은 spark-submit을 이용하면 해결
  • 스파크 홈에 spark-defaults.conf 파일에 정의를 하여 해결

 

그럼 각 설정값들이 무엇이 있는지 알아 보도록 하겠습니다.

 

1) 어플리케이션 관련 설정

 

property 명 의미 default
spark.app.name 어플리케이션 이름 X(필수로 세팅 필요)
spark.driver.cores 드라이버가 사용할 코어 수 1
spark.driver.maxResultSize 액션연산으로 생성된 값의 최대 크기 1G
spark.driver.memory 드라이버가 사용할 메모리 크기
(클라이언트 모드 시 SparkConf가 아닌 --driver-memory로 지정해야합니다.)
1G
spark.executor.memory 익스큐터 하나의 메모리 크기 1G
spark.local.dir RDD 데이터 저장 혹은 셔플 시 매퍼의 데이터 저장을 하는 경로 /tmp
spark.master 클러스터 매니저 정보 -
spark.submit.deployMode 디플로이 모드 지정(client 혹은 cluster) -

 

2) 실행환경 관련 설정

 

property 명 의미 default
spark.driver.extraClassPath 드라이버 클래스패스에 추가할 항목(SparkConf가 아닌 --driver-class-path로 지정해야 합니다) -
spark.executor.extraClassPath 익스큐터의 클래스패스에 추가할 항목 -
spark.files, spark.jars 각 익스큐터의 실행 dir에 위치한 파일, jars
(, 를 사용하여 여러개 지정 가능합니다.)
-
spark.submit.pyFiles PYTHON_PATH에 추가될 .zip, .egg, .py 파일 
(, 를 사용하여 여러개 지정 가능합니다.)
-
spark.jars.pachages 익스큐터와 드라이버의 클래스패스에 추가될 의존성 jar 정보 -

 

3) 셔플 관련 설정

 

property 명 의미 default
spark.reducer.maxSizeFlight 셔플시 각 리듀서가 읽어갈 때 사용할 버퍼 사이즈 48m
spark.reducer.maxReqslnFlight 리듀서에서 매퍼 결과를 가져갈때 동시에 수행가능항 최대 요청 수 int.MaxValue(2147483647)
spark.shuffle.compress 매퍼의 결과를 압축 유무
(true시 spark.io.compress.codec 지정해야 합니다.)
false
spark.shuffle.service.enabled 외부 셔플 서비스 사용 유무 false

 

4) 스파크 UI 관련 설정

 

property 명 의미 default
spark.eventLog.enabled 스파크 이벤트 로그 수행 유무
(true시 spark.eventLog.dir에 로깅 경로 지정해야합니다 - 스파크 UI에서 확인 가능합니다.)
false
spark.ui.port 스파크 UI 포트 4040
spark.ui.killEnabled 스파크 UI를 통해 job kill 가능 여부 true
spark.ui.retainedJobs 종료된 잡 정보 유지 갯수 -

 

 

5) 압축 및 직렬화 관련 설정

 

property 명 의미 default
spark.broadcast.compress 브로드 캐스트 변수값을 압축할지 유무 true
spark.io.compression.codec 스파크 내부에서 사용할 압축 방법 lz4
spark.kyro.classesToRegister Kyro 직렬화에 사용할 클래스 지정 -
spark.serializer 스파크에서 사용할 객체 직렬화 방식
(스파크에서는 JavaSerializer, KyroSerializer 클래스 제공합니다.)
-

 

 

6) 메모리 관련 설정

 

property 명 의미 default
spark.memory.fraction 스파크 여유/가용 메모리 비율 설정 0.6
spark.memory.storageFraction 스파크 가용공간에서 저장에 사용할 메모리 비용 0.5
spark.memory.offHeap.enabled off 힙 메모리 사용 유무 false

 

 

7) 익스큐터 관련 설정

 

property 명 의미 default
spark.executor.cores 익스큐터에 할당할 코어 수
(얀 경우 1, 나머지는 사용가능한 코어 수)
-
spark.default.parallelism 스파크에서 사용할 파티션 수 -
spark.files.fetchTimeout sparkContext.addFile() 메소드 사용 시 파일 받아오는 limit 시간 60s

 

 

8) 네트워크 관련 설정

 

property 명 의미 default
spark.driver.host, spark.driver.port 드라이버 프로세스의 호스트와 포트 -
spark.network.timeout 스파크의 default 네트워크 타임아웃 값 -

 

 

9) 보안 관련 설정

 

property 명 의미 default
spark.acls.enable 스파크 acl 활성화 여부 false
spark.admin.acls 스파크 잡에 접근가능 user, admin 설정
( , 를 사용하여 다수 등록 가능합니다.
group으로 설정할 시 spark.admin.acls.group 속성을 사용합니다)
-
spark.authenticate 스파크에서 사용자 인증 여부 확인 유무 false
spark.authenticate.secret 잡 실행 시 시크릿 키 정보 설정 -
spark.ui.view.acls, spark.ui.view.acls.groups 스파크 UI에서 잡 조회 acl 정보 -
spark.ui.filters 스파크 UI에 적용할 서블릿 필터 지정
( , 를 사용하여 다수 등록 가능합니다.)
-

 

10) 우선순위

 

스파크 프로퍼티가 적용되는 우선순위들은 아래와 같습니다.

 

  1. 코드 상 SparkConf
  2. spark-shell, spark-submit
  3. spark-defaults.conf 파일

 

 

 

 

반응형

 

 

 

 

 

3. 환경변수

스파크 어플리케이션이 아닌 각 서버마다 적용해야하는 정보는 서버의 환경변수를 사용해야 합니다.

 

환경변수로 설정 가능한 항목은 아래와 같습니다.

 

  • JAVA_HOME : 자바 설치 경로
  • PYSPARK_PYTHON : 파이썬 경로
  • PYSPARK_DRIVER_PYTHON : 파이썬 경로(드라이버에만 적용)
  • SPARK_DRIVER_R : R경로
  • SPARK_LOCAL_IP : 사용할 ip 경로
  • SPARK_PUBLIC_DNS : 애플리케이션 호스트명
  • SPARK_CONF_DIR : spark-defaults.conf, spark-env.sh, log4j.properties 등 설정 파일이 놓인 디렉터리 위치

클러스터 매니저에 따라 각 설정 방법이 달라 현재 어떤것을 사용하고 있는지 확인 후 설정하는것을 권장합니다.

얀으로 클러스터 모드 사용 시 환경변수는 spark-defaults.conf 파일의 spark.yarn.appMasterEnv.[환경변수명] 이용해야 합니다.

 

4. 로깅설정

로깅은 log4j.properties 파일로 설정합니다. -> log4j.properties.template 파일을 복사하여 사용하시면 됩니다.

 

아래는 각 클러스터 매니저 별 로깅파일이 저장되는 경로입니다.

 

클러스터 매니저 로깅 저장 경로
스탠드 얼론 각 슬레이브 노드의 spark 홈 아래 work 디렉토리
메소스 /var/log/mesos
기본 각 노드의 로컬 파일 시스템
(yarn.log-aggregation-enable이 true의 경우 yarn.nodemanager/remote-app-log-dir에 설정된 경로입니다.)

 

5. 스케쥴링

 

스케쥴링이란 클러스터내 자원을 각 Job에게 할당하는 작업입니다.

 

클러스터에서 수행되는 작업은 적당한 cpu, memory를 주는 것이 성능을 최대화 시킵니다.

과도하게 주는 경우 GC, IO, 네트워크 등의 경합이 더 비효율적일 수 있습니다.

 

그렇기 때문에, 하나의 클러스터에서 다수의 잡이 실행되는 경우 스케쥴링을 적절히 선택 및 이용하여 최적의 성능을 맞추어야합니다.

 

2개 이상의 어플리케이션이 한 클러스터에서 동작할 시, 스케쥴링은 크게 고정 자원 할당 방식과 동적 자원 할당 방식이 있습니다.

 

1) 고정 자원 할당 방식

 

고정 자원 할당 방식은 각 애플리케이션마다 할당할 자원을 미리 결정합니다.

 

사용 방법은 위에서 설명한 spark-shell, spark-submit 을 사용할 수 있습니다.

단기간이 아닌 웹같은 장기간 동작하며 이벤트 발생이 있을때, 수행되는 경우에는 비효율적입니다.

 

2) 동적 자원 할당 방식

 

동적 자원 할당 방식은 상황에 따라 자원을 할당 및 회수하는 방식입니다.

 

클러스터 마다 동적 자원 할당 방식이 다르며, 공통으로는 spark.dynamicAllocation.enabled 속성을 true로 해야합니다.

 

클러스터 모드 동적 자원 할당 방식
스탠드얼론 spark.shuffle.service.enabled 속성 true 사용
메소스 1. spark.mesos.coarse, spark.shuffle.service.enabled 속성을 true로 설정
2. 각 워커노드마다 start-mesos-shuffle-service.sh 수행
1. spark--yarn-shuffle.jar를 모든 노드매니저 클래스패스에 등록
2. 각 노드 매니저의 yarn-site.xml 파일에 아래와 같이 속성 설정
    2-1. spark_shuffle=yarn.nodemanager.aux-services
    2-2. yarn.nodemanager.aux-services.spark_shuffle.class=org.apache.spark.network.yarn.YarnShuffleService
    2-3 park.shuffle.service.enabled=true

 

1개의 어플리케이션에서 2개 이상의 Job이 수행되는 경우, FIFO, FAIR 스케쥴링 방법이 있습니다.

 

1) FIFO

 

FIFO는 기본 설정값이며, 수행요청대로 Job이 리소스를 점유하게됩니다.

단시간에 끝나는 잡이 오래 걸리는 잡 뒤에 있을때는 단점인 스케쥴링 방식입니다.

 

2) FAIR

 

FAIR는 의미 그대로 공유하는 설정입니다.

 

사용은 sparkConf에 spark.scheduler.mode=FAIR 로 설정하여 가능합니다.

 

FAIR에서도 우선순위를 조절하고 싶은 경우 pool을 사용 가능하며,

pool 설정은 conf 디렉토리에 fairscheduler.xml 파일에 기재하면 됩니다.

 

아래는 예시입니다.

 

<?xml version="1.0"?>
<allocations>
    <pool name="pool1">
        <schedulingMode>FAIR</schedulingMode>
        <weight>1</weight>
        <minShare>FAIR</minShare>
    </pool>
    <pool name="pool2">
        <schedulingMode>FIFO</schedulingMode>
        <weight>2</weight>
        <minShare>3</minShare>
    </pool>
</allocations>

 

pool 지정 시 사용하는 속성은 아래와 같습니다.

 

속성 의미
schedulingMode 스케쥴링 방법
weight pool간의 우선순위(크기가 큰값이 높습니다.)
minShare pool이 가져야 하는 CPU 코어 수

 

사용은 아래와 같이 하시면 됩니다.

conf.set("spark.scheduler.allocation.file", "pool 설정 파일 경로")

 

6. 마무리

이번에는 스파크 설정에 대해서 포스팅하였습니다.

다음에는 스파크 SQL에 대해 포스팅하겠습니다.

  •  
반응형

'BigData > Spark' 카테고리의 다른 글

(6) 스트럭처 스트리밍  (0) 2020.03.31
(5) 스파크 스트리밍  (0) 2020.03.19
(4) 스파크 SQL  (0) 2020.03.17
(2) RDD  (0) 2020.03.12
(1) 스파크 소개  (0) 2020.03.11
반응형

1. 서론

이번 포스팅에서는 스파크 데이터 모델 중 하나인 RDD에 대해 진행하겠습니다.

 

2. RDD 액션

RDD는 스파크에서 제공하는 데이터 모델입니다.

이 모델은 다양한 데이터처리를 위한 함수를 제공합니다.

 

함수는 크게 트랜스포메이션 연산과 액션 연산으로 나눌 수 있고,

각 연산에 대해 소개하도록 하겠습니다.

 

1) 트랜스포메이션 연산

 

1. map = RDD에 있는 데이터에 지정 연산을 수행한 후 RDD를 반환

2. flatMap = map연산과 비슷하지만 반환 타입이 iterate인 RDD 반환

3. mapPartitions = map연산을 파티션 단위로 연산

4. mapPartiitonsWithIndex = mapPartitions에서 각 파티션의 index정보도 같이 넘기는 연산

5. mapValues = (key, value) 형태의 RDD에만 적용가능하며 value에만 map 연산을 수행 후 RDD를 반환하는 연산

6. flatMapValues = RDD가 (key, value) 형태일 때 위의 flatMap 연산을 value에 적용하고자 할때 사용하는 연산

7. zip = 2개의 RDD를 (key, value)로 묶어주는 연산. RDD의 데이터 크기가 같아야 합니다..

8. zipPartitions = 파티션 단위로 zip연산을 수행하는 연산.

  • zipPartitions연산은 zip 연산과 다르게 최대 4개까지 지정 가능
  • 파이썬에서는 사용 불가.
  • 파티션의 갯수가 같아야 합니다.

9. groupby = RDD의 데이터를 지정한 group 집합으로 묶어 RDD를 반환하는 연산

10. groupbyKey = (key, value) 형태의 RDD를 키 기준으로 group 하여 반환하는 연산

  • (key, sequence[value])로 구성.

11. cogroup = (key, value) 형태의 여러 RDD를 key 기준으로 group 한 후 각 RDD의 sequence[value]를 Tuple로 반환하는 연산

 

ex) = (키, Tuple(RDD1의 sequence[value], RDD2의 sequence[value]))
최대 3개까지 RDD를 group 할 수 있습니다.

 

12. distinct = RDD에서 중복을 제거한 뒤 RDD를 반환하는 연산

13. certesian = 2개 RDD의 카테시안 곱을 한 RDD를 반환하는 연산(M x N)

14. subtract = 2개 RDD에서 1개의 RDD값을 제외한 값들을 반환하는 연산(M - N)

15. union = 2개 RDD의 합집합한 결과를 반환하는 연산(M + N)

16. intersection = 2개의 RDD의 교집합을 반환하는 연산 (M ∩ N)

17. join = 2개의 (key, value) 형태의 RDD를 키 기반으로 join하여 RDD를 반환하는 연산

  • ex ) (키, Tuple(첫번째 RDD의 요소, 두번째 RDD의 요소))
  • join은 inner join입니다. 한마디로 join이 되지 않으면 반환 RDD에 포함 X

18. leftOuterJoin, rightOuterJoin = sql의 left(right)OuterJoin과 비슷한 연산

  • 조인결과가 없을 수도 있어, 주최 RDD가 아니라면 Optional 값으로 반환.
  • ex) (a, (1, None)), (b, (1, Some(2))), (c, (1, Some(2)))

19. subtractByKey = 2개의 (key, value) 형태의 RDD에서 같은 key를 제외하고 RDD를 반환하는 연산.

20. reduceByKey = 2개의 RDD에서 같은 key로 병합하여 RDD를 반환하는 연산.

21. foldByKey = reduceByKey 연산에서 초기값을 부여하는 옵션이 추가된 연산.

22. combineByKey = 반환 RDD의 값 타입이 변경될수 있는 연산

  •  첫번째 인자(초기값을 위한 함수)
  •  두번째 인자(RDD의 각 파티션에서 수행할 함수)
  •  세번째 인자(각 파티션들을 결합하는 함수)

23. aggregateByKey = combineByKey연산의 첫번째 인자가 함수가 아닌 값으로만 변경된 연산.

24. pipe = map연산에서 외부 프로세스를 사용하는 연산.

  • ex) rdd.pipe("cut -f 1,3 -d ,") -> cut 유틸 사용.

25. repartitionAndSortWithinPartitions = 파티션 갯수 조절 후 각 파티션에서 정렬한 뒤 RDD를 반환하는 연산.

26. partitionBy = RDD의 값들을 특정 파티션으로 옮기고 싶을때 사용하는 연산

  • 셔플링이 발생하는 연산으로 성능에 크게 영향이 가므로 잘 판단하여 사용해야 합니다.

27. sortByKey = 키 값을 기준으로 정렬한 후 RDD 를 반환하는 연산.

28. keys, values = 자바 map의 keys, values와 같은 의미의 연산

 

 

 

 

 

반응형

 

 

 

 

 

2) 액션연산

 

1. collect = RDD 데이터를 배열로 반환하는 연산

2. count = RDD 데이터의 갯수를 반환하는 연산

3. first = RDD의 첫번째 요소를 반환하는 연산

4. take = RDD의 첫번째 요소부터 n개를 반환하는 연산

5. countByValue = 값을 각 카운팅하여 map으로 반환하는 연산

  • [1, 1, 2, 3, 3] => Map({1:2, 2:1, 3:2})

6. reduce = 2개의 RDD요소를 하나로 합치는 연산

  • 입력과 출력의 타입이 같아야 합니다.

7. fold = reduct 연산에서 초기값을 지정할 수 있는 연산

  • 입력과 출력의 타입이 같아야 합니다.

8. aggregate = reduce와 fold를 합친 연산

  • 첫번째 인자(초기값)
  • 두번째 인자(RDD의 각 파티션에서 수행할 병합함수)
  • 세번째 인자(각 파티션들을 결합하는 함수)
  • 입력과 출력의 타입이 달라도 됩니다.

9. sum = RDD의 요소의 합을 반환하는 연산

  • int, double, long 등 숫자 타입의 RDD에서만 가능합니다.

10. forEach, forEachPartition = map, mapPartition과 기능은 동일하나 반환값이 없는 점이 다른 연산

  • 이 연산은 각 개별 노드에서 수행됩니다.

11. toDebugString = RDD의 세부 정보를 파악하기 위한 연산

 

3. 기타 액션 

1. coalesce, repartition = 두 연산 모두 RDD의 파티션 크기를 조정하는 연산

  • coalesce = 파티션 갯수를 줄이기만 가능합니다.
  • repartition = 파티션 갯수를 늘리고 줄이기 모두 가능합니다.
  • repartition은 무조건 셔플이 발생되며, coalesce은 지정했을때만 셔플이 발생합니다.
  • 셔플 연산은 비용이 큰 연산이니 고려하여 사용해야 합니다.

2. cache, persist, unpersist =

  • cache, persist는 첫 액션연산 후 RDD 정보를 메모리 또는 디스크에 저장하는 메소드입니다.
  • unpersist는 필요없는 데이터를 캐시에서 제거할때 사용합니다.
  • cache, persist는 정보를 메모리에 올리게 되면 빠른 연산을 할 수 있게 도와줍니다.
    • 단, 너무 많이 올리게 되면 GC대상이 되어 악영향을 줄 수도 있으니 이 메서드도 함부로 남발해서는 안됩니다.

4. 마무리

이번에는 스파크 데이터 모델 중 하나인 RDD 대해서 포스팅하였습니다.

다음에는 스파크 설정에 대해 포스팅하겠습니다.

기존 책에서는 3장이 클러스터 구축이나, Hadoop 카테고리에서 Hadoop 설치 를 따라하시면 구축 및 스파크도 설치되어 있습니다. 

 

반응형

'BigData > Spark' 카테고리의 다른 글

(6) 스트럭처 스트리밍  (0) 2020.03.31
(5) 스파크 스트리밍  (0) 2020.03.19
(4) 스파크 SQL  (0) 2020.03.17
(3) 스파크 설정  (0) 2020.03.13
(1) 스파크 소개  (0) 2020.03.11
반응형

1. 서론

Hadoop eco system에서 많이 사용하는 Spark에 대해 공부한 내용을 공유하고자 합니다.

책은 빅데이터 분석을 위한 스파크 2 프로그래밍 을 통해 공부하였습니다.

 

이번 포스팅에서는 1장인 스파크 소개부분을 진행하도록 하겠습니다.

 

2. 스파크 소개

 

1) 스파크란?

 

스파크는 하둡의 mapreduce를 보완하고자 나온 메모리 기반 대용량 처리 프레임워크입니다.

spark는 스칼라로 개발되었습니다.

 

특징으로는 아래와 같습니다.

 

  • 하둡과 달리 파일이 아닌 메모리를 이용하여 데이터 저장방식 제공.
  • 자바, 파이썬, 스칼라 언어 지원 및 다른 오픈소스들과의 플러그인이 많아 유용.
  • 스트리밍, 머신러닝에서도 활용할 수 있도록 다양한 라이브러리 제공.

 

2) RDD, 데이터 프레임, 데이터셋

 

스파크에서는 데이터를 처리하기 위한 모델로 RDD, 데이터 프레임, 데이터셋 3가지를 제공합니다.

 

  • RDD란?
    스파크에서 정의한 분산 데이터 모델로서 병렬처리가 가능하고 스스로 에러를 복구할 수 있는 모델.
    input 데이터를 이 RDD라는 모델로 만들어 데이터 핸들링을 하게됩니다.
    • 데이터 복구가 가능한 이유로는 RDD생성작업을 기록하기 때문입니다.
    • RDD를 생성할 수 있는 방법은 3가지가 존재
      1. 프로그램의 memory에 있는 데이터.
      2. 로컬, hdfs에 있는 외부 파일
      3. 또 다른 RDD로부터.
  • 데이터 프레임이란?
     DataSet[Row]를 데이터 프레임이라고 합니다.
    여기서 Row는 스파크 lib에서 정의한 클래스라고 생각하면 됩니다.
  • 데이터 셋이란?
    = 데이터프레임의 진화형 모델.
    DataSet[CustomTypeModel] 과 같이 데이터 모델을 custom하게 정의하여 사용할 수 있도록 typed 모델입니다.
RDD를 low api라고 하면 데이터 프레임은 high api라고 보면 됩니다.
또한, 데이터 프레임을 사용한다면 언어의 성능 이슈를 해결가능합니다.
(스파크가 JVM 기반 언어인 스칼라로 만들어져 있기 때문에 파이썬과 같은 다른 언어의 이종 프로세스간의 성능 이슈가 발생하게 됩니다.
하지만, 데이터 프레임은 개발자가 정의한 모델이 아닌 스파크가 제공하는 모델이기 때문에 성능 이슈가 없습니다.)

 

 

 

 

 

반응형

 

 

 

 

 

 

3) 트랜스포메이션 연산과 액션연산

 

스파크에서는 데이터 처리를 위해 정의한 연산이 2가지가 있습니다.

 

  • 트랜스포메이션 연산이란?
    = 어떤 RDD에 변형을 가해 새로운 RDD를 생성하는 연산.
  • 액션 연산이란?
    = 연산의 결과로 RDD가 아닌 다른 값을 반환하는 연산.
  • lazy 실행
    = 스파크는 트랜스포메이션의 meta만을 가지고 있습니다.
    액션연산 수행 시 실제 트랜스포메이션 연산의 meta 순서대로 연산을 시작합니다.
    이점이 바로 연산의 최적화를 찾아 수행하는 이유이며, 에러 시 복구가 가능한 이유입니다.

 

4) sparkContext

 

컨텍스트라고 일컬어지는것은 대부분 어떠한 일을 대신 수행해주는 것을 의미합니다.

그렇기 때문에 스파크 컨텍스트는 아래의 일을 담당하게됩니다.

 

  • 스파크 애플리케이션과 클러스터의 연결을 관리
  • RDD를 생성

 

5) partition

 

partition이란 스파크 클러스터에서 데이터를 관리하는 단위입니다.

HDFS 기반으로 사용하게 된다면 데이터 block당 한 개의 partition이 생성됩니다.

 

6) 드라이버 프로그램, 워커 노드, Job, Executor

 

1. 드라이버 프로그램

= 스파크 컨텍스트를 생성한 프로그램을 의미합니다.

 

2. 워커 노드

= 실제 데이터 처리를 수행하는 서버입니다.

 

3. Job

= 데이터 핸들링을 하는 일련의 작업을 의미합니다.

 

4. Executor

= 워커 노드에서 실행되는 프로세스입니다.
쓰레드가 아닌 프로세스이기 때문에 각 executor들 끼리는 영향을 끼칠 수 없습니다.

 

3. 스파크 구성도

일반적인 구성도는 아래와 같습니다.

 

 

1. sparkContext를 통해 클러스터에게 Job 제출.

2. 각 워커노드에서 Executor 생성

3. Executor에서 제출한 Job 수행

4. DAG

DAG는 Directed Acyclic Graph의 약어로 cycle이 발생하지 않는 그래프입니다.

spark에서는 데이터 처리를 이 DAG 형식으로 스케쥴링이 이루어집니다.

그렇기 때문에 실패가 일어나게 되면 일어난 지점에서 다시 DAG를 수행하여 데이터 손실을 없도록 합니다.

5. 마무리

이번에는 간략히 스파크 소개에 대해서 포스팅하였습니다.

다음에는 소개에서 언급한 RDD에 대해 좀 더 깊게 포스팅하겠습니다.

 

반응형

'BigData > Spark' 카테고리의 다른 글

(6) 스트럭처 스트리밍  (0) 2020.03.31
(5) 스파크 스트리밍  (0) 2020.03.19
(4) 스파크 SQL  (0) 2020.03.17
(3) 스파크 설정  (0) 2020.03.13
(2) RDD  (0) 2020.03.12
반응형

1. 서론

이번 포스팅에서는 Hadoop 시스템 서비스 중 하나인 Hive에 대해 설명 및 간단한 예제를 진행하려고 합니다.

 

2. Hive

Hive는 페이스 북에서 개발 되었습니다.

Hive는 하둡에 저장된 데이터를 sql 기반으로 간편히 mapreduce를 수행할 수 있으며,

추가로 elasticsearch, hbase와 같은 저장소와 hdfs상의 중간 인터페이스 역할도 제공합니다.

 

hdfs에 있는 파일은 일반적으로 tab 혹은 , 으로 구분자를 통해 각 자리에 의미있는 데이터가 있습니다.

이러한 구분자를 기반으로 테이블을 생성해주고, select, insert쿼리를 수행할 수 있게 해줍니다.

단, update 와 delete쿼리는 지원되지 않습니다.
이유는, 결국 데이터는 구분자를 통한 파일의 한줄이기 때문입니다.
그 말은, DB와 같이 인덱싱이 걸려있지도, key가 있지도 않다는 의미입니다.

 

테이블은 아래와 같이 2가지가 존재합니다.

 

  • managed table
  • external table

 

1) managed table

 

hdfs의 데이터를 Hive에서 관리한다는 의미를 가지고 있습니다.

그렇기 때문에, drop table 질의를 하는 경우 hdfs의 데이터도 사라지게 됩니다.

 

2) external table

 

hdfs의 데이터와 hive 테이블과 심볼릭 링크가 걸린다고 생각하시면 됩니다.

그렇기 때문에, drop table 질의를 하는 경우에도 hdfs의 데이터는 유지가 됩니다.

 

3. 예제

예제로는 hdfs에 있는 데이터를 external 테이블로 만든 후, sql를 통하여 managed table로 만들고 두 테이블을 모두 drop 했을때의 hdfs 데이터 상태를 봐보도록 하겠습니다.

 

1) 데이터 준비

 

저는 아래와 같이 \t 구분자로 데이터를 hdfs:/user/young에 놓았습니다.

 

kim     1
geon    2
young   3
kim     1
geon    2
yeong   3

 

2) external table 생성

 

Hive에 접속하기 위해 hive 명령어를 통해 접속합니다.

그 후, 기본적으로 있는 default DB를 사용하도록 하겠습니다.

 

 

이제 아래와 같이 external 테이블을 생성합니다.

CREATE EXTERNAL TABLE temp
 (
    text string,
    count int
 )
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '\t'
 STORED AS TEXTFILE
 LOCATION '/user/young';

ROW FORMAT DELIMITED FIELDS TERMINATED BY 에는 한 라인에서 데이터를 나눌 구분자를 지정합니다.

저의 경우 \t으로 데이터를 만들었기 때문에 '\t'를 지정합니다.

 

LOCATION 에는 연결을 원하는 hdfs상 디렉토리경로를 지정합니다.

 

아래와 같이 테이블을 생성하게 되면 테이블 목록에 보이게 되며,

select 쿼리를 통하여 데이터를 조회할수도 있습니다.

 

 

 

 

 

 

반응형

 

 

 

 

 

 

3) managed table 생성

 

이젠 이 external table을 사용하여 새로운 managed table을 만들어보겠습니다.

 

쿼리는 아래와 같습니다.

 

CREATE TABLE temp_managed
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/user/young2'
AS
SELECT text, sum(count)
FROM temp
GROUP BY text
;

 

text로 group by 로 묶어서 count를 더하는 wordcount의 reducer의 역할을 하도록 쿼리하였고,

결과를 /user/young2 디렉토리에 놓도록 지정하였습니다.

managed 테이블은 단순히 external 이 아닌 create table로 테이블을 만들면 됩니다.

 

쿼리가 수행 후에는 아래와 같이 테이블 목록 및 hdfs에도 파일이 생성된것을 확인할 수 있습니다.

 

 

4) external table drop

 

이번에는 처음 만들었던 external table인 temp를 drop 시킨 후 hdfs의 파일을 봐보도록 하겠습니다.

 

쿼리는 아래와 같습니다.

drop table temp;

 

결과로는 아래와 같이 hdfs상의 데이터에는 영향이 없는것을 볼 수 있습니다.

이것은, 단순히 hdfs와 hive table간의 link만 끊은 것이기 때문입니다.

 

 

5) managed table drop

 

이번에는 managed table인 temp_managed 를 drop 시킨 후 hdfs 파일을 보도록 하겠습니다.

 

쿼리는 아래와 같습니다.

 

drop table temp_managed;

 

결과는 아래와 같이 hdfs의 데이터도 모두 삭제되어 No such File 구문이 나온것을 볼 수 있습니다.

 

 

추가로, Hive는 sql을 통해 간편히 데이터를 핸들링하기 위한 인터페이스이고, 모든 작업은 mapreduce로 이루어집니다.

 

 

4. 하이브 지원 SQL

 

하이브에서만 지원되는 sql 들이 있습니다.

 

그 중, 알아두면 유용할 정렬에 관련해서 아래 3개에 대해 정리하였습니다.

 

1) sort by

 

order by 와 비슷하게 느낄수도 있습니다.

하지만 order by의 경우에는 전체 결과를 하나의 리듀서가 정렬을 하게되는 반면,

sort by의 경우에는 각 리듀서에서 정렬을하게 됩니다.

 

그로인해, 리듀서 갯수 multiple이기 때문에 성능에 이점을 볼 수 있습니다.

단, 각 리듀서에서만 정렬이 이루어지기 때문에 정확히 정렬된  결과물은 보장되지 않습니다.

>리듀서 갯수를 한개로 한다면 order by 와 동일한 결과가 나옵니다.

 

2) distribute by

 

distribute by로 지정한 칼럼 기준으로 한 리듀서에 지정한 동일한 칼럼값의 데이터가 가도록 나누는 역할을 합니다.

 

예로, 아래와 같은 데이터가 있을 시

name age
kim 1
geon 2
yeong 2

distribute by age 를 하게되면 한 리듀서에는 (kim, 1)를 받게되고, 다른 리듀서는 (geon, 2), (yeong, 2)를 받게됩니다.

 

3) cluster by

 

cluster by는 distribute by와 sort by를 모두 한다고 보면 됩니다.

 

cluster by age를 하게되면, 각 리듀서는 age 기준으로 데이터 묶음을 받게되고 각 리듀서는 age기준으로 정렬하게 됩니다.

5. 마무리

이번 포스팅에서는 Hive에 대해 포스팅 하였습니다.

 

이렇게, Hadoop에 대한 포스팅을 완료했습니다.

다음 포스팅에서는 Hadoop eco system에서 많이 쓰이는 Spark에 대해 진행하도록하겠습니다.

 

감사합니다.

 

반응형

'BigData > Hadoop' 카테고리의 다른 글

(3) Hadoop 사용법  (0) 2020.03.09
(2) Hadoop 설치  (0) 2020.03.05
(1) Hadoop에 대해  (0) 2020.03.04
반응형

1. 서론

이번 포스팅에서는 간단히 Hadoop의 hdfs와 mapreduce를 진행해보겠습니다.

2. hdfs

hdfs 는 기본적으로 알고계신 파일 시스템이라고 보셔도 무방합니다.

단, hdfs는 설정값에 따라 replication과 파일을 block 단위로 관리합니다.

간단히 진행할 hdfs 의 명령어는 아래와 같습니다.

 

  • cat 
  • appendToFile
  • ls
  • copyFromLocal
  • copyToLocal
  • count
  • cp
  • du
  • expunge
  • get
  • getfacl
  • getmerge
  • mkdir
  • moveFromLocal
  • mv
  • put
  • rm
  • rmr
  • setfacl
  • setfattr
  • setrep
  • stat
  • tail
  • test
  • text
  • touchz

먼저 위 명령어들을 알아보기 전에 간편히 작업하기 위해

아래 사진과 같이 hdfs 권한 검사의 체크를 제거해주세요.

파일 권한 체크를 해제하는 옵션입니다.

 

1) cat

 

리눅스의 cat과 동일합니다.

 

아래는 sample.txt 파일을 cat으로 확인한 예제입니다.

 

2) appendToFile

 

Local 파일을 hdfs 파일에 append 하기위한 명령어

 

아래는 로컬의 young이 들어있는 sample2.txt 파일을 hdfs의 sample.txt 에 append한 후 cat으로 확인하는 예제입니다.

 

3) ls

 

특정 디렉토리의 파일, 디렉토리를 보여주는 명령어

 

아래는 ls 명령어 예제입니다.

 

 

4) copyFromLocal

 

로컬 파일을 hdfs 로 복사하는 명령어 입니다.

 

아래는 로컬 sample3.txt 파일을 hdfs에 복사하는 예제입니다.

 

 

5) copyToLocal

 

이번엔 반대로 hdfs에 있는 파일을 로컬로 복사하는 명령어 입니다.

 

아래는 hdfs의 sample.txt 파일을 로컬로 복사하는 예제입니다.

 

6) count

 

hdfs의 디렉토리, 파일의 갯수를 카운트하여 보여주는 명령어 입니다.

 

아래는 /user/young의 count한 결과 사진입니다.

 

count는 [디렉토리 갯수, 파일 갯수, 전체 사이즈] 로 보여줍니다.

 

7) cp

 

cp는 hdfs내에서 복사하는 경우 사용하는 명령어입니다.

 

아래는 /user.young의 sample.txt 파일을 /user/geonyeong 디렉토리로 복사한 예제입니다.

 

 

8) du

 

특정 디렉토리 혹은 파일의 사이즈를 보여줍니다.

 

아래는 /user/young의 사이즈를 보여주는 예제입니다.

각 파일의 사이즈와 replication * 파일 size 인 총합도 보여줍니다.
아래 예제의 경우에는 replication이 3으로 filesize * 3이 노출되게 됩니다.

 

9) expunge

 

완전 삭제입니다.

 

hdfs dfs -expunge

 

 

 

 

 

반응형

 

 

 

 

 

 

 

10) get

 

copyToLocal과 같은 명령어입니다.

 

아래는 get 명령어의 예제입니다.

 

 

11) getfacl

 

hdfs의 특정 디렉토리 혹은 파일의 ACLs를 보여주는 명령어입니다.

 

아래는 getfacl 명령어의 예제입니다.

 

 

12) getmerge

 

hdfs의 파일을 append한 후 로컬로 다운로드받는 명령어입니다.

 

아래는 예제입니다.

 

 

13) mkdir

 

디렉토리를 생성하는 명령어

 

아래는 예제입니다.

 

 

14) moveFromLocal

 

로컬파일을 hdfs에 올립니다.

copy가 아닌 mv로 로컬 파일은 삭제됩니다.

 

아래는 예제입니다.

 

 

15) mv

 

hdfs 상에서 mv 명령어 입니다.

 

아래는 예제 입니다.

 

 

 

16) put

 

copyFromLocal 명령어와 동일합니다.

 

 

17) rm

 

hdfs 파일을 삭제 합니다.

 

아래는 예제입니다.

 

18) setfacl

 

hdfs의 파일, 디렉토리의 ACLs를 setting 하는 명령어입니다.

 

19) test

 

파일이 존재하는지 체크하는 명령어입니다.

아래는 명령어 옵션값입니다.

 

-e : 파일이 존재하면 return 0

-z : 파일 길이가 0이면 return 0

-d : 디렉토리면 return 0

 

아래는 예제입니다.

 

 

3. mapreduce

mapreduce를 수행하기 위해선 mapper와 reducer가 있는 빌드된 fat jar가 있어야합니다.

1. mapper만 존재해도 됩니다.
2. spring의 boot jar는 안됩니다. 이유는 맵리듀스가 mapper와 reducer의 full packager경로로 찾아야 하는데 boot jar의 경우에는 찾을 수 없기 때문입니다.

저는 mapreduce의 대표적인 예제인 wordcount를 진행하도록 하겠습니다.

 

우선 아래와 같이 예제 jar를 다운 및 압축을 해제합니다.

 

wget http://www.java2s.com/Code/JarDownload/hadoop-examples/hadoop-examples-1.2.1.jar.zip
unzip hadoop-examples-1.2.1.jar.zip

 

그후, 저는 아래와 같은 파일을 hdfs:/user/young에 올렸습니다.

 

 

이제 아래와 같이 wordcount 작업을 yarn에게 제출합니다.

 

hadoop jar <jar path> <main class> [input file path] [output directory path]

 

hadoop jar hadoop-examples-1.2.1.jar wordcount /user/young/sample.txt /user/young2

 

그럼 이제 아래와 같이 mapreduce 작업이 진행됩니다.

 

 

작업이 모두 완료되면 /user/young2에 결과 파일이 생성된것을 볼 수 있습니다.

 

 

파일 내용은 아래와 같이 wordcount 결과입니다.

 

 

4. 마무리

이번 포스팅에서는 hadoop의 hdfs와 mapreduce에 대해 간단한 예제를 진행하였습니다.

다음 포스팅에서는 hadoop 서비스 중 하나인 hive에 대해 간랸히 소개 및 예제를 진해하도록 하겠습니다.

 

반응형

'BigData > Hadoop' 카테고리의 다른 글

(4) Hive  (0) 2020.03.10
(2) Hadoop 설치  (0) 2020.03.05
(1) Hadoop에 대해  (0) 2020.03.04
반응형

1. 서론

이번 포스팅에서는 Hadoop Cluster 설치를 진행하겠습니다.

 

Hadoop 설치는 직접 각 서버에 들어가, Hadoop 패키지를 다운받아 모든 설정파일을 직접 수정하여 설치 할 수 있습니다.

 

하지만, 너무 번거롭고 설정 파일을 잘못 세팅하면 제대로 Hadoop Cluster 설치가 되지 않습니다.

 

이런 까다로운 설치로 인해, Hadoop Cluster 설치 및 관리를 간편하게 제공하는 서비스가 별도로 있습니다.

 

해당 서비스를 제공하는 기업은 아래 2가지가 있습니다.

 

  • Cloudera
  • Hortonworks

 

저는 위 2가지 중 Cloudera 기업의 Cloudera Manager를 통해 Hadoop Cluster 구축을 진행하도록 하겠습니다.

 

참고로, 2019년 1월 Hortonworks는 Cloudera로 합병이 되었습니다.

 

2. Hadoop Cluster 설치

저는 총 4대로 Hadoop Cluster를 구축하도록 하겠습니다.

추가로, sudo가 가능한 계정으로 진행하겠습니다.

 

1) 터널링

 

먼저, 각 서버들을 터널링해야 합니다.

 

ssh-keygen

 

1. ssh-keygen 명령어를 통해 각 서버에 public_key (id_rsa.pub)와 private_key (id_rsa)를 만들어 줍니다. (key 는 ~/.ssh 디렉토리에 생성됩니다.)

 

2. 각 서버의 public_key를 ~/.ssh/authorized_keys 파일에 추가해줍니다.

 

아래와 같이 authorized_keys 파일에 각 서버의 public_key 정보를 넣어주었습니다.

각 서버에도 동일하게 적용하여 줍니다.
mr 수행 시 각 데이터 노드는 연결해야 하기때문에 모든 노드는 터널링이 되어있어야합니다.

 

 

3. 수동 터널링 진행

 

처음 ssh 터널링 진행시 yes or no를 물어보는 구문이 나오며, yes를 통해 터널링이 이루어져야 합니다.

이를 위해서, 최초 한번 수동으로 터널링을 진행해야 합니다.

 

아래와 같이 스크립트를 만들어 진행하였습니다.

 

2)  symbolic link

 

cloudera manager은 hdfs, mapreduce 등 서비스들의 default 경로를 root directory 하위에 잡게됩니다.

 

대부분의 서버는, root가 아닌 별도 디렉토리에 disk가 넉넉히 마운트가 되어 있습니다.

 

저의 경우에도 아래와 같이 root에는 10GB만이 마운트되어 있는 조그만 VM을 사용하고 있어,

symbolic link를 걸어주도록 하겠습니다.

 

 

저는 아래와 같이 스크립트를 만들었습니다.

sudo mkdir -p /home1/irteam/opt/cloudera
sudo mkdir -p /home1/irteam/var/log
sudo mkdir -p /home1/irteam/var/lib
sudo mkdir -p /home1/irteam/dfs
sudo mkdir -p /home1/irteam/yarn
sudo mkdir -p /home1/irteam/impala
sudo mkdir -p /home1/irteam/var/log/cloudera-scm-alertpublisher
sudo mkdir -p /home1/irteam/var/log/cloudera-scm-eventserver
sudo mkdir -p /home1/irteam/var/lib/cloudera-scm-eventserver
sudo mkdir -p /home1/irteam/var/log/cloudera-scm-firehose
sudo mkdir -p /home1/irteam/var/lib/cloudera-host-monitor
sudo mkdir -p /home1/irteam/var/lib/cloudera-service-monitor
sudo mkdir -p /home1/irteam/var/log/hadoop-hdfs
sudo mkdir -p /home1/irteam/var/log/hive
sudo mkdir -p /home1/irteam/var/log/hue
sudo mkdir -p /home1/irteam/var/log/oozie
sudo mkdir -p /home1/irteam/var/log/spark
sudo mkdir -p /home1/irteam/var/log/hadoop-mapreduce
sudo mkdir -p /home1/irteam/var/lib/hadoop-yarn/yarn-nm-recovery
sudo mkdir -p /home1/irteam/var/log/hadoop-yarn
sudo mkdir -p /home1/irteam/var/lib/zookeeper
sudo mkdir -p /home1/irteam/var/log/zookeeper
sudo mkdir -p /home1/irteam/tmp

sudo ln -s /home1/irteam/opt/cloudera /opt/cloudera
sudo ln -s /home1/irteam/var/log/ /var/log
sudo ln -s /home1/irteam/var/lib/ /var/lib
sudo ln -s /home1/irteam/dfs/ /dfs
sudo ln -s /home1/irteam/yarn/ /yarn
sudo ln -s /home1/irteam/impala/ /impala
sudo ln -s /home1/irteam/var/log/cloudera-scm-alertpublisher /var/log/cloudera-scm-alertpublisher
sudo ln -s /home1/irteam/var/log/cloudera-scm-eventserver /var/log/cloudera-scm-eventserver
sudo ln -s /home1/irteam/var/lib/cloudera-scm-eventserver /var/lib/cloudera-scm-eventserver
sudo ln -s /home1/irteam/var/log/cloudera-scm-firehose /var/log/cloudera-scm-firehose
sudo ln -s /home1/irteam/var/lib/cloudera-host-monitor /var/lib/cloudera-host-monitor
sudo ln -s /home1/irteam/var/lib/cloudera-service-monitor /var/lib/cloudera-service-monitor
sudo ln -s /home1/irteam/var/log/hadoop-hdfs /var/log/hadoop-hdfs
sudo ln -s /home1/irteam/var/log/hive /var/log/hive
sudo ln -s /home1/irteam/var/log/hue /var/log/hue
sudo ln -s /home1/irteam/var/log/oozie /var/log/oozie
sudo ln -s /home1/irteam/var/log/spark /var/log/spark
sudo ln -s /home1/irteam/var/log/hadoop-mapreduce /var/log/hadoop-mapreduce
sudo ln -s /home1/irteam/var/lib/hadoop-yarn/ /var/lib/hadoop-yarn
sudo ln -s /home1/irteam/var/log/hadoop-yarn /var/log/hadoop-yarn
sudo ln -s /home1/irteam/var/lib/zookeeper /var/lib/zookeeper
sudo ln -s /home1/irteam/var/log/zookeeper /var/log/zookeeper

sudo chmod -R 777 /home1/irteam

 

cloudera에서 default경로로 사용하고 있는 /var/log, /var/lib 등을 home1/irteam 디렉터리 하위로 link를 생성했습니다.

 

3) /etc/hosts 수정

 

/etc/hosts에 각 서버의 ip와 hostname을 등록해 줍니다.

 

저는 아래와 같이 등록해주었습니다.

 

 

 

 

 

반응형

 

 

 

 

 

 

 

 

4) cloudera manager installer binary 다운로드 및 실행

 

이제 사전준비는 끝났습니다.

실제로 Cloudera Manager를 다운받아 실행하도록 하겠습니다.

 

저는 6.3.0 버전을 다운받아 진행하도록 하겠습니다.

(저는 4번 서버에서 실행하도록 하겠습니다.)

 

아래와 같이 다운로드 및 실행하여 주세요.

wget https://archive.cloudera.com/cm6/6.3.0/cloudera-manager-installer.bin
chmod 755 cloudera-manager-installer.bin
sudo ./cloudera-manager-installer.bin

 

 

실행한다면 아래와 같이 뜰것입니다.

 

모두 next 및 yes를 눌러 설치를 진행해 주세요.

 

 

위의 설치 진행이 끝난다면 7180 port로 web ui에 접근할 수 있습니다. ( default id, password는 admin/admin 입니다. )

 

아래는 ui에 최초 접근했을때의 화면입니다.

 

 

admin/admin으로 로그인 후 next를 눌러 넘어가줍니다. ( edition을 선택하는 화면에서는 무료를 선택합니다. )

 

계속 next를 누르다 보면 Specify Hosts 부분이 나오게됩니다.

 

여기에서는 각 서버들을 입력해줍니다.

 

저의 경우는 1~3번 서버만 설치진행하도록 하겠습니다. ( 4번 서버는 manager 전용 서버로 사용하기 위해서 입니다. )

 

 

각 서버를 line by line으로 입력하여 검색버튼을 누르면 검색됩니다.

 

그 후, cloudera manager에서 default로 권장하는 parcel와 cdh 버전을 선택하여 next 해줍니다.( 기본적으로 선택되어져 있습니다. )

 

이제 설치하기 위해 각 서버에 접근하기 위해 위에서 ssh 터널링 했던 정보를 넣습니다.

 

아래와 같이 계정은 sudo가 가능한 계정을 기입하며, cloudera manager가 설치된 서버의 private key 파일을 업로드합니다.

 

 

입력 후 next 버튼을 누릅니다.

 

아래와 같이 각 서버에 설치가 진행되는 화면으로 이동하게 됩니다.

 

 

agent가 설치가 되었으면 next를 눌러 parcel도 설치를 진행합니다.

parcel까지 설치가 완료되었다면 Inspect Cluster가 나옵니다.

 

여기서 I understand the risks, let me continue with cluster setup. 를 선택하여 설치를 진행해주세요.

 

이제 Select Services화면이 나오고, 여기는 제가 설치할 서비스들을 선택하는 화면입니다.

 

저의 경우, 향후 spark를 사용할것을 고려하여 Data engineering을 선택하겠습니다.

 

 

다음은 각 호스트별로 어떤 서비스를 실행시킬지 선택하는 화면입니다.

 

아래와 같습니다.

 

 

이후, 계속 next를 눌러 서비스를 설치 및 실행시켜줍니다.

(중간에 DB 설정이 있는데 기본으로 내장되어 있는 posrgresql을 사용하였습니다.) 

 

이제 설치가 완료되었습니다.

 

설치가 완료된다면 아래와 같이 주의 표시들이 있을 텐데요.

 

이것은 heap log 디렉토리를 cloudera manager가 default로 /tmp로 잡아서입니다.

이경우, 아래와 같이 해당 서비스의 구성 텝에서 바라보는 디렉토리를 변경할 수 있습니다.

 

 

hdfs의 경우 default block replication 수는 3입니다.
저의 경우 데이터 노드를 2개만 지정하였기 때문에, 계속 경고가 떠있을 텐데요.
이를 위해, 추가로 4번 호스트를 hdfs의 데이터 노드, yarn의 node manager의 역할로 추가하였습니다.

3. 마무리

이번 포스팅에서는 간략히 Hadoop 설치를 진행해봤습니다.

 

다음 포스팅에서는 간단한 Hadoop 사용법을 진행하도록 하겠습니다.

반응형

'BigData > Hadoop' 카테고리의 다른 글

(4) Hive  (0) 2020.03.10
(3) Hadoop 사용법  (0) 2020.03.09
(1) Hadoop에 대해  (0) 2020.03.04

+ Recent posts