Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.
Reactive Streams는 논블로킹 백프레셔를 이용해 비동기 스트림 처리를 위한 표준을 제공한다. (리액티브 프로그래밍에 대한 인터페이스만을 제공)
Reactive Streams 인터페이스
Reactive Streams의 주요 인터페이스는 다음과 같다.
1) Publisher
데이터를 생성하고, Subscriber에게 데이터를 발행한다.
subscribe
- subscribe메서드에 인자로 Subcriber를 전달하여 등록한다.
- Publisher는 Subscriber에게 순차적으로 데이터를 전달한다.
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
2) Subscriber
구독한 Publisher로 부터 데이터를 전달받아 처리한다.
onSubscribe
- Publisher.subscribe()를 호출한 뒤에 호출된다.
- Subscription.reqeust()가 호출되기 전까지는 데이터가 흐르지 않는다.
- Publisher는 Subscription.request() 요청에 대한 응답으로 데이터를 전달한다.
OnNext
- Publisher로 부터 새로운 데이터가 도착할 때마다 호출된다.
- 전달 받은 데이터에 대한 처리를 수행한다.
OnError
- 오류가 발생했을 때 호출된다.
- 오류가 발생하면 스트림은 종료되고, Subscriber는 더이상 데이터를 전달받지 않는다.
onComplete
- 오류 없이 모든 데이터가 전달되었을 때 호출되고, 스트림이 종료된다.
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
3) Subscription
Publisher에게 전달받은 데이터 개수를 요청하고, 구독을 취소한다.
request
- Publisher에게 전달받을 데이터의 개수를 요청한다.
cancle
- Subscription이 종료되고, Publisher는 더이상 Subscriber에게 데이터를 전달하지 않는다.
public interface Subscription {
public void request(long n);
public void cancel();
}
4) Processor
Subscriber와 Publisher의 역할을 동시에 수행한다.
데이터를 받고(Subscriber) 처리하여 다른 Subscriber에게 전달하는(Publisher) 중개자 역할을 한다.
즉, Subscriber로서 다른 Publisher를 구독할 수 있고, Publisher로서 다른 Subscriber에게 데이터를 발행할 수 있다.
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Publisher와 Subscriber 동작 과정
백 프레셔(Back-Pressure)
백 프레셔는 소비자가 공급자에게 전달 받을 데이터 개수를 요청하는 것이다.
Reactive Streams에서는 Subscription.request()를 통해 전달 받을 데이터 개수를 요청하고 있다.
왜 백 프레셔가 필요할까?
여기 요리사(Publisher)와 웨이터(Subscriber)가 있다.
웨이터는 요리사에게 음식을 전달 받고, 전달 받은 음식을 고객에게 서빙한다.
웨이터가 1분에 2번의 서빙을 할 수 있으며 요리사는 1분에 8개의 음식을 만든다.
만약 백 프레셔가 없는 상황이라면, 시간이 지날수록 서빙되지 않는 음식이 많아지고 음식은 식어갈 것이다.
하지만, 웨이터가 자신이 처리할 수 있는 수량 만큼만 요리사에게 요청한다면(백 프레셔) 갓 만든 요리를 서빙할 수 있고, 웨이터도 힘들지 않을 것이다.
이처럼 Publisher가 데이터를 전달하는 속도가 Subscriber가 데이터를 처리하는 속도보다 빠르다면, 이 데이터가 쌓이게 되어 시스템 부하가 커질 수 있다.
결과적으로, 백 프레셔를 사용하면 다음과 같은 이점이 있다.
1) 과부화 방지
2) 시스템 전체 효율 향상
3) 리소스 효율적 활용
구현
public class ReactiveStreamsTest {
public static void main(String[] args) {
MyPublisher publisher = new MyPublisher();
Subscriber<Integer> subscriber = new MySubscriber();
publisher.subscribe(subscriber);
}
public static class MyPublisher implements Publisher<Integer> {
private Queue<Integer> queue = new LinkedList<>();
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
System.out.println("subscribe");
IntStream.range(0,10).forEach(queue::add);
MySubscription subscription = new MySubscription(subscriber, queue);
subscriber.onSubscribe(subscription);
}
}
public static class MySubscriber implements Subscriber<Integer> {
private Subscription subscription;
private final int SIZE = 3;
private int receivedItem = 0;
@Override
public void onSubscribe(Subscription s) {
System.out.println("onSubscribe");
subscription = s;
subscription.request(SIZE);
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext: " + integer);
receivedItem++;
if (receivedItem % SIZE == 0) {
subscription.request(SIZE);
}
}
@Override
public void onError(Throwable t) {
System.out.println("onError:" + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("Stream Complete");
}
}
public static class MySubscription implements Subscription {
private Subscriber<? super Integer> subscriber;
private Queue<Integer> queue;
public MySubscription(Subscriber<? super Integer> subscriber, Queue<Integer> queue) {
this.subscriber = subscriber;
this.queue = queue;
}
@Override
public void request(long n) {
System.out.println("=request:" + n + "=");
for (int i = 0; i < n; i++) {
if (queue.isEmpty()) {
subscriber.onComplete();
return;
}
subscriber.onNext(queue.poll());
}
}
@Override
public void cancel() {
System.out.println("==cancel==");
}
}
}
'Spring' 카테고리의 다른 글
[Spring] Spring Boot3.x Docker Compose로 ElasticSearch 8.x+Kibana 구성 (Local) (0) | 2024.08.11 |
---|---|
[Spring] Spring Boot 프로젝트 이름 변경하기 (0) | 2024.08.08 |
[Spring] 의존성 주입(DI: Dependency Injection) (0) | 2023.07.22 |
[Spring] Junit5 테스트 No tests found for given includes 오류 해결 (0) | 2023.06.21 |
[Spring] 아임포트 사용한 결제 구현 + JavaScript/React 코드 (0) | 2023.06.20 |