1. 서론
이번 포스팅에서는 Chapter16의 CompletableFuture : 안정적 비동기 프로그래밍에 대해 진행하도록 하겠습니다.
2. Future의 단순 활용
Future 인터페이스는 비동기 계산을 모델링 하는데 쓰이며, 계산이 끝났을 때 결과에 접근할 수 있는 참조를 제공합니다.
Future 이용시에는 시간이 오래걸리는 작업을 Callable 객체 내부로 감싼 다음 ExecutorService에 제출하면 됩니다.
아래는 예제 코드입니다.
ExecutorService executorService = Executors.newCachedThreadPool();
Future<Double> future = executorService.submit(new Callable<Double>() {
Override
public Double call() throws Exception {
return doSomeThingLongComputation();
}
});
doSomeThingElse();
try {
double result = future.get(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// 현재 스레드에서 대기 중 인터럽트 발생
} catch (ExecutionException e) {
// 계산 중 예외
} catch (TimeoutException e) {
// Future가 완료되기 전에 타임아웃 발생
}
위 예제에서 doSomeThingLongComputation 연산은 별도 쓰레드에서 수행하게 됩니다.
수행 후, 값을 얻기 위해 get 메서드를 호출해야합니다.
연산이 끝난 경우에는 바로 반환되지만 연산이 진행중이라면 get 메서드에서 블로킹 됩니다.
1) Future의 제한
Future는 일련의 동시 실행 코드를 구현하기에 충분하지 않습니다.
예를들어, '오래 걸리는 A라는 계산이 끝나면 그 결과를 다른 오래 걸리는 계산 B로 전달하시오 그리고 B의 결과가 나오면 다른 질의의 결과와 B의 결과를 조합하시오' 와 같은 요구사항을 쉽게 구현하기가 어렵다는 점입니다.
이러한 제한적인 부분을 자바 8에서 제공하는 CompletableFuture로 해결할 수 있습니다.
3. 비동기 API 구현
CompletableFuture를 이용하여 최저가격 검색 어플리케이션 예제를 진행하겠습니다.
public class Shop {
private static final Random random = new Random();
private String name;
public Shop(String name) {
this.name = name;
}
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
private static void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
위 예제에서는 상품명을 받아 1초의 작업 후 연산 결과를 반환하는 getPrice 메서드가 있습니다.
1) 동기 메서드를 비동기 메서드로 변환
위 예제의 경우 getPrice를 호출한 쓰레드는 1초 간 블로킹 되어집니다.
이 부분을 해소하기 위해 CompletableFuture를 이용한 getPriceAsync 메서드를 추가하겠습니다.
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
futurePrice.complete(price);
}).start();
return futurePrice;
}
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long invocationTime = ((System.nanoTime() - start) / 1000000);
System.out.println("Invocation returned after " + invocationTime + "msecs");
doSomeThingElse(); // 가격 정보를 가져오는 동안 다른 일 수행
try {
double price = futurePrice.get();
System.out.printf("Price is %.2f%n", price);
} catch (Exception e) {
throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1000000);
System.out.println("Price returned after " + retrievalTime + " msecs");
위 코드의 getPriceAsync는 가격 계산이 끝나기 전에 return을 받습니다.
단, doSomeThingElse 의 작업이 먼저 끝난다면 get 메서드시 블로킹 되는 문제는 여전히 가지고 있습니다.
이 문제는, CompletableFuture의 기능을 활용하면 해결 가능하며 아래에서 더 살펴보겠습니다.
2) 에러 처리 방법
비동기로 처리시에는 별도의 쓰레드에서 동작하기에 에러 처리가 매우 까다롭습니다.
하지만, CompletableFuture 의 completeExceptionally 메서드를 사용하면 매우 간단해집니다.
아래는 예제 코드입니다.
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch (Exception e) {
futurePrice.completeExceptionally(e);
}
}).start();
return futurePrice;
}
위와 같이 코드 추가 후 예외가 실제로 발생되면, 클라이언트는 ExecutionException을 받게 됩니다.
3) 팩토리 메서드 supplyAsync로 CompletableFuture 만들기
위 CompletableFuture를 만드는 코드가 매우 긴게 가독성이 좋지 않습니다.
이를 위해, CompletableFuture는 팩터리 메서드로 supplyAsync를 제공합니다.
위 getPriceAsync 메서드를 팩터리 메서드 supplyAsync를 적용한 코드는 아래와 같습니다.
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
이 방법은 위 에러처리 방법에서 본 completeExceptionally 까지 포함한 메서드입니다.
4. 비블록 코드 만들기
이제 위 예제를 좀 더 확장하여 여러 가게에서 가격정보를 가져와 노출하는 경우를 추가하겠습니다.
코드는 아래와 같습니다.
public class ShopTest {
private static List<Shop> shopList = Arrays.asList(
new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShp["),
new Shop("BuyItAll")
);
public static void main(String[] args) {
long start = System.nanoTime();
System.out.println(findPrices("myPhone27S"));
long duration = (System.nanoTime() - start) / 1000000;
System.out.println("Done in " + duration + "msecs");
}
public static List<String> findPrices(String product) {
return shopList.stream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(Collectors.toList());
}
}
위 예제는 각 shop 마다 delay가 있어 최소 4초 이상의 시간이 걸립니다.
1) 병렬 스트림으로 요청 병렬화 하기
위 예제를 앞장에서 배웠던 병렬 스트림을 통해서 성능을 개선시킬 수 있습니다.
코드는 아래와 같습니다.
public static List<String> findPrices(String product) {
return shopList.parallelStream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(Collectors.toList());
}
위와 같이 병렬로 변경 시, 작업은 대략 1/4로 줄어듭니다.
2) CompletableFuture로 비동기 호출 구현하기
위 병렬 스트림으로 성능이 개선되었지만 비동기를 입혀 블로킹까지 없애 성능을 더욱 올리는게 바람직합니다.
병렬스트림으로 처리하더라도, 위와 같은 경우 한 쓰레드가 shop을 2개 이상 가지게 된다면 블로킹은 여전히 존재합니다.
코드는 아래와 같습니다.
public static List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures = shopList.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))
))
.collect(Collectors.toList());
return priceFutures.stream().map(CompletableFuture::join)
.collect(Collectors.toList());
}
3) 커스텀 Executor 사용하기
위 병렬스트림과 CompletableFuture 를 적용한 성능은 내부적으로 Runtime.getRuntime().availableProcessors() 가 반환하는 스레드 수로 동작합니다.
따라서, 쓰레드 풀을 직접 생성하여 동작시킨다면 CompletableFuture를 이용한 비동기 프로그래밍은 더욱 유연해지고 성능 향상이 일어나게 됩니다.
아래는 예제입니다.
public static List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures = shopList.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))
, executor
))
.collect(Collectors.toList());
return priceFutures.stream().map(CompletableFuture::join)
.collect(Collectors.toList());
}
간단히 비동기 처리에 쓸 Executor를 두번째 인자로 추가만 해주면 됩니다.
5. 비동기 작업 파이프라인 만들기
위 예제에서 할인율을 더해 출력하는 요구사항을 추가해보도록 하겠습니다.
아래는 getPrice 메서드를 shop 이름, 가격, DisCount 정보를 가진 문자열로 반환하도록 변경한 예제입니다.
public class Discount {
public enum Code {
NONE(0),
SILVER(5),
GOLD(10),
PLATINUM(15),
DIAMOND(20)
;
private final int percentage;
Code(int percentage) {
this.percentage = percentage;
}
}
}
public String getPrice(String product) {
double price = calculatePrice(product);
Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s", name, price, code);
}
1) 할인 서비스 구현
위 getPrice의 문자열을 파싱하여 사용하는 클래스는 아래 Quote로 정의하였습니다.
추가로, 이제 Quote 객체를 Discount에 넘겨 할인이 적용된 값을 반환하는 메서드를 추가했습니다.
public class Quote {
private final String shopName;
private final double price;
private final Discount.Code code;
public Quote(String shopName, double price, Discount.Code code) {
this.shopName = shopName;
this.price = price;
this.code = code;
}
public static Quote parse(String s) {
String[] split = s.split(":");
String shopName = split[0];
double price = Double.parseDouble(split[1]);
Discount.Code code = Discount.Code.valueOf(split[2]);
return new Quote(shopName, price, code);
}
public String getShopName() {
return shopName;
}
public double getPrice() {
return price;
}
public Discount.Code getCode() {
return code;
}
}
public class Discount {
public enum Code {
NONE(0),
SILVER(5),
GOLD(10),
PLATINUM(15),
DIAMOND(20)
;
private final int percentage;
Code(int percentage) {
this.percentage = percentage;
}
}
public static String applyDiscount(Quote quote) {
return quote.getShopName() + "price is " +
Discount.apply(quote.getPrice(), quote.getCode());
}
private static double apply(double price, Code code) {
delay();
return price * (100 - code.percentage) / 100;
}
}
2) 할인 서비스 사용
이제 위 추가된 내용을 사용하기에 가장 쉬운 방법은 아래와 같이 stream으로 처리하는 것입니다.
public static List<String> findPrices(String product) {
return shopList.stream()
.map(shop -> shop.getPrice(product))
.map(Quote::parse)
.map(Discount::applyDiscount)
.collect(Collectors.toList());
}
위 코드는 처리만 할 뿐 성능 최적화와는 거리가 멀게 됩니다.
2) 동기 작업과 비동기 작업 조합하기
위 코드를 비동기를 이용한 코드로 바꿔보겠습니다.
예제는 아래와 같습니다.
public static List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures = shopList.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor)
)
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(
quote -> CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor
)
)
)
.collect(Collectors.toList());
return priceFutures.stream().map(CompletableFuture::join)
.collect(Collectors.toList());
}
위 코드에서 thenApply 는 CompletableFuture가 끝날때까지 블록하지 않습니다.
따라서 블로킹 없이 CompletableFuture<String> 에서 CompletableFuture<Quote>로 변환됩니다.
추가로, thenCompose 메서드는 첫번째 연산의 결과를 두번째 연산으로 전달하는 메서드입니다.
위에서는 quote의 결과를 받으면 Discount.applyDiscount 메서드를 한번 더 비동기로 수행시키는 코드입니다.
3) 독립 CompletableFuture 와 비독립 CompletableFuture 합치기
개발을 하다보면 두개의 독립적인 비동기 작업을 합쳐야 하는 경우가 있습니다.
이런 경우 thenCombine 메서드를 활용하시면 됩니다.
예제는 아래와 같습니다.
Future<Double> futurePriceInUSD =
CompletableFuture.supplyAsync(() -> shop.getPrice(prduct))
.thenCombine(CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate);
아래는 두 Future를 합치는 작업을 도식화 한것입니다.
출처 : 모던 자바 인 액션
4) 타임아웃 효과적으로 사용하기
Future의 단점으로는 계산이 길어지는 경우 무한정 대기할 수도 있다는 부분입니다.
CompletableFuture에서는 이를 위해 orTimeout 메서드를 제공합니다.
아래는 예제입니다.
Future<Double> futurePriceInUSD =
CompletableFuture.supplyAsync(() -> shop.getPrice(prduct))
.thenCombine(CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate)
.orTimeout(3, TimeUnit.SECONDS);
6. CompletableFuture의 종료에 대응하는 방법
CompletableFuture는 thenAccept라는 메서드를 제공합니다.
이 메서드는 연산 결과를 소비하는 Consumer를 인수로 받아 사용합니다.
아래는 예제입니다.
CompletableFuture[] futures = shopList.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor)
)
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(
quote -> CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor
)
)
)
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
7. 마무리
이번 포스팅에서는 Chapter16 CompletableFuture : 안정적 비동기 프로그래밍에 대해 진행하였습니다.
다음에는 Chapter17 리액티브 프로그래밍에 대해 포스팅하겠습니다.
'Programming > ModernJavaInAction' 카테고리의 다른 글
(17) 리액티브 프로그래밍 (0) | 2020.05.24 |
---|---|
(15) CompletableFuture와 리액티브 프로그래밍의 컨셉의 기초 (0) | 2020.05.23 |
(14) 자바 모듈 시스템 (0) | 2020.05.23 |
(13) 디폴트 메서드 (0) | 2020.05.23 |
(12) 새로운 날짜와 시간 API (0) | 2020.05.02 |