1. 서론
이번 포스팅에서는 Chapter15의 CompletableFuture와 리액티브 프로그래밍의 컨셉의 기초에 대해 진행하도록 하겠습니다.
2. 동시성을 구현하는 자바 지원의 진화
초기 자바는 Runnable과 Thread를 동기화된 클래스와 메서드를 이용해 잠갔습니다.
그 후, 자바 5에서는 ExecutorService 인터페이스를 제공하며 스레드 실행과 태스크를 분리하였고,
값을 반환하는 Callable도 ExecutorService로 사용할 수 있게 되었습니다.
ExecutorService는 Runnable, Callable 둘 다 인자로 받을 수 있습니다.
자바 8에서는 Future의 진화 단계인 CompletableFuture를 제공, 자바 9에서는 발행-구독 메커니즘을 위한 Flow를 제공하게 되었습니다.
CompletableFuture, Flow 의 제공 목표는 가능한한 동시에 블록킹 되지 않게 실행할 수 있도록 제공하기 위함입니다.
1) Executor와 쓰레드 풀
쓰레드의 문제
쓰레드는 자바 뿐만이 아닌 하드웨어, 운영체제에서도 최대 사용가능한 갯수가 있습니다.
만약 자바 어플리케이션의 쓰레드가 운영체제가 지원하는 쓰레드 수를 초과하면 에러가 발생 할 수 있습니다.
쓰레드 풀 그리고 쓰레드 풀이 더 좋은 이유
쓰레드 풀은 쓰레드를 계속 생성하는 것이 아니라 사용이 끝나면 반환하게 하여 재사용을 할 수 있도록 하는 방법입니다.
장점은 위의 쓰레드 문제같이 에러 나는 경우를 예방할 수 있으며, 쓰레드를 재사용할 수 있다는 점입니다.
쓰레드 풀 그리고 쓰레드 풀이 나쁜 이유
쓰레드 풀이 장점만 있는 것은 아닙니다.
쓰레드 풀을 사용 시 유휴상태의 쓰레드가 풀에 없다면 태스크는 블록킹되기 때문입니다.
3. 동기 API와 비동기 API
동기 API는 쓰레드가 CPU 자원을 점유한 상태로 아무런 일을 하지 않는 상황을 만듭니다.
이러한 동기 API의 문제점을 아래 예제 코드를 통해 보겠습니다.
public class ThreadExample {
public static void main(String[] args) throws InterruptedException {
int x = 1337;
Result result = new Result();
Thread t1 = new Thread(() -> {result.left = f(x);});
Thread t2 = new Thread(() -> {result.right = g(x);});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(result.left + result.right);
}
private static class Result {
private int left;
private int right;
}
}
위 예제는 f, g 라는 연산을 수행 후 두 결과를 합쳐 출력하는 예제입니다.
쓰레드를 2개 만들어 사용하지만 join 메서드로 인해 이들은 동시성이라고 할 수 없습니다.
1) Future 형식 API
위 예제를 Future 를 사용한다면 조금 개선이 될 수 있습니다.
Future는 태스크의 작업을 비동기로 쓰레드에서 수행하도록 합니다.
아래는 Future를 적용한 예제입니다.
int x = 1337;
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Integer> y = executorService.submit(() -> f(x));
Future<Integer> z = executorService.submit(() -> g(x));
System.out.println(y.get() + z.get());
2) 리액티브 형식 API
위 문제에서 Future 를 적용하였더라도 get() 메서드는 블로킹 되기 때문에 완벽한 해결책은 아닙니다.
리액티브 형식으로는 처리가 완료 되었을때의 콜백함수를 인자로 넣어 사용하는 것입니다.
아래는 예제입니다.
int x = 1337;
Result result = new Result();
f(x, (int y) -> {
result.left = y;
System.out.println((result.left + result.right));
});
g(x, (int z) -> {
result.right = z;
System.out.println((result.left + result.right));
});
위 같이 수행한다면 블록킹 되는 부분은 없을 겁니다.
하지만, f 와 g의 합계가 정확하게 출력되지 않으며 2번이나 출력이 될 것입니다.
이런 경우, 일반적으로 if-then-else 를 이용하여 해결 할 수 있습니다.
3) 잠자기(그리고 기타 블로캉 동작)는 해로운 것으로 간주
블로킹 코드가 있다면 쓰레드는 자원을 점유한 채 아무일도 하지 않을 것입니다.
이러한, 작업들이 조금씩 쌓이게되면 프로그램 전체에 영향이 가게 됩니다.
그러므로, 블로킹 동작은 최대한 배제해야하며 해로운 것으로 간주해야 합니다.
4) 비동기 API에서 예외는 어떻게 처리하는가?
비동기 API에서 호출된 메서드는 별도의 쓰레드에서 수행되기 때문에 호출자에서 예외를 핸들링 할 수 없습니다.
때문에, CompletableFuture에서는 런타임 get() 메서드에 예외를 처리할 수 있는 기능을 제공하였고,
exceptionally() 로 예외에서 회복할 수 있는 메서드도 제공하였습니다.
리액티브 형식에서는 값을 콜백형식으로 처리하기 때문에 예외 또한 콜백으로 처리하게됩니다.
플로를 예로 든다면 아래와 같이 예외에 대한 콜백 함수가 있습니다.
void onError(Throwable throwable)
4. CompletableFuture와 콤비네이터를 이용한 동시성
CompletableFuture는 complete 메서드를 통해 나중에 어떤 값을 이용해 다른 쓰레드가 이를 완료할 수 있게 허용합니다.
아래는 위 예제를 CompletableFuture를 이용한 예제로 바꾼 코드입니다.
ExecutorService executorService = Executors.newFixedThreadPool(2);
int x = 1337;
CompletableFuture<Integer> a = new CompletableFuture<>();
executorService.submit(() -> a.complete(f(x)));
int b = g(x);
System.out.println(a.get() + b);
위 예제의 경우 CompletableFuture를 사용하기만 했지 Future로 했을때처럼 블로킹은 존재합니다.
CompletableFuture에서는 이러한 문제를 해결하기 위해 thenCombine 메서드를 제공합니다.
아래는 예제입니다.
ExecutorService executorService = Executors.newFixedThreadPool(10);
int x = 1337;
CompletableFuture<Integer> a = new CompletableFuture<>();
CompletableFuture<Integer> b = new CompletableFuture<>();
CompletableFuture<Integer> c = a.thenCombine(b, (y, z) -> y + z);
executorService.submit(() -> a.complete(f(x)));
executorService.submit(() -> b.complete(g(x)));
System.out.println(c.get());
c의 경우 a와 b의 결과가 반환되기 전까지 쓰레드로 수행되지 않습니다.
때문에, 이제 블로킹이 없는 코드가 완성되었습니다.
5. 발행-구독 그리고 리액티브 프로그래밍
Future는 한번만 실행하여 결과를 제공합니다.
반면, 리액티브 프로그래밍은 시간이 흐르면서 여러 Future 같은 객체를 통해 여러 결과를 제공합니다.
자바 9에서는 java.util.concurrent.Flow 인터페이스에 발행-구독 모델을 적용하여 리액티브 프로그래밍을 제공합니다.
플로 API는 아래와 같이 3가지로 정리할 수 있습니다.
- 구독자가 구독할 수 있는 발행자
- 이 연결을 구독이라 합니다.
- 이 연결을 이용해 메시지 또는 이벤트를 전송합니다.
1) 발행 구독 예제
아래는 발행-구독의 간단한 예제 입니다.
public class SimpleCell implements Publisher<Integer>, Subscriber<Integer> {
private int value = 0;
private String name;
private List<Subscriber> subscriberList = new ArrayList<>();
public SimpleCell(String name) {
this.name = name;
}
@Override
public void subscriber(Subscriber<? super Integer> subscriber) {
subscriberList.add(subscriber);
}
private void notifyAllSubscribers() {
subscriberList.forEach(subscriber -> subscriber.onNext(this.value));
}
@Override
public void onNext(Integer newValue) {
this.value = newValue;
System.out.println(this.name + ":" + this.value);
notifyAllSubscribers();
}
}
public class FlowTest {
public static void main(String[] args) {
SimpleCell c3 = new SimpleCell("C3");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");
c1.subscriber(c3);
c1.onNext(10);
c1.onNext(20);
}
}
위 예제의 결과는 아래와 같이 출력됩니다.
C1 : 10
C3 : 10
C2 : 20
2) 역압력
발행-구독 모델에서는 구독자가 처리할 수 있는 양에 비해 발행자가 무수히 많은 데이터를 전달할 수 있으며, 이러한 상황을 압력이라고 합니다.
압력 현상이 발생 시 구독자는 처리할 양만 늘어나며 나중에는 많은 작업으로 어떤 문제가 발생할 수 있을지 모릅니다.
때문에, 구독자가 처리할 수 있을때만 발행자에게 데이터를 전달받도록 조절하는 것이 필요하며, 이를 역압력이라고 합니다.
플로 API에서는 Subscription을 통해 이 역압력을 지원합니다.
아래는 Subscription 입니다.
public static interface Subscription {
public void request(long n);
public void cancel();
}
Subscription 은 발행자와 구독자의 중간에서 소통을 해주는 역할로 이해하시면 됩니다.
아래는 Publisher, Subscriber, Subscription에 대해 그림으로 나타낸 것입니다.
6. 마무리
이번 포스팅에서는 Chapter15 CompletableFuture와 리액티브 프로그래밍의 컨셉의 기초에 대해 진행하였습니다.
다음에는 Chapter16 CompletableFuture : 안정적 비동기 프로그래밍에 대해 포스팅하겠습니다.
'Programming > ModernJavaInAction' 카테고리의 다른 글
(17) 리액티브 프로그래밍 (0) | 2020.05.24 |
---|---|
(16) CompletableFuture : 안정적 비동기 프로그래밍 (0) | 2020.05.24 |
(14) 자바 모듈 시스템 (0) | 2020.05.23 |
(13) 디폴트 메서드 (0) | 2020.05.23 |
(12) 새로운 날짜와 시간 API (0) | 2020.05.02 |