2021년 1월 29일 금요일

[RxJava] RxJava 프로그래밍

 ReactiveX Android 프로그래밍이 최근 유행이다.

예전방식 프로그래머  (나와 같은 늙은) 들은 항상 함수에 정의하고 , 반복루틴에 아이템들을 명시하고 그걸 처리하여 값을 돌려받는 순서대로 로직이 돌아가는게 정상이라 생각하고 살았는데, 이제는 컴퓨터가 너무 빨라져서 꼭 그렇게 순차적으로 하지 않아도 동시에 다른 일을 하면서도 처리할수 있도록 유연성있게 처리하는 방식이 리액티브이다.

Imperative 명령(강제적) 방식으로 프로그램한다면

for( i=0; i < [0,1,2,3,4]; i ++) {

   sum = i *2;

print sum

이런식일거다.

Declarative 선언식 방식은 

sum = numbers.map(function(n) {

return n*2})

처럼 내가하고자 하는 것만 명시하면 컴퓨터가 객체를 찾아서 반복해주는 것들은 알아서 해주도록 하는거다. 

명령형방식은 데이터를 어떻게 반복할것인지, 어떤 조건을 넣을것인지, 어떤형식으로 변환하여 처리할건지를 일일히 매번 짜야된다. 

반면 선언식은 저 뻔한 작업을 좀더 단순명료하게 처리할수 있도록 미리 선언된 함수에 집어 넣기만 하면 원하는 결과가 나오도록 하는 방식이다.

이를 가능케 하는 프로그램기법으로 일급함수, 람다 가 최근의 프로그램에서 거의 모두 지원하기 때문이다.

Rx 프로그래밍에서는 

  • Observer (관찰자, 내가 어떤 데이터를 넣는지 몇개인지 관찰하는 녀석, Hot Observable 과 Cold Observable가 있는데 뜨거운 것은 지금 바로 바로 구독자에게 전달하는 방식이고, 차가운 것은 구독자가 구독하는 시점이전의 데이터 까지 발행하는방식이다.)
  • subscribe (관찰자가 흘려보내준 데이터를 구독자는 구독하게 되고 데이터를 추가가공하거나 비지니스 로직을 전개한다. 구독자는 여러개일수 있다. 옵서버가 한번에 많은 데이터를 구독자에게 모두 준다고 해도 구독자는 선택적으로 데이터를 가질수 있다. 예를들어 날씨정보에서 기온구독자, 습도구독자,지역별날씨정보구독자 등 나뉠수 있다.)
  • schedule (구독자에게 순서대로 줄건지 모두 모아 줄건지 등을 결정한다)

이상의 3개의 기본개념으로 움직인다.

이들기본 개념에 operator 와 일급함수를 이용하여 데이터의처리조건과 비지니스로직을 적용한다. 

Observer는 Rx 에만 있는게 아니고 프로그래밍의 한방법(옵서버패턴)이다. 최근 유행인 binding (자바스크립트, 안드로이드 등에서 쓰이는) 기능도 옵서버패턴을 이용한 방식이다.

*옵서버패턴이기 때문에 주의할점은 구독을 시작한다음 구독해제를 해야 옵서버가 구독자를 계속쳐다보지 않게된다. 

기본 사용법은 다음과 같다.

Observable.just("kim", "park", "lee")
.filter(s -> s.equals("park"))
.subscribe(s -> result=s);
assertTrue(result.equals("park"));
//filter 로 park 만 골라냈으니 결과는 true

Observable이 할당된 각 요소를 잘 찾는지 확인하자.

Observable.just("kim", "park", "lee")
.doOnNext(s -> System.out.println("Who is the next ? " + s))
.filter(s -> s.equals("kim"))
.subscribe(s -> result=s);
assertTrue(result.equals("kim"));
Who is the next ? kim
Who is the next ? park
Who is the next ? lee

이대로는 그냥 for 문 으로 돌리는거랑 무슨 차이냐? 메인쓰레드는 자기일하게 냅두고, Observable 이 조용히 몰래 처리하도록 하자(사실 이게 Rx프로그래밍이다)

System.out.println("Thread:" + Thread.currentThread().getName());
Observable.just("kim", "park", "lee")
.doOnNext(s -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribeOn(Schedulers.io())
.filter(s -> s.equals("park"))
.subscribe(s -> result=s);
Thread.sleep(1000);
System.out.println(result);
assertTrue(result.equals("park"));
Thread:main
Thread:RxCachedThreadScheduler-1
Thread:RxCachedThreadScheduler-1
Thread:RxCachedThreadScheduler-1
park

Thread를 보면 프로그램은 main 에서 시작해서 Observable처리는RxCachedThreadScheduler-1 에서 실행된다. 즉, Rx루틴을 처리하는동안 main쓰레드는 놀고 있었다라는거다.

이대로는 그냥 for 문 으로 돌리는거랑 무슨 차이냐? 메인쓰레드는 자기일하게 냅두고, Observable 이 조용히 몰래 처리하도록 하자(사실 이게 Rx프로그래밍이다)
이렇게 스케쥴기능을 이용하여 별도의 쓰레드에서 처리하도록 할수 있는데 처리결과도 마친가지로 별도의 쓰레드에서 처리하도록 할수 있다.
System.out.println("Thread:" + Thread.currentThread().getName());
Observable.just("kim", "park", "lee")
.doOnNext(s -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.filter(s -> s.equals("park"))
.subscribe(s -> {
result=s;
System.out.println("Thread:" + Thread.currentThread().getName());
});
Thread.sleep(1000);
System.out.println(result);
assertTrue(result.equals("park"));
Thread:main
Thread:RxCachedThreadScheduler-1
Thread:RxCachedThreadScheduler-1
Thread:RxCachedThreadScheduler-1
Thread:RxComputationThreadPool-1 <----결과를 위한 쓰레드
park

doOnError, doOnComplete등으로 처리중 에러나 성공시 추가처리를 할수도 있다.
System.out.println("Thread:" + Thread.currentThread().getName());
Observable.just("kim", "park", "lee")
.doOnNext(s -> System.out.println("Thread:" + Thread.currentThread().getName()))
.doOnComplete(() -> assertTrue(result.equals("park")))
.doOnError(e -> System.out.println("Error:" + e.getMessage()))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.filter(s -> s.equals("park"))
.subscribe(s -> {
result=s;
System.out.println("Thread:" + Thread.currentThread().getName());
});

RxJava에서 데이터를 전달하기 위해서는 팩토리 함수를 이용하는데 버젼에 따라 다르다. rx2에서는 배열, 컬렉션 등이 구분되어있다.
팩토리함수함수
RxJava 1.x의 기본 팩토리 함수create(),just(),from()
RxJava 2.x의 기본 팩토리 함수fromArray(),fromlterable(),fromCallable(),fromFuture(),
fromPublisher()
기타 팩토리 함수interval(),range(),timer(),defer()
- fromCallable : 외부 인터페이스에 지정된 함수결과(Callable)형식을 옵서버가 받아서 구독자에게 전달한다.
Callable<String> callable = ()->{
            Thread.sleep(1000);
            return "Hello Callable";
};
Observable<String> source=Observable.fromCallable(callable);
source.subscribe(System.out::println);
- fromFuture : Java5 부터지원되는 Future (비동기) 기능을 이용하여 지정된 함수가 끝나서 Future의 get메소드를 통해  데이터가 전달되는 순간에  옵서버가 데이터를 구독자에게 전달하다.
        Future<String> future =Executors.newSingleThreadExecutor().submit(()->{
            Thread.sleep(1000);
            return "Hello Future";
        });
        Observable source =Observable.fromFuture(future);
        source.subscribe(System.out::println);

MethodFunctional Interface
doOnSubscribe()Action0
doOnUnsubscribe()Action0
doOnNext()Action1<T>
doOnCompleted()Action0
doOnError()Action1<T>
doOnTerminate()Action0
finallyDo()Action0
doOnEach()Action1<Notification<T>>
doOnRequest()Action1<Long>

이밖에

Filter : 데이터를 조건으로 구분해서 구독자에게 전달하기

Map : 데이터를 가공

Cast : 데이터의 형식을 바꿈

Buffer : 데이터를 모아서 줌

Merger : 여러개의 옵서버가 있을때, 구독자에게 하나로 합쳐서 줌

Zip : 지정된 개수만큼 옵서버들에게서 데이터를 받았을때만 구독자에게 전달해준다. 만일 두개의 옵서버로부터 데이터를 받아야 구독자에게 전달하는 걸 만든다면 이걸 사용한다. ( 두개의 버튼을 클릭시 반응하는 것 또는 두개의 url에서 응답이 있어야 데이터 처리를 하고 싶을떄)


* JAVA8 에서부터는 STREAM 이라는 것을 지원해서 배열 또는 컬렉션의 요소를 순차적으로 돌면서 필터링,정렬,병합등을 할수 있도록 지원해준다. 


귀찮다. 일단 여기를 참고.

  • http://reactivex.io/documentation/ko/operators/flatmap.html
  • RXJava 반응형 프로그래밍-1,2,3
    https://velog.io/@chan33344/03.-RXJava-%EB%B0%98%EC%9D%91%ED%98%95-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D-1
  • RxJava on Android
    https://www.slideshare.net/dcgraham7/rxjava-on-android
  • https://nittaku.tistory.com/category/%EC%95%88%EB%93%9C%EB%A1%9C%EC%9D%B4%EB%93%9C
  • https://jeongupark-study-house.tistory.com/39?category=820719

0 comments:

댓글 쓰기