Apache Zeppelin이란 Spark를 Web 기반 NoteBook으로 간편히 수행할 수 있게 제공하는 어플리케이션입니다.
ide를 통해 할 수도 있지만 Web 기반으로 어디서든 접근하여 간편히 Spark 로직을 생산할 때 많이 사용하는 도구입니다.
3. 설치 및 CDH와 연동
CDH ( = Cloudera's Distribution for Hadoop )에서 사용하는 CM (= Cloudera Manager)에서는 안타깝게도 Apache Zeppelin 설치를 제공하고 있지 않습니다.
HDP ( = Hortonworks Data Platform ) 에서 사용하는 Ambari 에서는 지원하고 있습니다. Ambari에서 설치 법은 아래 URL에 나와있습니다. https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/installing-zeppelin/content/installing_apache_zeppelin.html
때문에, 수동으로 설치하여 CDH와 연동하여 사용하여야 합니다.
1) 다운로드
저는 CDH 중 호스트 한개를 선택하여 설치를 진행하였습니다.
먼저 아래와 같이 zeppelin 을 다운받습니다.
( 현재 포스팅 시점에서는 zeppelin-0.9.0-preview1 가 최신 버전이지만 많이 사용하는 0.8.1 버전으로 진행하도록 하겠습니다. )
대게, 스트리밍이라하면 실시간 처리로 알고 있습니다. 스파크는 마이크로 배치로 짧게 여러번 수행하여 스트리밍 처리를 제공합니다.
2. 주요 용어
1) 스트리밍 컨텍스트
스파크 스트리밍을 수행하기 위해서는 스트리밍 모듈에서 제공하는 스트리밍 컨텍스트를 사용해야 합니다.
아래는 스트리밍 컨텍스트를 생성하고 사용하는 예제입니다. - (스칼라)
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("StreamingSample")
conf.set("spark.driver.host", "127.0.0.1")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(3))
val rdd1 = sc.parallelize(List("Spark Streaming Sample ssc"))
val rdd2 = sc.parallelize(List("Spark Queue Spark API"))
val inputQueue = Queue(rdd1, rdd2)
val lines = ssc.queueStream(inputQueue, true)
val words = lines.flatMap(_.split(" "))
words.countByValue().print()
ssc.start()
ssc.awaitTermination()
예제에서 볼 수 있듯이 StreamingContext는 SparkContext에서 생성 할 수 있습니다.
추가로, 어느 주기로 수행할 지의 정보도 같이 넘겨야 합니다.
예제에서는 Seconds(3)을 통해 3초에 한번씩 수행되도록 하였습니다.
그리고, 마지막에 있는 start 메소드를 실행해야만 스파크 스트리밍은 수행됩니다.
또한, awaitTermination 를 통해 임의로 애플리케이션이 종료되지 않도록 합니다.
종료는 개발자가 직접 어떤 시점 혹은 상황에 하도록 추가하여야 합니다.
2) DStream (Discretized Streams)
DStream은 RDD 와 같이 스파크에서 스트리밍을 위해 제공하는 데이터 모델입니다.
단순하게 RDD의 시퀀스로 이해하시면 되며, 지정한 배치 간격마다 input 에서 데이터를 가져와 DStream으로 변경하면서 처리하게 됩니다.
설정값은 SparkConf 인스턴스를 통해 setting 할 수 있습니다. 다만, 코드에 항상 포함되어야 하는 단점이 있습니다.
이를 해결하기 위해서 아래의 방법들이 있습니다.
스파크 쉘 혹은 spark-submit을 이용하면 해결
스파크 홈에 spark-defaults.conf 파일에 정의를 하여 해결
그럼 각 설정값들이 무엇이 있는지 알아 보도록 하겠습니다.
1) 어플리케이션 관련 설정
property 명
의미
default
spark.app.name
어플리케이션 이름
X(필수로 세팅 필요)
spark.driver.cores
드라이버가 사용할 코어 수
1
spark.driver.maxResultSize
액션연산으로 생성된 값의 최대 크기
1G
spark.driver.memory
드라이버가 사용할 메모리 크기 (클라이언트 모드 시 SparkConf가 아닌 --driver-memory로 지정해야합니다.)
1G
spark.executor.memory
익스큐터 하나의 메모리 크기
1G
spark.local.dir
RDD 데이터 저장 혹은 셔플 시 매퍼의 데이터 저장을 하는 경로
/tmp
spark.master
클러스터 매니저 정보
-
spark.submit.deployMode
디플로이 모드 지정(client 혹은 cluster)
-
2) 실행환경 관련 설정
property 명
의미
default
spark.driver.extraClassPath
드라이버 클래스패스에 추가할 항목(SparkConf가 아닌 --driver-class-path로 지정해야 합니다)
-
spark.executor.extraClassPath
익스큐터의 클래스패스에 추가할 항목
-
spark.files, spark.jars
각 익스큐터의 실행 dir에 위치한 파일, jars (, 를 사용하여 여러개 지정 가능합니다.)
-
spark.submit.pyFiles
PYTHON_PATH에 추가될 .zip, .egg, .py 파일 (, 를 사용하여 여러개 지정 가능합니다.)
-
spark.jars.pachages
익스큐터와 드라이버의 클래스패스에 추가될 의존성 jar 정보
-
3) 셔플 관련 설정
property 명
의미
default
spark.reducer.maxSizeFlight
셔플시 각 리듀서가 읽어갈 때 사용할 버퍼 사이즈
48m
spark.reducer.maxReqslnFlight
리듀서에서 매퍼 결과를 가져갈때 동시에 수행가능항 최대 요청 수
int.MaxValue(2147483647)
spark.shuffle.compress
매퍼의 결과를 압축 유무 (true시 spark.io.compress.codec 지정해야 합니다.)
false
spark.shuffle.service.enabled
외부 셔플 서비스 사용 유무
false
4) 스파크 UI 관련 설정
property 명
의미
default
spark.eventLog.enabled
스파크 이벤트 로그 수행 유무 (true시 spark.eventLog.dir에 로깅 경로 지정해야합니다 - 스파크 UI에서 확인 가능합니다.)
false
spark.ui.port
스파크 UI 포트
4040
spark.ui.killEnabled
스파크 UI를 통해 job kill 가능 여부
true
spark.ui.retainedJobs
종료된 잡 정보 유지 갯수
-
5) 압축 및 직렬화 관련 설정
property 명
의미
default
spark.broadcast.compress
브로드 캐스트 변수값을 압축할지 유무
true
spark.io.compression.codec
스파크 내부에서 사용할 압축 방법
lz4
spark.kyro.classesToRegister
Kyro 직렬화에 사용할 클래스 지정
-
spark.serializer
스파크에서 사용할 객체 직렬화 방식 (스파크에서는 JavaSerializer, KyroSerializer 클래스 제공합니다.)
-
6) 메모리 관련 설정
property 명
의미
default
spark.memory.fraction
스파크 여유/가용 메모리 비율 설정
0.6
spark.memory.storageFraction
스파크 가용공간에서 저장에 사용할 메모리 비용
0.5
spark.memory.offHeap.enabled
off 힙 메모리 사용 유무
false
7) 익스큐터 관련 설정
property 명
의미
default
spark.executor.cores
익스큐터에 할당할 코어 수 (얀 경우 1, 나머지는 사용가능한 코어 수)
-
spark.default.parallelism
스파크에서 사용할 파티션 수
-
spark.files.fetchTimeout
sparkContext.addFile() 메소드 사용 시 파일 받아오는 limit 시간
60s
8) 네트워크 관련 설정
property 명
의미
default
spark.driver.host, spark.driver.port
드라이버 프로세스의 호스트와 포트
-
spark.network.timeout
스파크의 default 네트워크 타임아웃 값
-
9) 보안 관련 설정
property 명
의미
default
spark.acls.enable
스파크 acl 활성화 여부
false
spark.admin.acls
스파크 잡에 접근가능 user, admin 설정 ( , 를 사용하여 다수 등록 가능합니다. group으로 설정할 시 spark.admin.acls.group 속성을 사용합니다)
-
spark.authenticate
스파크에서 사용자 인증 여부 확인 유무
false
spark.authenticate.secret
잡 실행 시 시크릿 키 정보 설정
-
spark.ui.view.acls, spark.ui.view.acls.groups
스파크 UI에서 잡 조회 acl 정보
-
spark.ui.filters
스파크 UI에 적용할 서블릿 필터 지정 ( , 를 사용하여 다수 등록 가능합니다.)
-
10) 우선순위
스파크 프로퍼티가 적용되는 우선순위들은 아래와 같습니다.
코드 상 SparkConf
spark-shell, spark-submit
spark-defaults.conf 파일
반응형
3. 환경변수
스파크 어플리케이션이 아닌 각 서버마다 적용해야하는 정보는 서버의 환경변수를 사용해야 합니다.
환경변수로 설정 가능한 항목은 아래와 같습니다.
JAVA_HOME : 자바 설치 경로
PYSPARK_PYTHON : 파이썬 경로
PYSPARK_DRIVER_PYTHON : 파이썬 경로(드라이버에만 적용)
SPARK_DRIVER_R : R경로
SPARK_LOCAL_IP : 사용할 ip 경로
SPARK_PUBLIC_DNS : 애플리케이션 호스트명
SPARK_CONF_DIR : spark-defaults.conf, spark-env.sh, log4j.properties 등 설정 파일이 놓인 디렉터리 위치
클러스터 매니저에 따라 각 설정 방법이 달라 현재 어떤것을 사용하고 있는지 확인 후 설정하는것을 권장합니다.
얀으로 클러스터 모드 사용 시 환경변수는 spark-defaults.conf 파일의 spark.yarn.appMasterEnv.[환경변수명] 이용해야 합니다.
4. 로깅설정
로깅은 log4j.properties 파일로 설정합니다. -> log4j.properties.template 파일을 복사하여 사용하시면 됩니다.
아래는 각 클러스터 매니저 별 로깅파일이 저장되는 경로입니다.
클러스터 매니저
로깅 저장 경로
스탠드 얼론
각 슬레이브 노드의 spark 홈 아래 work 디렉토리
메소스
/var/log/mesos
얀
기본 각 노드의 로컬 파일 시스템 (yarn.log-aggregation-enable이 true의 경우 yarn.nodemanager/remote-app-log-dir에 설정된 경로입니다.)
5. 스케쥴링
스케쥴링이란 클러스터내 자원을 각 Job에게 할당하는 작업입니다.
클러스터에서 수행되는 작업은 적당한 cpu, memory를 주는 것이 성능을 최대화 시킵니다.
과도하게 주는 경우 GC, IO, 네트워크 등의 경합이 더 비효율적일 수 있습니다.
그렇기 때문에, 하나의 클러스터에서 다수의 잡이 실행되는 경우 스케쥴링을 적절히 선택 및 이용하여 최적의 성능을 맞추어야합니다.
2개 이상의 어플리케이션이 한 클러스터에서 동작할 시, 스케쥴링은 크게 고정 자원 할당 방식과 동적 자원 할당 방식이 있습니다.
1)고정 자원 할당 방식
고정 자원 할당 방식은 각 애플리케이션마다 할당할 자원을 미리 결정합니다.
사용 방법은 위에서 설명한 spark-shell, spark-submit 을 사용할 수 있습니다.
단기간이 아닌 웹같은 장기간 동작하며 이벤트 발생이 있을때, 수행되는 경우에는 비효율적입니다.
2) 동적 자원 할당 방식
동적 자원 할당 방식은 상황에 따라 자원을 할당 및 회수하는 방식입니다.
클러스터 마다 동적 자원 할당 방식이 다르며, 공통으로는 spark.dynamicAllocation.enabled 속성을 true로 해야합니다.
클러스터 모드
동적 자원 할당 방식
스탠드얼론
spark.shuffle.service.enabled 속성 true 사용
메소스
1. spark.mesos.coarse, spark.shuffle.service.enabled 속성을 true로 설정 2. 각 워커노드마다 start-mesos-shuffle-service.sh 수행
얀
1. spark--yarn-shuffle.jar를 모든 노드매니저 클래스패스에 등록 2. 각 노드 매니저의 yarn-site.xml 파일에 아래와 같이 속성 설정 2-1. spark_shuffle=yarn.nodemanager.aux-services 2-2. yarn.nodemanager.aux-services.spark_shuffle.class=org.apache.spark.network.yarn.YarnShuffleService 2-3 park.shuffle.service.enabled=true
1개의 어플리케이션에서 2개 이상의 Job이 수행되는 경우, FIFO, FAIR 스케쥴링 방법이 있습니다.
1) FIFO
FIFO는 기본 설정값이며, 수행요청대로 Job이 리소스를 점유하게됩니다.
단시간에 끝나는 잡이 오래 걸리는 잡 뒤에 있을때는 단점인 스케쥴링 방식입니다.
2) FAIR
FAIR는 의미 그대로 공유하는 설정입니다.
사용은 sparkConf에 spark.scheduler.mode=FAIR 로 설정하여 가능합니다.
FAIR에서도 우선순위를 조절하고 싶은 경우 pool을 사용 가능하며,
pool 설정은 conf 디렉토리에 fairscheduler.xml 파일에 기재하면 됩니다.
Hadoop eco system에서 많이 사용하는 Spark에 대해 공부한 내용을 공유하고자 합니다.
책은 빅데이터 분석을 위한 스파크 2 프로그래밍 을 통해 공부하였습니다.
이번 포스팅에서는 1장인 스파크 소개부분을 진행하도록 하겠습니다.
2. 스파크 소개
1) 스파크란?
스파크는 하둡의 mapreduce를 보완하고자 나온 메모리 기반 대용량 처리 프레임워크입니다.
spark는 스칼라로 개발되었습니다.
특징으로는 아래와 같습니다.
하둡과 달리 파일이 아닌 메모리를 이용하여 데이터 저장방식 제공.
자바, 파이썬, 스칼라 언어 지원 및 다른 오픈소스들과의 플러그인이 많아 유용.
스트리밍, 머신러닝에서도 활용할 수 있도록 다양한 라이브러리 제공.
2) RDD, 데이터 프레임, 데이터셋
스파크에서는 데이터를 처리하기 위한 모델로 RDD, 데이터 프레임, 데이터셋 3가지를 제공합니다.
RDD란? 스파크에서 정의한 분산 데이터 모델로서 병렬처리가 가능하고 스스로 에러를 복구할 수 있는 모델. input 데이터를 이 RDD라는 모델로 만들어 데이터 핸들링을 하게됩니다.
데이터 복구가 가능한 이유로는 RDD생성작업을 기록하기 때문입니다.
RDD를 생성할 수 있는 방법은 3가지가 존재
프로그램의 memory에 있는 데이터.
로컬, hdfs에 있는 외부 파일
또 다른 RDD로부터.
데이터 프레임이란? DataSet[Row]를 데이터 프레임이라고 합니다. 여기서 Row는 스파크 lib에서 정의한 클래스라고 생각하면 됩니다.
데이터 셋이란? = 데이터프레임의 진화형 모델. DataSet[CustomTypeModel] 과 같이 데이터 모델을 custom하게 정의하여 사용할 수 있도록 typed 모델입니다.
RDD를 low api라고 하면 데이터 프레임은 high api라고 보면 됩니다. 또한, 데이터 프레임을 사용한다면 언어의 성능 이슈를 해결가능합니다. (스파크가 JVM 기반 언어인 스칼라로 만들어져 있기 때문에 파이썬과 같은 다른 언어의 이종 프로세스간의 성능 이슈가 발생하게 됩니다. 하지만, 데이터 프레임은 개발자가 정의한 모델이 아닌 스파크가 제공하는 모델이기 때문에 성능 이슈가 없습니다.)
반응형
3) 트랜스포메이션 연산과 액션연산
스파크에서는 데이터 처리를 위해 정의한 연산이 2가지가 있습니다.
트랜스포메이션 연산이란? = 어떤 RDD에 변형을 가해 새로운 RDD를 생성하는 연산.
액션 연산이란? = 연산의 결과로 RDD가 아닌 다른 값을 반환하는 연산.
lazy 실행 = 스파크는 트랜스포메이션의 meta만을 가지고 있습니다. 액션연산 수행 시 실제 트랜스포메이션 연산의 meta 순서대로 연산을 시작합니다. 이점이 바로 연산의 최적화를 찾아 수행하는 이유이며, 에러 시 복구가 가능한 이유입니다.
4) sparkContext
컨텍스트라고 일컬어지는것은 대부분 어떠한 일을 대신 수행해주는 것을 의미합니다.
그렇기 때문에 스파크 컨텍스트는 아래의 일을 담당하게됩니다.
스파크 애플리케이션과 클러스터의 연결을 관리
RDD를 생성
5) partition
partition이란 스파크 클러스터에서 데이터를 관리하는 단위입니다.
HDFS 기반으로 사용하게 된다면 데이터 block당 한 개의 partition이 생성됩니다.
6) 드라이버 프로그램, 워커 노드,Job, Executor
1. 드라이버 프로그램
= 스파크 컨텍스트를 생성한 프로그램을 의미합니다.
2. 워커 노드
= 실제 데이터 처리를 수행하는 서버입니다.
3. Job
= 데이터 핸들링을 하는 일련의 작업을 의미합니다.
4. Executor
= 워커 노드에서 실행되는 프로세스입니다. 쓰레드가 아닌 프로세스이기 때문에 각 executor들 끼리는 영향을 끼칠 수 없습니다.
3. 스파크 구성도
일반적인 구성도는 아래와 같습니다.
1. sparkContext를 통해 클러스터에게 Job 제출.
2. 각 워커노드에서 Executor 생성
3. Executor에서 제출한 Job 수행
4. DAG
DAG는 Directed Acyclic Graph의 약어로 cycle이 발생하지 않는 그래프입니다.
spark에서는 데이터 처리를 이 DAG 형식으로 스케쥴링이 이루어집니다.
그렇기 때문에 실패가 일어나게 되면 일어난 지점에서 다시 DAG를 수행하여 데이터 손실을 없도록 합니다.