반응형

 

이번 문제는 주어진 트리의 각 depth 별로 수를 수집하여 List<List> 형식으로 반환해야합니다.

 

반환 값은 left -> right 로 숫자가 있어야 하며, 맨 아래 depth 의 값부터 들어있어야합니다.

 

저는 depth 값을 같이 넘겨주며, 트리를 모두 탐색하도록 하였습니다.

그리고, 마지막으로는 완성된 List를 reverse하도록 했습니다.

 

코드는 아래와 같습니다.

 

from typing import List


class TreeNode:
    def __init__(self, x):
        self.val = x
        self.right = None
        self.left = None

class Solution:

    def levelOrderBottom(self, root: TreeNode) -> List[List[int]]:
        if root == None: return []
        if root.right == None and root.left == None: return [[root.val]]

        res = [[root.val]]
        depth = 0
        self.find(root.left, depth + 1, res)
        self.find(root.right, depth + 1, res)
        res.reverse()
        return res

    def find(self, root: TreeNode, depth: int, res: List[List[int]]):
        if root == None: return
        if root != None:
            if len(res) <= depth:
                res.insert(depth, [root.val])
            else:
                res[depth].append(root.val)

        self.find(root.left, depth + 1, res)
        self.find(root.right, depth + 1, res)

 

감사합니다.

반응형
반응형

이번 문제는 주어진 트리의 max depth를 찾는 문제입니다.

 

완전탐색을 통해 트리의 모든 노드를 들리며 depth의 max를 구했습니다.

 

코드는 아래와 같습니다.

 

class TreeNode:
    def __init__(self, x):
        self.val = x
        self.left = None
        self.right = None

class Solution:
    def maxDepth(self, root: TreeNode) -> int:

        if root == None: return 0
        if root.right == None and root.left == None: return 1
        depth = 1
        depth = max(self.find(root.right, depth), self.find(root.left, depth))
        return depth

    def find(self, root: TreeNode, depth: int):
        if root == None: return depth
        return max(self.find(root.right, depth+1), self.find(root.left, depth+1))

 

감사합니다.

반응형
반응형

 

이번 문제는 1개의 tree를 인자로 받아 대칭인지 판단하는 문제입니다.

 

대칭트리의 경우에는, 트리의 왼쪽부분은 왼쪽 우선으로, 오른쪽 부분은 오른쪽 우선으로 탐색 시 만나는 값의 순서가 같습니다.

 

그렇기 때문에, 저는 아래와 같이 풀이했습니다.

 

class TreeNode:
    def __init__(self, x):
        self.val = x
        self.left = None
        self.right = None

class Solution:

    def isSymmetric(self, root: TreeNode) -> bool:
        if root == None: return True
        left_list = [root.val]
        right_list = [root.val]

        self.searchPriorLeft(root.left, left_list)
        self.searchPriorRight(root.right, right_list)
        
        if len(right_list) != len(left_list): return False

        for i in range(len(right_list)):
            if right_list[i] != left_list[i]: return False
        return True

    def searchPriorLeft(self, root: TreeNode, list):
        if root == None:
            list.append(None)
            return True
        list.append(root.val)
        if root.left == None and root.right == None: return True
        self.searchPriorLeft(root.left, list)
        self.searchPriorLeft(root.right, list)

    def searchPriorRight(self, root: TreeNode, list):
        if root == None:
            list.append(None)
            return True
        list.append(root.val)
        if root.left == None and root.right == None: return True
        self.searchPriorRight(root.right, list)
        self.searchPriorRight(root.left, list)

 

감사합니다.

반응형
반응형

1. 서론

이번 포스팅에서는 Chapter2의 동작 파라미터화 코드 전달하기 에 대해 진행하도록 하겠습니다.

 

진행하기에 앞서 동작 파라미터화의 의미를 설명하자면 아래와 같습니다.

 

아직은 어떻게 실행할것인지 결정하지 않은 코드 블록

 

2. 변화하는 요구사항에 대응하기

책에서는 사과 농부의 요구사항을 예로 들고 있습니다.

 

요구사항으로는 녹색인 사과 혹은 특정 무게 이상인 사과와 같이 filter 류의 요구사항들입니다.

 

1) 첫 번째 시도 : 녹색 사과 필터링

 

private enum Color {
    GREEN,
    RED
}

@Getter
@RequiredArgsConstructor
private static class Apple {
    private final Color color;
    private final int weight;
}

private static List<Apple> filterGreenApples(List<Apple> apples) {
    List<Apple> result = new ArrayList<>();
    for(Apple apple: apples) {
        if(Color.GREEN.equals(apple.getColor())) {
            result.add(apple);
        }
    }
    return result;
}

 

아직, 색이 다양해짐에 따라 메소드들이 늘어나야 하는 불편함이 존재합니다.

아래는 이를 해결하기 위해 값을 파라미터화한 케이스입니다.

 

 

2) 두 번째 시도 : 색을 파라미터화

 

private enum Color {
    GREEN,
    RED
}

@Getter
@RequiredArgsConstructor
private static class Apple {
    private final Color color;
    private final int weight;
}

private static List<Apple> filterApplesByColor(List<Apple> apples, Color color) {
    List<Apple> result = new ArrayList<>();
    for(Apple apple: apples) {
        if(color.equals(apple.getColor())) {
            result.add(apple);
        }
    }
    return result;
}

private static List<Apple> filterApplesByWeight(List<Apple> apples, int weight) {
    List<Apple> result = new ArrayList<>();
    for(Apple apple: apples) {
        if(apple.getWeight() > weight) {
            result.add(apple);
        }
    }
    return result;
}

 

 

값을 파라미터화한 이 코드가 나쁘진 않습니다. 다만, 중복되는 코드가 존재하는것을 볼 수 있습니다.

아래는 그것을 해결하기 위해 flag 값을 두어 필터링하는 케이스입니다.

 

 

3) 세 번째 시도 : 가능한 모든 속성으로 필터링

 

private static List<Apple> filterApples(List<Apple> apples, Color color, int weight, boolean flag) {
    List<Apple> result = new ArrayList<>();
    for(Apple apple: apples) {
        if((flag && apple.getColor().equals(color)) 
        || (!flag && apple.getWeight() > weight)) {
            result.add(apple);   
        }
    }
    return result;
}

public static void main(String[] args) {
    List<Apple> appleList = Arrays.asList(new Apple(Color.RED, 200), new Apple(Color.GREEN, 30), new Apple(Color.GREEN, 300));
    final List<Apple> greenAppleList = filterApples(appleList, Color.GREEN, 0, true);
    final List<Apple> heavyAppleList = filterApples(appleList, null, 150, false);
}

 

한개의 메서드로 줄어지긴 했지만 코드가 너무 지저분하며 이해하기가 어렵습니다.

이를 해결하기 위해, 이제 동작 파라미터화를 진행해보겠습니다.

 

 

 

 

 

반응형

 

 

 

 

3. 동작 파라미터화

private interface ApplePredicate {
    boolean test (Apple apple);
} 

private static class AppleHeavyWeightPredicate implements ApplePredicate {
    public boolean test(Apple apple) {
        return apple.getWeight() > 150;
    }
}

private static class AppleGreenColorPredicate implements ApplePredicate {
    public boolean test(Apple apple) {
        return Color.GREEN.equals(apple.getColor());
    }
}

 

위와 같이 인터페이스를 하나 두어 각 필터링을 담당하는 클래스들을 선언합니다.

 

1) 네 번째 시도 : 추상적 조건으로 필터링

 

private static List<Apple> filterApples(List<Apple> apples, ApplePredicate p) {
    List<Apple> result = new ArrayList<>();

    for(Apple apple: apples) {
        if(p.test(apple)) {
            result.add(apple);
        }
    }
    return result;
}

 

동작 파라미터화로 지저분한 코드를 깔끔하게 만들었으며 이해하기도 쉬워졌습니다.

다만, 요구사항이 늘어남에 따라 클래스를 생성해야 하니 코드수가 많이 늘게 됩니다.

또한, 한번만 사용하는 소스도 굳이 클래스를 생성하여 써야하는 불편함이 생겼습니다.

이를 해결하기 위해 아래와 같이 간소화 작업을 진행하겠습니다.

 

 

4. 복잡한 과정 간소화

1) 다섯 번째 시도 : 익명 클래스 사용

 

private static List<Apple> filterApples(List<Apple> apples, ApplePredicate p) {
    List<Apple> result = new ArrayList<>();

    for(Apple apple: apples) {
        if(p.test(apple)) {
            result.add(apple);
        }
    }
	return result;
}

final List<Apple> redAppleList = filterApples(appleList, new ApplePredicate() {
    public boolean test(Apple apple) {
        return Color.RED.equals(apple.getColor());
    }
});

 

익명클래스를 통해 클래스 선언을 제거했습니다.

하지만, 위 두번째 시도인 색을 파라미터화와 같이 중복되는 코드들이 많게됩니다.

 

2) 여섯 번째 시도 : 람다 표현식 사용

 

final List<Apple> redAppleList = filterApples(appleList, apple -> Color.RED.equals(apple.getColor()));

 

자바 8의 기능인 람다를 사용하여 중복되는 코드를 줄였으며 이해하기도 편하게 되었습니다.

 

5. 마무리

이번 포스팅에서는 동작 파라미터화 코드 전달하기에 대해 간단한 예제를 진행해보았습니다.

다음에는 람다 표현식에 대해 포스팅하겠습니다.

반응형

'Programming > ModernJavaInAction' 카테고리의 다른 글

(6) 스트림으로 데이터 수집  (0) 2020.04.04
(5) 스트림 활용  (0) 2020.04.04
(4) 스트림 소개  (0) 2020.03.28
(3) 람다 표현식  (0) 2020.03.28
(1) 자바 8, 9, 10, 11 : 무슨 일이 일어나고 있는가?  (0) 2020.03.21
반응형

1. 서론

 

이번 포스팅에서는 Modern Java In Action 의 1장인 자바 8, 9, 10, 11 : 무슨 일이 일어나고 있는가? 에 대해 알아보도록 하겠습니다.

 

2. 왜 아직도 자바는 변화하는가?

프로그래밍 언어 생태계는 끊임없이 변화하고 있습니다.

이러한 생태계에서 살아남기위해 프로그래밍 언어들은 계속 발전을 해왔고, 자바도 그 중 하나입니다.

 

자바는 살아남기 위해 무엇을 발전했는지 아래 설명하도록 하겠습니다.

 

 

1) 스트림 처리

 

자바8에서 java.util.stream 패키지가 추가되었습니다.

 

이 패키지에 있는 기능들은 리눅스의 파이프라인과 동일하다고 생각하시면 편합니다.

 

아래는 리눅스의 파일을 읽어 소문자로 변경하고 정렬하여 마지막 3줄만을 출력하는 command입니다.

 

cat file1 | tr "[A-Z]" "[a-z]" | sort | tail -3

 

각 파일읽기, 소문자 치환, 정렬 같은 일련의 작업을 ' | ' 사용하여 연결하는 것을 볼 수 있습니다.

 

이와 같이 자바 8에서도 Stream 패키지를 통해 각각의 작업을 하나의 일련의 작업으로 만들 수 있게 제공하고 있습니다.

 

 

2) 동작 파라미터화로 메서드에 코드 전달하기

 

자바 8에서는 메서드를 다른 메서드로 전달할 방법이 없었습니다.

 

하지만 자바 8에서 이를 지원하도록 개선되었고 이를 통해 복잡한 코드구조가 사라지게 되었습니다.

 

아래는 대표적인 메서드에 코드를 전달할 수 있는 함수형 인터페이스입니다.

패키지는 java.util.function 입니다.

 

  • Predicate
  • Consumer
  • Supplier

 

3) 병렬성과 공유 가변 데이터

 

자바 8에서는 스트림API를 지원하면서 간단히 병렬성을 가져갈수 있도록 제공하고 있습니다.

하지만, 이를 위해서는 멀티 쓰레드 환경에서도 서로 코드를 동시에 수행하더라도 안전하게 바꿔야하는 번거로움이 존재합니다.

 

스트림 함수 사용 시 외부접근 데이터를 Atomic한 객체로 만들어 사용해야 합니다.
이는, 위의 안전한 코드여야 하기 때문입니다.

 

 

 

 

 

반응형

 

 

 

 

 

3. 자바 함수

 

프로그래밍 언어에서 함수라는 용어는 메서드와 같은 의미로 사용됩니다.

자바 8에서는 이러한 메서드를 일급시민으로 올리도록 제공하고 있습니다.

 

일급시민이란, 각 구조체의 값을 전달할 수 있는 것을 의미합니다.

그러므로, 위의 메서드를 일급시민으로 올린다는 의미는 메서드라는 구조체의 값을 다른 구조체에게 전달할 수 있다는 의미입니다.

 

1) 메서드와 람다를 일급시민으로

 

1 - 메서드 참조

 

자바8 에서는 메서드참조를 제공합니다.

 

아래는 숨겨진 파일들만을 filter하여 얻어오는 코드입니다.

File[] hiddenFiles = new File(".").listFiles(new FileFilter() {
	@Override
	public boolean accept(File pathname) {
		return pathname.isHidden();
	}
});

 

위의 코드는 자바 8이 나오기전의 코드입니다.

 

자바 8에서는 람다와 메서드 참조를 제공하였고 그 결과 아래와 같이 더욱 readable 있는 코드로 리팩토링이 가능해졌습니다.

 

File[] hiddenFiles = new File(".").listFiles(File::isHidden);

 

람다를 통해 익명클래스를 간단하게 바꾸었으며, ::isHidden 인 메서드 참조를 통해 한결 간단해진 코드가 되었습니다.

 

 

2- 코드 넘겨주기

 

 

아래는 사과 리스트에서 초록색이며, 무게가 200이 넘는 사과만 분류하는 코드입니다. - 자바 8 이전

 

private enum Color {
    GREEN,
    RED
}

public static void main(String[] args) {
    List<Apple> appleList = Arrays.asList(new Apple(Color.RED, 200), new Apple(Color.GREEN, 30), new Apple(Color.GREEN, 300));
    final List<Apple> filterdGreenAppleList = filterGreenApples(appleList);
    final List<Apple> filterdGreenWithHeavyAppleList = filterHeavyApples(filterdGreenAppleList, 200);
}

@Getter
@RequiredArgsConstructor
private static class Apple {
    private final Color color;
    private final int weight;
}

private static List<Apple> filterGreenApples(List<Apple> apples) {
    List<Apple> result = new ArrayList<>();
    for(Apple apple: apples) {
        if(Color.GREEN.equals(apple.getColor())) {
            result.add(apple);
        }
    }
    return result;
}

private static List<Apple> filterHeavyApples(List<Apple> apples, int weight) {
    List<Apple> result = new ArrayList<>();
    for(Apple apple: apples) {
        if(apple.getWeight() > weight) {
            result.add(apple);
        }
    }
    return result;
}

 

하지만, 자바 8의 스트림 함수와 람다를 사용하면 아래와 같이 간단하게 바뀔 수 있습니다.

 

public static void main(String[] args) {
    List<Apple> appleList = Arrays.asList(new Apple(Color.RED, 200), new Apple(Color.GREEN, 30), new Apple(Color.GREEN, 300));
    final List<Apple> filterdGreenWithHeavyAppleList = appleList
            .stream()
            .filter(apple -> Color.GREEN.equals(apple.getColor()))
            .filter(apple -> apple.getWeight() > 200)
            .collect(Collectors.toList());
}

 

단, filter 에 있는 조건들이 일회성이 아니라면 별도 함수로 추출해서 사용해야 합니다.

 

4. 스트림

이전 옛날 자바에서는 한개의 CPU만을 사용하는 단점이 있었습니다.

하지만 자바 8에서는 한개가 아닌 멀티 CPU를 점유해서 사용하도록 변경되었고 이는 성능의 극대화를 가져다 주었습니다

 

대표적으로 parallelStream 을 들 수 있습니다.

parallelStream은 멀티 CPU를 통해 분할로 처리할때 사용합니다.

 

단, Parallel Stream 작업이 독립적이면서 CPU사용이 높은 작업에 사용해야합니다.

 

5. 디폴트 메서드와 자바 모듈

자바 8에서는 인터페이스에 디폴트 메서드를 제공합니다.

 

디폴트 메서드는 기존의 코드를 수정하지 않고 확장하기 위해서 만들어진 것이며,

간단하게 인터페이스에도 기본적으로 정의된 구현 메서드가 있을 수 있다고 생각하면 됩니다.

 

아래는 List의 default sort 함수입니다.

 

default void sort(Comparator<? super E> c) {
    Object[] a = this.toArray();
    Arrays.sort(a, (Comparator) c);
    ListIterator<E> i = this.listIterator();
    for (Object e : a) {
        i.next();
        i.set((E) e);
    }
}

 

6. 함수형 프로그래밍에서 가져온 다른 유용한 아이디어

 

자바 8에서 제공되는 기능들은 대체로 함수형 프로그래밍 특성을 가져다 주는 것입니다.

이런 함수형 프로그래밍의 패러다임을 가져오면서 많은 프로그램에 도움이 되는 아이디어가 나왔습니다.

 

한가지 예로, null을 회피하는 방법이며 자바에서는 Optional<T> 라는 컨테이너 클래스를 통해 제공하고 있습니다.

 

7. 마무리

이번 포스팅에서는 자바 8부터의 변화에 대해 간단한 소개와 설명을 했습니다.

다음에는 동작 파라미터화 코드 전달에 대해 포스팅하겠습니다.

반응형

'Programming > ModernJavaInAction' 카테고리의 다른 글

(6) 스트림으로 데이터 수집  (0) 2020.04.04
(5) 스트림 활용  (0) 2020.04.04
(4) 스트림 소개  (0) 2020.03.28
(3) 람다 표현식  (0) 2020.03.28
(2) 동작 파라미터화 코드 전달하기  (0) 2020.03.21
반응형

 

이번 문제는 2개의 tree를 인자로 받아 같은지 체크하는 문제입니다.

 

Tree의 경우 문제에서 아래와 같은 클래스로 정의하고 있습니다.

 

class TreeNode:
    def __init__(self, x):
        self.val = x
        self.left = None
        self.right = None

 

저의 경우에는 아래와 같은 로직을 재귀를 통해 완전 탐색하도록 하여 문제를 풀었습니다.

 

0. 2개의 현재 focus되고 있는 노드 객체가 맞는지 체크

1. 현재 값이 같은지 체크

2. 왼쪽 노드 체크

3. 오른쪽 노드 체크

 

코드는 아래와 같습니다.

 

class TreeNode:
    def __init__(self, x):
        self.val = x
        self.left = None
        self.right = None

class Solution:
    def isSameTree(self, p: TreeNode, q: TreeNode) -> bool:
        if p == q == None: return True
        elif p == None or q == None: return False
        return (p.val == q.val and self.isSameLeftRight(p.left, q.left) and self.isSameLeftRight(p.right, q.right))

    def isSameLeftRight(self, p: TreeNode, q: TreeNode):
        if p == q == None: return True
        elif p == None or q == None: return False
        else: return (p.val == q.val and self.isSameLeftRight(p.left, q.left) and self.isSameLeftRight(p.right, q.right))

 

감사합니다.

반응형
반응형

1. 서론

 

이번 포스팅에서는 스파크 스트리밍에 대해 알아 보도록하겠습니다.

 

대게, 스트리밍이라하면 실시간 처리로 알고 있습니다.
스파크는 마이크로 배치로 짧게 여러번 수행하여 스트리밍 처리를 제공합니다. 

 

2. 주요 용어

 

1) 스트리밍 컨텍스트

 

스파크 스트리밍을 수행하기 위해서는 스트리밍 모듈에서 제공하는 스트리밍 컨텍스트를 사용해야 합니다.

 

아래는 스트리밍 컨텍스트를 생성하고 사용하는 예제입니다. - (스칼라)

 

val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("StreamingSample")
conf.set("spark.driver.host", "127.0.0.1")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(3))
val rdd1 = sc.parallelize(List("Spark Streaming Sample ssc"))
val rdd2 = sc.parallelize(List("Spark Queue Spark API"))
val inputQueue = Queue(rdd1, rdd2)
val lines = ssc.queueStream(inputQueue, true)
val words = lines.flatMap(_.split(" "))
words.countByValue().print()

ssc.start()
ssc.awaitTermination()

 

예제에서 볼 수 있듯이 StreamingContext는 SparkContext에서 생성 할 수 있습니다.

추가로, 어느 주기로 수행할 지의 정보도 같이 넘겨야 합니다. 

예제에서는 Seconds(3)을 통해 3초에 한번씩 수행되도록 하였습니다.

 

그리고, 마지막에 있는 start 메소드를 실행해야만 스파크 스트리밍은 수행됩니다.

또한, awaitTermination 를 통해 임의로 애플리케이션이 종료되지 않도록 합니다.

 

종료는 개발자가 직접 어떤 시점 혹은 상황에 하도록 추가하여야 합니다.

 

 

2) DStream (Discretized Streams)

 

DStream은 RDD 와 같이 스파크에서 스트리밍을 위해 제공하는 데이터 모델입니다.

단순하게 RDD의 시퀀스로 이해하시면 되며, 지정한 배치 간격마다 input 에서 데이터를 가져와 DStream으로 변경하면서 처리하게 됩니다.

 

3. 데이터 읽기

스파크 스트리밍으로 처리할 수 있는 input 종류는 대표적으로 아래와 같습니다.

 

  • 소켓
  • 파일
  • RDD 큐
  • 카프카

 

1) 소켓

 

스파크 스트리밍은 TCP 소켓을 이용해 데이터를 수신할 수 있도록 지원하고 있습니다.

 

소켓을 통해 데이터를 읽는 예제는 아래와 같습니다. - (파이썬)

 

conf = SparkConf()
conf.set("spark.driver.host", "127.0.0.1")

sc = SparkConf()
ssc = StreamingContext(sc ,3)

ds = ssc.socketTextStream("localhost", 9000)
ds.pprint()

ssc.start()
ssc.awaitTermination()

 

socketTextStream 함수를 통해 데이터를 읽을 host와 port를 지정하여 사용하는것을 확인할 수 있습니다.

추가로, 데이터가 문자열이 아닌경우에는 convert() 함수를 사용하여 변환할 수 있습니다.

 

 

2) 파일

 

스파크는 하둡과 연계해서 자주 사용합니다.

그로인해, 파일에 대해서도 스트리밍 처리를 지원하고 있습니다.

 

사용법은 아래와 같습니다.

ds = ssc.textFileStream(<path>)

 

위의 path는 읽어 들일 파일의 디렉터리 위치입니다.

 

파일을 input으로 사용하는 경우, 파일의 변경내용을 추적하지 못한다는 것을 염두하고 사용해야 합니다.

 

 

3) RDD 큐

 

RDD 큐의 경우에는 위 예제에서도 보셨겠지만,  일반적으로 테스트용으로 많이 사용하는 input 종류입니다.

 

사용법은 아래와 같습니다.

 

queue = Queue(rdd1, rdd2)
ds = ssc.queueStream(queue)

 

 

4) 카프카

 

카프카는 스파크와의 연계가 가능한 분산 메시징 큐 시스템입니다.

카프카에 대한 자세한 내용은 아래를 참고해주시면 되겠습니다.

(https://geonyeongkim-development.tistory.com/category/MQ/Kafka)

 

카프카를 input으로 사용할때는 스파크에서 제공하는 KafkaUtils의 createDirectStream 함수를 사용할 수 있습니다.

 

예제는 아래와 같습니다. (스칼라 , spark-streaming-kafka-0-10 버전)

val ssc = new StreamingContext(sc, Seconds(3))

val params = Map(
	ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
	ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
	ConsumerConfig.GROUP_ID_CONFIG -> "test-group-1",
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"
)

val topics = Array("test")

val ds = KafkaUtils.createDirectStream[String, String](
	ssc,
    PreferConsistent,
    Subscribe[String, String](topics, params)
)

ds.flatMap(record => record.value.split(" "))
	.map((_, 1))
    .reduceByKey(_ + _)
    .print

ssc.start()
ssc.awaitTermination()

 

위 예제에서 볼 수 있듯이 카프카를 input으로 사용하는 경우에는 PreferConsistent, Subscribe 인자를 추가로 주어야 합니다.

 

Subsribe 인자는 스파크가 아닌 카프카 옵션으로 컨슈머가 수신할 토픽, 역직렬화 종류, 컨슈머 그룹 id 등이 있습니다.

 

PreferConsistent 인자는 스파크 내부에서 카프카 컨슈머를 어떻게 관리할지에 대한 설정입니다.

 

아래와 같이 총 3가지가 있으며 하나를 지정하시면 됩니다.

 

1 - PreferBrokers

 

스파크에서 카프카 컨슈머의 역할은 각 익스큐터에서 일어나게 됩니다.

매번 마이크로 배치가 수행될 때마다 컨슈머를 새로 생성하기보다는 캐싱해둔 컨슈머를 사용하는것이 성능상 유리합니다.

컨슈머는 prefetch라는 특징이 있어 현재 처리할 데이터 뿐만이 아니라 다음 데이터까지 가져와 버퍼에 저장합니다.
이 버퍼에 있는것을 사용하기 위해서는 전에 사용한 컨슈머를 사용해야 하기 때문입니다.

이를 위해, 파티션의 리더 서버와 같은 서버에서 동작중인 익스큐터에게 해당 파티션의 수신을 맡기는 옵션입니다.

( 브로커 서버와 익스큐터가 동일 서버에서 동작한다는 가정하 입니다 )

 위와 같은 설정은 카프카 클러스터와 스파크 클러스터를 동일한 서버에서 운용한다는 전제입니다.
개인적으로, 리소스를 카프카, 스파크 모두 사용해야 하므로 추천하지 않는 설정이며 클러스터 구축도 별도로 하길 추천합니다.

 

2 - PreferConsistent

 

익스큐터들에게 균등하게 파티션의 점유권을 할당하는 옵션입니다.

특별한 이유가 없다면 가장 적합한 옵션입니다.

 

3 - PreferFixed

 

이름에서 알 수 있듯이, 의도적으로 특정 익스큐터를 지정하여 데이터를 처리하도록 하는 옵션입니다. 

 

 

 

 

 

반응형

 

 

 

 

 

 

4. 데이터 다루기 (기본 연산)

DStream을 생성하는 방법을 알아봤으니, 이제 DStream을 이용하여 데이터를 가공할 수 있는 연산에 대해 알아보겠습니다.

 

연산 설명
print() DStream에 포함된 각 RDD를 출력합니다.
map(func) DStream의 RDD에 포함된 각 원소에 func함수를 적용하여 새로운 DStream을 반환합니다.
(RDD 연산과 동일하게 mapPartitions(), mapPartitionsWithIndex()도 가능합니다)
flatMap(func) RDD의 flatMap 함수와 동일합니다.
count(), countByValue() DStream에 포함된 데이터 갯수를 반환합니다.
RDD와 다른점은 반환 값이 DStream 인 점입니다. 
reduce(func), reduceByKey(func) DStream에 포함된 RDD값들을 집계해서 한개의 값으로 변환합니다.
RDD가 키와 값의 형태라면 reduceByKey를 통해 키 별로 집계도 가능합니다.
반환값은 DStream 입니다.
filter(func) DStream의 모든 요소에 func를 적용하여 결과가 true인 요소만 포함한 DStream을 반환합니다.
union() 두개의 DStream을 합해서 반환합니다.
join() 키와 값으로 구성된 두 개의 DStream을 키를 이용해 조인한 결과를 DStream으로 반환합니다.
RDD연산과 동일하게 leftOuterJoin, rightOuterJoin, fullOuterJoin 모두 가능합니다.

 

5. 데이터 다루기 (고급 연산)

4번에서 알아본 연산들은 이전장에서 배운 RDD 연산과 거의 비슷합니다.

 

이번에는 DStream에 특화된 연산에 대해 알아보겠습니다.

 

1) transform(func)

 

DStream 내부의 RDD에 func 함수를 적용 후 새로운 DStream을 반환합니다.
RDD에 접근하여 적용하는것이기 때문에, RDD 클래스에서만 제공되는 메서드도 사용가능합니다.

 

2) updateStateByKey()

 

스트림으로 처리하다 보면 이전 상태를 알아야하는 경우가 있습니다.

이를 위해 updateStateByKey()를 사용할 수 있습니다.

 

updateStateByKey는 배치가 수행될때마다 이전 배치의 결과를 같이 전달하여 처리할 수 있도록 제공하는 연산입니다.


이전 배치의 결과를 알고 있어야 하기 때문에 필수적으로 checkpoint라는 함수로 이전 결과를 영속화해야합니다.

 

아래는 예제와 결과입니다.

 

val ssc = new StreamContext(conf, Seconds(3))

val t1 = ssc.sparkContext.parallelize(List("a", "b", "c"))
val t2 = ssc.sparkContext.parallelize(List("b", "c"))
val t3 = ssc.sparkContext.parallelize(List("a", "a", "a"))
val q6 = mutable.Queue(t1, t2, t3)

val ds6 = ssc.queueStream(q6, true)

ssc.checkpoint(".") // 현재 디렉터리에 영속화

val updateFunc = (newValues: Seq[Long], currentValue: Option[Long]) => Option(currentValue.getOrElse(0L) + newValues.sum)

ds6.map((_, 1L)).updateStateByKey(updateFunc).print

 

[배치 1차]

(c, 1)

(a, 1)

(b, 1)

 

[배치 2차]

(c, 2)

(a, 1)

(b, 2)

 

[배치 3차]

(c, 2)

(a, 4)

(b, 2)

 

 

3) 윈도우 연산

 

스트림 처리를 하다보면 마지막 배치의 결과가 아닌 그 이전 배치의 결과까지도 필요한 경우가 있습니다.

이를 위해, 스파크 스트리밍에서는 윈도우 연산들을 제공합니다.

 

윈도우 연산은 아래 그림과 같이 동작하기 때문에 윈도우 길이(= window length)와, 배치주기(= sliding interval)가 추가로 필요합니다.

 

이 윈도우 길이와 배치주기값은 스트리밍 컨텍스트를 생성할때 지정한 배치 간격의 배수로 지정해야합니다.

예를 들어 배치 간격을 3초로 했다면 윈도우 길이와 배치주기는 6초, 9초, 12초 ... 여야 합니다.

 

 

3-1) window(windowLength, slideInterval)

 

윈도우 연산은 slideInterval 시간마다 windowLength 만큼의 시간동안 발생된 데이터를 포함한 DStream을 생성합니다.

 

3-2) countByWindow(windowLength, slideInterval)

 

윈도우 크기만큼 DStream의 요소 개수를 포함한 DStream을 생성합니다.

 

3-3) reduceByWindow(func, windowLength, slideInterval)

 

윈도우 크기만큼의 DStream에 func를 적용한 결과의 DStream을 생성합니다.

 

3-4) reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

 

키와 값의 형태에서 적용할 수 있는 연산입니다. 다만, invFunc 라는 함수가 추가로 받을 수 있습니다.

invFunc 함수에 대해 설명하기 이전에 윈도우 연산의 단점에 대해 설명하겠습니다.

 

윈도우에는 중복된 데이터가 있습니다.

그렇다는것은 처리한 데이터를 다시 처리할 수 있으며, 쓸모없는곳에 성능을 낭비할 수 있게 됩니다.

 

이를, 해결하기 위해 reduceByKeyAndWindow 연산에서는 invFunc인자를 받으며,

invFunc는 기존에 처리한 데이터는 빼도록 하는 역 리듀스함수입니다.

 

3-5) countByValueAndWindow(windowLength, slideInterval, [numTasks]))

 

윈도우 내에 포함된 데이터들을 Value 기준으로 카운트하여 새로운 DStream을 생성합니다.

 

6. 데이터 저장

DStream 역시, RDD와 같이 데이터를 저장해야하며 이를 위해 제공되는 함수들은 아래와 같습니다.

 

  • saveAsTextFiles()
  • saveAsObjectFiles()
  • saveAsHadoopFiles()

RDD와 동일하게 DStream 역시 파일, 객체, 하둡에 데이터를 저장할 수 있습니다.

 

다만, RDD 함수와 다른점은 인자로 접두어와 접미어를 받으며 실제 저장은 "접두어 + -  + 시간 + . + 접미어" 으로 된다는 점입니다.

 

아래는 예제와 같이 함수를 사용하게 된다면 결과는 <저장위치/test-시간(ms).txt> 로 저장하게 됩니다.

 

ds.saveAsTextFile("저장위치/test", "txt")

 

7. CheckPoint

체크포인팅은 분산 클러스터 환경처럼 오류가 발생할 가능성이 높은 환경에서 장시간 수행되는 시스템들이 주시로 상태값을 안전성이 높은 저장소에 저장하여, 장애가 발생할 경우 복구를 위해 사용하는 용어입니다.

 

스파크 스트리밍에서는 이 체크포인팅을 크게 데이터 체크포인팅과 메타데이터 체크포인팅으로 분리할 수 있습니다.

 

1) 메타데이터 체크포인팅

 

메타데이터 체크포인팅이란 드라이버 프로그램을 복구하는 용도로 사용됩니다.

포함 데이터로는 아래와 같습니다.

  • 스트리밍컨텍스트를 생성할 시에 사용했던 설정 값
  • DStream에 적용된 연산 히스토리
  • 장애로 인해 처리되지 못한 배치 작업

 

2) 데이터 체크포인팅

 

데이터 체크포인팅은 최종 상태의 데이터를 빠르게 복구하기 위해 사용합니다.

 

이를 위해서는 상태를 저장과 읽어야하며 각 제공 메서드는 아래와 같습니다.

 

// 저장
ssc.checkpoint("저장 할 디렉터리 경로")

// 읽기
StreamingContext.getOrCreate("체크포인팅 경로", 스트리밍 컨텍스트 생성 함수)

 

읽기의 경우는 체크포인팅 경로에 데이터가 있다면 데이터를 가진 채 스트리밍 컨텍스트가 반환되며, 없는경우에는 새 스트리밍 컨텍스트를 반환합니다.

 

8. 캐시

 

DStream의 경우에도 RDD와 동일하게 cache() 기능을 제공합니다.

 

updateStateByKey, reduceByWindow 와 같은 연산은 이전 배치 결과를 알아야하기 때문에 명시하지 않아도 캐시를 내부적으로 수행합니다.

 

9. 마무리

이번에는 데이터 스트리밍 대해서 포스팅하였습니다.

다음에는 스트럭쳐 스트리밍에 대해 포스팅하겠습니다.

반응형

'BigData > Spark' 카테고리의 다른 글

(7) Apache Zeppelin  (0) 2020.04.20
(6) 스트럭처 스트리밍  (0) 2020.03.31
(4) 스파크 SQL  (0) 2020.03.17
(3) 스파크 설정  (0) 2020.03.13
(2) RDD  (0) 2020.03.12
반응형

1. 서론

이번 포스팅에서는 스파크 SQL에 대해서 알아보겠습니다.

 

2. 스파크 SQL

스파크 SQL은 RDD에서 표현하지 못하는 스키마 정보를 표현가능하도록 보완된 모델입니다.

 

대표적으로 데이터 프레임과 데이터셋이 있습니다.

 

스파크 SQL은 언어별 사용유무는 아래와 같습니다.

 

스파크 SQL 스칼라 자바 파이썬
데이터 프레임 O X O
데이터 셋 O O X

 

1) 연산 종류

 

  • 트랜스포메이션 연산
  • 액션연산

 

2) 프로그래밍 구성요소

 

스파크 SQL을 사용하여 프로그래밍 시 필요한 구성요소는 아래와 같습니다.

 

구성요소 의미
스파크 세션 1. 데이터 프레임을 생성하기 위해 이용.
2. 인스턴스 생성을 위한 build()메서드 제공
3. 하이브 지원 default
데이터 셋 RDD와 같은 타입 기반연산 + SQL과 같은 비타입 연산 제공
데이터 프레임 ROW 타입으로 구성된 데이터 셋
DataFrameReader 1. SparkSession의 read()메서드를 통해 접근 가능
2. ["jdbc", "json", "parquet"] 등 다양한 input 제공.
DataFrameWriter 1. SparkSession의 write()메서드를 통해 접근 가능
로우, 칼럼 데이터 프레임을 구성하는 요소(한 로우에 1개 이상의 칼럼 존재)
functions 데이터를 처리할떄 사용할 수 있는 각종 함수를 제공하는 오브젝트
StructType, StructField 데이터에 대한 스키마 정보를 나타내는 API
GroupedData, GroupedDataSet 1. groupBy() 메서드로 인해 사용
2. 집계와 관련된 연산 제공

 

3) 코드 작성 절차

 

  1. 스파크 세션 생성
  2. 스파크 세션으로부터 데이터셋 또는 데이터 프레임 생성
  3. 생성된 데이터셋 또는 데이터프레임을 이용해 데이터 처리
  4. 처리된 결과 데이터를 외부 저장소에 저장
  5. 스파크 세션 종료

 

4) 스파크 세션

 

스파크 세션의 사용목적은 아래와 같습니다.

 

  1. 데이터 프레임, 데이터 셋 생성.
  2. 사용자 정의 함수(UDF)를 등록.

다음은 스파크 세션의 예제입니다.

 

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("sample")\
    .master("local[*]")\
    .getOrCreate()

 

 

 

 

 

반응형

 

 

 

 

 

 

3. 데이터프레임

데이터 프레임은 Row라는 스파크 데이터 모델을 사용하는 wrapper 모델입니다.

 

RDD와의 차이점으로는 데이터 값뿐만이 아닌 스키마 정보까지 함께 다룬다는 차이가 있습니다.

 

1) 데이터 프레임 생성 방법

 

1. 외부의 데이터 소스

  • DataFrameReader의 read()메서드 사용
  • DataFrameReader가 제공하는 주요 메서드는 아래와 같습니다.

 

메소드 의미
format() 1. input 데이터의 유형을 지정
2. orc, libsvm, kafka, csv, jdbc, json, parquet, text, console, socket 등 사용 가능
3. 써드파티 라이브러리 존재
option/options() 키와 값형태로 설정 정보 지정.
load() input으로 부터 실제 데이터 프레임을 생성.
jdbc() 데이터베이스를 input으로 사용하기위한 간편 메서드
json() json을 input으로 사용하기 위한 간편 메서드(파일의 경우 각 라인 단위로 json이 구성되어야 함)
orc() orc 형식 데이터 다룰수 있음
parquet() parquet형식 데이터를 가져와 데이터 프레임 생성
schema() 사용자 정의 스키마 지정가능
table() 테이블 데이터를 데이터프레임으로 생성
text() 텍스트 파일을 읽어 데이터 프레임 생성
csv() 1. csv 파일을 읽어 데이터 프레임 생성
2. 2.0버전부터 제공

 

2. 기존 RDD 및 로컬 컬렉션으로부터

 

  • RDD와 달리 스키마정보를 함께 지정하여 생성해야 합니다.
  • 스키마 지정 방법
    1. 리플렉션 = 스키마 정의를 추가하지 않고 데이터 값으로부터 알아서 스키마 정보를 추출하여 사용하는 방법입니다.
    2. 명시적 타입 지정 = 말 그대로 명시적으로 스키마를 정의하는것입니다. -> (StructField, StructType 사용)
    3. 이미지 파일을 이용한 데이터 생성
      • 2.3부터 지원
      • ImageSchema.readImages를 사용하여 생성 가능합니다.
      • 이미지 파일 경로, 가로, 세로, 채널정보 등 추출 가능합니다.

 

2) 주요 연산 및 사용법

 

1. 액션 연산

 

연산 설명
show() 1. 데이터를 화면에 출력하는 연산
2. 3개의 인자 존재(레코드 수, 표시할 값의 길이, 세로 출력 여부)
3. 기본값은 20개 레코드수, 20바이트 길이 제한
head(), first() 데이터셋의 첫번째 ROW를 반환.
take() 첫 n개의 ROW 반환
count() 데이터셋의 로우 갯수 반환
collect(), collectAsList() 데이터셋의 모든 데이터를 컬렉션 형태로 반환(Array, List)
decribe() 숫자형 칼럼에 대해 기초 통계값 제공(건수, 평균값, 표준편차, 최솟값, 최댓값)

 

2. 기본 연산

 

연산 설명
cache(), persist() 1. 데이터를 메모리에 저장.
2. [NONE, DISK_ONLY, DISK_ONLY_2, MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_ONLY_SER, MEMORY_ONLY_SER_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER, MEMORY_AND_DISK_SER_2, OFF_HEAP]옵션 가능
3. cache는 MEMORY_AND_DISK 옵션의 persist
createOrReplaceTempView() 1. 데이터프레임을 테이블로 변환
2. 스파크 세션인 종료되면 사라짐.
explain() 데이터 프레임 처리와 관련된 실행 계획 정보를 출력.

 

3. 비타입 트랜스포메이션 연산

 

연산 설명
alias(), as() 칼럼이름에 별칭 부여
isin() 칼러의 값이 인자로 지정된 값에 포함되어 있는지 여부 확인
when() 1. if~else와 같은 분기 처리 연산 수행
2. functions, Column 모두 제공(최초 사용시에는 functions의 메소드를 사용해야함)
max(), mean() 칼럼의 최댓값, 평균값 계산.
collect_list(), collect_set() 특정 칼럼을 모아서 list 혹은 set을 반환
count(), countDistinct() 특정 칼럼에 속한 데이터의 갯수, 중복을 제거한 데이터의 갯수
sum() 특정 칼럼 값의 합계
grouping(), grouping_id() 소계 결과에 그룹화 수준을 파악하기 위해 사용.
array_contains(), size(), sort_array() 특정값이 배열에 존재하는지, 배열의 사이즈, 배열 정렬 기능
explode(), posexplode() 1. 배열, map 데이터를 여러개의 row로 변환
2. posexplode의 경우 순서정보도 부여
current_data(), unix_timestamp(), to_date() 1. current_data = 현재 시간값
2. unix_timestamp = string을 Date로 변환(yyyy-MM-dd HH:mm:ss)
3. to_date = string을 Date로 변환(yyyy-MM-dd)
add_months(), date_add(), last_day() 1. 달 더하는 연산
2. 일 더하는 연산
3. 해당 달의 마지막 날짜.
window() 1. DateType 칼럼을 대상으로 적용 가능.
2. 일정크기의 시간을 기준으로 윈도우를 생성하여 각종 집계 연산 수행.
round(), sqrt() 반올림, 제곱근 값 계산
array() 여러 칼럼값을 배열로 만듬.
desc(), asc() sort() 메소드와 함께 사용하는 정렬 방법 메소드
desc_nulls_first, desc_nulls_last, asc_nulls_first, asc_nulls_last 정렬시 null값 기준 정의 메소드.
split(), length() 문자열 분리, 길이 반환 메소드
rownum(), rank() 오라클의 partitionBy개념의 Window 범위 안에서 사용하는 메소드
udf() 1. 사용자 정의 함수
2. 파이썬에서 자바 UDF사용하고 싶을 시 spark.udf.registerJavaFunction 메소드 사용
select(), drop() 특정 칼럼을 포함/ 제외한 데이터 프레임 생성
filter(), where() 특정 조건을 만족하는 데이터프레임만을 반환
agg() 특정칼럼에 대해 sum, max와 같은 집합연산을 수행하기 위해 사용.
apply(), col() 데이터프레임의 칼럼 객체 생성
groupBy() SQL문의 groupBy 연산 수행, 집합연산 수행 가능
cube() 데이터의 소계를 반환( = 소계대상인 칼럼들의 모든 가능한 조합의 부분합)
distinct(), dropDuplicates() 1. distinct의 경우 모든 컬럼값이 같을때 중복 판단
2. dropDuplicates는 중복을 제거하고자 하는 컬럼 지정 가능
intersect() 두개의 데이터프레임의 교집합
except() 두개의 데이터프레임의 차집합
join() 1. 두 개의 데이터프레임의 join
2. inner, outer, left_outer, right_outer, leftsemi 등 join 종류 모두 지원
3. 조인 칼럼에 대해서는 중복으로 포함되지 않는 특징 존재
crossjoin() 두 데이터프레임의 카테시안곱 지원
na() 1. null, NaN값이 포함되는 경우 사용
2. DataFrameNaFunctions 인스턴스 반환
3. DataFrameNaFunctions가 가지고 있는 메서드 사용(drop, fill, replace 등등)
orderBy() SQL의 orderBy와 같은 동작 수행
stat() 1. 특정 칼럼값의 통계수치를 제공하는 DataFrameStatFunctions 인스턴스 제공
2. corr, cov, crosstab, freqItems, sampleBy 등의 메소드 사용 가능
withColumn(), withColumnRenamed() 새로운 칼럼 추가 혹은 이름을 변경하는 메서드
repartitionByRange() repartition시 데이터 프레임의 갯수를 기준으로 균등하게 분포(2.3부터 가능)
colRegax() 정규식을 이용하여 칼럼을 선택할 수 있는 기능(2.3부터 가능)
unionByName() 1. union할 시 데이터프레임의 칼럼 이름을 기준으로 수행
2. 그냥 union은 데이터프레임의 데이터 순서로 수행되기 때문에 데이터가 엉킬수 있음.
to_json, from_json 칼럼 수준에서 데이터를 json으로 저장 및 가져오는 기능 제공.

 

 

3) 데이터 저장

 

DataFrameWriter.write 사용하여 데이터프레임 데이터를 저장합니다.

 

DataFrameWriter의 주요 메소드는 아래와 같습니다.

 

메소드 의미
save 데이터를 저장하는 메소드
format 저장하는 데이터의 format을 지정.
partitionBy 특정 칼럼값을 기준으로 파티션 설정
options 데이터 저장시 추가 설정 시 사용
mode 저장 모드 설정(Append, Overwrite, ErrorExists, Ignore)
saveAsTable 테이블 형태로 저장 및 영구 저장 가능

 

4) Pandas 연동

 

PyArrow 라이브러리를 사용하여 스파크의 데이터프레임과 파이썬의 Pandas의 상호 변환이 가능합니다.

스파크 2.3버전부터 가능합니다.
추가로, spark.sql.execution.arrow.enabled 옵션을 true로 하여야 합니다.

 

 

4. 데이터 셋

 

데이터 셋은 Row가 아닌 개발자 지정 클래스를 Type으로 쓰는 wrapper 모델입니다.

 

데이터셋의 장단점은 아래와 같습니다.

 

  • 장점 = 지정 클래스를 사용함으로 인해 컴파일 시 타입오류 검증이 가능합니다.
  • 단점 = 사용자 정의 타입 클래스를 사용하기 때문에 Spark SQL 최적화가 안될 수 있습니다.

1) 데이터 셋 생성

 

  • 생성 방법 = 자바 객체, 기존 RDD, 데이터 프레임, 외부 source 가 있습니다.
  • 생성 시 인코더 지정(필수) = 성능 최적화를 위해 기존 오브젝트를 스파크 내부 최적화 포맷으로 변환해야 하는데 그때 사용합니다.  (데이셋은 내부적으로 InternalRow클래스를 사용합니다)
  • 스칼라의 경우, 기본타입의 경우에는 import를 통하여 암묵적 인코더 사용 가능합니다.
  • 자바의 경우, 데이터 프레임을 만든 후 이것을 데이터 셋으로 변환하여 사용해야 합니다.
  • 데이터 프레임으로부터 생성시에는 type을 지정해야하기 때문에 as() 메서드를 통하여 생성할 수 있습니다.
  • range() 메서드를 통하여 간단한 샘플데이터 또한 생성 가능합니다.

 

2) 타입 트랜스포메이션 연산

 

연산 설명
select() 1. 데이터 프레임의 select 메서드와 동일
2. 단, as() 메서드를 통해 type을 지정해야함.
as() 1. 데이터 셋에 별칭 부여
2. column에 부여하는 것이 아닌 데이터 셋에 타입을 부여.
distinct() 중복을 제외한 요소만으로 데이터셋 반환
dropDuplicates() distonct() 메서드에서 칼럼을 지정할 수 있는 기능 추가.
filter() 사용자 정의 조건을 만족하는 데이터만으로 구성된 데이터 셋 반환.
map(), flatMap() 1. RDD의 map(), flatMap() 메서드와 동일
2. 단 데이터 셋의 타입을 인자로 사용 가능.
3. 자바의 경우 인코더 지정 필수
groupByKey() 1. RDD의 groupBy와 기능 동일
2. KeyValueGroupedDataset 클래스 반환
agg() 1. 데이터 프레임에서 본 agg() 메서드와 동일
2. 단, 사용하는 칼럼은 반드시 TypeedColumn 클래스여야 한다.
3. 사용가능한 집계 연산 = avg(), count(), sum(), sumLong()
mapValues(), reduceGroups() 1. KeyValueGroupedDataset클래스의 매서드
2. 맵연산, 리듀스 연산을 수행.

 

 

5. QueryExecution

 

QueryExecution은 Spark SQL 사용시 최적화 과정에서 일어나는 일들을 확인할 수 있는 API입니다.

 

Spark Sql의 Query 실행은 아래와 같은 순으로 진행되고 최적화됩니다.

 

순서 Plan 종류 확인 메소드 종류 
1 LogicalPlan QueryExecution.logical
2 SessionState의 Analyzer가 적용된 LogicalPlan  QueryExecution.analyzed
3 SessionState의 Optimizer가 적용된 LogicalPlan QueryExecution.optimizedPlan
4 SessionState의 SparkPlanner가 적용된 SparkPlan QueryExecution.sparkPlanner
5 SparkPlan에 추가적인 최적화 과정 적용하여 SparkPlan QueryExecution.executedPlan

 

6. 마무리

이번에는 스파크 SQL, 데이터 프레임, 데이터 셋 대해서 포스팅하였습니다.

다음에는 스파크 스트리밍에 대해 포스팅하겠습니다.

 

반응형

'BigData > Spark' 카테고리의 다른 글

(6) 스트럭처 스트리밍  (0) 2020.03.31
(5) 스파크 스트리밍  (0) 2020.03.19
(3) 스파크 설정  (0) 2020.03.13
(2) RDD  (0) 2020.03.12
(1) 스파크 소개  (0) 2020.03.11
반응형

 

인자로 받은 문자열 중 나란히 있는 중복문자를 반복적으로 제거하고, 남은 문자열을 반환하는 문제입니다.

위 예제를 보면 아래와 같이 처리가 흐르게 됩니다.

 

1. abbaca -> bb 제거

2. aaca -> aa 제거

3. ca 반환

 

저는 아래와 같이 풀이하였습니다.

 

class Solution:
    def removeDuplicates(self, S: str) -> str:
        stack = []
        stack.append(S[0])

        for i in S[1:]:
            if stack and stack[-1] == i:
                stack.pop()
            else:
                stack.append(i)
        return ''.join(stack)

 

stack으로 이용할 list를 하나 만들었습니다.

그 후, 문자열을 iterate하며 stack에서 peek한 결과와 동일하면 pop 아니면 append 하였습니다.

 

감사합니다.

반응형

'Algorithm > DataStructure' 카테고리의 다른 글

leetcode - Tree - Maximum Depth of Binary Tree  (0) 2020.03.24
leetcode - Tree - Symmetric Tree  (0) 2020.03.23
leetcode - Tree - Same Tree  (0) 2020.03.19
leetcode - Stack - Min Stack  (1) 2020.03.13
leetcode - Stack - Valid Parentheses  (0) 2020.03.13
반응형

 

이번 문제는 간단히 Stack의 기능인 push, pop, top에서 추가로 getMin 함수를 구현하는 클래스를 작성하면 됩니다.

 

저는 아래와 같이 풀었습니다.

 

class MinStack:

    def __init__(self):
        self.stack = []

    def push(self, x: int) -> None:
        self.stack.append(x)

    def pop(self) -> None:
        self.stack.pop()

    def top(self) -> int:
        return self.stack[-1]

    def getMin(self) -> int:
        return min(self.stack)

 

감사합니다.

반응형

+ Recent posts