반응형

1. 서론

이번 포스팅에서는 Chapter7의 병렬 데이터 처리와 성능 에 대해 진행하도록 하겠습니다.

 

 

2. 병렬 스트림

java 8 에서는 병렬 처리를 간편하게 제공하고 있습니다.

예로 컬렉션에서 parallelStream를 통해 간편히 병렬 스트림 처리가 가능합니다.

 

1) 순차 스트림을 병렬 스트림으로 변환하기

 

순차 스트림에 parallel 메서드를 통해 병렬스트림으로 변경이 가능합니다.

 

아래는 그 예제입니다.

 

public long parallelSum(long n) {
    return Stream.iterate(1L, i -> i +1)
            .limit(n)
            .parallel()
            .reduce(0L, Long::sum);
}

 

parallel 처리 동작방식은 각 쓰레드에게 분할한 청크를 전달하여 병렬로 수행하도록 하는 것입니다.

 

아래는 그림으로 수행과정을 나타낸 것입니다.

 

 

추가로, 병렬에서 순차 스트림으로 다시 바꿀때에는 sequential 메서드를 사용하면 됩니다.

 

만약, parallel과 sequential 두개를 모두 사용했을 때에는 최종적으로 호출된 메서드로 전체 스트림 파이프라인에 영향이 미치게 됩니다.

 

병렬스트림으로 사용하는 쓰레드는 ForkJoinPool 을 사용하며 갯수는 Runtime.getRuntime().availableProcessors() 의 반환값으로 결정됩니다.
전역적으로 쓰레드수를 변경하고 싶을때는 아래와 같이 시스템 설정을 해주시면 됩니다.
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12")

 

 

 

2) 병렬 스트림 효과적으로 사용하기

 

병렬 프로그래밍은 항상 유심히 문제점이 없는지 파악한 후 개발해야 합니다.

 

java에서 제공해주는 병렬 스트림도 마찬가지입니다.

parallel을 사용하는 부분은 올바른 처리가 이루어지는지와 성능은 올라갔는지 테스트를 해야합니다.

java의 스트림은 내부로직이 복잡하기 때문에, 무분별하게 사용하게 되면 성능이 더욱 악화가 될 수 있습니다.

 

Collectors를 통해 리듀싱 작업을 수행하는 경우에는 toConcurrentMap과 같이 ConcurrentMap 클래스가 리턴타입일때만 병렬로 수행이 됩니다.
결국, 다른 toList와 같은 리턴타입은 병렬로 수행되지 않으므로 parallelStream으로 처리할 경우 성능이 악화 될 수 있습니다.

 

병렬로 처리해야하는지에 대한 결정은 아래와 같은 부분이 도움이 될 수 있습니다.

 

  1. 확신이 서지 않는다면 직접 성능 측정.
  2. 박싱 타입 유의
  3. 순차스트림보다 병렬 스트림에서 성능이 떨어지는 연산 고려 - ex) limit과 같이 순서에 의존하는 연산.
  4. 스트림 전체 파이프라인 연산 비용 고려
  5. 소량 데이터의 경우 병렬 스트림 제외
  6. 스트림으로 구성하는 자료구조가 적절한지 고려
  7. 최종 연산의 병합 과정 비용 고려

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

3. 포크/조인 프레임워크

포크/조인 프레임워크는 자바 7에서 추가되었으며, 자바 8의 병렬스트림에서는 내부적으로 포크/조인 프레임워크로 처리하고 있습니다.

 

포크/조인 프레임워크는 재귀를 통해 병렬화 작업을 작게 분할한 다음 ForkJoinPool의 작업자 스레드에 분산 할당하는 방식입니다.

 

1) RecursiveTask 활용

 

스레드 풀을 이용하려면 RecursiveTask<R>의 서브클래스를 만들어야 합니다.

여기서 R은 제네릭으로 결과타입 또는 결과가 없을때의 타입을 의미합니다.

 

RecursiveTask를 구현하기 위해서는 추상메서드인 compute메서드를 구현해야합니다.

compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의해야 합니다.

 

아래는 compute 메서드의 의사코드입니다.

 

if (태스크가 충분히 작거나 더 이상 분할 할 수 없으면) {
	순차적으로 태스크 계산
} else {
    태스크를 두 서브태스크로 분할
    태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함
    모든 서브태스크의 연산이 완료될 때까지 기다림
    각 서브태스크의 결과를 합침
}

 

아래는 long[] 의 sum을 구하는 것을 포크/조인 프레임워크를 사용하는 예제입니다.

 

public static long forkJoinSum(long n) {
    long[] numbers = LongStream.rangeClosed(1, n).toArray();
    ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
    return new ForkJoinPool().invoke(task);
}

public class ForkJoinSumCalculator extends RecursiveTask<Long> {
    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long THRESHOLD = 10_000;
    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }
    @Override
    protected Long compute() {
        int length = end - start;
        if(length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
        leftTask.fork();
        
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
        
        long rightResult = rightTask.compute();
        long leftResult = leftTask.join();
        
        return leftResult + rightResult;
    }
    private long computeSequentially() {
        long sum = 0;
        for(int i = start; i< end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

 

위 ForkJoinSumCalculator의 경우에는 배열의 길이가 10,000 보다 큰것은 반으로 자르면서 분할시키고 있습니다.

그리고 분할을 재귀로 계속 수행 후 결과를 모아서 반환하고 있습니다.

 

재귀를 통해 분할된 작업들은 ForkJoinPool에 넘겨져 병렬로 수행되어 집니다.

 

위 예제와 같이, 병렬로 수행 시 결과에 영향이 가지 않는 연산에서만 사용해야 합니다.

 

2) 포크/조인 프레임워크를 제대로 사용하는 방법

 

아래는 포크/조인 프레임워크를 효과적으로 사용하기 위해 알아야 할 점입니다.

 

  1. 두 서브 태스크가 모두 시작된 다음에 join을 호출해야 합니다.
  2. RecursiveTask 내에서는 ForkJoinPool의 invoke메서드 대신 fork나 compute 메서드를 직접 호출해야 합니다.
  3. 서브 태스크에서 fork나 compute를 통해 ForkJoinPool의 일정을 조절할 수 있습니다.
  4. 포크/조인 프레임워크를 이용하는 병렬계산은 디버깅하기 어렵습니다.
  5. 멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠르지는 않습니다.

 

3) 작업 훔치기

 

멀티 쓰레드로 처리를 하다보면 고루 처리량을 할당했더라도, 각 쓰레드마다 완료 시점이 다릅니다.

이 경우, 노는 쓰레드가 발생하게되며 성능이 생각한것만큼 좋아지지 않게됩니다.

 

이를 위해, 포크/조인 프레임워크는 작업훔치기라는 기법을 사용하고 있습니다.

간단히 말해서, task들을 큐가 아닌 덱에 넣고 노는 쓰레드는 일하는 쓰레드의 덱의 꼬리에서 task가 있다면 훔쳐와 동작하는 것입니다.

 

그렇기 때문에, task는 적절히 작은 양으로 분배가 되도록 해야합니다.

 

4. Spliterator 인터페이스

자바 8은 Spliterator라는 새로운 인터페이스를 제공합니다.

 

Spliterator는 스트림을 분할하여 처리할때 사용하며,

자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공하고 있습니다.

 

아래는 Spliterator 인터페이스에서 필수로 구현해야하는 메서드만을 모아놓은 명세입니다.

 

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);
    Spliterator<T> trySplit();
    long estimateSize();
    int characteristics();
}

 

tryAdvance 는 Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아 있으면 참을 반환합니다.

trySplit Spliterator의 일부 요소를 분할해서 두 번째 Spliterator를 생성하는 메서드입니다.

estimateSize 는 메서드로 탐색해야 할 요소 수 정보를 제공할 수 있습니다.

characteristicsSpliterator에서 정의한 int형으로 각 값은 의미하고 있는것이 있습니다. - ex) 16 = Spliterator.ORDERED

 

 

1) 분할 과정

 

Spliterator는 trySplit 메서드를 통해 스트림 요소를 재귀적으로 분할합니다.

 

아래는 Spliterator를 통해 분할하는 과정의 그림입니다.

 

 

이 과정은 characteristics 메서드로 정의하는 Spliterator의 특성에 영향을 받습니다.

 

2) Spliterator 특성

 

characteristics 메서드는 Spliterator 자체의 특성 집합을 포함하는 int를 반환합니다.

 

아래는 Spliterator 특성에 대한 표입니다.

 

특성 의미
ORDERED 리스트처럼 요소에 정해진 순서가 있으므로 순서에 유의해야 함.
DISTINCT x, y 두 요소를 방문했을 시 x.equals(y) 는 항상 false를 반환해야 함.
SORTED 탐색된 요소는 미리 정의된 정렬 순서를 따라야 함.
SIZED 크기가 알려진 소스로 Spliterator를 생성했으므로 estimateSize는 정확한 값을 반환함.
NON-NULL 탐색하는 모든 요소는 null이 아님.
IMMUTABLE Spliterator는 불변. 즉 탐색 중간에 추가, 삭제가 불가능함.
CONCURRENT 동기화 없이 Spliterator의 소스를 여러 쓰레드에서 동시에 고칠 수 없음.
SUBSIZED Spliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 갖고 있음.

 

5. 마무리

이번 포스팅에서는 Chapter 7인 병렬 데이터 처리와 성능에 대해 진행하였습니다.

다음에는 Chapter 8인 컬렉션 API 개선에 대해 포스팅하겠습니다.

반응형

'Programming > ModernJavaInAction' 카테고리의 다른 글

(9) 리팩터링, 테스팅, 디버깅  (0) 2020.04.13
(8) 컬렉션 API 개선  (0) 2020.04.13
(6) 스트림으로 데이터 수집  (0) 2020.04.04
(5) 스트림 활용  (0) 2020.04.04
(4) 스트림 소개  (0) 2020.03.28

+ Recent posts