반응형

1. 서론

이번 포스팅에서는 스파크 실행에 있어 다양한 설정값들에 대해 포스팅하도록 하겠습니다.

2. 스파크 프로퍼티

스파크 프로퍼티는 스파크 어플리케이션 실행과 관련한 설정값을 의미합니다.

 

설정값은 SparkConf 인스턴스를 통해 setting 할 수 있습니다. 다만, 코드에 항상 포함되어야 하는 단점이 있습니다.

 

이를 해결하기 위해서 아래의 방법들이 있습니다.

 

  • 스파크 쉘 혹은 spark-submit을 이용하면 해결
  • 스파크 홈에 spark-defaults.conf 파일에 정의를 하여 해결

 

그럼 각 설정값들이 무엇이 있는지 알아 보도록 하겠습니다.

 

1) 어플리케이션 관련 설정

 

property 명 의미 default
spark.app.name 어플리케이션 이름 X(필수로 세팅 필요)
spark.driver.cores 드라이버가 사용할 코어 수 1
spark.driver.maxResultSize 액션연산으로 생성된 값의 최대 크기 1G
spark.driver.memory 드라이버가 사용할 메모리 크기
(클라이언트 모드 시 SparkConf가 아닌 --driver-memory로 지정해야합니다.)
1G
spark.executor.memory 익스큐터 하나의 메모리 크기 1G
spark.local.dir RDD 데이터 저장 혹은 셔플 시 매퍼의 데이터 저장을 하는 경로 /tmp
spark.master 클러스터 매니저 정보 -
spark.submit.deployMode 디플로이 모드 지정(client 혹은 cluster) -

 

2) 실행환경 관련 설정

 

property 명 의미 default
spark.driver.extraClassPath 드라이버 클래스패스에 추가할 항목(SparkConf가 아닌 --driver-class-path로 지정해야 합니다) -
spark.executor.extraClassPath 익스큐터의 클래스패스에 추가할 항목 -
spark.files, spark.jars 각 익스큐터의 실행 dir에 위치한 파일, jars
(, 를 사용하여 여러개 지정 가능합니다.)
-
spark.submit.pyFiles PYTHON_PATH에 추가될 .zip, .egg, .py 파일 
(, 를 사용하여 여러개 지정 가능합니다.)
-
spark.jars.pachages 익스큐터와 드라이버의 클래스패스에 추가될 의존성 jar 정보 -

 

3) 셔플 관련 설정

 

property 명 의미 default
spark.reducer.maxSizeFlight 셔플시 각 리듀서가 읽어갈 때 사용할 버퍼 사이즈 48m
spark.reducer.maxReqslnFlight 리듀서에서 매퍼 결과를 가져갈때 동시에 수행가능항 최대 요청 수 int.MaxValue(2147483647)
spark.shuffle.compress 매퍼의 결과를 압축 유무
(true시 spark.io.compress.codec 지정해야 합니다.)
false
spark.shuffle.service.enabled 외부 셔플 서비스 사용 유무 false

 

4) 스파크 UI 관련 설정

 

property 명 의미 default
spark.eventLog.enabled 스파크 이벤트 로그 수행 유무
(true시 spark.eventLog.dir에 로깅 경로 지정해야합니다 - 스파크 UI에서 확인 가능합니다.)
false
spark.ui.port 스파크 UI 포트 4040
spark.ui.killEnabled 스파크 UI를 통해 job kill 가능 여부 true
spark.ui.retainedJobs 종료된 잡 정보 유지 갯수 -

 

 

5) 압축 및 직렬화 관련 설정

 

property 명 의미 default
spark.broadcast.compress 브로드 캐스트 변수값을 압축할지 유무 true
spark.io.compression.codec 스파크 내부에서 사용할 압축 방법 lz4
spark.kyro.classesToRegister Kyro 직렬화에 사용할 클래스 지정 -
spark.serializer 스파크에서 사용할 객체 직렬화 방식
(스파크에서는 JavaSerializer, KyroSerializer 클래스 제공합니다.)
-

 

 

6) 메모리 관련 설정

 

property 명 의미 default
spark.memory.fraction 스파크 여유/가용 메모리 비율 설정 0.6
spark.memory.storageFraction 스파크 가용공간에서 저장에 사용할 메모리 비용 0.5
spark.memory.offHeap.enabled off 힙 메모리 사용 유무 false

 

 

7) 익스큐터 관련 설정

 

property 명 의미 default
spark.executor.cores 익스큐터에 할당할 코어 수
(얀 경우 1, 나머지는 사용가능한 코어 수)
-
spark.default.parallelism 스파크에서 사용할 파티션 수 -
spark.files.fetchTimeout sparkContext.addFile() 메소드 사용 시 파일 받아오는 limit 시간 60s

 

 

8) 네트워크 관련 설정

 

property 명 의미 default
spark.driver.host, spark.driver.port 드라이버 프로세스의 호스트와 포트 -
spark.network.timeout 스파크의 default 네트워크 타임아웃 값 -

 

 

9) 보안 관련 설정

 

property 명 의미 default
spark.acls.enable 스파크 acl 활성화 여부 false
spark.admin.acls 스파크 잡에 접근가능 user, admin 설정
( , 를 사용하여 다수 등록 가능합니다.
group으로 설정할 시 spark.admin.acls.group 속성을 사용합니다)
-
spark.authenticate 스파크에서 사용자 인증 여부 확인 유무 false
spark.authenticate.secret 잡 실행 시 시크릿 키 정보 설정 -
spark.ui.view.acls, spark.ui.view.acls.groups 스파크 UI에서 잡 조회 acl 정보 -
spark.ui.filters 스파크 UI에 적용할 서블릿 필터 지정
( , 를 사용하여 다수 등록 가능합니다.)
-

 

10) 우선순위

 

스파크 프로퍼티가 적용되는 우선순위들은 아래와 같습니다.

 

  1. 코드 상 SparkConf
  2. spark-shell, spark-submit
  3. spark-defaults.conf 파일

 

 

 

 

반응형

 

 

 

 

 

3. 환경변수

스파크 어플리케이션이 아닌 각 서버마다 적용해야하는 정보는 서버의 환경변수를 사용해야 합니다.

 

환경변수로 설정 가능한 항목은 아래와 같습니다.

 

  • JAVA_HOME : 자바 설치 경로
  • PYSPARK_PYTHON : 파이썬 경로
  • PYSPARK_DRIVER_PYTHON : 파이썬 경로(드라이버에만 적용)
  • SPARK_DRIVER_R : R경로
  • SPARK_LOCAL_IP : 사용할 ip 경로
  • SPARK_PUBLIC_DNS : 애플리케이션 호스트명
  • SPARK_CONF_DIR : spark-defaults.conf, spark-env.sh, log4j.properties 등 설정 파일이 놓인 디렉터리 위치

클러스터 매니저에 따라 각 설정 방법이 달라 현재 어떤것을 사용하고 있는지 확인 후 설정하는것을 권장합니다.

얀으로 클러스터 모드 사용 시 환경변수는 spark-defaults.conf 파일의 spark.yarn.appMasterEnv.[환경변수명] 이용해야 합니다.

 

4. 로깅설정

로깅은 log4j.properties 파일로 설정합니다. -> log4j.properties.template 파일을 복사하여 사용하시면 됩니다.

 

아래는 각 클러스터 매니저 별 로깅파일이 저장되는 경로입니다.

 

클러스터 매니저 로깅 저장 경로
스탠드 얼론 각 슬레이브 노드의 spark 홈 아래 work 디렉토리
메소스 /var/log/mesos
기본 각 노드의 로컬 파일 시스템
(yarn.log-aggregation-enable이 true의 경우 yarn.nodemanager/remote-app-log-dir에 설정된 경로입니다.)

 

5. 스케쥴링

 

스케쥴링이란 클러스터내 자원을 각 Job에게 할당하는 작업입니다.

 

클러스터에서 수행되는 작업은 적당한 cpu, memory를 주는 것이 성능을 최대화 시킵니다.

과도하게 주는 경우 GC, IO, 네트워크 등의 경합이 더 비효율적일 수 있습니다.

 

그렇기 때문에, 하나의 클러스터에서 다수의 잡이 실행되는 경우 스케쥴링을 적절히 선택 및 이용하여 최적의 성능을 맞추어야합니다.

 

2개 이상의 어플리케이션이 한 클러스터에서 동작할 시, 스케쥴링은 크게 고정 자원 할당 방식과 동적 자원 할당 방식이 있습니다.

 

1) 고정 자원 할당 방식

 

고정 자원 할당 방식은 각 애플리케이션마다 할당할 자원을 미리 결정합니다.

 

사용 방법은 위에서 설명한 spark-shell, spark-submit 을 사용할 수 있습니다.

단기간이 아닌 웹같은 장기간 동작하며 이벤트 발생이 있을때, 수행되는 경우에는 비효율적입니다.

 

2) 동적 자원 할당 방식

 

동적 자원 할당 방식은 상황에 따라 자원을 할당 및 회수하는 방식입니다.

 

클러스터 마다 동적 자원 할당 방식이 다르며, 공통으로는 spark.dynamicAllocation.enabled 속성을 true로 해야합니다.

 

클러스터 모드 동적 자원 할당 방식
스탠드얼론 spark.shuffle.service.enabled 속성 true 사용
메소스 1. spark.mesos.coarse, spark.shuffle.service.enabled 속성을 true로 설정
2. 각 워커노드마다 start-mesos-shuffle-service.sh 수행
1. spark--yarn-shuffle.jar를 모든 노드매니저 클래스패스에 등록
2. 각 노드 매니저의 yarn-site.xml 파일에 아래와 같이 속성 설정
    2-1. spark_shuffle=yarn.nodemanager.aux-services
    2-2. yarn.nodemanager.aux-services.spark_shuffle.class=org.apache.spark.network.yarn.YarnShuffleService
    2-3 park.shuffle.service.enabled=true

 

1개의 어플리케이션에서 2개 이상의 Job이 수행되는 경우, FIFO, FAIR 스케쥴링 방법이 있습니다.

 

1) FIFO

 

FIFO는 기본 설정값이며, 수행요청대로 Job이 리소스를 점유하게됩니다.

단시간에 끝나는 잡이 오래 걸리는 잡 뒤에 있을때는 단점인 스케쥴링 방식입니다.

 

2) FAIR

 

FAIR는 의미 그대로 공유하는 설정입니다.

 

사용은 sparkConf에 spark.scheduler.mode=FAIR 로 설정하여 가능합니다.

 

FAIR에서도 우선순위를 조절하고 싶은 경우 pool을 사용 가능하며,

pool 설정은 conf 디렉토리에 fairscheduler.xml 파일에 기재하면 됩니다.

 

아래는 예시입니다.

 

<?xml version="1.0"?>
<allocations>
    <pool name="pool1">
        <schedulingMode>FAIR</schedulingMode>
        <weight>1</weight>
        <minShare>FAIR</minShare>
    </pool>
    <pool name="pool2">
        <schedulingMode>FIFO</schedulingMode>
        <weight>2</weight>
        <minShare>3</minShare>
    </pool>
</allocations>

 

pool 지정 시 사용하는 속성은 아래와 같습니다.

 

속성 의미
schedulingMode 스케쥴링 방법
weight pool간의 우선순위(크기가 큰값이 높습니다.)
minShare pool이 가져야 하는 CPU 코어 수

 

사용은 아래와 같이 하시면 됩니다.

conf.set("spark.scheduler.allocation.file", "pool 설정 파일 경로")

 

6. 마무리

이번에는 스파크 설정에 대해서 포스팅하였습니다.

다음에는 스파크 SQL에 대해 포스팅하겠습니다.

  •  
반응형

'BigData > Spark' 카테고리의 다른 글

(6) 스트럭처 스트리밍  (0) 2020.03.31
(5) 스파크 스트리밍  (0) 2020.03.19
(4) 스파크 SQL  (0) 2020.03.17
(2) RDD  (0) 2020.03.12
(1) 스파크 소개  (0) 2020.03.11
반응형

 

문자열을 인자로 받습니다.

 

문자열은 [ , ], { , }, ( , )인 괄호들로만 이루어져 있습니다.

 

문자열 괄호들이 올바르게 열고 닫혀있는지를 체크하는 문제입니다.

 

저는 아래와 같이 풀었습니다.

 

class Solution:
    def isValid(self, s: str) -> bool:
        stack = []
        parenthesis = {"[" : "]", "{" : "}", "(" : ")"}

        for char in s:
            if char in parenthesis.keys():
                stack.append(char)
            else:
                if not stack: return False
                if char == parenthesis.get(stack[-1]):
                    stack.pop()
                else:
                    return False
        
        if stack: return False
        return True

 

1. open 괄호가 들어오면 리스트에 담습니다.

2. close 괄호가 들어오면 list에 open 괄호가 있는지 체크합니다.

3. 있다면 list에서 마지막 open 괄호를 꺼내 close와 맞는지 확인합니다. -> 확인하기 유용하도록 위에 묶음은 dict로 선언했습니다.

4. 맞지 않으면 False를 반환합니다.

5. 마지막으로 list에 문자가 남아 있다면 False를 반환하도록 합니다.

 

감사합니다.

반응형
반응형

 

인자로 문자열을 받습니다.

 

문자열에는 R 혹은 L만 존재합니다.

 

이때, R과 L의 숫자가 같도록 substring을 하여 가장 많은 substring 갯수를 구해야 합니다.

 

이 문제는, R과 L의 숫자를 바꿀수 있는 조건이 없습니다.

그렇기 때문에, 단순히 iterate를 돌며 R과 L의 갯수가 같을 때면 그냥 substring을 한다고 생각하시면 됩니다.

 

저는 아래와 같이 코드를 짰습니다.

class Solution:
    def balancedStringSplit(self, s: str) -> int:
        r_cnt = 0
        l_cnt = 0
        res = 0

        for i in s:
            if i == "L":
                l_cnt += 1
            else:
                r_cnt += 1

            if l_cnt == r_cnt:
                res += 1
                l_cnt = 0
                r_cnt = 0
        return res

 

감사합니다.

반응형

'Algorithm > greedy' 카테고리의 다른 글

leetcode - Last Stone Weight  (0) 2020.03.12
leetcode - Two City Scheduling  (0) 2020.03.12
leetcode - Maximize Sum Of Array After K Negations  (0) 2020.03.11
leetcode - Walking Robot Simulation  (2) 2020.03.10
leetcode - Lemonade Change  (0) 2020.03.05
반응형

 

List<int>를 인자로 받습니다.

이것은 각 돌멩이들의 무게입니다.

 

돌멩이들은 서로 부딪혀 작은 돌멩이로 만들 수 있습니다.

 

만약 7 돌멩이와 8돌멩이가 부딪히면 1 돌멩이로 되고, 7 돌멩이와 7 돌멩이가 부딪히면 두개는 사라집니다.

 

이런 규칙에서, 남은 돌멩이의 최소 무게를 구해야 합니다.

 

논리적으로는 돌멩이 무게가 큰것들끼리 서로 부딪히게 해서 무게를 줄여나가야 합니다.

그렇기 때문에, 저는 아래와 같이 풀었습니다.

 

class Solution:
    def lastStoneWeight(self, stones: List[int]) -> int:
        sorted_stones = sorted(stones, reverse=True)
        for i in range(len(stones) -1):
            if sorted_stones[0] == sorted_stones[1]:
                sorted_stones[0] = 0
                sorted_stones[1] = 0
            else:
                sorted_stones[1] = sorted_stones[0] - sorted_stones[1]
                sorted_stones[0] = 0
            sorted_stones = sorted(sorted_stones, reverse=True)
        return sum(sorted_stones)

 

 

부딪히는 횟수는 최대 돌멩이 갯수 -1 입니다.
이유로는, 최소가 되려면 어찌됐든 한번씩은 부딪혀야 하기 때문입니다.

 

 

  • 무게가 큰 순으로 정렬시킵니다.
  • 정렬했기 때문에 첫번째와 두번째만 비교합니다.
    • 무게가 같다면 둘다 사라지기 때문에 첫번째, 두번째 자리의 값을 0으로 넣습니다.
    • 무게가 같지 않다면, 첫번째자리에는 0, 두번째자리에는 그 차이값을 넣습니다.
  • 무게가 큰것들끼리만 부딪혀야 최소가 나오기 때문에, 마지막으로 다시 정렬시켜 반복되도록 합니다.

감사합니다.

반응형
반응형

1. 서론

이번 포스팅에서는 스파크 데이터 모델 중 하나인 RDD에 대해 진행하겠습니다.

 

2. RDD 액션

RDD는 스파크에서 제공하는 데이터 모델입니다.

이 모델은 다양한 데이터처리를 위한 함수를 제공합니다.

 

함수는 크게 트랜스포메이션 연산과 액션 연산으로 나눌 수 있고,

각 연산에 대해 소개하도록 하겠습니다.

 

1) 트랜스포메이션 연산

 

1. map = RDD에 있는 데이터에 지정 연산을 수행한 후 RDD를 반환

2. flatMap = map연산과 비슷하지만 반환 타입이 iterate인 RDD 반환

3. mapPartitions = map연산을 파티션 단위로 연산

4. mapPartiitonsWithIndex = mapPartitions에서 각 파티션의 index정보도 같이 넘기는 연산

5. mapValues = (key, value) 형태의 RDD에만 적용가능하며 value에만 map 연산을 수행 후 RDD를 반환하는 연산

6. flatMapValues = RDD가 (key, value) 형태일 때 위의 flatMap 연산을 value에 적용하고자 할때 사용하는 연산

7. zip = 2개의 RDD를 (key, value)로 묶어주는 연산. RDD의 데이터 크기가 같아야 합니다..

8. zipPartitions = 파티션 단위로 zip연산을 수행하는 연산.

  • zipPartitions연산은 zip 연산과 다르게 최대 4개까지 지정 가능
  • 파이썬에서는 사용 불가.
  • 파티션의 갯수가 같아야 합니다.

9. groupby = RDD의 데이터를 지정한 group 집합으로 묶어 RDD를 반환하는 연산

10. groupbyKey = (key, value) 형태의 RDD를 키 기준으로 group 하여 반환하는 연산

  • (key, sequence[value])로 구성.

11. cogroup = (key, value) 형태의 여러 RDD를 key 기준으로 group 한 후 각 RDD의 sequence[value]를 Tuple로 반환하는 연산

 

ex) = (키, Tuple(RDD1의 sequence[value], RDD2의 sequence[value]))
최대 3개까지 RDD를 group 할 수 있습니다.

 

12. distinct = RDD에서 중복을 제거한 뒤 RDD를 반환하는 연산

13. certesian = 2개 RDD의 카테시안 곱을 한 RDD를 반환하는 연산(M x N)

14. subtract = 2개 RDD에서 1개의 RDD값을 제외한 값들을 반환하는 연산(M - N)

15. union = 2개 RDD의 합집합한 결과를 반환하는 연산(M + N)

16. intersection = 2개의 RDD의 교집합을 반환하는 연산 (M ∩ N)

17. join = 2개의 (key, value) 형태의 RDD를 키 기반으로 join하여 RDD를 반환하는 연산

  • ex ) (키, Tuple(첫번째 RDD의 요소, 두번째 RDD의 요소))
  • join은 inner join입니다. 한마디로 join이 되지 않으면 반환 RDD에 포함 X

18. leftOuterJoin, rightOuterJoin = sql의 left(right)OuterJoin과 비슷한 연산

  • 조인결과가 없을 수도 있어, 주최 RDD가 아니라면 Optional 값으로 반환.
  • ex) (a, (1, None)), (b, (1, Some(2))), (c, (1, Some(2)))

19. subtractByKey = 2개의 (key, value) 형태의 RDD에서 같은 key를 제외하고 RDD를 반환하는 연산.

20. reduceByKey = 2개의 RDD에서 같은 key로 병합하여 RDD를 반환하는 연산.

21. foldByKey = reduceByKey 연산에서 초기값을 부여하는 옵션이 추가된 연산.

22. combineByKey = 반환 RDD의 값 타입이 변경될수 있는 연산

  •  첫번째 인자(초기값을 위한 함수)
  •  두번째 인자(RDD의 각 파티션에서 수행할 함수)
  •  세번째 인자(각 파티션들을 결합하는 함수)

23. aggregateByKey = combineByKey연산의 첫번째 인자가 함수가 아닌 값으로만 변경된 연산.

24. pipe = map연산에서 외부 프로세스를 사용하는 연산.

  • ex) rdd.pipe("cut -f 1,3 -d ,") -> cut 유틸 사용.

25. repartitionAndSortWithinPartitions = 파티션 갯수 조절 후 각 파티션에서 정렬한 뒤 RDD를 반환하는 연산.

26. partitionBy = RDD의 값들을 특정 파티션으로 옮기고 싶을때 사용하는 연산

  • 셔플링이 발생하는 연산으로 성능에 크게 영향이 가므로 잘 판단하여 사용해야 합니다.

27. sortByKey = 키 값을 기준으로 정렬한 후 RDD 를 반환하는 연산.

28. keys, values = 자바 map의 keys, values와 같은 의미의 연산

 

 

 

 

 

반응형

 

 

 

 

 

2) 액션연산

 

1. collect = RDD 데이터를 배열로 반환하는 연산

2. count = RDD 데이터의 갯수를 반환하는 연산

3. first = RDD의 첫번째 요소를 반환하는 연산

4. take = RDD의 첫번째 요소부터 n개를 반환하는 연산

5. countByValue = 값을 각 카운팅하여 map으로 반환하는 연산

  • [1, 1, 2, 3, 3] => Map({1:2, 2:1, 3:2})

6. reduce = 2개의 RDD요소를 하나로 합치는 연산

  • 입력과 출력의 타입이 같아야 합니다.

7. fold = reduct 연산에서 초기값을 지정할 수 있는 연산

  • 입력과 출력의 타입이 같아야 합니다.

8. aggregate = reduce와 fold를 합친 연산

  • 첫번째 인자(초기값)
  • 두번째 인자(RDD의 각 파티션에서 수행할 병합함수)
  • 세번째 인자(각 파티션들을 결합하는 함수)
  • 입력과 출력의 타입이 달라도 됩니다.

9. sum = RDD의 요소의 합을 반환하는 연산

  • int, double, long 등 숫자 타입의 RDD에서만 가능합니다.

10. forEach, forEachPartition = map, mapPartition과 기능은 동일하나 반환값이 없는 점이 다른 연산

  • 이 연산은 각 개별 노드에서 수행됩니다.

11. toDebugString = RDD의 세부 정보를 파악하기 위한 연산

 

3. 기타 액션 

1. coalesce, repartition = 두 연산 모두 RDD의 파티션 크기를 조정하는 연산

  • coalesce = 파티션 갯수를 줄이기만 가능합니다.
  • repartition = 파티션 갯수를 늘리고 줄이기 모두 가능합니다.
  • repartition은 무조건 셔플이 발생되며, coalesce은 지정했을때만 셔플이 발생합니다.
  • 셔플 연산은 비용이 큰 연산이니 고려하여 사용해야 합니다.

2. cache, persist, unpersist =

  • cache, persist는 첫 액션연산 후 RDD 정보를 메모리 또는 디스크에 저장하는 메소드입니다.
  • unpersist는 필요없는 데이터를 캐시에서 제거할때 사용합니다.
  • cache, persist는 정보를 메모리에 올리게 되면 빠른 연산을 할 수 있게 도와줍니다.
    • 단, 너무 많이 올리게 되면 GC대상이 되어 악영향을 줄 수도 있으니 이 메서드도 함부로 남발해서는 안됩니다.

4. 마무리

이번에는 스파크 데이터 모델 중 하나인 RDD 대해서 포스팅하였습니다.

다음에는 스파크 설정에 대해 포스팅하겠습니다.

기존 책에서는 3장이 클러스터 구축이나, Hadoop 카테고리에서 Hadoop 설치 를 따라하시면 구축 및 스파크도 설치되어 있습니다. 

 

반응형

'BigData > Spark' 카테고리의 다른 글

(6) 스트럭처 스트리밍  (0) 2020.03.31
(5) 스파크 스트리밍  (0) 2020.03.19
(4) 스파크 SQL  (0) 2020.03.17
(3) 스파크 설정  (0) 2020.03.13
(1) 스파크 소개  (0) 2020.03.11
반응형

 

List<List<int>를 인자로 받습니다.

인자는 각 사람이 A 도시로 가는데 드는 비용, B 도시로 가는데 드는 비용을 의미합니다.

 

예제를 설명드리면 첫번째 사람이 A로 가는데 드는 비용은 10, B로 가는데 드는 비용은 20입니다.

 

각 사람은 짝수로 들어오고, A도시와 B도시에 동일하게 사람을 가도록 해야합니다.

이 조건에서 가장 적은 돈으로 보내도록 구하는것이 문제입니다.

 

사람마다 모두 A, B로 가는 금액은 다를것이고, 금액의 차가 큰것부터 보내면 최소금액으로 모두 각 도시로 보낼 수 있을겁니다.

그렇기 때문에, 저의 경우는 아래와 같이 풀었습니다.

 

class Solution:
    def twoCitySchedCost(self, costs: List[List[int]]) -> int:
        res = 0
        a_count = b_count = int(len(costs) / 2)
        for cost in costs:
            cost.append(abs(cost[0] - cost[1]))
        sorted_costs = sorted(costs, key=lambda x:x[2], reverse=True)

        for cost in sorted_costs:
            if a_count == 0:
                res += cost[1]
                continue
            if b_count == 0:
                res += cost[0]
                continue
            if cost[0] < cost[1]:
                a_count -= 1
                res += cost[0]
            else:
                b_count -= 1
                res += cost[1]
        return res

 

  • 각 사람의 A, B의 cost의 차를 구해 append 시킨다.
  • 차가 큰순으로 정렬시킨다.
  • 정렬된 사람들을 iterate 돌며 적은 비용을 선택한다.
    • 단, A,B에 모두 동일한 사람수를 보내야 하기 때문에 count를 세어 A에 절반을 보냈다면 어쩔수 없이 B를 선택하도록 한다. (반대의 경우도 마찬가지)

 

감사합니다.

 

 

 

 

 

반응형
반응형

 

주어진 A 배열에서 K 번만큼 부호를 변경하여 최대 sumation을 만드는 문제입니다.

(단, 부호는 동일 값을 중복하여 해도됩니다.)

 

저는 아래와 같이 풀었습니다.

class Solution:
    def largestSumAfterKNegations(self, A: List[int], K: int) -> int:
        for i in range(K):
            if K == 0:
                break
            min_val = min(A)
            index = A.index(min_val)
            if min_val < 0:
                K -= 1
                A[index] = -min_val
            elif (K % 2) == 0:
                break
            else:
                K -= 1
                A[index] = -min_val
        return sum(A)

 

1. 배열에서 최솟값과 인덱스를 구해옵니다.

2. 음수라면 양수로 변경하여 배열의 값을 변경 및 K도 1 감소시킵니다.

3. 최솟값이 음수가 아니지만 K번이 짝수라면 현재 상태가 sumation 시 최대이기 때문에 연산을 멈춥니다.

4. 최솟값이 양수이고, K번도 홀수라면 그나마 작은 최솟값을 음수로 변경합니다.

 

감사합니다.

반응형

'Algorithm > greedy' 카테고리의 다른 글

leetcode - Last Stone Weight  (0) 2020.03.12
leetcode - Two City Scheduling  (0) 2020.03.12
leetcode - Walking Robot Simulation  (2) 2020.03.10
leetcode - Lemonade Change  (0) 2020.03.05
leetcode - Assign Cookies  (0) 2020.03.04
반응형

1. 서론

Hadoop eco system에서 많이 사용하는 Spark에 대해 공부한 내용을 공유하고자 합니다.

책은 빅데이터 분석을 위한 스파크 2 프로그래밍 을 통해 공부하였습니다.

 

이번 포스팅에서는 1장인 스파크 소개부분을 진행하도록 하겠습니다.

 

2. 스파크 소개

 

1) 스파크란?

 

스파크는 하둡의 mapreduce를 보완하고자 나온 메모리 기반 대용량 처리 프레임워크입니다.

spark는 스칼라로 개발되었습니다.

 

특징으로는 아래와 같습니다.

 

  • 하둡과 달리 파일이 아닌 메모리를 이용하여 데이터 저장방식 제공.
  • 자바, 파이썬, 스칼라 언어 지원 및 다른 오픈소스들과의 플러그인이 많아 유용.
  • 스트리밍, 머신러닝에서도 활용할 수 있도록 다양한 라이브러리 제공.

 

2) RDD, 데이터 프레임, 데이터셋

 

스파크에서는 데이터를 처리하기 위한 모델로 RDD, 데이터 프레임, 데이터셋 3가지를 제공합니다.

 

  • RDD란?
    스파크에서 정의한 분산 데이터 모델로서 병렬처리가 가능하고 스스로 에러를 복구할 수 있는 모델.
    input 데이터를 이 RDD라는 모델로 만들어 데이터 핸들링을 하게됩니다.
    • 데이터 복구가 가능한 이유로는 RDD생성작업을 기록하기 때문입니다.
    • RDD를 생성할 수 있는 방법은 3가지가 존재
      1. 프로그램의 memory에 있는 데이터.
      2. 로컬, hdfs에 있는 외부 파일
      3. 또 다른 RDD로부터.
  • 데이터 프레임이란?
     DataSet[Row]를 데이터 프레임이라고 합니다.
    여기서 Row는 스파크 lib에서 정의한 클래스라고 생각하면 됩니다.
  • 데이터 셋이란?
    = 데이터프레임의 진화형 모델.
    DataSet[CustomTypeModel] 과 같이 데이터 모델을 custom하게 정의하여 사용할 수 있도록 typed 모델입니다.
RDD를 low api라고 하면 데이터 프레임은 high api라고 보면 됩니다.
또한, 데이터 프레임을 사용한다면 언어의 성능 이슈를 해결가능합니다.
(스파크가 JVM 기반 언어인 스칼라로 만들어져 있기 때문에 파이썬과 같은 다른 언어의 이종 프로세스간의 성능 이슈가 발생하게 됩니다.
하지만, 데이터 프레임은 개발자가 정의한 모델이 아닌 스파크가 제공하는 모델이기 때문에 성능 이슈가 없습니다.)

 

 

 

 

 

반응형

 

 

 

 

 

 

3) 트랜스포메이션 연산과 액션연산

 

스파크에서는 데이터 처리를 위해 정의한 연산이 2가지가 있습니다.

 

  • 트랜스포메이션 연산이란?
    = 어떤 RDD에 변형을 가해 새로운 RDD를 생성하는 연산.
  • 액션 연산이란?
    = 연산의 결과로 RDD가 아닌 다른 값을 반환하는 연산.
  • lazy 실행
    = 스파크는 트랜스포메이션의 meta만을 가지고 있습니다.
    액션연산 수행 시 실제 트랜스포메이션 연산의 meta 순서대로 연산을 시작합니다.
    이점이 바로 연산의 최적화를 찾아 수행하는 이유이며, 에러 시 복구가 가능한 이유입니다.

 

4) sparkContext

 

컨텍스트라고 일컬어지는것은 대부분 어떠한 일을 대신 수행해주는 것을 의미합니다.

그렇기 때문에 스파크 컨텍스트는 아래의 일을 담당하게됩니다.

 

  • 스파크 애플리케이션과 클러스터의 연결을 관리
  • RDD를 생성

 

5) partition

 

partition이란 스파크 클러스터에서 데이터를 관리하는 단위입니다.

HDFS 기반으로 사용하게 된다면 데이터 block당 한 개의 partition이 생성됩니다.

 

6) 드라이버 프로그램, 워커 노드, Job, Executor

 

1. 드라이버 프로그램

= 스파크 컨텍스트를 생성한 프로그램을 의미합니다.

 

2. 워커 노드

= 실제 데이터 처리를 수행하는 서버입니다.

 

3. Job

= 데이터 핸들링을 하는 일련의 작업을 의미합니다.

 

4. Executor

= 워커 노드에서 실행되는 프로세스입니다.
쓰레드가 아닌 프로세스이기 때문에 각 executor들 끼리는 영향을 끼칠 수 없습니다.

 

3. 스파크 구성도

일반적인 구성도는 아래와 같습니다.

 

 

1. sparkContext를 통해 클러스터에게 Job 제출.

2. 각 워커노드에서 Executor 생성

3. Executor에서 제출한 Job 수행

4. DAG

DAG는 Directed Acyclic Graph의 약어로 cycle이 발생하지 않는 그래프입니다.

spark에서는 데이터 처리를 이 DAG 형식으로 스케쥴링이 이루어집니다.

그렇기 때문에 실패가 일어나게 되면 일어난 지점에서 다시 DAG를 수행하여 데이터 손실을 없도록 합니다.

5. 마무리

이번에는 간략히 스파크 소개에 대해서 포스팅하였습니다.

다음에는 소개에서 언급한 RDD에 대해 좀 더 깊게 포스팅하겠습니다.

 

반응형

'BigData > Spark' 카테고리의 다른 글

(6) 스트럭처 스트리밍  (0) 2020.03.31
(5) 스파크 스트리밍  (0) 2020.03.19
(4) 스파크 SQL  (0) 2020.03.17
(3) 스파크 설정  (0) 2020.03.13
(2) RDD  (0) 2020.03.12
반응형

1. 서론

이번 포스팅에서는 Hadoop 시스템 서비스 중 하나인 Hive에 대해 설명 및 간단한 예제를 진행하려고 합니다.

 

2. Hive

Hive는 페이스 북에서 개발 되었습니다.

Hive는 하둡에 저장된 데이터를 sql 기반으로 간편히 mapreduce를 수행할 수 있으며,

추가로 elasticsearch, hbase와 같은 저장소와 hdfs상의 중간 인터페이스 역할도 제공합니다.

 

hdfs에 있는 파일은 일반적으로 tab 혹은 , 으로 구분자를 통해 각 자리에 의미있는 데이터가 있습니다.

이러한 구분자를 기반으로 테이블을 생성해주고, select, insert쿼리를 수행할 수 있게 해줍니다.

단, update 와 delete쿼리는 지원되지 않습니다.
이유는, 결국 데이터는 구분자를 통한 파일의 한줄이기 때문입니다.
그 말은, DB와 같이 인덱싱이 걸려있지도, key가 있지도 않다는 의미입니다.

 

테이블은 아래와 같이 2가지가 존재합니다.

 

  • managed table
  • external table

 

1) managed table

 

hdfs의 데이터를 Hive에서 관리한다는 의미를 가지고 있습니다.

그렇기 때문에, drop table 질의를 하는 경우 hdfs의 데이터도 사라지게 됩니다.

 

2) external table

 

hdfs의 데이터와 hive 테이블과 심볼릭 링크가 걸린다고 생각하시면 됩니다.

그렇기 때문에, drop table 질의를 하는 경우에도 hdfs의 데이터는 유지가 됩니다.

 

3. 예제

예제로는 hdfs에 있는 데이터를 external 테이블로 만든 후, sql를 통하여 managed table로 만들고 두 테이블을 모두 drop 했을때의 hdfs 데이터 상태를 봐보도록 하겠습니다.

 

1) 데이터 준비

 

저는 아래와 같이 \t 구분자로 데이터를 hdfs:/user/young에 놓았습니다.

 

kim     1
geon    2
young   3
kim     1
geon    2
yeong   3

 

2) external table 생성

 

Hive에 접속하기 위해 hive 명령어를 통해 접속합니다.

그 후, 기본적으로 있는 default DB를 사용하도록 하겠습니다.

 

 

이제 아래와 같이 external 테이블을 생성합니다.

CREATE EXTERNAL TABLE temp
 (
    text string,
    count int
 )
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '\t'
 STORED AS TEXTFILE
 LOCATION '/user/young';

ROW FORMAT DELIMITED FIELDS TERMINATED BY 에는 한 라인에서 데이터를 나눌 구분자를 지정합니다.

저의 경우 \t으로 데이터를 만들었기 때문에 '\t'를 지정합니다.

 

LOCATION 에는 연결을 원하는 hdfs상 디렉토리경로를 지정합니다.

 

아래와 같이 테이블을 생성하게 되면 테이블 목록에 보이게 되며,

select 쿼리를 통하여 데이터를 조회할수도 있습니다.

 

 

 

 

 

 

반응형

 

 

 

 

 

 

3) managed table 생성

 

이젠 이 external table을 사용하여 새로운 managed table을 만들어보겠습니다.

 

쿼리는 아래와 같습니다.

 

CREATE TABLE temp_managed
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/user/young2'
AS
SELECT text, sum(count)
FROM temp
GROUP BY text
;

 

text로 group by 로 묶어서 count를 더하는 wordcount의 reducer의 역할을 하도록 쿼리하였고,

결과를 /user/young2 디렉토리에 놓도록 지정하였습니다.

managed 테이블은 단순히 external 이 아닌 create table로 테이블을 만들면 됩니다.

 

쿼리가 수행 후에는 아래와 같이 테이블 목록 및 hdfs에도 파일이 생성된것을 확인할 수 있습니다.

 

 

4) external table drop

 

이번에는 처음 만들었던 external table인 temp를 drop 시킨 후 hdfs의 파일을 봐보도록 하겠습니다.

 

쿼리는 아래와 같습니다.

drop table temp;

 

결과로는 아래와 같이 hdfs상의 데이터에는 영향이 없는것을 볼 수 있습니다.

이것은, 단순히 hdfs와 hive table간의 link만 끊은 것이기 때문입니다.

 

 

5) managed table drop

 

이번에는 managed table인 temp_managed 를 drop 시킨 후 hdfs 파일을 보도록 하겠습니다.

 

쿼리는 아래와 같습니다.

 

drop table temp_managed;

 

결과는 아래와 같이 hdfs의 데이터도 모두 삭제되어 No such File 구문이 나온것을 볼 수 있습니다.

 

 

추가로, Hive는 sql을 통해 간편히 데이터를 핸들링하기 위한 인터페이스이고, 모든 작업은 mapreduce로 이루어집니다.

 

 

4. 하이브 지원 SQL

 

하이브에서만 지원되는 sql 들이 있습니다.

 

그 중, 알아두면 유용할 정렬에 관련해서 아래 3개에 대해 정리하였습니다.

 

1) sort by

 

order by 와 비슷하게 느낄수도 있습니다.

하지만 order by의 경우에는 전체 결과를 하나의 리듀서가 정렬을 하게되는 반면,

sort by의 경우에는 각 리듀서에서 정렬을하게 됩니다.

 

그로인해, 리듀서 갯수 multiple이기 때문에 성능에 이점을 볼 수 있습니다.

단, 각 리듀서에서만 정렬이 이루어지기 때문에 정확히 정렬된  결과물은 보장되지 않습니다.

>리듀서 갯수를 한개로 한다면 order by 와 동일한 결과가 나옵니다.

 

2) distribute by

 

distribute by로 지정한 칼럼 기준으로 한 리듀서에 지정한 동일한 칼럼값의 데이터가 가도록 나누는 역할을 합니다.

 

예로, 아래와 같은 데이터가 있을 시

name age
kim 1
geon 2
yeong 2

distribute by age 를 하게되면 한 리듀서에는 (kim, 1)를 받게되고, 다른 리듀서는 (geon, 2), (yeong, 2)를 받게됩니다.

 

3) cluster by

 

cluster by는 distribute by와 sort by를 모두 한다고 보면 됩니다.

 

cluster by age를 하게되면, 각 리듀서는 age 기준으로 데이터 묶음을 받게되고 각 리듀서는 age기준으로 정렬하게 됩니다.

5. 마무리

이번 포스팅에서는 Hive에 대해 포스팅 하였습니다.

 

이렇게, Hadoop에 대한 포스팅을 완료했습니다.

다음 포스팅에서는 Hadoop eco system에서 많이 쓰이는 Spark에 대해 진행하도록하겠습니다.

 

감사합니다.

 

반응형

'BigData > Hadoop' 카테고리의 다른 글

(3) Hadoop 사용법  (0) 2020.03.09
(2) Hadoop 설치  (0) 2020.03.05
(1) Hadoop에 대해  (0) 2020.03.04
반응형

로봇이 commands에 있는 숫자만큼 이동합니다.

이동 중 첫 지점과 로봇의 위치의 두 점으로 이루어진 사각형이 생기며, 사각형이 최대일때의 면적을 구하는 문제입니다.

 

아래는 조건입니다.

  • 시작 위치는 (0, 0)
  • commands에서 -1은 turn right
  • commands에서 -2은 turn left
  • commands에서 -1, -2를 제외하고는 0이상 9이하만이 존재
  • obstacles는 장애물 위치이며, 장애물이 있으면 로봇은 전진하지 못함

 

저의 경우는 아래와 같이 풀었습니다.

 

class Solution:
    def robotSim(self, commands: List[int], obstacles: List[List[int]]) -> int:

        dx = [0, 1, 0, -1]
        dy = [1, 0, -1, 0]
        direction = 0

        # position (x, y)
        position = [0,0]
        res = 0
        obstacle_set = set(map(tuple, obstacles))

        for command in commands:
            if command == -1 :
                direction = (direction + 1) % 4
            elif command == -2:
                direction = (direction - 1) % 4
            else:
                for i in range(command):
                    if (dx[direction] + position[0], dy[direction] + position[1]) not in obstacle_set:
                        position[1] += dy[direction]
                        position[0] += dx[direction]
                        res = max(res, math.pow(position[0], 2) + math.pow(position[1], 2))
        return int(res)

 

일단, 동서남북으로 움직이기 위한 dx, dy 배열을 만들어 줍니다.

direction은 배열의 인덱스로 방향을 의미합니다.

 

그리고 저는 추가로 로봇의 현재 위치를 알기 위해 position 배열을 하나 만들었습니다.

 

그후는 commands를 이터레이션하며 -1, -2일때는 맞게 방향을 설정합니다.

그리고, 그 외 숫자가 나왔을 경우엔 숫자만큼 for문을 통하여 장애물이 있는지 없는지 체크하고 없다면 전진시킵니다.

 

전진 후에는 만들어진 사각형이 최대인지 max함수를 통해 구해줍니다.

 

사실, 여기서 계속 time 초과로 떨어졌었는데요.

그 이유는 위의 obstacle_set = set(map(tuple, obstacles)) 를 안해주었기 때문이였습니다......

 

장애물들을 list가 아닌 imutable한 tuple로 변환 후 set을 통하여 중복제거하여 불필요한 작업을 하지 않도록 해야합니다.

 

감사합니다.

 

반응형

'Algorithm > greedy' 카테고리의 다른 글

leetcode - Two City Scheduling  (0) 2020.03.12
leetcode - Maximize Sum Of Array After K Negations  (0) 2020.03.11
leetcode - Lemonade Change  (0) 2020.03.05
leetcode - Assign Cookies  (0) 2020.03.04
leetcode - Is Subsequence  (0) 2020.03.03

+ Recent posts