Spring

[Spring] Reactive Programming 리액티브 프로그래밍 - Reactive Steams

별토끼. 2021. 2. 10. 12:02
반응형

Reactive Streams

토비의봄TV 6화 스프링 리액티브 프로그래밍 을 바탕으로 정리한 글입니다.

.

목표

Duality(쌍대성)과 Observer Pattern을 통해 ReactiveProgramming이 무엇인지 알아보자.

Duality

Iterable과 Observable은 Duality(쌍대성) 관계를 갖고있다. 아래 Iterable로 구현한 코드(Pull방식)와 Observable(Push방식)으로 구현한 코드를 비교하며 양립하는 Duality 관계를 알아보자.

Iterable

  • for loop를 사용할 수 있는 이유는 Iterable 때문이다.
  • Pull 방식 - it.next()로 value를 끌어오는 방식
Iterable<Integer> iter = Arrays.asList(1,2,3,4,5);
for (Integer i : iter) {
    System.out.println(i);
}

즉, for Each는 Iterable 때문임을 기억하고 넘어가자.

Iterable<Integer> iter = () ->
    new Iterator<>() {
        int i=0;
        final static int MAX = 10;

        public boolean hasNext() {
            return i < MAX;
        }

        public Integer next() {
            return ++i;
        }
    };

for (Integer i : iter) {
    System.out.println(i);
}

Observable

Observer 패턴

  • Observable(Publisher, 발행자) - 새로운 정보 발생 시, 이벤트(Data)를 제공. Observer에 notify한다.
  • Observer(Subscriber, 구독자, 관찰자) - notify를 받는 역할. Observable에 해당 Observer를 등록한다.
  • push 개념 - 주는 방식.
static class IntObservable extends Observable implements Runnable{

    @Override
    public void run() {
        for (int i=1; i<=10; i++){
            setChanged();
            notifyObservers(i);     // : push방식 <--> int i = it.next(); // : pull 방식
        }
    }
}

public static void main(String[] args) {
    Observer ob = new Observer() {
        @Override
        public void update(Observable o, Object arg) {
            System.out.println(Thread.currentThread().getName()+ " " + arg);
        }
    };

    IntObservable io = new IntObservable();
    io.addObserver(ob);

    //외부 스레드에서 수행되도록 처리
    ExecutorService es = Executors.newSingleThreadExecutor();
    es.execute(io);

    System.out.println(Thread.currentThread().getName() + " EXIT");
    es.shutdown();
}

Observer패턴의 당시 문제

널리 알려진 옵저버 패턴에서 몇가지 문제를 꼽자면 이러하다.

  1. Complete
  2. Error
    이 두가지 처리를 어떻게 해결할 수 있을까에 대한 해답으로 나온 것이 Reactive Programming의 시초이다.

Reactive Streams

위와 같은 문제를 어떻게 해결하면 좋을까? 라는 질문에서 Reactive Streams가 생겨났다.

Component

  • Publisher
    이벤트(데이터)를 제공하는 쪽.
    subscribe 메서드에서 onSubscribe를 반드시 호출해주어야 한다. (코드 참조)

  • Subscriber
    이벤트(데이터)를 받아서 처리하는 쪽.
    onSubscribe, onNext, onError, onComplete 메서드가 있다. (코드참조)
    onSubscribe는 항상 호출해야한다.

  • Subscription
    Subscriber가 이벤트(데이터) 처리 할 때 속도 조절을 할 수 있도록 하는 request, cancel 등의 메서드 구현부.
    Publisher가 Subscriber에게 이 오브젝트를 전달한다.

Reactive Streams의 흐름

pubsubandbackpressure
이미지 출처 : https://jongmin92.github.io/%5D

  1. subscriber의 구독 시작 (pub.subscribe(sub))
  2. onSubscribe를 통해 구독 시작
  3. Subscriber가 publisher로 Subscription을 통해 request, cancel method 전달
  4. Publisher가 Subscription을 통해 전달 속도 조절하며 요청 전달, 완료 및 에러 여부 전달

reactive-streams GITHUB 참고

    public static void main(String[] args) {
        //publisher - 내가 데이터 100만개 줄게!
        //subscriber - 나는 처리 못해 하나당 1초걸려
        //이런 경우 backpressure로 조절한다. -> 버퍼를 일정하게 유지하여 더욱 안정적이다.
        Iterable<Integer> itr = Arrays.asList(1,2,3,4,5);
        Publisher p = new Publisher() {
            @Override
            public void subscribe(Subscriber subscriber) {
                subscriber.onSubscribe(new Flow.Subscription() {
                    //subscriber의 부탁이 쓰여있는 역할, backpressure(역압, 처리량 등을 조절하도록 하는 메서드들이 있다)

                    Iterator<Integer> it = itr.iterator();
                    @Override
                    public void request(long n) {
                        while(n-- > 0){
                            if (it.hasNext()) {
                                subscriber.onNext(it.next());
                            }else{
                                subscriber.onComplete();
                                break;
                            }
                        }
                    }
                    @Override
                    public void cancel() {

                    }
                });
            }
        };
        Subscriber<Integer> s = new Subscriber<Integer>() {
            Flow.Subscription subscription;
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                //subscriber의 부탁이 들어있는 subscription을 바탕으로 pub에게 알림  부탁하는 메서드
                System.out.println("onSubscribe");
                this.subscription = subscription;
                subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                //다음걸 줬다는 알림 역할
                System.out.println("onNext "+item);
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                //Exception 발생 대신 Object 전달 역할 (try catch 필요없음)
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                //더이상 줄 데이터 없다는 알림
                System.out.println("onComplete");
            }
        };

        p.subscribe(s);
    }

Reactive Streams - Operator

Publisher - Subscriber 의 기본 구조

public class  PubSub {
    Publisher<Integer> pub = new Publisher<Integer>() {
        @Override
        public void subscribe(Subscriber<? super Integer> sub) {
            sub.onSubscribe(new Subscription() {
                @Override
                public void request(long l) {

                }

                @Override
                public void cancel() {

                }
            });
        }
    };
    Subscriber<Integer> sub = new Subscriber<Integer>() {
        @Override
        public void onSubscribe(Subscription subscription) {

        }

        @Override
        public void onNext(Integer i) {

        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {

        }
    };
}

Operator

mapping, filter, reducer 등 Stream Data 가공하는 작업 진행

  1. Map : Data1 -> function -> Data2
public class PubSub {

    public static void main(String[] args) {
        Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
        Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
        Publisher<Integer> map2Pub = mapPub(mapPub, s -> -s);
        map2Pub.subscribe(logSub());
    }

    private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
        return new Publisher<Integer>() {
            //map을 이용한 operator 기능 구현
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                pub.subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        sub.onSubscribe(s);
                    }

                    @Override
                    public void onNext(Integer i) {
                        sub.onNext(f.apply(i));
                    }

                    @Override
                    public void onError(Throwable t) {
                        sub.onError(t);
                    }

                    @Override
                    public void onComplete() {
                        sub.onComplete();
                    }
                });
            }
        };
    }

    private static Subscriber<Integer> logSub() {
        return new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe:");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer i) {
                log.debug("onNext:{}", i);
            }

            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}", t);
            }

            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        };
    }

    private static Publisher<Integer> iterPub(final List<Integer> iter) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(s -> sub.onNext(s));
                            sub.onComplete();
                        } catch (Throwable t) {
                            sub.onError(t);
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                });
            }
        };
    }
}
  1. sumPub - sumPub을 통해 값을 더해서 lobSub로 로그 출력
public class PubSub {

    public static void main(String[] args) {
        Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
        Publisher<Integer> sumPub = sumPub(pub);
        sumPub.subscribe(logSub());
    }

    private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                pub.subscribe(new DelegateSub(sub){
                    int sum = 0;

                    @Override
                    public void onNext(Integer i) {
                        sum+=i;
                    }

                    @Override
                    public void onComplete() {
                        sub.onNext(sum);
                        sub.onComplete();
                    }
                });
            }
        };
    }

    private static Subscriber<Integer> logSub() {
        return new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe:");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer i) {
                log.debug("onNext:{}", i);
            }

            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}", t);
            }

            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        };
    }
}
반응형