반응형

1. 서론

이번 포스팅에서는 Chapter17의 리액티브 프로그래밍에 대해 진행하도록 하겠습니다.

 

2. 리액티브 매니패스토

리액티브 매니패스토란 리액티브 어플리케이션과 시스템 개발의 핵심 원칙을 공식적으로 정의한 내용입니다.

 

  1. 반응성 : 리액티브 시스템은 빠를 뿐 아니라 더 중요한 특징으로 일정하고 예상할 수 있는 반응 시간을 제공합니다.
  2. 회복성 : 장애가 발생해도 시스템은 반응해야 합니다.
  3. 탄력성 : 어플리케이션의 생명주기 동안 다양한 작업 부하를 받게 되는데 이 다양한 작업 부하로 어플리케이션의 반응성이 위협받을 수 있습니다. 이를 대비해, 자동으로 할당 된 자원 수를 늘립니다.
  4. 메시지 주도 : 회복성과 탄력성을 위해 메시지 기반의 통신으로 이루어 지도록 합니다.

아래는 위 4가지가 어떤 관계로 얽혀있는지 보여줍니다.

 

 

3. 리액티브 스트림과 플로 API

리액티브 프로그래밍은 리액티브 스트림을 사용하는 프로그래밍입니다.

 

리액티브 스트림은 무한의 비동기 데이터를 순서대로 블록하지 않은 역압력을 전제하여 처리하는 기술입니다.

스트림의 비동기가 기반이기 때문에 역압력 기술은 필수입니다.

 

역압력이란, 데이터 소모하는 쪽에서 데이터 발행자에게 요청한 경우에만 처리하도록 하는 기법입니다.

 

 

1) Flow 클래스 소개

 

자바 9 에서는 java.util.concurrent.Flow를 추가했습니다.

 

Flow 클래스는 내부적으로 4개의 인터페이스를 가지고 있습니다.

 

  • Publisher
  • Subscriber
  • Subscription
  • Processor

아래는 각 4개의 인터페이스 정의입니다.

 

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}

public static interface Subscription {
    public void request(long n);
    public void cancel();
}

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {

}

 

여기서 Processor는 리액티브 스트림에서 처리하는 이벤트의 변환을 담당합니다.

 

 

아래는 이 4개의 인터페이스의 동작 원칙입니다.

 

  • Publisher는 반드시 Subscription의 request 메서드에 정의된 개수 이하의 요소만 Subscriber에 전달해야 합니다.
  • Subscriber 는 요소를 받아 처리할 수 있음을 Publisher에게 알려야 합니다.
  • Publisher 와 Subscriber 는 Subscription을 공유해야 합니다.

 

 

1) 첫번째 리액티브 어플리케이션 만들기

 

아래는 매초 온도를 보고하는 예제입니다.

 

public class TempInfo {

    public static final Random random = new Random();

    private final String town;
    private final int temp;

    public TempInfo(String town, int temp) {
        this.town = town;
        this.temp = temp;
    }
    
    public static TempInfo fetch(String town) {
        if(random.nextInt(10) == 0) {
            throw new RuntimeException("ERROR!!");
        }
        return new TempInfo(town, random.nextInt(100));
    }
}

 

public class TempSubscriber implements Flow.Subscriber<TempInfo> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(TempInfo item) {
        System.out.println(item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println(throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Done!");
    }
}

 

public class TempSubscription implements Flow.Subscription {

    private final Flow.Subscriber<? super TempInfo> subscriber;
    private final String town;

    public TempSubscription(Flow.Subscriber<? super TempInfo> subscriber, String town) {
        this.subscriber = subscriber;
        this.town = town;
    }

    @Override
    public void request(long n) {
        for(long i = 0L; i < n; i++) {
            try {
                subscriber.onNext(TempInfo.fetch(town));
            } catch (Exception e) {
                subscriber.onError(e);
                break;
            }
        }
    }

    @Override
    public void cancel() {
        subscriber.onComplete();
    }
}

 

public class Main {

    public static void main(String[] args) {
        getTemperatures("New York").subscribe(new TempSubscriber());
    }

    private static Flow.Publisher<TempInfo> getTemperatures(String town) {
        return subscriber -> subscriber.onSubscribe(
                new TempSubscription(subscriber, town)
        );
    }
}

 

 

위 예제에서는 Subscription의 request를 호출하면, Subscriber가 Subscription을 또 호출하는 재귀 문제가 있습니다.

이를 해결하기 위해서는 별도 Executor를 사용할 수 있습니다.

 

Executor 소스를 추가한 코드는 아래와 같습니다.

 

public class TempSubscription implements Flow.Subscription {
    
    private static final ExecutorService executor = Executors.newSingleThreadExecutor(); 
    private final Flow.Subscriber<? super TempInfo> subscriber;
    private final String town;

    public TempSubscription(Flow.Subscriber<? super TempInfo> subscriber, String town) {
        this.subscriber = subscriber;
        this.town = town;
    }

    @Override
    public void request(long n) {
        executor.submit(() -> {
            for(long i = 0L; i < n; i++) {
                try {
                    subscriber.onNext(TempInfo.fetch(town));
                } catch (Exception e) {
                    subscriber.onError(e);
                    break;
                }
            }    
        });
    }

    @Override
    public void cancel() {
        subscriber.onComplete();
    }
}

 

 

 

2) Processor로 데이터 변환하기

 

Processor의 목적은 Publisher를 구독한 다음 수신한 데이터를 가공해 다시 제공하는 것입니다.

 

아래는 위 예제에 Processor를 적용한 코드입니다.

 

public class TempSubscription implements Flow.Subscription {

    private static final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final Flow.Processor<? super TempInfo, ? super TempInfo> processor;
    private final String town;

    public TempSubscription(Flow.Processor<? super TempInfo, ? super TempInfo> processor, String town) {
        this.processor = processor;
        this.town = town;
    }

    @Override
    public void request(long n) {
        executor.submit(() -> {
            for(long i = 0L; i < n; i++) {
                try {
                    processor.onNext(TempInfo.fetch(town));
                } catch (Exception e) {
                    processor.onError(e);
                    break;
                }
            }
        });
    }

    @Override
    public void cancel() {
        processor.onComplete();
    }
}

 

public class TempProcessor implements Flow.Processor<TempInfo, TempInfo> {

    private Flow.Subscriber<? super TempInfo> subscriber;

    @Override
    public void subscribe(Flow.Subscriber<? super TempInfo> subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscriber.onSubscribe(subscription);
    }

    @Override
    public void onNext(TempInfo item) {
        subscriber.onNext(
                new TempInfo(
                        item.getTown(),
                        (item.getTemp() -32) * 5 /9
                )
        );

    }

    @Override
    public void onError(Throwable throwable) {
        subscriber.onError(throwable);
    }

    @Override
    public void onComplete() {
        subscriber.onComplete();
    }
}

 

public class Main {

    public static void main(String[] args) {
        getTemperatures("New York").subscribe(new TempSubscriber());
    }

    private static Flow.Publisher<TempInfo> getTemperatures(String town) {
        return subscriber -> {
            TempProcessor processor = new TempProcessor();
            processor.subscribe(subscriber);
            processor.onSubscribe(new TempSubscription(processor, town));
        };
    }
}

 

Processor를 이용해 onNext 메서드에서 화씨 온도를 섭씨 온도로 변경하여 Subscriber 에게 전달하는 것을 볼 수 있습니다.

 

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

4. 리액티브 라이브러리 RxJava

RxJava는 자바로 리액티브 어플리케이션을 구현하는데 사용하는 라이브러리입니다.

compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.19'

 

RxJava는 io.reactivex 패키지하위의 Observable, Observer 를 사용할 수 있습니다.

 

간단히 자바 플로에서 Publisher 가 RxJava에서는 Observable, Subscriber가 Observer라고 보시면 됩니다.

 

 

1) Observable 만들고 사용하기

 

아래는 간단한 Observable 을 만드는 예제 코드입니다.

 

Observable<String> strings = Observable.just("fisrt", "second");

 

just 팩토리 메서드는 한개 이상의 요소를 이용해 방출하는 Observable 을 만듭니다.

 

Observable 구독자는 onNext("first"), onNext("second"), onComplete() 순으로 메시지를 받게 됩니다.

 

특정 시간단위로 상호작용할때는 아래와 같이 interval 팩토리 메서드를 사용할 수 있습니다.

 

Observable<Long> onePerSec = Observable.interval(1, TimeUnit.SECONDS);

 

추가로 RxJava는 역압력을 지원하지 않아, Observable에는 Subscription의 request 메서드 같은 기능이 없습니다.

 

Observer 인터페이스는 위의 Subscriber와 정의도 비슷합니다.

 

아래는 Observer 인터페이스 정의입니다.

 

public interface Observer<T> {
    void onSubscriber(Disposable d);
    void onNext(T t);
    void onCompleted();
    void onError(Throwable e);
}

 

RxJava는 위의 메서드를 모두 구현할 필요가 없고, onNext 만 구현해도 괜찮습니다.

 

예제는 아래와 같습니다.

 

onePerSec.subscribe(i -> System.out.println(TempInfo.fetch("New York")));

 

 

이 예제는 초마다 뉴욕의 온도를 출력하는 예제입니다.

 

하지만 실제로 동작시키면 아무런 출력이 되지 않습니다.

이유는, Observable이 RxJava의 연산 쓰레드 풀 즉 데몬 쓰레드에서 실행되기 때문입니다.

 

RxJava는 호출 쓰레드에서 값을 받는 메서드도 제공합니다.

 

예제는 아래와 같습니다.

 

onePerSec.blockingSubscribe(i -> System.out.println(TempInfo.fetch("New York")));

 

이제 위 예제에 RxJava를 적용해보겠습니다.

 

private static Observable<TempInfo> getTemperatures(String town) {
    return Observable.create(emitter -> {
        Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(i -> {
                    if (!emitter.isDisposed()) {
                        if (i >= 5) {
                            emitter.onComplete();
                        } else {
                            try {
                                emitter.onNext(TempInfo.fetch(town));
                            } catch (Exception e) {
                                emitter.onError(e);
                            }

                        }
                    }
                });
    });
}

 

public class TempObserver implements Observer<TempInfo> {
    
    @Override
    public void onCompleted() {
        System.out.println("Done!");    
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Got problem : " + e.getMessage());

    }

    @Override
    public void onNext(TempInfo tempInfo) {
        System.out.println(tempInfo);
    }
}

 

public static void main(String[] args) {
    Observable<TempInfo> observable = getTemperatures("New York");
    observable.blockingSubscribe(new TempObserver());
}

 

 

2) Observable 변환하고 합치기

 

Observable에는 스트림과 비슷한 메서드들을 제공합니다.

 

  • map
  • filter
  • merge

 

map

 

map은 스트림의 map 과 같이 요소를 변환하는 메서드입니다.

자바 플로의 Processor라고 보시면 됩니다.

 

아래는 섭씨로 변환하는 작업을 Observable의 map을 사용하여 만드는 예제입니다.

 

private static Observable<TempInfo> getCelsiusTemperature(String town) {
    return getTemperatures(town).map(item -> new TempInfo(
            item.getTown(),
            (item.getTemp() -32) * 5 /9
    ));
}

 

filter

 

filter도 스트림과 동일하게 특정 조건의 데이터만을 추출하는 용도의 메서드입니다.

 

아래는 filter 예제 코드입니다.

 

private static Observable<TempInfo> getNegativeTemperature(String town) {
    return getCelsiusTemperature(town).filter(temp -> temp.getTemp() < 0);
}

 

merge

 

merge는 여러 Observable를 하나의 Observable로 만드는 메서드입니다.

 

아래는 merge 예제 코드입니다.

 

private static Observable<TempInfo> getCelsiusTemperatures(String... towns) {
    return Observable.merge(
            Arrays.asList(towns).stream()
            .map(Main::getCelsiusTemperature)
            .collect(Collectors.toList())
    );
}

 

public static void main(String[] args) {
    Observable<TempInfo> observable = getCelsiusTemperatures("New York", "Chicago", "San Francisco");
    observable.blockingSubscribe(new TempObserver());
}

 

merge 메서드는 Observable의 Iterator을 인수로 받아 한개의 Observable처럼 동작합니다.

 

5. 마무리

이번 포스팅에서는 Chapter17 리액티브 프로그래밍 대해 진행하였습니다.

이렇게, 모던 자바 인 액션에 대한 포스팅은 완료했습니다.

반응형

+ Recent posts