반응형

1. 서론

Hadoop eco system에서 비관계형 분산 데이터 베이스로 많이 사용하는 Hbase에 대해 공부한 내용을 공유하고자 합니다.

책은 HBase 완벽 가이드 를 통해 공부하였습니다.

 

이번 포스팅에서는 1장인 소개부분을 진행하도록 하겠습니다.

 

2. 관계형 데이터 베이스 시스템의 문제점

 

책에서는 RDB에 대한 확장성 측면에서 문제점을 얘기하고 있습니다.

물론, RDB의 대표적인 MySql, Oracle, Postgres 등은 실제 서비스에서도 많이 사용되고 있는 DB이며,
현재는 확장성을 고려해 MySql, Postgres 등은 분산 클러스터 기능도 제공하고 있습니다.

 

DB 기반 서비스는 확장됨에 따라 DB 성능이 매우 중요해지게 됩니다.

 

이 성능을 올리기 위해 일반적으로 아래와 같은 방법을 사용할 수 있습니다.

 

  1. Read 질의를 위한 슬레이브 데이터 베이스 사용
  2. 멤캐시드와 같은 캐시 추가

하지만, 위 2가지 방법의 경우에는 Read를 위한 방법으로 Write에 대한 성능 향상은 아니며,

마스터/ 슬레이브 구조에서는 마스터와 슬레이브의 컴퓨팅 성능의 차이가 클수록 성능에도 악영향을 끼칠수 있게 됩니다.

 

결국, DB를 구성한 서버의 자원을 모두 올려야하는 비용이 들며, 캐시를 사용하게 되면 순간적인 데이터 일관성이 깨지게 되는 문제점이 발생할 수 밖에 없게 됩니다.

 

최종적으로는 RDB의 장점인 정규화를 없애고, 성능을 위해 비정규화를 하게 됩니다.

정규화는 결국 데이터를 분리하게 되는것입니다. 그렇다는것은 데이터를 가져오기 위해서 join연산을 수행해야 합니다.
하지만 RDB에서는 이 join 연산이 비싸기 때문에 성능을 올리기 위해서는 일반적으로 비정규화를 하게 됩니다.

 

3. RDBMS vs NoSql

 

RDBMS와 NoSql은 정반대의 관계가 아닙니다.

 

그 이유로는 상용에 있는 RDBMS와 NoSql 종류에 따라 저장 방식도 모두 다르고 지향하는 점이 다르기 때문입니다.

개인적으로, RDBMS와 NoSql의 구분은 스키마의 자유도로 나누면 된다고 생각합니다.
트랜잭션 강도를 제외한 이유로는 DBMS의 제품에 따라 NoSql 이라도 강한 제품이 있기 때문입니다.

 

그렇기에, 서비스에 맞는 DBMS를 선택해야 합니다. 책에서는 선택할 때 고려해야할 기준을 소개하고 있습니다.

 

1) 데이터 모델

 

데이터 모델로는 대표적으로 아래와 같이 있습니다.

 

  • 키/값 방식
  • 반 구조적
  • 컬럼지향 방식
  • 문서 지향 방식

 

2) 저장 모델

 

인메모리 방식인지, 영구저장 방식인지도 기준으로 들 수 있습니다.

 

영구저장의 경우 디스크에 쓰게되며, 이 경우 성능에 어떤 영향을 미치는지도 고려해야 합니다.

 

3) 일관성 모델

 

일관성 정책의 엄격함도 고려해야 합니다.

 

각 DBMS에 따라 느슨한경우와 엄격한 경우가 있기 때문입니다.

 

4) 물리적 모델

 

물리 장치가 단일한지 분산인지도 고려해야 합니다.

 

단일 장치의 장/단점과 분산 장치의 장/단점이 존재하기 때문입니다.

 

간단한 예로 분산의 경우는 네트워크 비용이 어쩔수 없이 들게 된다는 단점이 있지만,
수평 확장이 가능하여 저장 부분에서는 무한하다는 장점도 가지고 있습니다.

 

5) 읽기/쓰기 성능

 

현재 서비스는 DB에 읽기와 쓰기 중에 비율이 어떤지도 고려해야합니다.

 

아래 크게 3가지에 따라서 선택해야 하는 DBMS도 다르지만 DBMS의 정책도 달라지기 때문입니다.

 

  1. 읽기 > 쓰기
  2. 읽기 = 쓰기
  3. 읽기 < 쓰기

 

6) 보조색인

 

보조색인의 필요성도 고려대상에 포함됩니다.

 

보조색인이 필요 없다고 판단될 땐 확장성을 생각하여 어플리케이션에서 충분히 커버가 가능한지도 알아봐야 합니다.

 

7) 장애 처리

 

장애처리에 대해서 각 DBMS들은 어떠한 대처를 하는지도 알아봐야 합니다.

 

데이터가 인메모리 저장 방식의 경우 각 DB 서버는 graceful shutdown으로 디스크에 저장되도록 되어 있지 않다면 데이터 유실이 발생할 것 입니다.

 

8) 압축

 

압축이 기본적으로 제공되는지 혹은 필요시 플러그인으로 압축 기법을 사용할 수 있는지도 고려 대상에 포함됩니다.

 

9) 부하 분산

 

읽기/쓰기에 대한 부하를 DB 자체적으로 분산해주는지도 고려대상입니다.

 

(처리량이 많은 어플리케이션 구조에서 고려해보면 됩니다.)

 

10) 원자적 읽기, 갱신, 쓰기

 

원자적인 CRUD가 가능한지도 고려 대상입니다.

 

DB에서 제공하는지의 여부에 따라 클라이언트 측 어플리케이션의 복잡도에 영향이 가기 때문입니다.

 

11) 락걸기, 대기, 데드락

 

데이터 접근 시 어떤 유형의 락 모델을 제공하는지도 고려대상입니다.

 

이것은 성능에도 직접적인 영향이 있기 때문에 놓치지 않아야 하는 기준입니다.

 

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

4. 구성 요소

이제 Hbase에 대한 구성요소를 간단히 소개하겠습니다.

 

1) 테이블

 

Hbase에는 RDBMS와 동일하게 테이블이라는 구성 요소가 존재합니다.

 

2) 로우

 

로우는 유일한 키인 로우 키를 가지고 있습니다.

또한, 이 로우가 다수 모여 테이블을 이루게 됩니다.

 

추가로,로우는 로우키를 기준으로 사전 편찬식으로 저장되어 집니다.

 

아래는 scan했을때의 예입니다.

 

row-1 column=cf1:, timestamp=11111
row-10 column=cf1:, timestamp=11111
row-11 column=cf1:, timestamp=11111
row-2 column=cf1:, timestamp=11111

 

사전편찬식 정렬에서는 2진수 수준에서 바이트 단위로 왼쪽부터 비교하게 됩니다.

그로인해, 위와 같이 row-10, row-11이 row-2보다 위에 있게 됩니다.

 

3) 컬럼

 

컬럼은 상위에 컬럼패밀리라는 것을 가져야 합니다.

 

컬럼패밀리는 데이터를 의미적으로 분류하게 해주는 것으로, 이 컬럼 패밀리 단위로 압축이나 메모리 상주같은 설정이 가능하게 됩니다.

 

또한, 컬럼패밀리 안의 모든 컬럼은 HFile이라는 하나의 Hbase에서 관리하는 저수준 저장 파일에 함께 저장됩니다.

 

컬럼패밀리는 테이블 생성될 때 최소 하나 이상은 정의해야하며, 갯수가 많아서는 안되는 제약사항이 있습니다.

 

그래서, Hbase에서의 컬럼은 '컬럼패밀리:퀄리파이어' 로 표현할 수 있습니다.

여기서 퀄리파이어는 바이트 배열로 패밀리안에 속한 컬럼키라고 보시면 됩니다.

 

퀄리파이어의 경우 컬럼패밀리와 달리 갯수에 제약사항이 없어, 하나의 컬럼패밀리안에는 수백만개의 퀄리파이어를 저장할 수 있습니다.

또한, 데이터 타입이나 길이에도 제한이 없습니다.

 

4) 셀

 

셀은 컬럼의 값으로서 타임스탬프를 가지고 있습니다.

 

타임스탬프를 가지고 있다는 것은 컬럼의 값을 타임스탬프 기준으로 버저닝한다는 의미입니다.

 

타임스탬프는 사용자가 직접 지정할 수도 있으며, Hbase 내부적으로 부여할 수도 있습니다.

또한, 동일한 값은 타임스탬프 기준으로 정렬되어 항상 최신의 값을 먼저 읽을 수 있도록 되어 있습니다.

 

추가로, 셀의 경우 바이트 배열로 되어 있어, 클라이언트 측에서 어떻게 처리해야 할지 알고 있어야 합니다. 


아래는 Hbase의 전체적인 구조를 나타낸 것입니다.

 

(Table, RowKey, Family, Column, Timestamp) -> Value

 

정렬의 기능을 추가하여 프로그래밍적으로 나타내게 된다면 아래와 같습니다.

 

SortedMap<RowKey, List<SortedMap<Column, List<Value, Timestamp>>>>

 

5) 원자성

 

 

Hbase는 로우 단위로 컬럼 수와는 무관하게 원자성을 보장하고 있습니다.

 

단, 여러 로우나 테이블에 걸친 원자성이나 트랜잭션을 보장하지 않습니다.

 

 

6) 자동 샤딩

 

Hbase에서는 확장성, 로드밸런싱을 위해 리전이라는 단위를 사용하고 있습니다.

 

리전은 단순히 특정 범위의 로우 집합으로 이해하면 되며,

리전은 사이즈가 커지게 되면 자동으로 분할하게 되며, 반대로 합쳐지기도 합니다.

 

최초 테이블 생성 시에는 하나의 리전이 존재하고, 시스템이 모니터링을 하다 특정 기준이 넘어가면 둘로 분리하게 됩니다.

 

각 리전은 하나의 리전서버에서 운용되며 각 서버는 수많은 리전을 운용할 수 있습니다.

 

리전의 특징으로는 아래와 같습니다.

 

  • 서버 고장 시 리전을 다른 리전 서버로 이동시켜 재빨리 복구 가능
  • 세밀한 로드밸런싱

 

5. Hbase 내부 동작 방식

hbase는 데이터 색인 방식으로 LSM(= Log Structured Merge) Tree 을 사용합니다.

 

hbase는 데이터를 HFile 이라는 파일에 저장하게 되고,

이 파일은 영구 저장, 정렬, 고정 불변의 키/값 쌍의 맵이라고 보시면 됩니다.

 

1) HFile

 

HFile은 연속적인 블록이며 블록에 대한 색인은 블록 끝에 저장되어 있습니다.

색인은 HFile이 열릴때 메모리로 로드되어 사용하게 됩니다.

 

또한, HFile은 위에서 설명한 LSM Tree를 기반으로하여 특정값 혹은 시작값~끝값의 range 스캔이 가능합니다.

 

추가로, 모든 HFile은 블록 색인을 갖고 있어 검색은 단 한번의 디스크 판독으로 수행될 수 있습니다.

검색하고자하는 키를 통해 블록 색인에서 이진탐색이 이루어지게 됩니다.

 

 

2) WAL 

 

WAL은 Write-Ahead-Log로 Hbase는 데이터 갱신시 여기에 먼저 씌어지게 됩니다.

Hbase에서는 커밋로그 라고도 불립니다.

 

WAL에 먼저 쓰여지기 때문에 장애가 났을시에도 데이터의 유실은 있지 않습니다.

 

3) 멤스토어

 

WAL에 씌어진 다음에는 메모리인 멤스토어에 저장이 됩니다.

 

멤스토어에 있는 설정한 값을 넘게 되면 그때야 비로소 HFile에 쓰여지게 됩니다.

HFile에 쓰여진 이후에는 WAL에 있던 쓰여진 데이터도 삭제되어 집니다.

 

데이터를 읽을때에는 HFile과 멤스토어간의 데이터 정합이 안맞을 수 있어, 두 곳에서 데이터를 통합하여 반환하게 됩니다.

 

LSM 트리 구조상 삭제는 바로 할 수 없습니다. 하지만, 삭제표시를 해두어 읽기 연산 시 삭제된것처럼 데이터를 감출 수 있습니다. 

 

4) 컴팩션

 

멤스토어에서 HFile로 write 할 때마다 파일의 수는 증가하기 때문에 Hbase는 내부적으로 컴팩션이라는 것을 수행하게 됩니다.

 

이 컴팩션은 부 컴팩션 과 주 컴팩션으로 구분할 수 있습니다.

 

1. 부 컴팩션

 

부 컴팩션은 작은 파일의 내용을 큰 파일에 병합시켜 파일의 갯수를 줄이는 행위입니다.

 

HFile은 내부적으로 정렬되어 있기 때문에 병합속도는 빠르게 이루어지며, 오직 디스크 입출력 성능에 영향을 받습니다.

 

2. 주 컴팩션

 

주 컴팩션은 하나의 리전안의 컬럼패밀리 하나를 구성하는 모든 파일을 새로운 파일 하나로 다시 쓰는 작업입니다.

 

부 컴팩션과의 차이로는 위에 설명한 삭제표시가 달린 데이터를 다시 쓰는 과정에서 제외시켜 영구적으로 삭제할 수 있다는 점 입니다.

 

6. Hbase 클러스터 구성

Hbase 클러스터는 마스터 서버 한대, 리전 서버 다수로 이루어져 있습니다.

 

1) 마스터서버

 

마스터 서버는 아래와 같은 역할을 수행합니다.

 

  1. 리전 서버에 리전 할당
  2. 리전 서버간의 리전의 부하 분산 처리
  3. 테이블 및 컬럼패밀리의 생성 같은 스키마 변경 사항 및 기타 메타데이터 작업 수행

위 역할들을 마스터 서버가 수행하기 위해서는 주키퍼라는 서비스를 필수로 사용해야 합니다.

 

2) 리전 서버

 

리전 서버는 클라이언트와 통신하며 데이터의 읽기와 쓰기를 담당하는 서버입니다.

 

쓰기도 담당하기 때문에 리전 분할의 역할도 이루어 집니다.

 

6. 마무리

이번에는 간략히 Hbase 소개에 대해서 포스팅하였습니다.

다음에는 2장인 설치 챕터이지만 해당 챕터는 Hadoop 설치 에서 사용한 cloudera manager를 사용하여 Hbase 서비스를 추가하는 것으로 대체하도록 하겠습니다.

반응형

'BigData > Hbase' 카테고리의 다른 글

(5) 클라이언트 API : 관리 기능  (0) 2020.06.02
(4) 클라이언트 API : 고급 기능  (0) 2020.04.19
(3) 클라이언트 API : 기본 기능  (0) 2020.04.09
(2) 설치  (0) 2020.04.08
반응형

 

이번 문제는 인자로 binary tree와 int형 sum 값이 주어지며,

트리의 최상단에서 마지막 노드까지의 val을 sumation한 값이 인자로 주어진 sum값과 같은 경우가 있는지 판단하는 문제입니다.

 

저의 경우 leaf 노드를 만날때까지 탐색을 하였고, 리프노드에 도달했을때 값을 비교하여 boolean 값을 반환하도록 하였습니다.

 

값이 같다면 True를 반환시켰고, or 연산으로 왼쪽-오른쪽 노드 탐색을 묶어서 하나라도 sum이 같은 길을 찾으면 True로 반환되도록 하였습니다.

 

아래는 코드입니다.

 

class TreeNode:
    def __init__(self, x):
        self.val = x
        self.left = None
        self.right = None

class Solution:
    def hasPathSum(self, root: TreeNode, sum: int) -> bool:

        if not root: return False
        res = self.findTree(root, sum, 0)
        return res

    def findTree(self, root: TreeNode, sum:int, additional_sum: int):
        if root: additional_sum += root.val
        if not root: return 
        if root.left == None and root.right == None:
            if sum == additional_sum: return True
            else: return False
        return self.findTree(root.right, sum, additional_sum) or self.findTree(root.left, sum, additional_sum)
반응형
반응형

 

이번 문제는 binary tree를 인자로 받아 leaf node까지의 depth 중 최소를 찾는 문제입니다.

여기서 leaf node는 왼쪽, 오른쪽 노드가 없는 것을 의미합니다.

 

저의 경우에는 재귀를 통해 leaf node를 발견할때까지 tree를 모두 순회하도록 하였습니다.

leaf node를 발견하면 leaf node의 depth를 반환하게 하여, 최소값을 구하도록 하였습니다.

 

아래는 코드입니다.

 

import sys

class TreeNode:
    def __init__(self, x):
        self.val = x
        self.left = None
        self.right = None

class Solution:
    def minDepth(self, root: TreeNode) -> int:
        if not root: return 0
        if not root.left and not root.right: return 1
        depth = sys.maxsize

        depth = min(self.findDepth(root.right, 1), self.findDepth(root.left, 1)) + 1
        return depth

    def findDepth(self, root: TreeNode, depth: int):
        if not root: return sys.maxsize
        if not root.left and not root.right: return depth
        return min(self.findDepth(root.left, depth +1), self.findDepth(root.right, depth + 1))

 

반응형
반응형

 

이번 문제는 주어진 트리가 높이 균형 트리인지 판단하는 문제입니다.

왼쪽 노드와 오른쪽 노드의 depth 차이가 1이하여야만 합니다.

 

풀이로는 재귀를 통해 각 노드를 순회하면서 depth를 아래부터 1씩 증가시키면서 매깁니다.

depth를 매기면서 각 왼쪽과 오른쪽의 depth를 비교하여 1보다 크면 불균형으로 판단하도록 하였습니다.

 

풀이는 아래와 같습니다.

 

class TreeNode:
    def __init__(self, x):
        self.val = x
        self.left = None
        self.right = None

class Solution:

    def isBalanced(self, root: TreeNode) -> bool:
        if not root: return True

        res = self.searchDepth(root)
        if res == -1: return False
        return True

    def searchDepth(self, root: TreeNode):
        if root == None: return 0

        right_depth = self.searchDepth(root.right)
        if right_depth == -1 : return -1
        
        left_depth = self.searchDepth(root.left)
        if left_depth == -1: return -1
        
        if abs(right_depth - left_depth) > 1: return -1
        
        return max(right_depth, left_depth) + 1
반응형
반응형

1. 서론

이번 포스팅에서는 Chapter6의 스트림으로 데이터 수집 에 대해 진행하도록 하겠습니다.

 

2. 컬렉터란 무엇인가?

Collector 는 java.util.stream 에 있는 인터페이스로, 리듀싱 연산을 어떻게 구현할지 제공하는 메서드 집합입니다.

앞장에서 많이 사용한 스트림 함수 중 collect는 이 Collector 인터페이스를 인수로 받습니다.

 

그럼 앞장에서 자주 인수로 전달한 Collectors는 Collector의 구현체일까요? 아쉽지만 아닙니다.

 

코드를 깊게 파보시면 바로 아시겠지만 Collector의 구현체는 CollectorImpl 입니다.

 

그럼 대체 Collectors는 왜 가능한걸까요?

 

코드를 자세히 보시면 Collectors는 일종의 유틸 클래스로 inner class로 CollectorImpl를 가지고 있습니다.

그리고 toList와 같은 정적 메서드가 호출될때마다 이 CollectorImpl를 만들어 반환하는 형태입니다.

 

 

1) 고급 리듀싱 기능을 수행하는 컬렉터

 

스트림 함수인 collect의 동작방식은 인자로 받은 Collector로 리듀싱 연산을 수행하여 결과를 반환합니다.

 

일반적으로 많이 사용하는 toList, toSet 등등을 모아둔 유틸 클래스가 Collectors로 보시면 됩니다.

 

일반적으로 Collectors는 앞절에서 살펴본 sum, max와 같은 연산이 아닌
요소를 어떤 자료구조에 담아 반환할지의 리듀싱 기능을 제공합니다.

책에서는 이를 고급 리듀싱기능으로 분류하였습니다.

 

2) 미리 정의된 컬렉터

 

Collectors는 정적 팩토리 메서드 패턴을 통해 자주 사용하는 CollectorImpl를 제공하고 있습니다.

책에서는 이를 미리 정의된 컬렉터라고 일컫습니다.

 

Collectors는 아래와 같이 크게 3가지를 제공합니다.

 

  • 스트림 요소를 하나의 값으로 리듀스하고 요약
  • 요소 그룹화
  • 요소 분할

3. 리듀싱과 요약

Collectors에서 제공하는 정적 메서드들은 결국 스트림의 최종 연산으로 취급이 되며, reduce 작업을 하여 만들어지게 됩니다.

 

Collectors에서 제공하는 reduce 역할을 수행하는 정적 메소드들에 대해 소개하겠습니다.

 

1) counting

 

이 메서드는 Collectors가 제공하는 컬렉터로 요소의 갯수를 세어 반환합니다.

 

아래는 예제입니다.

 

long howManyDishes = menu.stream().collect(Collectors.counting());

 

 

2) maxBy, minBy

 

maxBy, minByCollectors가 제공하는 컬렉터로 요소 중 최댓값, 최솟값을 찾을때 사용합니다.

최대인지 최소인지 구별하기 위해 인자로는 Comparator를 받습니다.

 

아래는 예제입니다.

 

Comparator<Dish> dishCaloriesComparator = Comparator.comparingInt(Dish::getCalories);
Optional<Dish> mostCalorieDish = menu.stream().collect(Collectors.maxBy(dishCaloriesComparator));

 

3) 요약 연산

 

Collectors는 합계, 평균을 위한 요약 연산도 제공한다.

 

메서드는 아래와 같습니다.

단, 인수는 언박싱인 int, long, double 타입으로 반환하는 함수형 인터페이스의 구현체 혹은 람다여야 합니다.

 

  • summingInt
  • summingDouble
  • summingLong
  • averagingInt
  • averagingDouble
  • averagingLong

사용 예제는 아래와 같습니다.

 

int totalCalories = menu.stream().collect(Collectors.summingInt(Dish::getCalories));

 

만약 한번의 collect API로 합계, 평균 등의 값을 한번에 수행해야 할 경우에는 아래와 같은 메서드를 사용하면 됩니다.

 

  • summarizingInt
  • summarizingDouble
  • summarizingLong

아래는 예제 및 결과입니다.

 

IntSummaryStatistics menuStatistics = menu.stream()
            .collect(Collectors.summarizingInt(Dish::getCalories));

System.out.println(menuStatistics);

/*
 * 결과
*/
IntSummaryStatistics{count=?, sum=?, average=?, min=?, max=?}

 

 

4) 문자열 연결

 

Collectors에서 제공하는 joining 메서드를 사용하면,

스트림에 각 요소에 toString 메서드를 호출하여 하나의 문자열로 연결하여 반환합니다.

 

추가로, joining 메서드는 문자열을 연결할 때 사용할 구분자를 인자로 받을 수있습니다.

 

아래는 예제입니다.

 

String shortMenu = menu.stream().map(Dish::getName).collect(Collectors.joining(", "));

// pork, beef, chicken, ...

 

5) 범용 리듀싱 요약 연산

 

지금까지 알아본 메서드는 사실 Collectors.reducing의 메서드를 이용하여 특화된 값을 도출하도록 만들은 것입니다.

 

이는 단순히, 개발 편의와 가독성을 위해 제공한 것입니다.

 

그럼, 특화된것이 아니라 커스텀을 해야한다면 스스로 reducing 메서드를 사용해야 합니다.

 

때문에 reducing 메서드에 대해 알아보겠습니다.

 

reducing 메서드는 인자로 3개를 받고 있습니다.

 

  • 첫번째 인자 = 리듀싱 연산의 초기값입니다.
  • 두번째 인자 = 각 요소에 적용할 변환 함수입니다.
  • 세번째 인자 = 같은 종류의 요소를 하나의 요소로 합칠 BinaryOperator입니다.
reducing 메서드는 인자를 한개만 받을 수도 있습니다.
이 경우는 첫번째 인자가 스트림 요소의 첫번째가 되고, 두번째 인자는 자신을 그대로 반환하는 Function.identity()가 됩니다.
단, 스트림이 비어있는 경우가 있기 때문에 결과값으로는 Oprional이 씌어지게 됩니다.

 

아래는 summingInt 를 reducing을 통해 구현한 예제입니다.

 

int totalCalories = menu.stream().collect(Collectors.reducing(
        0, // 초기값
        Dish::getCalories, // 변환 함수
        Integer::sum // 누적 함수
));

 

 

 

 

반응형

 

 

 

 

 

4. 그룹화

데이터 처리를 하다보면 그룹핑이 필요한 경우가 있습니다.

이를 위해 Collectors 는 groupingBy와 같은 그룹핑을 할수 있도록 제공합니다.

 

아래는 각 Dish의 타입별로 그룹핑하는 예제입니다.

 

Map<Dish.Type, List<Dish>> dishesByType =
        menu.stream().collect(Collectors.groupingBy(Dish::getType));
        
// 결과
{
    FISH=[prawns, salmon], 
    OTHER=[french fries, rice, season fruit, pizza], 
    MEAT=[pork, beef, chicken]
}

 

Dish::getType과 같이 그룹화되는 기준을 제공하는 함수를 분류함수라고 합니다.

 

만약 분류함수가 복잡하다면 코드블록을 사용하여 커스텀하면 됩니다.

 

아래는 각 칼로리값에 따라 분류를 나누는 예제입니다.

 

public enum CaloricLevel {DIET, NORMAL, FAT}

Map<Dish.Type, List<Dish>> dishesByType =
        menu.stream().collect(Collectors.groupingBy(
                dish -> {
                    if (dish.getCalories() <= 400) return CaloricLevel.DIET;
                    else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
                    else return CaloricLevel.FAT;
                }));

 

 

1) 그룹화된 요소 조작

 

그룹화 진행 후에 각 그룹화된 요소를 조작해야 하는 경우가 있습니다.

이를 위해 Collectors.groupingBy는 추가로 Collector 인자를 받을수 있습니다.

 

이는, 첫번째 인자를 통해 그룹핑이 진행 된 후, 두번째 인자인 Collector를 적용하게 됩니다.

 

Collectors는 또 미리 자주 사용하는 mapping 함수를 제공합니다.

java 9 부터는 Collectors 에 filtering, flatMapping이 추가되었습니다.
[java docs 9] 참조 

 

아래는 타입별로 그룹핑한 요리 리스트가 아닌 요리명 리스트로 바꾼 예제입니다.

 

Map<Dish.Type, List<String>> dishesByType =
        menu.stream().collect(Collectors.groupingBy(
                Dish::getType,
                Collectors.mapping(Dish::getName, Collectors.toList())
                ));

 

2) 다수준 그룹화

 

위에서 설명했듯이 groupingBy는 두번째 인수로 Collecto를 받습니다.

 

그 말은, 두번째 인수로 또 groupingBy를 받을 수 있습니다.

 

결국, n차원의 그룹화가 가능하다는 말로 직결됩니다.

 

아래는 2 수준으로 그룹핑하는 예제입니다.

 

Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByType =
        menu.stream().collect(Collectors.groupingBy(
                Dish::getType,
                Collectors.groupingBy(
                        dish -> {
                            if (dish.getCalories() <= 400) {
                                return CaloricLevel.DIET;
                            } else if (dish.getCalories() <= 700) {
                                return CaloricLevel.NORMAL;
                            } else {
                                return CaloricLevel.FAT;
                            }
                        }
                )
        ));

// 결과
{
    FISH = {
        NORMAL=[salmon], 
        DIET=[prawns]
    }, 
    MEAT = {
        FAT=[pork], 
        NORMAL=[beef], 
        DIET=[chicken]
    }, 
    OTHER = {
        NORMAL=[french fries, pizza], 
        DIET=[rice, season fruit]
    }
}

 

3) 서브그룹으로 데이터 수집

 

groupingBy 의 두번째 인자로 Collector를 인자로 받는다는 것은 원하는 데이터 수집도 가능하다는 말이 됩니다.

 

대표적으로 아래와 같습니다.

 

  • counting = 그룹별 요소의 갯수를 수집
  • maxBy = 그룹별 주어진 기준의 최댓값을 가진 요소 수집
  • summingInt = 그룹별 합계 값 수집
maxBy의 경우 반환값이 optional로 감싸게 됩니다.
만약 empty가 없는 경우에는 Collectors.collectingAndThen 을 통해 Optional.get을 적용하여 감싸지 않은 형태로도 가능합니다.

 

5. 분할

스트림에서의 분할은 프레디케이트를 분류 함수로 사용하는 특수한 그룹화 기능으로 분할 함수로 일컫습니다.

 

프레디케이트가 분류함수이기 때문에 키는 Boolean 값으로 그룹핑은 최대 2개가 됩니다.

 

Collectors에서는 partitioningBy 메서드로 이를 제공합니다.

 

아래는 Collectors.partitioningBy 예제입니다.

 

Map<Boolean, List<Dish>> partitionedMenu = 
        menu.stream().collect(Collectors.partitioningBy(Dish::isVegetarian));

 

Collectors.partitioningBy 또한 groupingBy와 같이 두번째 인수로 Collector 를 받을 수 있어 다양한 커스텀 연산이 가능합니다.

 

6. Collector 인터페이스

Collector 인터페이스의 시그니처와 다섯개의 메서드 정의는 아래와 같습니다.

 

public interface Collector<T, A, R> {
    Supplier<A> supplier();
    BiConsumer<A, T> accumulator();
    BinaryOperator<A> combiner();
    Function<A, R> finisher();
    Set<Characteristics> characteristics();
}

 

각 제네릭은 아래와 같은 용도입니다.

  • T = 수집될 스트림 항목
  • A = 누적자로 수집 과정에서 중간 결과를 누적하는 객체 타입
  • R = 수집 연산의 결과 객체 타입 (대게 컬렉션 타입입니다.)

 

각 메서드의 용도는 아래와 같습니다.

 

1) supplier 메서드 : 새로운 결과 컨테이너 만들기

 

supplier 메서드는 빈 결과로 이루어진 Supplier를 반환해야 합니다.

즉, supplier는 수집 과정에서 빈 누적자 인스턴스를 만드는 파라미터 함수입니다.

 

toList와 같은 컬렉터는 ArrayList::new 가 supplier입니다.

 

2) accumulator 메서드 : 결과 컨테이너에 요소 추가하기

 

acuumulator 메서드는 리듀싱 연산을 수행하는 함수를 반환합니다.

 

함수의 반환값은 void입니다.

 

toList 컬렉터의 acuumulator는 List::add입니다.

 

3) finisher 메서드 : 최종 변환값을 결과 컨테이너로 적용하기

 

finisher 메서드는 스트림 탐색이 끝나고 누적자 객체를 최종 결과로 변환하면서 누적 과정을 끝낼 때 호출할 함수를 반환해야 합니다.

 

toList 컬렉터의 finisher는 누적자 객체가 이미 반환 형태이기 때문에 Function.identity() 입니다.

 

사실상, toList는 위 3가지로도 기능 구현이 가능합니다.
다만, 병렬성을 통한 최적화 판단 및 구현이 있어 내부적으로는 더욱 복잡하며
이를 위해 combiner 메서드와 characteristics 가 있습니다.

 

4) combiner 메서드 : 두 결과 컨테이너 병합

 

combiner는 스트림의 서로 다른 서브 파트를 병렬로 처리할때 누적자가 이 결과를 어떻게 처리할지 정의합니다.

 

toList 컬렉터에서는 비교적 간단합니다.

왜냐하면, 2개의 리스트를 이으기만 하면 되기 때문입니다.

 

아래는 toList 컬렉터의 combiner 입니다.

 

(left, right) -> { left.addAll(right); return left;

 

이 메서드를 통해 병렬로 수행이 가능하게 됩니다.

 

아래는 병렬화 리듀싱 과정을 간단하게 보여주는 그림입니다.

 

 

5) Characteristics 메서드

 

characteristics 메서드는 컬렉터의 연산을 정의하는 Characteristics 형식의 불변 집합을 반환합니다.

 

characteristics는 스트림을 병렬로 리듀스 할 것인지, 병렬로 한다면 어떤 최적화를 선택해야 하는지 힌트를 제공합니다.

 

Characteristics는 enum 클래스로 아래 3개의 값을 가지고 있습니다.

 

  • CONCURRENT = 다중 스레드에서 accumulator 함수를 동시에 호출할 수 잇으며 이 컬렉터는 병렬 리듀싱이 가능함.
    • 컬렉터 플래그에 UNORDERED를 함께 설정하지 않은 경우, 데이터 소스가 정렬되어 있지 않은 상황에서만 병렬 리듀싱 가능
  • UNORDERED = 리듀싱 결과는 스트림 요소의 방문 순서나 누적순서에 영향을 받지 않음.
  • IDENTITY_FINISH = finisher 메서드가 반환하는 함수는 단순히 identify 이므로 생략 가능.

toList의 경우에는 IDENTITY_FINISH 입니다.

 

그 이유는 accumulator로 List::add를 사용하는데 병렬로 처리 시 순서가 엉킬수 있기 때문입니다.

그 말은, 누적 순서에도 영향이 있다는 말이 됨으로 CONCURRENT와 UNORDERED는 해당 사항이 되지 않습니다.

 

7. 마무리

 

이번 포스팅에서는 Chapter 6인 스트림으로 데이터 수집에 대해 진행하였습니다.

다음에는 Chapter 7인 병렬 데이터 처리와 성능에 대해 포스팅하겠습니다.

반응형

'Programming > ModernJavaInAction' 카테고리의 다른 글

(8) 컬렉션 API 개선  (0) 2020.04.13
(7) 병렬 데이터 처리와 성능  (0) 2020.04.11
(5) 스트림 활용  (0) 2020.04.04
(4) 스트림 소개  (0) 2020.03.28
(3) 람다 표현식  (0) 2020.03.28
반응형

1. 서론

이번 포스팅에서는 Chapter5의 스트림 활용 에 대해 진행하도록 하겠습니다.

 

2. 필터링

java 8에서는 스트림의 요소를 선택하는 필터링 기능을 제공합니다.

 

소개할 기능으로는 filter와 distinct 입니다.

 

1) 프레디케이트로 필터링

 

스트림 인터페이스는 filter 메서드를 지원하고 있습니다.

이 메서드는 인자로 프레디케이트를 받고, 프레디케이트와 일치하는 요소를 포함하는 스트림을 반환합니다.

 

아래는 예제입니다.

 

@Getter
@RequiredArgsConstructor
private static class Dish {
    private final String name;
    private final boolean vegetarian;
    private final int calories;
    private final Type type;

    enum Type {
        MEAT, FISH, OTHER
    }
}

public static void main(String[] args) {
    List<Dish> menu = Arrays.asList(
            new Dish("pork", false, 800, Dish.Type.MEAT),
            new Dish("beef", false, 700, Dish.Type.MEAT),
            new Dish("chicken", false, 400, Dish.Type.MEAT),
            new Dish("french fries", true, 530, Dish.Type.OTHER),
            new Dish("rice", true, 350, Dish.Type.OTHER),
            new Dish("season fruit", true, 120, Dish.Type.OTHER),
            new Dish("pizza", true, 550, Dish.Type.OTHER),
            new Dish("prawns", false, 300, Dish.Type.FISH),
            new Dish("salmon", false, 450, Dish.Type.FISH)
    );

    List<Dish> vegetarianMenu = menu.stream()
            .filter(Dish::isVegetarian)
            .collect(Collectors.toList());
}

 

2) 고유 요소로 필터링

 

스트림에서는 distinct 메서드를 지원합니다.

 

이 메서드는 고유한 요소들로 이루어진 스트림을 반환합니다.

고유를 판단하는 기준은 java의 hashCode, equals로 결정됩니다.

 

아래는 예제입니다.

 

List<Integer> numbers = Arrays.asList(1, 2, 1, 3, 3, 2, 4);
numbers.stream()
        .filter(i-> i %2 == 0)
        .distinct()
        .forEach(System.out::println);

 

3. 스트림 슬라이싱

스트림에서는 특정 요소를 선택하거나 스킵하는 방법을 제공하며 이를 슬라이싱이라고 일컫습니다.

 

1) 프레디케이트를 이용한 슬라이싱

 

자바 9에서는 스트림의 필터측면으로 특화된 takeWhile, dropWhile를 제공하고 있습니다.

 

takeWhile는 filter와 동일하게 작동하는것 같지만, 처음으로 프레디케이트가 거짓이 되면 종료하게 됩니다.

 

예제는 아래와 같습니다.

 

List<Dish> specialMenu = Arrays.asList(
        new Dish("season fruit", true, 120, Dish.Type.OTHER),
        new Dish("prawns", false, 300, Dish.Type.FISH),
        new Dish("rice", true, 350, Dish.Type.OTHER),
        new Dish("chicken", false, 400, Dish.Type.MEAT),
        new Dish("french fries", true, 530, Dish.Type.OTHER)
);

List<Dish> sliceMenu1 =
        specialMenu.stream()
        .takeWhile(dish -> dish.getCalories() < 320)
        .collect(Collectors.toList());

 

위 예제는 칼로리가 320보다 큰 rice를 만나는 시점까지만을 스트림으로 반환하게 됩니다.

filter를 통해 모든 요소에 대해 프레디케이트를 적용하게되어 성능에 큰 이슈가 있을 시 사용하면 됩니다.
단, 위와 같이 sort가 되어 있는것처럼 데이터의 상태를 자세히 보고 사용해야 합니다.

 

dropWhile 은 takeWhile과 반대로 프레디케이트가 거짓이 되는 지점까지 발견된 요소를 버립니다.

 

예제는 아래와 같습니다.

 

List<Dish> specialMenu = Arrays.asList(
        new Dish("season fruit", true, 120, Dish.Type.OTHER),
        new Dish("prawns", false, 300, Dish.Type.FISH),
        new Dish("rice", true, 350, Dish.Type.OTHER),
        new Dish("chicken", false, 400, Dish.Type.MEAT),
        new Dish("french fries", true, 530, Dish.Type.OTHER)
);

List<Dish> sliceMenu2 =
        specialMenu.stream()
        .dropWhile(dish -> dish.getCalories() < 320)
        .collect(Collectors.toList());

 

이번에는 거짓이 되는 rice 이전 요소는 모두 버리게 됩니다. 

 

 

2) 스트림 축소

 

스트림은 특정 요소갯수만을 가진 스트림을 반환하는 limit 메서드를 제공합니다.

ansiansi sql의 limit과 같이 생각하면 됩니다.

 

예제는 아래와 같습니다.

 

List<Dish> dishes = specialMenu.stream()
        .filter(dish -> dish.getCalories() > 300)
        .limit(3)
        .collect(Collectors.toList());

 

3) 요소 건너뛰기

 

스트림은 처음 n개 요소를 제외한 스트림을 반환하는 skip 메서드를 제공합니다.

 

예제는 아래와 같습니다.

 

List<Dish> dishes = specialMenu.stream()
	    .filter(dish -> dish.getCalories() > 300)
	    .skip(2)
	    .collect(Collectors.toList());

 

4. 매핑

스트림에서는 자바 객체 타입을 변경하는 스트림을 반환하는 map, flatMap을 제공합니다.

 

1) 스트림의 각 요소에 함수 적용하기

 

스트림에 들어있는 모든 요소에 함수를 적용하고 싶다면 map을 사용하면 됩니다.

foreach도 가능하지만 foreach의 경우에는 반환값이 void인것을 염두하여 사용해야 합니다.
반면 map의 경우에는 스트림을 반환하기 때문에 다른 스트림 메서드를 붙일 수 있습니다.

 

예제는 아래와 같습니다.

 

List<Integer> dishNameLengths = menu.stream()
        .map(Dish::getName)
        .map(String::length)
        .collect(Collectors.toList());

 

2) 스트림 평면화

 

스트림 처리를 하다보면 2개이상의 스트림을 평면화된 스트림으로 만들어야 하는 경우가 있습니다.

 

이를 제공하는것이 flatMap입니다.

 

책에서는 {"Hello", "World"}와 같은 문자열 배열에서 고유문자로 이루어진 {"H", "e", "l", "o", "W", "r", "d"} 인 스트림을 만드는 예제가 있습니다.

 

처음으로 단순히 앞에서 배운 map, distinct를 사용하는 케이스를 생각할 수 있습니다.

 

코드는 아래와 같습니다.

 

List<String> words = Arrays.asList("Hello", "World");
words.stream()
        .map(word -> word.split(""))
        .distinct()
        .collect(Collectors.toList());

 

하지만 원하는 결과는 나오지 않게 됩니다.

이유로는 map으로 반환되는 스트림의 객체 타입은 String[]이지 String이 아니기 때문에 distinct가 제대로 동작하지 않기 때문입니다.

 

아래는 flatMap를 사용한 코드입니다.

 

List<String> words = Arrays.asList("Hello", "World");
words.stream()
        .map(word -> word.split(""))
        .flatMap(Arrays::stream)
        .distinct()
        .collect(Collectors.toList());

 

이번에는 flatMap을 통해 Stream<String[]>을 Stream<String>으로 스트림 안에 있는

String 배열을 평면화시킨 스트림으로 변경한 후 distinct 메소드를 수행하도록 하였습니다.

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

5. 검색과 매칭

스트림 API에서는 스트림에 특정 요소가 있는지 검색하는 기능도 제공하고 있습니다.

 

대표적으로는 아래와 같습니다.

 

  • allMatch
  • anyMatch
  • noneMatch
  • findFirst
  • findAny

1) 프레디케이트가 적어도 한 요소와 일치하는지 확인

 

주어진 스트림에서 적어도 한개 요소가 프레디케이트에서 참인지 확인하는 메서드로 anyMatch를 제공합니다.

확인 용도이기 때문에 반환값은 boolean 입니다.

 

예제는 아래와 같습니다.

 

if(menu.stream().anyMatch(Dish::isVegetarian)) {
    ...
}

 

2) 프레디케이트가 모든 요소와 일치하는지 검사

 

anyMatch와 달리 스트림의 모든 요소가 주어진 프레디케이트에서 참인지 확인하는 allMatch가 있습니다.

 

예제는 아래와 같습니다.

 

boolean isHealthy = menu.stream().allMatch(dish -> dish.getCalories() < 1000);

 

3) NONEMATCH

 

allMatch와 반대로 스트림의 요소가 모두 프레디케이트의 거짓인지 확인하는 noneMatch 메서드가 있습니다.

 

예제는 아래와 같습니다.

 

boolean isHealthy = menu.stream().noneMatch(dish -> dish.getCalories() >= 1000);

 

4) 요소 검색

 

스트림에서 확인용 혹은 샘플링을 위해 임의의 한 요소만을 필요로 할때가 있습니다.

 

이를 위해 findAny 메서드를 지원하고 있습니다.

 

이 메서드는 주어진 스트림에서 임의로 한개를 반환합니다.

 

findAny는 주어진 스트림에 요소가 한개도 없을 수 있기 때문에 Optional로 감싸 반환합니다.

 

예제는 아래와 같습니다.

 

Optional<Dish> dish = menu.stream()
        .filter(Dish::isVegetarian)
        .findAny();

 

5) 첫 번째 요소 찾기

 

 findAny는 임의로 한개를 반환하지만 첫번째 요소가 필요한 경우가 있습니다.

이를 위해 findFirst 메서드를 지원하고 있습니다.

 

이는 스트림의 첫번째 요소를 반환하며 findAny와 같이 Optional로 감싼 반환값을 반환합니다.

 

예제는 아래와 같습니다.

 

List<Integer> someNumbers = Arrays.asList(1, 2, 3, 4, 5);
Optional<Integer> firstSquareDivisibleByThree = someNumbers.stream()
        .map(n -> n * n)
        .filter(n -> n % 3 == 0)
        .findFirst();

 

6. 리듀싱

스트림에서는 모든 요소를 통해 값으로 도출하는 연산인 리듀싱을 지원합니다.

 

대표적으로 아래와 같은 일들을 할 수 있습니다.

 

  • 요소의 합/ 곱
  • 최댓값/ 최솟값

 

1) 요소의 합/ 곱

 

스트림 요소의 합 혹은 곱을 구할때는 아래와 같이 할 수 있습니다.

 

int sum = numbers.stream().reduce(0, (a, b) -> a + b);
int sum2 = numbers.stream().reduce(0, Integer::sum);
int multiple = numbers.stream().reduce(1, (a, b) -> a * b);

 

 

위에는 스트림의 합과 곱을 구하는 reduce 연산입니다.

reduce 연산의 경우 첫번째 인자로 초기값을 설정 가능합니다.

초깃값의 경우 생략이 가능합니다 .
단, 생략하는 경우 빈 스트림의 경우가 있기 때문에 반환값은 Optional을 감싼 형태가 됩니다.

 

두번째 인자로는 두 요소를 조합해 새로운 값을 만드는 BinaryOperator를 받습니다.

이 두 요소의 첫번째는 이전 BinaryOperator로 나온 결과값이며 두번째는 스트림의 값입니다.

 

결국, reduce는 점진적으로 스트림의 값에 BinaryOperator를 적용하여 값을 한개로 만들어 반환합니다.

 

2) 최댓값/ 최솟값

 

위와 같은 맥락으로 스트림 요소중에 최댓값과 최솟값도 도출할 수 있습니다.

 

코드는 아래와 같습니다.

 

int max = numbers.stream().reduce(0, Integer::max);
int min = numbers.stream().reduce(0, Integer::min);

 

 

7. 상태가 있는 스트림, 없는 스트림

스트림 연산을 보다보면 map, filter와 같이 이전 스트림의 값을 저장해야할 필요가 없는 메서드가 있습니다.

이를, 상태를 갖지 않는 스트림 연산이라고 일컫습니다.

 

하지만, reduce, sum, max, sorted, distinct와 같이 이전 스트림의 값을 알아야만 처리가 가능한 메서드가 있습니다.

이는, 상태를 갖는 스트림 연산이라고 일컫습니다.

 

이 상태를 갖는 연산의 경우에는 내부 버퍼에 값을 저장하고 사용하게 됩니다.

 

8. 숫자형 스트림

스트림은 모두 제네릭 타입을 가지고 있습니다.

제네릭에는 기본적으로 박싱타입만을 선언해야 합니다.

 

그로인해, 스트림 연산중에는 박싱 -> 언박싱의 오버헤드가 발생하게 됩니다.

스트림에서는 이를 위해 기본형 특화 스트림을 제공하고 있습니다.

 

1) 숫자 스트림으로 매핑

 

숫자 특화 스트림으로 변환시에는 mapToInt, mapToDouble, mapToLong 을 사용하면 됩니다.

3개 메서드는 각각 IntStream, DoubleStream, LongStream을 반환합니다.

 

위와 같은 메서드를 통해 아래와 같이 숫자에 특화된 스트림 메서드가 바로 가능하다는 점입니다.

  • sum
  • max
  • min
  • average

아래는 예제 코드입니다.

 

int calories = menu.stream().mapToInt(Dish::getCalories).sum();

 

2) 객체 스트림 복원

 

스트림은 반대로 기본 특화 스트림에서 박싱형 기본 스트림으로도 변경하는 boxed 메서드를 제공합니다.

 

예제는 아래와 같습니다.

 

IntStream intStream = menu.stream().mapToInt(Dish::getCalories);
Stream<Integer> stream = intStream.boxed();

 

3) 특화형 Optional

 

스트림과 같이 Optional도 기본 특화형이 있습니다.

 

대표적으로 OptionalInt, OptionalDouble, OptionalLong 이 있습니다.

 

사용 예제는 아래와 같습니다.

 

OptionalInt  maxCalories = menu.stream().mapToInt(Dish::getCalories).max();

 

4) 숫자 범위

 

개발을 하다보면 특정 범위의 숫자를 이용해야 하는경우가 있습니다.

대표적으로 for(int i=0; i< 100; i++) 와 같은 연산을 들 수 있습니다.

 

스트림에서도 이를 위해, range와 rangeClosed 함수를 제공합니다.

 

두 함수는 IntStream, LongStream에서 사용가능합니다.

 

두 함수 모두 2개의 인자를 받으며 첫번째 인자는 초기값, 두번째 인자는 종료값을 의미합니다.

 

차이 점으로는 range는 시작값과 종료값은 결과에 포함되지 않고, rangeClosed는 초기값, 종료값이 포함된다는 점입니다.

 

예제는 아래와 같습니다.

 

IntStream evenNumbers = IntStream.rangeClosed(1, 100).filter(n -> n % 2 == 0);
System.out.println(evenNumbers.count());

 

9. 스트림 만들기

스트림은 위와 같이 컬렉션에서 생성이 가능합니다.

 

하지만, 컬렉션 이외에도 일련의 값, 배열, 파일 등으로도 생성이 가능합니다.

 

이제 스트림 만드는 각 종류를 소개합니다.

 

1) 값으로 스트림 만들기

 

Stream.of를 통해 임의의 값으로 스트림을 만들 수 있습니다.

 

예제는 아래와 같습니다.

 

Stream<String> stream = Stream.of("Modern ", "Java ", "In ", "Action");
stream.map(String::toUpperCase).forEach(System.out::println);

 

요소가 없는 빈 스트림을 만들자 할때는 Stream.empty를 사용하시면 됩니다.

 

Stream<String> emptyStream = Stream.empty();

 

2) null이 될 수 있는 객체로 스트림 만들기

 

개발을 하다보면 null 값도 스트림으로 만들어야 하는 경우가 생깁니다.

이를 위해, 자바 9에서는 ofNullable 메서드를 제공합니다.

 

예제는 아래와 같습니다.

 

Stream<String> homeValueStream = Stream.ofNullable(System.getProperty("home"));

 

3) 배열로 스트림 만들기

 

컬렉션이 아닌 배열에서도 Arrays.stream을 통해 스트림을 만들 수 있습니다.

 

예제는 아래와 같습니다.

 

int[] numbers  = {1, 2, 3, 4 ,5};
int sum = Arrays.stream(numbers).sum();

 

4) 파일로 스트림 만들기

 

파일 처리의 I/O 연산을 위해 사용하는 java.nio.file 에서도 스트림 처리가 가능하도록 업데이트 되었습니다.

 

예제는 아래와 같습니다.

 

long uniqueWords = 0;
try (Stream<String> lines = Files.lines(Paths.get("data.txt"), Charset.defaultCharset())) {
    uniqueWords = lines.flatMap(line -> Arrays.stream(line.split(" ")))
            .distinct()
            .count();
} catch (IOException e) {
    e.printStackTrace();
}

 

파일의 고유 단어들이 몇개인지 찾는 예제입니다.

 

5) 함수로 무한 스트림 만들기

 

스트림 API에서는 무한 스트림을 만들 수 있는 Stream.iterate와 Stream.generate를 제공합니다.

 

무한 스트림을 만들수 는 있지만, 보통 limit을 통해 무한으로 사용하는 것을 방지합니다.

 

1. Stream.iterate

 

iterate 정적 메서드는 초깃값과 람다를 인수로 받아 새로운 값을 무한으로 생성할 수 있습니다.

이러한 스트림을 언바운드 스트림이라고 일컫습니다.

 

아래는 예제 코드입니다.

 

Stream.iterate(0, n -> n + 2)
        .limit(10)
        .forEach(System.out::println);

 

자바 9에서는 iterate 함수에 프레디케이트를 지원합니다.

 

아래는 0부터 4씩 증가하며 100보다 작은 값으로 스트림을 생성하는 예제입니다.

 

Stream.iterate(0, n -> n < 100, n -> n + 4)
	.forEach(System.out::println);

 

두번째 인자로 프레디케이트가 들어간것을 볼 수 있습니다.

 

2. Stream.generate

 

generate는 iterate와 같이 무한 스트림을 생성합니다.

 

차이점으로는 인수로 Supplier를 받는 다는 점입니다.

 

결국, generate는 무한히 지정한 Supplier를 호출하여 반환된 값으로 스트림을 생성하게 됩니다.

 

이는 iterate와 큰 차이를 가져오게 됩니다.

이유는 바로 인자로 Supplier를 받기 때문입니다.

 

Supplier는 별도로 생성하여 인자로 줄 수 있기 때문에 커스텀이 가능하게 됩니다.

그 말은, iterate와 같이 스트림으로 만드는 요소의 값이 불변이 아니라 가변이 될 수 있게 되는 것입니다.

 

아래는 그 예제 코드입니다.

 

IntSupplier fib = new IntSupplier() {
    private int previous = 0;
    private int current = 1;
    @Override
    public int getAsInt() {
        int oldPrevious = this.previous;
        int nextValue = this.previous + this.current;
        this.previous = this.current;
        this.current = nextValue;
        return oldPrevious;
    }
};

IntStream.generate(fib).limit(10).forEach(System.out::println);

 

10. 마무리

이번 포스팅에서는 Chapter 5인 스트림 활용에 대해 진행하였습니다.

다음에는 Chapter 6 스트림으로 데이터 수집에 대해 포스팅하겠습니다.

반응형

'Programming > ModernJavaInAction' 카테고리의 다른 글

(7) 병렬 데이터 처리와 성능  (0) 2020.04.11
(6) 스트림으로 데이터 수집  (0) 2020.04.04
(4) 스트림 소개  (0) 2020.03.28
(3) 람다 표현식  (0) 2020.03.28
(2) 동작 파라미터화 코드 전달하기  (0) 2020.03.21
반응형

 

이번 문제는 정렬된 List<int>를 인자로 받아 높이가 균등한 balanced binary search tree를 만드는 문제입니다.

 

트리의 사용 이유중에는 정렬을 위해서도 있지만, 문제에서는 이미 정렬된 문자열을 주기 때문에

문자열의 중간값을 찾고 중간에서부터 각 왼쪽과 오른쪽값들은 현재 노드에서 왼쪽과 오른쪽으로 넘겨주면 자연스럽게

높이가 균등한 BST가 완성되게 됩니다.

 

코드는 아래와 같습니다.

 

class TreeNode:
    def __init__(self, x):
        self.val = x
        self.left = None
        self.right = None

class Solution:
    def sortedArrayToBST(self, nums: List[int]) -> TreeNode:
        if not nums: return None

        mid = len(nums) // 2

        res = TreeNode(nums[mid])
        res.left = self.sortedArrayToBST(nums[:mid])
        res.right = self.sortedArrayToBST(nums[mid+1:])
        return res

 

감사합니다.

반응형
반응형

1. 서론

이번 포스팅에서는 스트럭처 스트리밍에 대해 알아 보도록하겠습니다.

 

들어가기에 앞서 간략히 스트럭처 스트리밍에 대해 소개하겠습니다.

 

스트럭처 스트리밍은 스파크 2.0에서 제공하는 또 하나의 스트리밍 처리 API 입니다.

 

스트럭처 스트리밍의 경우 앞장에서 본 스파크 스트리밍과는 약간 다릅니다.

 

스파크 스트리밍의 경우, 데이터를 일정 구간과 간격을 두고 마이크로 배치로 처리하는것에 반해

스트럭처 스트리밍은 꾸준히 생성되는 데이터를 무한히 증가하는 하나의 커다란 데이터셋으로 간주하고 처리하는 방식입니다.

즉, 배치처리와 유사한 방법으로 스트리밍 데이터를 처리하도록 제공한다고 이해하시면 됩니다.

 

정확히 말하면 무한히 커지는 데이터셋을 처리하진 않습니다.
앞에서 얘기했던 reduceByKeyAndWindow와 같이 스트리밍 데이터의 상태값, 결과값을 저장하며
다음 스트리밍 데이터를 적절히 합해서 처리하는 방식입니다.

하지만 이러한 복잡한 로직을 스파크가 담당하여 처리해준다는 것에 의미가 있으며,
스파크에게 위임함에 따라 오류 복구처리도 제공합니다.

 

2. 데이터프레임과 데이터 셋 생성

스트럭처 스트리밍에서 사용하는 데이터 모델은 데이터 셋입니다.

이 데이터 셋을 생성하는 방법으로는 기존 DataFrameReader를 사용했었지만,

스트럭처 스트리밍 처리를 위해서는 DataStreamReader를 통해 사용해야 합니다

 

DataStreamReader의 경우는 스파크 세션의 readStream()을 통해 생성이 가능하며,

스파크 2.3.0 버전 기준으로 아래와 같이 지원합니다.

 

  • 파일 소스
  • 카프카 소스
  • 소켓 소스
  • Rate 소스
소켓, Rate 소스의 경우에는 데이터 중복이나 누락이 생길 수 있어 테스트 용도 외에는 사용하지 않는것을 권장합니다.

 

아래는 각 소스별 간단한 사용법입니다.

 

1) 파일 소스

 

path라는 옵션이 있으며, 디렉터리 정보를 넘기면 됩니다.

해당 디렉터리에 새로운 파일이 생길때마다 파일을 open하여 읽고 처리합니다.

 

glob과 같이 패턴 지정이 가능하지만 단 하나만 사용가능합니다.

 

그외 대표적인 옵션으로는 아래와 같습니다.

 

  • maxFilesTrigger = 한 번에 처리 가능한 최대 파일 수
  • lastestFirst = 가장 마지막에 들어온 파일을 가장 먼저 처리할 것인지 설정 여부
  • filenameOnly = 동일 파일 여부를 판단할 때 디렉터리 경로는 무시하고 파일 이름만 비교할 것인지 설정 여부

 

아래는 지원하는 대표적인 파일 포맷입니다.

 

  • csv
  • json
  • parquet
  • text
단, csv와 parquet은 반드시 스키마 정보를 같이 입력해야하며 
생략을 원할땐 spark.sql.streaming.schemaInterface 값을 true로 주어야 합니다.

 

2) 카프카 소스

 

카프카 토픽으로부터 데이터를 읽어 스트림을 생성합니다.

카프카의 경우 스파크와 연계성이 좋으며, 데이터 유실이나 중복이 발생하지 않는 연동 방식을 사용합니다.

 

 

3) 소켓 소스

 

소켓을 통해 데이터를 읽어 스트림을 생성합니다.

간단히 테스트용으로 많이 사용됩니다.

 

 

4) Rate 소스

 

timestamp, value 칼럼을 가지고 있는 데이터 소스이며, 스파크 세션의 readStream 메서드로 생성가능합니다.

소켓소스와 같이 테스트용으로 주로 사용됩니다.

 

 

아래는 파일 소스를 처리하는 간단한 예제 코드입니다. (python)

파일소스의 경우 스파크는 디렉터리를 모니터링하기 때문에,
기존 파일에 append가 아닌 파일을 새로 추가하는 방식으로 작업해야 합니다.

 

source = spark\
	.readStream\
        .schema(StructType().add("name", "string").add("age", "integer"))\
        .json(<path_to_dir>)

 

 

 

 

 

반응형

 

 

 

 

 

3. 스트리밍 연산

스트럭처 스트리밍은 무한한 크기를 가진 데이터셋 또는 데이터프레임으로 간주해서 처리합니다.

때문에, 데이터프레임 또는 데이터 셋 API에서 제공하는 대부분의 연산을 그대로 사용 가능합니다.

 

아래는 스트럭처 스트리밍 연산의 종류와 내용입니다.

 

 

1) 기본 연산 및 집계 연산

 

스트럭처 스트리밍의 경우 고정된 데이터와 실시간으로 생성되는 스트리밍에도 적용 가능한 API입니다.

그로인해, 연산이 가능할때가 있고 가능하지 않을때가 있습니다.

 

그 중, 스트리밍 데이터와 크기가 고정된 데이터 모두 사용 가능한 일반 연산으로 map, flatMap, reduce, select 등이 있습니다.

 

아래는 유의 사항이 있는 연산들입니다.

 

  • groupBy, agg = 스트리밍 데이터의 경우 하나 이상의 집계연산을 중복 적용할 수 없습니다.
  • limit, take, distinct = 스트리밍 데이터를 대상으로 limit, take, distinct 메서드는 호출할 수 없습니다.
  • sorting = 스트리밍 데이터에 대한 정렬은 집계 연산 적용 후 complete 출력 모드의 경우에만 가능합니다.
  • count = count 연산 역시 집계 연산의 결과에만 가능합니다.
  • foreach = 스트리밍 데이터셋의 경우 ds.writeStream.foreach 형태로만 사용 가능합니다.
  • show = 스트리밍 데이터셋의 경우 show가 아닌 DataStreamWriter의 출력 포맷을 콘솔로하여 출력해야 합니다.

 

2) 윈도우 연산

 

윈도우 연산은 스트리밍 데이터를 다룰 대 자주 사용되는 데이터 처리 기법 중 하나입니다.

스트럭처 스트리밍 역시 이 윈도우 연산을 지원합니다.

 

단, 앞의 스파크 스트리밍의 윈도우 연산과는 다른 방식으로 동작합니다.

이유는, 스트럭처 스트리밍은 윈도우를 데이터를 읽어들이는 단위가 아닌 그룹키로 사용하기 때문입니다.

 

아래는 스트럭처 스트리밍에서 윈도우 연산늘 통해 5분마다 단어 수 세기 예제 입니다.

 

spark = SparkSession.builder...

lines = spark\
	.readStream\
        .format("socket")\
        .option("host", "localhost")
        .option("port", "9999")\
        .option("includeTimestamp", False)\
        .load()\
        .select(col("value").alias("words"), current_timestamp().alias("ts"))

words = lines.select(explode(split(col("words"), " ")).alias("word"), window(col("ts"), "10 minute", "5 minute").alias("time"))
wordCount = words.groupBy("time", "word").count()

query = wordCount.writeStream...
query.awaittermination()

 

위 예제는 5분마다 10분동안의 데이터를 하나의 Row로 간주하여 처리하는 예제입니다.

그로인해, 실제 12:00:00~12:10:00 과 같은 칼럼 값이 그룹키로 할당되고 이를 통해 groupBy 연산이 가능하게 됩니다.

 

아래는 출력문을 간단히 테이블로 표시했습니다.

 

time word count
[2018-03-05 22:45:00, 2018-03-05 22:55:00] test 1
[2018-03-05 22:50:00, 2018-03-05 23:00:00] test 1

 

정각 기준으로 스트림 배치가 시작되는것을 원하지 않는다면, window 메서드의 4번째 인자로 시작 시간값을 설정하면 됩니다.

 

 

3) 워터마킹

 

스트럭처 스트리밍은 스트림 데이터의 이전 상태와 변경 분을 찾아내 필요한 부분만 수정하는 기능을 제공한다고 했습니다.

하지만 사실상, 위 같은 기능을 구현하기 위해서는 데이터를 메모리에 계속 들고 있으면서 스트림으로 유입되는 데이터를 무한정 기다리면서 처리해야합니다.

 

하지만, 스파크에서도 무한정 기다릴수는 없기 때문에 워터마킹 기법을 제공합니다.

 

워터마킹은 스파크 2.1.0 버전에서 새롭게 제공된 기능입니다.

워터 마킹은 간단하게 유입되는 이벤트의 유효 시간을 주는 것 입니다.

 

아래는 spark 공식 가이드에 나와 있는 워터마크 그림입니다.

 

 

워터 마크를 사용하는 경우 지정한 워터마크 시간동안 데이터가 유입될 수 있습니다.

이 말은, 지정한 윈도우안에 있는 값이 변경가능하게 된다는 의미이며, 워터마크시간까지 지나야 해당 윈도우에 있는 값은 불변 데이터로 변경되어집니다.

 

그로인해, 워터마크를 적용하게 되면 윈도우의 데이터는 실시간으로 결과를 출력할 수는 없습니다.

 

아래는 워터마크를 적용한 예제입니다.

워터마크 사용시에는 워터마크로 사용한 timestamp칼럼으로 집계함수의 키로 사용해야 합니다.

 

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.functions._
import spark.implicits._

val spark = SparkSession.builder() ...
val lines = spark.
	.readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", 9999)
        .option("includeTimestamp", false)
        .load()
        .select('value as "words", current_timestamp as "timestamp")

val words = lines.select(explode(split("words", " ")).as("word"), 'timestamp)

val wordCount = words.withWatermark("timestamp", "10 minutes").groupBy(window('timestamp, "10 monutes", "5 minutes"), 'word).count

val query = wordCount.writeStream
	.outputMode(OutputMode.Append())
        .format("console")
        .option("truncate", false)
        .start()

query.awaitTermination()

 

워터마크 시간을 길게 할수록 처리 결과가 늦게 출력되는 것을 유념하여 사용해야 합니다.

 

 

4) 조인 연산

 

스트럭처 스트리밍이 처음 소개 되었을때는 조인연산에 제약이 많았습니다.

하지만 2.3.0 버전부터는 대폭 개선되어 다양한 방식의 조인 연산이 가능해졌습니다.

 

  • inner join = inner join의 경우 특별한 제약없이 사용 가능합니다.
    • 단, 무한정 보관하는 것은 불가능한 일이기 때문에 워터마크를 사용하여 특정시점이 지난데이터는 지우도록해야합니다.
  • outer join = outer join의 경우 반드시 스트림 데이터셋을 기준으로 하는 outer join 만 가능합니다.

 

5) 스트리밍 쿼리

 

현재 까지는 스트리밍 데이터를 처리하는것을 소개했습니다.

하지만 이 처리 연산을 실행하기 위해서는 DataStreamWriter를 이용한 쿼리 작업이 필요합니다.

 

1. DataStreamWriter

 

DataStreamWriter는 데이터 셋의 writeStream 메서드로 생성할 수 있습니다.

 

사용용도 로는 아래와 같습니다.

 

  • 저장 모드
  • 쿼리명
  • 트리거 주기
  • 체크포인트

 

2. 스트리밍 쿼리

 

DataStreamWriter는 스트리밍 애플리케이션을 시작하기 위한 start 메서드를 제공합니다.

 

start 메서드의 반환값은 StreamingQuery 타입입니다.

이를 통해, 실행 중인 어플리케이션의 모니터링 및 관리가 가능합니다.

 

대표적으로는 아래와 같습니다.

 

  • 쿼리 이름, id 조회
  • 쿼리 중지
  • 소스와 싱크의 상태 정보 조회
  • awaitTermination 을 통한 실행 동작 제어

 

3. 저장 모드

 

DataStreamWriter의 저장모드에는 3가지가 있습니다.

 

아래와 같습니다.

 

모드 설명
Append(추가) 기존에 처리된 결과를 제외한 새롭게 추가된 데이터만 싱크로 전달하는 방법입니다.


이 경우는, 기존 데이터의 수정이 발생할 가능성이 0%가 전제되어야 하기 때문에 시간의 흐름에 따라 데이터 수정이 일어나는 케이스에는 사용이 불가능합니다.
Complete(완전) 데이터 프레임이 가지고 있는 전체 데이터를 모두 출력하는 방법입니다.


항상 모든 데이터를 출력하기 때문에 데이터 사이즈가 작은 경우에 사용해야 합니다.
Update(수정) 마지막 출력이 발생된 시점부터 다음 출력이 발생하는 시점 동안 변경된 데이터만 출력하는 모드입니다.


변경분만 출력하기 때문에 조인 연산을 사용하는 경우에는 사용이 불가능합니다.

 

 

4. 싱크 (Sink)

 

스트럭처 스트리밍은 필요한 경우 ForEachWriter라는 방식을 통해 출력을 커스터마이징이 가능합니다.

 

ForEachWriter를 사용하는 경우 open, process, close 메서드를 구현해야 합니다.

 

단, ForEachWriter는 드라이버가 아닌 익스큐터에서 수행되며,
드라이버에서 수행하도록 하고 싶은 경우에는 ForEachWriter 객체의 생성자 본문에 초기화 코드를 작성하면 됩니다.

 

아래는 ForEachWriter 사용 예제 입니다.

 

val query = wordCount
	.writerStream
        .outputMode(OutputMode.Complete)
        .foreach(new ForeachWriter[Row] {

            def open(partitionId: Long, version: Long): Boolean = {
                println(s"partitionId:${partitionId}, version:${version}")
		true
            }

            def process(record: Row) = {
                println("process:" + record.mkString(", "))
            }

            def close(errorOrNull: Throwable): Unit = {
                println("close")
            }
        }).start()

 

4. 마무리

이번 포스팅에서는 스트럭처 스트리밍에 대해 포스팅 하였습니다.

 

다음에는 스파크를 비교적 간단하게 Web 기반의 NoteBook에서 수행할 수 있도록 제공하는 제플린에 대해 포스팅하겠습니다.

반응형

'BigData > Spark' 카테고리의 다른 글

(8) CDH와 스파크 클라이언트 연동  (0) 2020.04.21
(7) Apache Zeppelin  (0) 2020.04.20
(5) 스파크 스트리밍  (0) 2020.03.19
(4) 스파크 SQL  (0) 2020.03.17
(3) 스파크 설정  (0) 2020.03.13
반응형

1. 서론

 

이번 포스팅에서는 Chapter4의 스트림 소개 에 대해 진행하도록 하겠습니다.

 

2. 스트림이란 무엇인가?

스트림은 자바 8에서 추가된 새로운 기능입니다.

 

이 스트림을 이용하여 SQL과 같이 선언형으로 컬렉션 데이터를 처리할 수 있으며, 

부가적으로 투명하게 멀티쓰레드로 처리도 할 수 있습니다.

 

아래는 자바 7의 코드를 자바 8의 스트림으로 변경하는 예제입니다.

 

@Getter
@RequiredArgsConstructor
private static class Dish {
	private final String name;
	private final boolean vegetarian;
	private final int calories;
	private final Type type;

	enum Type {
		MEAT, FISH, OTHER
	}
}

public static void main(String[] args) {
	List<Dish> menu = Arrays.asList(
				new Dish("pork", false, 800, Dish.Type.MEAT),
				new Dish("beef", false, 700, Dish.Type.MEAT),
				new Dish("chicken", false, 400, Dish.Type.MEAT),
				new Dish("french fries", true, 530, Dish.Type.OTHER),
				new Dish("rice", true, 350, Dish.Type.OTHER),
				new Dish("season fruit", true, 120, Dish.Type.OTHER),
				new Dish("pizza", true, 550, Dish.Type.OTHER),
				new Dish("prawns", false, 300, Dish.Type.FISH),
				new Dish("salmon", false, 450, Dish.Type.FISH)
		);
	List<Dish> lowCaloricDishes = new ArrayList<>();
	for(Dish dish : menu) {
		if(dish.getCalories() < 400) {
			lowCaloricDishes.add(dish);
		}
	}
	Collections.sort(lowCaloricDishes, new Comparator<Dish>() {
		@Override
		public int compare(Dish o1, Dish o2) {
			return Integer.compare(o1.getCalories(), o2.getCalories());
		}
	});
	List<String> lowCaloricDishesName = new ArrayList<>();
	for(Dish dish : lowCaloricDishes) {
		lowCaloricDishesName.add(dish.getName();
	}
}

 

위 코드는 400 칼로리보다 작은 음식들을 칼로리 순으로 정렬하여 이름만 담은 List를 만드는 자바7 코드입니다.

 

이것을 자바 8의 스트림을 사용하여 아래와 같이 변경할 수 있습니다.

 

@Getter
@RequiredArgsConstructor
private static class Dish {
	private final String name;
	private final boolean vegetarian;
	private final int calories;
	private final Type type;

	enum Type {
		MEAT, FISH, OTHER
	}
}

public static void main(String[] args) {
	List<Dish> menu = Arrays.asList(
				new Dish("pork", false, 800, Dish.Type.MEAT),
				new Dish("beef", false, 700, Dish.Type.MEAT),
				new Dish("chicken", false, 400, Dish.Type.MEAT),
				new Dish("french fries", true, 530, Dish.Type.OTHER),
				new Dish("rice", true, 350, Dish.Type.OTHER),
				new Dish("season fruit", true, 120, Dish.Type.OTHER),
				new Dish("pizza", true, 550, Dish.Type.OTHER),
				new Dish("prawns", false, 300, Dish.Type.FISH),
				new Dish("salmon", false, 450, Dish.Type.FISH)
		);
	List<String> lowCaloricDishesName = menu.stream()
			.filter(d -> d.getCalories() < 400)
			.sorted(Comparator.comparing(Dish::getCalories))
			.map(Dish::getName)
			.collect(Collectors.toList());
}

 

선언형으로 좀 더 readable 한 코드가 되었습니다.

또한, filter, sorted와 같이 여러 연산을 연결하여 처리 파이프라인을 만들었습니다.

 

이것을 멀티코어 아키텍쳐로 처리하고 싶다면 아래와 같이 stream()을 parallelStream()으로 변경하면 됩니다.

 

@Getter
@RequiredArgsConstructor
private static class Dish {
	private final String name;
	private final boolean vegetarian;
	private final int calories;
	private final Type type;
    
	enum Type {
		MEAT, FISH, OTHER
	}
}

public static void main(String[] args) {
	List<Dish> menu = Arrays.asList(
				new Dish("pork", false, 800, Dish.Type.MEAT),
				new Dish("beef", false, 700, Dish.Type.MEAT),
				new Dish("chicken", false, 400, Dish.Type.MEAT),
				new Dish("french fries", true, 530, Dish.Type.OTHER),
				new Dish("rice", true, 350, Dish.Type.OTHER),
				new Dish("season fruit", true, 120, Dish.Type.OTHER),
				new Dish("pizza", true, 550, Dish.Type.OTHER),
				new Dish("prawns", false, 300, Dish.Type.FISH),
				new Dish("salmon", false, 450, Dish.Type.FISH)
		);
	List<String> lowCaloricDishesName = menu.parallelStream()
			.filter(d -> d.getCalories() < 400)
			.sorted(Comparator.comparing(Dish::getCalories))
			.map(Dish::getName)
			.collect(Collectors.toList());
}

 

스트림 API의 특징은 아래와 같이 요약할 수 있습니다.

  • 선언형 : 더 간결하고 가독성이 좋습니다.
  • 조립할 수 있음 : 유연성이 좋습니다.
  • 병렬화 : 성능이 좋습니다.

 

 

 

 

반응형

 

 

 

 

 

3. 스트림 시작하기

스트림을 다시 재정의하자면 '데이터 처리 연산을 지원하도록소스에서 추출된 연속된 요소' 로 정의할 수 있습니다.

 

이 정의에서 나온 단어에 대해 하나씩 살펴보겠습니다.

 

1. 연속된 요소

 

컬렉션과 마찬가지로 스트림은 특정 요소 형식으로 이루어진 연속된 값 집합의 인터페이스를 제공합니다.

여기서 스트림은 filter, sorted와 같이 데이터의 계산에 더 중점을 두고 있습니다.

 

2. 소스

 

스트림은 컬렉션, 배열, I/O 자원 등의 데이터 제공 소스로부터 데이터를 소비합니다.

정렬된 컬렉션으로 스트림을 생성하면 정렬이 그대로 유지됩니다.

즉, 리스트로 스트림을 만들면 스트림의 요소는 리스트의 요소와 같은 순서를 유지합니다.

 

3. 데이터 처리 연산

 

스트림은 함수형 연산과 데이터베이스와 비슷한 연산을 지원합니다.

예로 filter, map, reduce, find, match 등이 있습니다.

 

이런 스트림에는 2가지 특징이 있습니다.

 

1. 파이프라이닝

 

대부분의 스트림연산은 스트림 연산끼리연결해서 파이프라이 구성이 가능합니다.

그로 인해 laziness, short-circuiting과 같은 최적화도 얻을 수 있습니다.

 

2. 내부 반복

 

스트림은 컬렉션과 달리 내부 반복을 사용합니다.

 

 

 

아래는 스트림을 사용한 하나의 예제입니다.

 

List<String> threeHighCaloricDishNames =
        menu.stream()
                .filter(d -> d.getCalories() > 300)
                .map(Dish::getName)
                .limit(3)
                .collect(Collectors.toList());

 

이 예제에서는 데이터 소스로 요리 리스트를 사용하고 있습니다.

또한 filter, map, limit, collect로 일련의 파이프 라인을 구성하여 데이터 처리를 하고 있습니다.

 

여기서 스트림 함수는 collect가 호출되기 전까지는 메서드 호출을 저장만 하고 실제로 수행하진 않습니다.

 

4. 스트림과 컬렉션

스트림과 컬렉션은 모두 연속된 요소 형식의 값을 저장하는 자료구조의 인터페이스를 제공합니다.

 

하지만, 스트림과 컬렉션도 차이점은 있습니다.

 

컬렉션의 경우 적극적 생성으로 모든 요소를 메모리에 올려서 사용하는 반면 스트림은 요청할때만 요소를 사용한다는 차이점이 있습니다.

 

1. 딱 한번만 탐색 가능

 

스트림도 반복자와 마찬가지로 딱 한번만 요소를 탐색하고 소비한다.

 

만약, 한번 더 탐색하고 싶다면 새로운 스트림을 열어야 합니다.

 

아래는 하나의 스트림에서 2번을 탐색하려고 하는 케이스로 오류가 나게 됩니다.

 

List<String> title = Arrays.asList("Java8", "In", "Action");
Stream<String> s = title.stream();
s.forEach(System.out::println);
s.forEach(System.out::println);

 

2. 외부 반복과 내부 반복

 

컬렉션 인터페이스는 사용자가 직접 요소를 반복해야합니다. 이를 외부 반복이라고 부릅니다.

반면, 스트림의 경우에는 내부 반복을 사용합니다.

 

외부 반복과 내부 반복의 차이점으로는 2가지가 있습니다.

 

첫째, 관리측면입니다.

내부 반복의 경우 병렬성을 안전하게 제공하고 있습니다.

하지만, 외부 반복의 경우에는 병렬성을 스스로 관리해야합니다.

 

둘째, 최적화입니다.

외부 반복의 경우에는 컬렉션에서 데이터 요소를 일일히 하나씩 가져와 처리를하게 되지만,

내부반복의 경우에는 내부적으로 함수들의 최적화 순서로 실행하게 됩니다.

 

아래는 외부 반복과 내부 반복의 예시 입니다.

 

// 외부 반복
List<Dish> menu = Arrays.asList(
        new Dish("pork", false, 800, Dish.Type.MEAT),
        new Dish("beef", false, 700, Dish.Type.MEAT),
        new Dish("chicken", false, 400, Dish.Type.MEAT),
        new Dish("french fries", true, 530, Dish.Type.OTHER),
        new Dish("rice", true, 350, Dish.Type.OTHER),
        new Dish("season fruit", true, 120, Dish.Type.OTHER),
        new Dish("pizza", true, 550, Dish.Type.OTHER),
        new Dish("prawns", false, 300, Dish.Type.FISH),
        new Dish("salmon", false, 450, Dish.Type.FISH)
);
List<String> names = new ArrayList<>();

for(Dish dish : menu) {
    names.add(dish.getName());
}

Iterator<Dish> iterator = menu.iterator();
while(iterator.hasNext()) {
    Dish dish = iterator.next();
    names.add(dish.getName());
}

// 내부 반복
List<String> names2 = menu.stream()
                .map(Dish::getName)
                .collect(Collectors.toList());

 

5. 스트림 연산

스트림 연산은 크게 2개로 나눌수 있습니다.

 

  • 중간 연산 - 스트림과 스트림을 연결할 수 있는 연산
  • 최종 연산 - 스트림을 닫는 연산

 

1. 중간 연산

 

중간 연산은 filter, sorted와 같이 다른 스트림을 반환합니다.

 

특징으로는 스트림 파이프라인을 실행하기 전까지는 아무 연산도 수행되지 않는다는 점입니다.

 

이것은 lazy연산으로 최적화로 이끌게 해주는 기법입니다.

 

2. 최종연산

 

최종연산은 반환값이 스트림이 아닌것을 의미하며 스트림 파이프라인을 수행시키는 트리거 역할을 합니다.

대표적으로 foreach, collect등이 있습니다.

 

3. 스트림 이용하기

 

스트림 이용과정은 아래의 3가지로 요약할 수 있습니다.

 

  • 질의를 수행할 데이터 소스
  • 스트림 파이프라인을 구성할 중간 연산 연결
  • 스트림 파이프라인을 실행하고 결과를 만들 최종 연산

6. 마무리

이번 포스팅에서는 Chapter 4인 스트림 소개에 대해 진행하였습니다.

다음에는 Chapter 5인 스트림 활용에 대해 포스팅하겠습니다.

반응형
반응형

1. 서론

이번 포스팅에서는 Chapter3의 람다 표현식 에 대해 진행하도록 하겠습니다.

 

2. 람다란 무엇인가?

람다 표현식은 메서드로 전달할 수 있는 익명함수를 단순화한 것입니다.

 

아래는 람다의 특징입니다.

 

  1. 익명 : 보통 메서드와 달리 이름이 없어 익명함수입니다.
  2. 함수 : 람다는 메서드처럼 특정 클래스에 종속되지 않아 함수라고 부릅니다.
  3. 전달 : 람다는 메서드 인수로 전달하거나 변수로 저장이 가능합니다.
  4. 간결성 : 익명 클래스처럼 자질구레한 코드가 필요 없습니다.

람다는 자바 8 이전에 할 수 없었던 것을 제공하는 것이 아니라 간결한 코드를 만들어 준다고 이해하면 되겠습니다.

 

아래는 Comparator의 compare 메서드를 람다로 표현한 예제 입니다.

  • 파라미터 리스트 :  compare 메서드 파라미터입니다.
  • 화살표 : 람다의 파라미터 리스트와 바디를 구분하는 역할입니다.
  • 람다 바디 : 람다의 반환값에 해당하는 표현식입니다.

메서드는 인자 파라미터가 없을수도 있고 반환값이 없거나 반환값을 위해서 다양한 일을 수행해야 할수도 있습니다.

람다는 이를 모두 지원하기 위해 아래와 같이 총 5가지의 형태가 모두 가능합니다.

 

// 1 - String 파라미터 하나에 int 반환을 하는 람다
(String s) -> s.length()

// 2 - Apple 파라미터 하나에 boolean 반환을 하는 람다
(Apple a) -> a.getWeight() > 150

// 3 - int 파라미터 두개에 반환값이 없는 람다
(int x, int y) -> {
	System.out.println("Result: ");
    System.out.println(x+y);
}

// 4 - 파라미터는 없고 int를 반환하는 람다
() -> 42

// 5 - Apple 파라미터 두개에 int를 반환하는 람다
(Apple a1, Apple a2) -> a1.getWeight().compareTo(a2.getWeight())

 

3. 어디에, 어떻게 람다를 사용할까?

람다는 함수형 인터페이스라는 문맥에서 사용할 수 있습니다.

 

1. 함수형 인터페이스

 

함수형 인터페이스는 정확히 하나의 추상 메서드를 지정하는 인터페이스를 의미합니다.

 

인터페이스에는 많은 디폴트 메서드가 있을 수 있습니다.

하지만 추상 메서드가 오직 하나라면 이도 함수형 인터페이스라고 부를 수 있습니다.

 

이 함수형 인터페이스의 추상 메서드를 람다 표현식을 통해 간결하게 만들 수 있는 것입니다.

 

2. 함수 디스크립터

 

함수 디스크립터란 람다 표현식의 시그니처를 서술하는 메서드입니다.

 

여기서 시그니처란 함수의 설명이라고 이해하시면 됩니다.

 

예를 들어, Runnable 인터페이스는 인수와 반환값이 없는 시그니처 라고 말할 수 있습니다.

 

3. @FunctionallInterface

 

이 어노테이션은 함수형 인터페이스를 가리키는 어노테이션입니다.

만약 함수형 인터페이스가 아니라면 컴파일단에서 에러를 내뱉게 되어있습니다.

 

4. 람다 활용 : 실행 어라운드 패턴

람다를 활용하는 예제를 간단하게 진행해 보겠습니다.

 

아래와 같이 파일을 한줄씩 읽는 작업을 하는 메소드가 있습니다.

 

public String processFile() throws IOException {
	try(BufferedReader br = new BufferedReader(new FileReader("data.txt"))) {
		return br.readLine();
	}
}

 

 

현재는 파일에서 한번에 한줄만 읽을 수 있습니다.

만약, 파일에서 두줄씩 읽어야하는 요구사항이 생긴다면?

파일을 open해서 읽는 작업은 동일하되, processFile의 읽는 코드만 수정하면 좋을 것입니다.

 

이를 위해 동작 파라미터화를 적용하겠습니다.

 

1. 동작 파라미터화

 

String result = processFile((BufferedReader br) -> br.readLine() + br.readLine());

 

2. 함수형 인터페이스를 이용해서 동작 전달

 

위 정의한 동작을 함수형 인터페이스를 통해 전달하도록 수정해보겠습니다.

 

코드는 아래와 같습니다.

 

@FunctionalInterface
public interface BufferedReaderProcessor {
	String process(BufferedReader b) throws IOException;
}

public String processFile(BufferedReaderProcessor p) throws IOException {
	
}

 

3. 동작 실행

 

위 처럼 구조를 잡고 이젠 processFile의 메서드를 구현해 보겠습니다.

 

public String processFile(BufferedReaderProcessor p) throws IOException {
	try(BufferedReader br = new BufferedReader(new FileReader("data.txt"))) {
		return p.process(br);
	}
}

 

4. 람다 전달

 

이제 processFile을 사용하는 쪽에서는 람다를 통해 실행을 전달할 수 있습니다.

 

String oneLine = processFile((BufferedReader br) -> br.readLine());
String twoLines = processFile((BufferedReader br) -> br.readLine() + br.readLine());

 

위와 같은 작업을 적용한 패턴을 실행 어라운드 패턴이라고 부릅니다.

 

 

 

 

 

반응형

 

 

 

 

 

5. 함수형 인터페이스 사용

자바 8 에서는 기본적인 함수형 인터페이스를 java.util.function 패키지에 제공하고 있습니다.

 

대표적인 Predicate, Consumer, Function 에 대해 소개하도록 하겠습니다.

 

1. Predicate

 

test 라는 추상메서드가 있고, 시그니처는 아래와 같습니다.

제네릭 타입의 인자 한개를 받아 boolean 값을 반환.

 

아래는 Predicate를 사용하는 예제 코드입니다.

 

@FunctionalInterface
public interface Predicate<T> {
    boolean test(T t);
}

public <T> List<T> filter(List<T> list, Predicate<T> p) {
	List<T> results = new ArrayList<>();
	for (T t: list) {
		if(p.test(t)) {
			results.add(t);
		}
	}
	return results;
}

Predicate<String> nonEmptyStringPredicate = (String s) -> !s.isEmpty();
List<String> nonEmpty = filter(listOfStrings, nonEmptyStringPredicate);

 

2. Consumer

 

accept라는 추상메서드가 있고, 시그니처는 아래와 같습니다.

제네릭 타입의 인자 한개를 받고 반환 값은 없는 void.

 

아래는 Consumer를 사용하는 예제 코드입니다.

 

@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}

public <T> void forEach(List<T> list, Consumer<T> c) {
	for (T t: list) {
		c.accept(t);
	}
}

forEach(
    Arrays.asList(1,2,3,4,5,),
    (Integer i) -> System.out.println(i)
);

 

3. Function

 

apply라는 추상메서드가 있고, 시그니처는 아래와 같습니다.

제네릭 타입의 인자 한개를 받고 제네릭 타입을 반환.

 

아래는 Function을 사용하는 예제 코드입니다.

 

@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}

public <T, R> List<R> map(List<T> list, Function<T, R> f) {
	List<R> result = new ArrayList<>();
	for (T t: list) {
		result.add(f.apply(t));
	}
	return result;
}

List<Integer> l = map(
		Arrays.asList("lambdas", "in", "action"),
		(String s) -> s.length()
);

 

대표적인 3개의 함수형 인터페이스를 알아봤습니다.

 

java.util.function에는 이 3개 말고도 아래와 같이 더 있습니다.

 

  • Supplier<T>
  • UnaryOperator<T>
  • BinaryOperator<T>
  • BiPredicate<L, R>
  • BiConsumer<T, U>
  • BiFunction<T, U, R>

6. 형식 검사, 형식 추론, 제약

이번에는 자바 컴파일러가 람다를 어떻게 처리하는지 알아보도록 하겠습니다.

 

1. 형식 검사

 

람다가 사용되는 콘텍스트를 통해 람다의 형식을 추론할 수 있습니다.

 

이전에 예제로 했던 filter 를 통해 설명하겠습니다.

 

List<Apple> heavierThan150g = filter(inventory, (Apple apple) -> apple.getWeight() > 150);

 

 

 

위 예제의 람다는 아래와 같은 순으로 형식검사가 이루어 집니다.

 

  1. 람다가 사용된 콘텍스트인 filter 메서드를 확인
  2. 확인 결과 Predicate<Apple>로 대상 형식 확인
  3. Predicate<Apple>의 추상메서드 확인
  4. 람다의 디스크립터와 Predicate<Apple>의 추상메서드가 동일한지 확인

 

 

2. 같은 람다, 다른 함수형 인터페이스

 

위 형식검사에서 본듯이 하나의 람다는 여러개의 함수형 인터페이스와 호환이 가능합니다.

 

아래는 그 예제입니다.

 

Comparator<Apple> c1 = (Apple a1, Apple a2) -> a1.getWeight().compareTo(a2.getWeight());
ToIntBiFunction<Apple, Apple> c2 = (Apple a1, Apple a2) -> a1.getWeight().compareTo(a2.getWeight());
BiFunction<Apple, Apple, Integer> c3 = (Apple a1, Apple a2) -> a1.getWeight().compareTo(a2.getWeight());

 

3. 형식 추론

 

자바 컴파일러는 람다 표현식이 사용된 콘텍스트를 이용해 람다 표현식과 관련된 함수형 인터페이스를 추론 할 수 있습니다.

 

그로인해, 람다 사용시 이 추론을 이용해 더욱 간결한 코드작성이 가능합니다.

 

아래는 그 예제입니다.

 

Comparator<Apple> c1 = (a1, a2) -> a1.getWeight().compareTo(a2.getWeight());

 

컴파일러는 파라미터인 Apple을 추론하기 때문에 위와 같이 생략이 가능합니다.

 

4. 지역변수 사용

 

람다에서는 외부 변수를 조작하는 경우도 있습니다.

이 경우 가능은 하지만 외부 변수는 final로 선언되거나 final처럼 취급되는 것에 한해서 가능합니다.

 

이유로는 인스턴수 변수는 힙에, 지역 변수는 스택에 저장되기 때문입니다.

람다가 실행되는 스레드에서 지역 변수를 참조할 때는 지역변수의 복사본(읽기 전용)을 생성하고 참조하게 됩니다.

하지만, 람다가 직접 지역변수가 위치한 스택영역에 접근하게 되면 지역변수를 할당한 스레드가 끝나면 변수 할당이 해제되는 시점과 겹칠 수 있습니다.

그래서, 람다는 복사본(읽기 전용)을 참조하기 때문에 해당 변수의 값은 변경되어서는 안되는 제약사항이 생기게 된것입니다.

 

7. 메서드 참조

메서드 참조는 특정 메서드만을 호출하는 람다의 축약형입니다.

 

메서드 참조는 코드의 가독성을 높인다는 장점을 가지고 있습니다.

 

또한, 람다의 축약형으로 동일하게 콘텍스트의 형식과 일치해야 하며, 생성자 참조도 지원하고 있습니다.

 

아래는 생성자 참조의 예제 입니다.

 

Supplier<Apple> c1 = Apple::new

 

8. 람다, 메서드 참조 활용하기

이제 배운 람다와 메서드 참조를 통해 사과를 무게순으로 정렬하는 코드를 단계별로 적용하는 예제를 진행하겠습니다.

 

1. 코드 전달

 

@Getter
class Apple {
	private Integer weight;
}

public class AppleComparator implements Comparator<Apple> {
	@Override
	public int compare(Apple o1, Apple o2) {
		return o1.getWeight().compareTo(o2.getWeight());
	}
} 
inventory.sort(new AppleComparator());

 

2. 익명 클래스 사용

 

inventory.sort(new Comparator<Apple>() {
	@Override
	public int compare(Apple o1, Apple o2) {
		return o1.getWeight().compareTo(o2.getWeight());
	}
});

 

3. 람다 표현식 사용

 

inventory.sort((Comparator<Apple>) (o1, o2) -> o1.getWeight().compareTo(o2.getWeight()));

 

4. 메서드 참조 사용

 

Arrays.asList().sort(Comparator.comparing(Apple::getWeight));

 

9. 람다 표현식을 조합할 수 있는 유용한 메서드

람다 표현식을 조합할 수 있는 유용한 메서드는 바로 디폴트 메서드입니다.

 

함수형 인터페이스는 디폴트 메서드가 있더라도 추상 메서드만 한개를 가지고 있으면 되기 때문에, 이 규칙에도 어긋나지 않습니다.

 

대표적인 함수형 인터페이스들의 디폴트 메서드를 소개하겠습니다.

 

1. Comparator

 

  • comparing - 비교에 사용할 키를 추출하는 Function
  • reversed - 오름차순이 아니라 내림차순으로 정렬.
  • thenComparing - 정렬을 위한 비교자 추가

2. Predicate

 

  • negate - 기존 Predicate 객체 결과를 반전시킨 객체 생성
  • and - 두 Predicate를 and로 연결하여 새로운 Predicate 객체 생성
  • or - 두 Predicate를 or로 연결하여 새로운 Predicate 객체 생성

3. Function

 

  • andThan - 주어진 함수를 먼저 적용한 결과를 다른 함수의 입력으로 전달하는 함수
  • compose - 인수로 주어진 함수를 먼저 실행한 다음에 그 결과를 외부 함수의 인수로 제공

 

10. 마무리

이번 포스팅에서는 Chapter 3인 람다 표현식에 대해 진행하였습니다.

다음에는 Chapter 4인 스트림 소개에 대해 포스팅하겠습니다.

반응형

+ Recent posts