2021년 1월 31일 일요일

[Android] Thread, Callable, Executor(framework) 쓰레드 다루기

최근 coroutine , rx 로 인해 쓰레드 체 관한 처리가 조금 쉬워지긴했다.

그래도 옛날 소스들을 보면 java thread, Executor 또는 AsyncTask를 이용해서 만들어진게 많다.

이중 주로 외부랑 httpd 통신할때 쓰면 Executor 에 대해 알아보자.

그전에 어차피 나올용어로서 Callable 과 Runnable을 알아보자

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    static class MyCallable implements Callable<String> {

        @Override
        public String call() throws Exception {
            return "hehe";
        }
    }
    /* 사용 


        ExecutorService mPool = Executors.newFixedThreadPool(5);

        mPool.execute(new MyRunnable());
    */


    static class MyRunnable implements Runnable {

        @Override
        public void run(){
        }
    }
    /* 사용 


        Future<String> mFuture = mPool.submit(new MyCallable());
        try {
            // 미래(future)에서 데이터가 오기까지 여기는 런타임 블럭처리된다.
            Log.e("from Future", mFuture.get());
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    */

  • Callable : 보면 알겠지만  리턴값을 지정할수 있고 call() 로 실행한다. 또 에러처리를 할수 있어서 쓰레드실행시에 문제가 생겼을때 사용자가 적절한 처리를 할수있다.
  • Runnale : run으로 실행한다.
Executor Framework는 자바 1.5 부터 제공해주는 것으로 기존의 단순 Thread 를 프레임워크화 해서 좀덛 쓰레드를 유연하게 사용하도록 해 준다.
대표 기능으로 thread pool, 생명주기 관리, Task 관리, 병렬처리(?), 비동기(Future)처리지원 등이다.

아래는 Future를 사용한 비동기 처리 예제이다.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
   static class MyCallable implements Callable<String> {
        CountDownLatch howMuchCounting;
        String s = "";

        @Override
        public String call() throws Exception {
            howMuchCounting.countDown();
            return "http://ohmy.god?" + s;
        }

        public void setS(String argS) {
            s = argS;
        }

        public void setCountDowner(CountDownLatch argHowMuchCounting) {
            howMuchCounting = argHowMuchCounting;
        }
    }


    private void runExe() {
        CountDownLatch howMuchCounting = new CountDownLatch(3);
        
        ExecutorService mPool = Executors.newFixedThreadPool(5);


        List<String> yourInputDatas = Arrays.asList("page=1", "page=2", "page=3");
        List<Future<String>> futures = new ArrayList<Future<String>>();
        for (final String dataItem : yourInputDatas) {
            MyCallable myCallable = new MyCallable();
            myCallable.setS(dataItem);
            myCallable.setCountDowner(howMuchCounting);
            futures.add(mPool.submit(myCallable));
        }

        // 바로 아래의 await로 쓰레드가 모두 실행될때까지 기다린다.
        try {
            howMuchCounting.await();
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        // 모두 끝났으면 셧다운
        mPool.shutdown();


        List<String> results = new ArrayList<String>();
        for (Future<String> future : futures) {
            // future에 있는 데이터는 그래도 사용 하기 번거로우니까 따로 담는다.
            try {
                results.add(future.get());
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        for (String s : results) {
            Log.i("fure", s);
        }

    }
    /*
    fure: http://ohmy.god?page=1
    fure: http://ohmy.god?page=2
    fure: http://ohmy.god?page=3
    */

CountDownLatch 클래스는 동시성관련 유틸 클래스로서 초기에 몇개의 쓰레드가 동시에 이루질수 있는지 지정하고, 쓰레드의 각 처리가 끝나는 순간 하나씩 countDown하여 0 이 될때까지 await 함수를 통해 비동기작업이 끝나기를 기다리게 해준다.
이를 이용하여 모든 쓰레드를 강제로 닫는 shutdown 을 사용해도 문제가 없도록 구현할수 있게 해준다.

2021년 1월 30일 토요일

[Android] WorkManager 백그라운드 태스크 다루기

  Android 에서 백그라운드 서비스, 브로드캐스트등을을 하기위해서 Thread, Async, JobScheduler,알람매니져등이 있지만  최근에는 WorkManager를 이용하기를 권장한다.

WorkManager는 주로 백단 에서 작업해야하는 것들에 대해 사용하는데, 앱이 종료되거나 다시시작되더  WorkManager가 작업을 다시 시작해주기 때문에 안정적인 서비스 개시가 가능하도록 해준다.

그렇다고 모든 서비스, 쓰레드에 WorkManager를 쓰라는건 아니고 ,  즉각실행해야 될거는 coroutines(또는 rxjava등) , 정확한 시간에 가동되야 하는거는 AlarmManager,  기기가 다시 시작되어도 실행되어야 할 백그라운드 작업등은 WorkManager를 쓰라는 얘기다.

WorkerManager 는 API레벨과 앱상태같은 요건에 근거해서 내부적으로 적절한 방법으로 백그라운작업을 어떻게 할지 자동으로 선택하다. 

https://developer.android.com/guide/background

WorkerManger에는 다음과 같은 개념이 있다.

  • Worker : Abstract클래스로서 이 클래스를 확장받은 일할놈(?)이 어떤일을 할지를 구체적으로 기술한다. 자신의 일이 잘 끝났다 못끝냈다는 Success, Failure, Retry 3개의 값을 반환함으로서 workmanager가 전반적인  work를 관리할수 있도록 한다.
  • WorkRequest : 작업할놈이 결정되었다면 이 클래스를 통해서 작업요청을 해야하는데, OneTimeWorkrequest, PeriodicWorkRequest가 있다.
  •  WorkerManager : WorkRequest를 받은 Worker가 를 큐에 넣거나 빼는 등의 전체 work 에 대한 작업상태를 모니터링 한다.


1. 앱이 백그라운드에서 log 를 출력하기.

https://github.com/sugoigroup/android_workmanager_example/commit/d9662cc3e3fa38c01079a7bac25003b5b42fd46f

1
2
3
4
5
6
7
dependencies {
...

    def work_version = "2.5.0"
    implementation "androidx.work:work-runtime:$work_version"

}
build.gradle (Module:... .app)에 workerManager 의존성추가
 
2. 샘플워커 클래스를 만들고,  workRequest->workerManager에 등록

https://github.com/sugoigroup/android_workmanager_example/commit/d9662cc3e3fa38c01079a7bac25003b5b42fd46f
---UploadWorker.java (일할놈)를 만들어서 등록
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import android.util.Log;

import androidx.annotation.NonNull;
import androidx.work.Worker;
import androidx.work.WorkerParameters;

public class UpladWorker extends Worker {
    private int count = 0;

    public UpladWorker(@NonNull Context context, @NonNull WorkerParameters workerParams) {
        super(context, workerParams);
    }

    @NonNull
    @Override
    public Result doWork() {

        while (count < 10) {
            count++;
            try {
                Log.i("worker", "now background" + count);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        return Result.success();

    }
}

---workerRequest -> workerManager에 worker
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        // 충전중일때만 worker 가 실행되도록 제한을 건다.
        Constraints constraints = new Constraints.Builder()
                .setRequiresCharging(true)
                .build();

        // 이번 한번만 일하도록 한다.
        OneTimeWorkRequest oneTimeWorkRequest =  new OneTimeWorkRequest.Builder(UpladWorker.class)
                .setConstraints(constraints)
                .build();

        // 매니져에게 일할거라고 등록한다.
        WorkManager.getInstance(this).enqueue(oneTimeWorkRequest);
    }
I/worker: now background1
I/worker: now background2
I/worker: now background3
I/worker: now background4 --> 이 시점에서 Home 버튼을눌러 앱을 보이지않는 백그라운드상태로 전환했다.
I/worker: now background5 --> 백그라운드상태이지만 워커는 계속움직여 로그가 찍힌다.
I/worker: now background6
I/worker: now background7
I/worker: now background8
I/worker: now background9
I/worker: now background10 


2. 백그라운드 상테에서 알림창으로 통지가 오는지 확인해보자.

https://github.com/sugoigroup/android_workmanager_example/commit/3dbb0496cf0427cbdb6c710987f8efb3a1fa1d4e
--worker (일할놈)이 로그찍는일에서 통지하는일로 역활을 바꾸어 보자.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class UpladWorker extends Worker {
    private int count = 0;

    public UpladWorker(@NonNull Context context, @NonNull WorkerParameters workerParams) {
        super(context, workerParams);
    }

    @NonNull
    @Override
    public Result doWork() {

        while (count < 10) {
            count++;
            try {
               // Log.i("worker", "now background" + count);
                showNotification("worker", "now background" + count);
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        return Result.success();

    }

    private void showNotification(String task, String desc) {
        NotificationManager manager = (NotificationManager) getApplicationContext().getSystemService(
                Context.NOTIFICATION_SERVICE);
        String channelId = "my_channel";
        String channelName = "my_name";
        // Oreo 부터는 노티에 알림하려면 채널이 있어야 한다.
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
            NotificationChannel channel = new
                    NotificationChannel(channelId, channelName, NotificationManager.IMPORTANCE_DEFAULT);
            manager.createNotificationChannel(channel);
        }
        NotificationCompat.Builder builder =
                new NotificationCompat.Builder(getApplicationContext(), channelId)
                        .setContentTitle(task)
                        .setContentText(desc)
                        .setSmallIcon(R.mipmap.ic_launcher);
        manager.notify(1, builder.build());
    }
}
핸폰 알림창에 3초마다 알림이 갱신된다.. 

3. workManager에게 특정태그로 요청된 workRequest를 통해 중지명령을 내려서 worker가 명령이후에는 작업하지않도록 해보자.
일단 간단하게 hello textview 를 클릭하면 클릭한 시점부터 worker가 일을 안하도록 하자.

https://github.com/sugoigroup/android_workmanager_example/commit/b44eaa1059d44fa718d089053c63f2ad81005fe6

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
-------------
UploadWorker.java

    public Result doWork() {

        while (count < 10) {
            if (isStopped()) {
                continue;
            }
            count++;
            try {
               // Log.i("worker", "now background" + count);
                showNotification("worker", "now background" + count);
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        return Result.success();

    }
-------------------
MainActivity.java


        // 충전중일때만 worker 가 실행되도록 제한을 건다.
        Constraints constraints = new Constraints.Builder()
                .setRequiresCharging(true)
                .build();

        // 이번 한번만 일하도록 한다.
        OneTimeWorkRequest oneTimeWorkRequest =  new OneTimeWorkRequest.Builder(UpladWorker.class)
                .setConstraints(constraints)
                .addTag(CANCEL_ME)
                .build();

        // 매니져에게 일할거라고 등록한다.
        WorkManager.getInstance(this).enqueue(oneTimeWorkRequest);

Hello world라는 텍스트뷰를 클릭하면 알림창이 더이상 안온다.. 

4. WorkerManager는 각 workRequest에 대해 모두 기록하고 중간에 멈춰질때 다시 복귀하도록 관리하고 있다. 따라서 이번 예제에서 setRequiresCharging(work의 일하는 제약을 충전중일경우에만) 이라고 한정지었기 때문에 충전중이 아닌상태로 앱을 시작했다가 , 충전중상태로 다시 앱을 재가동하면 충전중이 아닐때의 work 까지 총 두개의 work 가 일하게 된다. 
때문에 이를 막기 위해서 beginUniqueWork라는 함수로 workManager를 가동하면 KEEP, REPLACE, APPEND 라는 3개의 옵션을 통해 같은 이름을 가진 작업 queue가 있을때, 나중에 들어온 같은 이름의 작업을 무시할건지, 중간에 교체할건지, 순서대로 진행할건지를 결정할수 있도록 해준다

https://github.com/sugoigroup/android_workmanager_example/commit/4bd3b10667b4f09879a317f124d68066b44c1f3e

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    protected void onCreate(Bundle savedInstanceState) {

        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        TextView tv = (TextView) findViewById(R.id.hello);
        tv.setOnClickListener(e -> {
            startQueue();
        });
        startQueue();
    }

    private void startQueue() {

        // 충전중일때만 worker 가 실행되도록 제한을 건다.
        Constraints constraints = new Constraints.Builder()
                .setRequiresCharging(true)
                .build();

        // 이번 한번만 일하도록 한다.
        final OneTimeWorkRequest  oneTimeWorkRequest =  new OneTimeWorkRequest.Builder(UpladWorker.class)
                .setConstraints(constraints)
                .addTag(CANCEL_ME)
                .build();

        // 매니져에게 일할거라고 등록한다.
        //
        WorkManager.getInstance(this)
                .beginUniqueWork("iamUnique", ExistingWorkPolicy.APPEND_OR_REPLACE, oneTimeWorkRequest)
                .enqueue();
    }

같은 WorkRequest를 A ,실행중에 B 실행 에  대해 

KEEP : B는 무시된다.(단 A가 끝난상태면 B는 진행한다.)
REPLACE : B로 새롭게 시작한다.(A가 어디까지 진행된지 상관없다)
APPEND : A가끝날때 까지 기다렸다가 B를 진행한다.
APPEND OR REPLACE : A가진행중이면 B를 뒤에 추가하고 시작하되, A가 실패로 끝난상태거나 취소가 이루어졌다면 B로 새롭게 시작한다.

hello world text 뷰를 클릭하면서 위의 네가지를 바꾸어 보면 알림창에 해당 옵션에 따라서 적절하게 queue가 진행될거다.   

5. 서로 다른 workRequest를 순차적으로 일시키자.

https://github.com/sugoigroup/android_workmanager_example/commit/255afd504af3c17417a39f4f00801e408f896c26
 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
------
LogWorker.java
public class LogWorker  extends Worker {

    public LogWorker(@NonNull Context context, @NonNull WorkerParameters workerParams) {
        super(context, workerParams);
    }

    @NonNull
    @Override
    public Result doWork() {
         Log.i("worker", "Wow All Done!");
        return Result.success();
    }
}

-------
MainActivity.java


        // 이번 한번만 일하도록 한다.
        final OneTimeWorkRequest  oneTimeWorkRequest =  new OneTimeWorkRequest.Builder(UpladWorker.class)
                .setConstraints(constraints)
                .addTag(CANCEL_ME)
                .build();

        // 이번 한번만 일하도록 한다.
        final OneTimeWorkRequest  logWorkerRequest =  new OneTimeWorkRequest.Builder(LogWorker.class)
                .setConstraints(constraints)
                .addTag(CANCEL_ME)
                .build();

        // 매니져에게 일할거라고 등록한다.
        // then 으로 순차적인 workRequest를 실행할수 있다.
        WorkManager.getInstance(this)
                .beginUniqueWork("iamUnique", ExistingWorkPolicy.APPEND_OR_REPLACE, oneTimeWorkRequest)
                .then(logWorkerRequest)
                .enqueue();
    }



6. workRequest(일요청)에 자료를 넣고(input), 일이 끝나면 처리한 결과(output)을 받아서 textview에 뿌려보자.
백그라운드 쓰레드에서 작동하는 workManager 에 데이터를 전달하고 이를 다시 받아서 UI쓰레드에서 ui를 변경하는것은 참 귀찮은 작업인데, 화면변경은 livedata 를 통해서 전달 할수 있어서 간편하다. 이밖에도 Future 인터페이스를 통해 해당 workRequest 작업이 끝나는 시점에만 특정한 로직을 실행한다든지 할수도있다.

https://github.com/sugoigroup/android_workmanager_example/commit/b97b03e7743c6d86001a7df8a821a80bd115325f



 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
------
LogWorker.java
public class LogWorker  extends Worker {

    public LogWorker(@NonNull Context context, @NonNull WorkerParameters workerParams) {
        super(context, workerParams);
    }

    @NonNull
    @Override
    public Result doWork() {

        String detector = getInputData().getString(KEY_DETECTOR);

        Log.i("worker",   detector + " detector is founding a key!");

        //내보낼 값을 정하자
        Data outputData = new Data.Builder()
            .putString(FOUND_OUT_KEY, "elzzup")
            .build();
        return Result.success(outputData);
}

-------
MainActivity.java


        private void startQueue() {

        // 충전중일때만 worker 가 실행되도록 제한을 건다.
        Constraints constraints = new Constraints.Builder()
                .setRequiresCharging(true)
                .build();

        // 이번 한번만 일하도록 한다.
        final OneTimeWorkRequest  oneTimeWorkRequest =  new OneTimeWorkRequest.Builder(UpladWorker.class)
                .setConstraints(constraints)
                .addTag(CANCEL_ME)
                .build();

        //보낼값을 정하자.
        Data whoIsTheDetector = new Data.Builder().putString(KEY_DETECTOR, "kim").build();

        // 이번 한번만 일하도록 한다.
        final OneTimeWorkRequest  logWorkerRequest =  new OneTimeWorkRequest.Builder(LogWorker.class)
                .setConstraints(constraints)
                .setInputData(whoIsTheDetector)
                .addTag(CANCEL_ME)
                .build();

        // 매니져에게 일할거라고 등록한다.
        // then 으로 순차적인 workRequest를 실행할수 있다.
        WorkManager.getInstance(this)
                .beginUniqueWork("iamUnique", ExistingWorkPolicy.APPEND_OR_REPLACE, oneTimeWorkRequest)
                .then(logWorkerRequest)
                .enqueue();

        // 결과를
        foundKeyByWorkerGuy(logWorkerRequest.getId());
    }

    private void foundKeyByWorkerGuy(UUID workerUUID) {
        LiveData<WorkInfo> lf = WorkManager.getInstance(this).getWorkInfoByIdLiveData(workerUUID);

        lf.observe(this, workInfo -> {
            if (workInfo.getOutputData().getString(FOUND_OUT_KEY) != null) {
                tv.setText("The Key is  " + workInfo.getOutputData().getString(FOUND_OUT_KEY));
            } else {
                tv.setText("Finding the key");
            }
        });
    }



workManager 는 백그라운드 서비스용도로 사용되기 위한 기능이므로써 쓰레드비슷한 형식으로 자겁해서 UI에 변화를 줘야되는 즉, 화면이 떠있는 상태로 foreground작업에는 쓰레드나 coroutine을 사용하는게 맞다.

[RxAndroid] RxAndroid 사용해보자.

 RxJava가 대충(잉?) 알았으니 그걸 Android에서 어케 쓸까 참고해보자.

0. gradle에 라이브러리 추가


build.gradle (Module:...app) 파일에 rxjava2 추가
dependencies {
...
//Rx Utils dependencies
implementation 'io.reactivex.rxjava2:rxjava:2.2.19'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

}

1.프로젝트 만들때 생성된 Activity에 간단히 RxJava(RxAndroid)를 사용해보자.

https://github.com/sugoigroup/rxandroid_sample/commit/8dbc80e9686850c20fa15003ef450218a0c99458

MainActivity의 onCreate

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected void onCreate(Bundle savedInstanceState) {
....
TextView helloTv = (TextView) findViewById(R.id.hello);
Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext(getJest());
    emitter.onComplete();
})
        .subscribeOn(Schedulers.io()) //스케쥴러로 몰래 뒤에서 실행시키고
        .observeOn(AndroidSchedulers.mainThread())  //결과는 화면 메인쓰레드에서 하자.
        .subscribe(s -> helloTv.setText(s));


//just 는 아이템을 바로바로 서브스크들한테 방출한다.
Observable.just("Hello ! from just").subscribe(s -> helloTv.setText(s));
}
private String getJest() {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello! from create";
}

2.버튼을 눌렀을때  한번에 두개의 텍스트뷰를 바꾸도록 해보자. Thread 대신 timer operator로 바꾸어보자.

https://github.com/sugoigroup/rxandroid_sample/commit/1d5171889342e24c78836b607563ee04c4fac396

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        helloTv = (TextView) findViewById(R.id.hello);
        helloTv2 = (TextView) findViewById(R.id.hello2);
        Button btn = (Button) findViewById(R.id.button);
        btn.setOnClickListener(e -> btnBlicked());
    }
    private void btnBlicked() {

        //timer 를 이용하면 앞서 했던 thread 를 안써도 일정시간 지난후 구독이 이루어지게 할수 있다..
        Observable.timer(3, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())  //결과는 화면 메인쓰레드에서 하자.
                .subscribe(d -> {
                    changeText(getJest());
                });


        //just 는 아이템을 바로바로 서브스크들한테 방출한다.
        Observable.just("Hello ! from just").subscribe(s -> changeText(s));
    }

    private void changeText(String txt) {
        helloTv.setText(txt);
        helloTv2.setText(txt);
    }

3. 옵서버에 구독아이템을 동적으로 추가해 보자

https://github.com/sugoigroup/rxandroid_sample/commit/23007802b49b21097228e1adec35f667259b9ee5

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class MainActivity extends AppCompatActivity {
    private ConnectableObservable<String> dataStream;
    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        TextView helloTv = (TextView) findViewById(R.id.hello);
        Button btn = (Button) findViewById(R.id.button);
        btn.setOnClickListener(e -> btnBlicked());

        dataStream = Observable.just("Hello ! from scriber" )
                .observeOn(AndroidSchedulers.mainThread())
                .publish();
        dataStream.subscribe(s -> helloTv.setText(s));

    }

    public void addMe(View view) {
        TextView helloTv2 = (TextView) findViewById(R.id.hello2);
        dataStream.subscribe(s -> helloTv2.setText(s));
    }

    private void btnBlicked() {
        disposable = dataStream.connect();
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}


참고 사이트들

버스도착정보 파싱 예제 : https://nittaku.tistory.com/215?category=727978

https://beomseok95.tistory.com/category/Rx




















2021년 1월 29일 금요일

[RxJava] RxJava 프로그래밍

 ReactiveX Android 프로그래밍이 최근 유행이다.

예전방식 프로그래머  (나와 같은 늙은) 들은 항상 함수에 정의하고 , 반복루틴에 아이템들을 명시하고 그걸 처리하여 값을 돌려받는 순서대로 로직이 돌아가는게 정상이라 생각하고 살았는데, 이제는 컴퓨터가 너무 빨라져서 꼭 그렇게 순차적으로 하지 않아도 동시에 다른 일을 하면서도 처리할수 있도록 유연성있게 처리하는 방식이 리액티브이다.

Imperative 명령(강제적) 방식으로 프로그램한다면

for( i=0; i < [0,1,2,3,4]; i ++) {

   sum = i *2;

print sum

이런식일거다.

Declarative 선언식 방식은 

sum = numbers.map(function(n) {

return n*2})

처럼 내가하고자 하는 것만 명시하면 컴퓨터가 객체를 찾아서 반복해주는 것들은 알아서 해주도록 하는거다. 

명령형방식은 데이터를 어떻게 반복할것인지, 어떤 조건을 넣을것인지, 어떤형식으로 변환하여 처리할건지를 일일히 매번 짜야된다. 

반면 선언식은 저 뻔한 작업을 좀더 단순명료하게 처리할수 있도록 미리 선언된 함수에 집어 넣기만 하면 원하는 결과가 나오도록 하는 방식이다.

이를 가능케 하는 프로그램기법으로 일급함수, 람다 가 최근의 프로그램에서 거의 모두 지원하기 때문이다.

Rx 프로그래밍에서는 

  • Observer (관찰자, 내가 어떤 데이터를 넣는지 몇개인지 관찰하는 녀석, Hot Observable 과 Cold Observable가 있는데 뜨거운 것은 지금 바로 바로 구독자에게 전달하는 방식이고, 차가운 것은 구독자가 구독하는 시점이전의 데이터 까지 발행하는방식이다.)
  • subscribe (관찰자가 흘려보내준 데이터를 구독자는 구독하게 되고 데이터를 추가가공하거나 비지니스 로직을 전개한다. 구독자는 여러개일수 있다. 옵서버가 한번에 많은 데이터를 구독자에게 모두 준다고 해도 구독자는 선택적으로 데이터를 가질수 있다. 예를들어 날씨정보에서 기온구독자, 습도구독자,지역별날씨정보구독자 등 나뉠수 있다.)
  • schedule (구독자에게 순서대로 줄건지 모두 모아 줄건지 등을 결정한다)

이상의 3개의 기본개념으로 움직인다.

이들기본 개념에 operator 와 일급함수를 이용하여 데이터의처리조건과 비지니스로직을 적용한다. 

Observer는 Rx 에만 있는게 아니고 프로그래밍의 한방법(옵서버패턴)이다. 최근 유행인 binding (자바스크립트, 안드로이드 등에서 쓰이는) 기능도 옵서버패턴을 이용한 방식이다.

*옵서버패턴이기 때문에 주의할점은 구독을 시작한다음 구독해제를 해야 옵서버가 구독자를 계속쳐다보지 않게된다. 

기본 사용법은 다음과 같다.

Observable.just("kim", "park", "lee")
.filter(s -> s.equals("park"))
.subscribe(s -> result=s);
assertTrue(result.equals("park"));
//filter 로 park 만 골라냈으니 결과는 true

Observable이 할당된 각 요소를 잘 찾는지 확인하자.

Observable.just("kim", "park", "lee")
.doOnNext(s -> System.out.println("Who is the next ? " + s))
.filter(s -> s.equals("kim"))
.subscribe(s -> result=s);
assertTrue(result.equals("kim"));
Who is the next ? kim
Who is the next ? park
Who is the next ? lee

이대로는 그냥 for 문 으로 돌리는거랑 무슨 차이냐? 메인쓰레드는 자기일하게 냅두고, Observable 이 조용히 몰래 처리하도록 하자(사실 이게 Rx프로그래밍이다)

System.out.println("Thread:" + Thread.currentThread().getName());
Observable.just("kim", "park", "lee")
.doOnNext(s -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribeOn(Schedulers.io())
.filter(s -> s.equals("park"))
.subscribe(s -> result=s);
Thread.sleep(1000);
System.out.println(result);
assertTrue(result.equals("park"));
Thread:main
Thread:RxCachedThreadScheduler-1
Thread:RxCachedThreadScheduler-1
Thread:RxCachedThreadScheduler-1
park

Thread를 보면 프로그램은 main 에서 시작해서 Observable처리는RxCachedThreadScheduler-1 에서 실행된다. 즉, Rx루틴을 처리하는동안 main쓰레드는 놀고 있었다라는거다.

이대로는 그냥 for 문 으로 돌리는거랑 무슨 차이냐? 메인쓰레드는 자기일하게 냅두고, Observable 이 조용히 몰래 처리하도록 하자(사실 이게 Rx프로그래밍이다)
이렇게 스케쥴기능을 이용하여 별도의 쓰레드에서 처리하도록 할수 있는데 처리결과도 마친가지로 별도의 쓰레드에서 처리하도록 할수 있다.
System.out.println("Thread:" + Thread.currentThread().getName());
Observable.just("kim", "park", "lee")
.doOnNext(s -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.filter(s -> s.equals("park"))
.subscribe(s -> {
result=s;
System.out.println("Thread:" + Thread.currentThread().getName());
});
Thread.sleep(1000);
System.out.println(result);
assertTrue(result.equals("park"));
Thread:main
Thread:RxCachedThreadScheduler-1
Thread:RxCachedThreadScheduler-1
Thread:RxCachedThreadScheduler-1
Thread:RxComputationThreadPool-1 <----결과를 위한 쓰레드
park

doOnError, doOnComplete등으로 처리중 에러나 성공시 추가처리를 할수도 있다.
System.out.println("Thread:" + Thread.currentThread().getName());
Observable.just("kim", "park", "lee")
.doOnNext(s -> System.out.println("Thread:" + Thread.currentThread().getName()))
.doOnComplete(() -> assertTrue(result.equals("park")))
.doOnError(e -> System.out.println("Error:" + e.getMessage()))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.filter(s -> s.equals("park"))
.subscribe(s -> {
result=s;
System.out.println("Thread:" + Thread.currentThread().getName());
});

RxJava에서 데이터를 전달하기 위해서는 팩토리 함수를 이용하는데 버젼에 따라 다르다. rx2에서는 배열, 컬렉션 등이 구분되어있다.
팩토리함수함수
RxJava 1.x의 기본 팩토리 함수create(),just(),from()
RxJava 2.x의 기본 팩토리 함수fromArray(),fromlterable(),fromCallable(),fromFuture(),
fromPublisher()
기타 팩토리 함수interval(),range(),timer(),defer()
- fromCallable : 외부 인터페이스에 지정된 함수결과(Callable)형식을 옵서버가 받아서 구독자에게 전달한다.
Callable<String> callable = ()->{
            Thread.sleep(1000);
            return "Hello Callable";
};
Observable<String> source=Observable.fromCallable(callable);
source.subscribe(System.out::println);
- fromFuture : Java5 부터지원되는 Future (비동기) 기능을 이용하여 지정된 함수가 끝나서 Future의 get메소드를 통해  데이터가 전달되는 순간에  옵서버가 데이터를 구독자에게 전달하다.
        Future<String> future =Executors.newSingleThreadExecutor().submit(()->{
            Thread.sleep(1000);
            return "Hello Future";
        });
        Observable source =Observable.fromFuture(future);
        source.subscribe(System.out::println);

MethodFunctional Interface
doOnSubscribe()Action0
doOnUnsubscribe()Action0
doOnNext()Action1<T>
doOnCompleted()Action0
doOnError()Action1<T>
doOnTerminate()Action0
finallyDo()Action0
doOnEach()Action1<Notification<T>>
doOnRequest()Action1<Long>

이밖에

Filter : 데이터를 조건으로 구분해서 구독자에게 전달하기

Map : 데이터를 가공

Cast : 데이터의 형식을 바꿈

Buffer : 데이터를 모아서 줌

Merger : 여러개의 옵서버가 있을때, 구독자에게 하나로 합쳐서 줌

Zip : 지정된 개수만큼 옵서버들에게서 데이터를 받았을때만 구독자에게 전달해준다. 만일 두개의 옵서버로부터 데이터를 받아야 구독자에게 전달하는 걸 만든다면 이걸 사용한다. ( 두개의 버튼을 클릭시 반응하는 것 또는 두개의 url에서 응답이 있어야 데이터 처리를 하고 싶을떄)


* JAVA8 에서부터는 STREAM 이라는 것을 지원해서 배열 또는 컬렉션의 요소를 순차적으로 돌면서 필터링,정렬,병합등을 할수 있도록 지원해준다. 


귀찮다. 일단 여기를 참고.

  • http://reactivex.io/documentation/ko/operators/flatmap.html
  • RXJava 반응형 프로그래밍-1,2,3
    https://velog.io/@chan33344/03.-RXJava-%EB%B0%98%EC%9D%91%ED%98%95-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D-1
  • RxJava on Android
    https://www.slideshare.net/dcgraham7/rxjava-on-android
  • https://nittaku.tistory.com/category/%EC%95%88%EB%93%9C%EB%A1%9C%EC%9D%B4%EB%93%9C
  • https://jeongupark-study-house.tistory.com/39?category=820719