2022년 12월 2일 금요일

Kotiln Flow

Kotiln Flow

Kotlin Coroutine Flow

Kotlin Coroutine 의 데이터 흐름 작업을 수행하기 위해서 Kotlin Coroutines 1.3.2 릴리스 이후에 Flow가 추가되었습니다.
Flow는 Rx계열의 구독/발행 하는 방식과 거의 같고 Kotlin Coroutine에 맞게 제작된 것입니다. 또한 다양한 연산자가 Rx랑 거의 동일하게 보이기 때문에 디자인이 매우 친숙해 보일 수 있습니다.

Flow와 Sequences의 차이점은 Flow는 메인 스레드의 실행을 차단하지 않는 반면 Sequences는 메인 스레드의 실행을 차단한다는 것입니다. 이름이세 알수 있듯이 순차적처리 vs 흐름처리 의 차이라고 이해하면 됩니다.

Flow 기본사용법

Flow는 다음과 같이 비동기적으로 계산된 여러 값을 반환할 수 있습니다.

flow {
    for (i in 1..5) {
        delay(100)
        emit(i)
    }
}.collect{
    println(it)
}

RxJava에 익숙하다면 collect() subscribe() 가 emit( ) onNext()에 해당한다는 것을 이해할 수 있습니다.
Flow가 처음이거나 익숙치 않은 사람들은 아래처럼 간단히 이해하면됩니다.

  • flow : 누군가 요청할때 처리할 내용들을 흘려보낼 준비를 하는것으로 스스로는 아무것도 흘려보내지 않으며 collect(수집) 요청이 있을때만 괄호 내용을 처리합니다.
  • emit: 흐름처리시에 필요한 때에 지정한 데이터를 밖으로 내뱉게 하는것입니다. flow는 그냥 흐름만 보낼뿐 밖으로 아무것도 내보내지 않기 때문에 emit으로 밖으로 토해내는? 명령어를 이용하여 흐름에서 처리한 데이터를 밖에 전달해줍니.
  • collect : 아무리 수도물을 호스에 흘려보내도 수도꼭지를 틀지 않으면 물이 안나오듯이, collect( 수집 ) 을 해야 flow에서 처리하여 emit한 데이터를 받을수 있습니다.
    이렇게 flow, emit, collect가 기본한쌍으로 실행되게 됩니다.

Cold, Hot 스트림

흐름에는 Cold(차가운)과 Hot(뜨거운) 이 있습니다. 직관적인 단어 그대로 설명됩니다.

  • Cold : 예를 들어 물통에 담겨져 있는 물을 배출하는것과 같습니다. 물나오는 버튼을 누르면 (collect) 이미 담겨져 있는 물통의 물이 나오기 때문에 아무리 뜨거운 물을 넣었다 하더라도 약간 식은 물이 나오는 것을 말합니다. 프로그래밍으로 말한다면 collect가 요청때마다 flow안에 담겨진 모든 데이터가 전부 받아지는 것을 말합니다. 당연한듯 생각되겠지만 흐름(Strea or Flow)이라는 관점에서 보면 흐르는 시냇물 보다는 어딘가 담긴 물 같은 개념입니다.
  • Hot : Cold는 그렇다 치고 Hot은 뭐가 다를까요? 뜨겁다. 네. 위의 콜드 처럼 어딘가 물을 담기지 않고 정말 뜨거운 물을 계속 흘려 보내는 것이고 수도 꼭지를 틀면 지금 지나가는 뜨거운 물이 줄줄 나오는 것을 말합니다.
    프로그래밍으로 말한다면 collect가 요청할때 flow가 처리한 지금의 데이터만을 받아지는 것을 말합니다.
  • Hot vs Cold : Hot 스트림은 24시간 뉴스를 생각하면 됩니다. 23시간뉴스는 누가 듣든 말든 계속 Hot한 기사를 흘려보낼것이며 듣고자하는 시청자들은 뉴스를 보는 순간의 뉴스부터 보게 됩니다. 따라서Hot 스트림은 이전의 뉴스는 어떤게 있었는지 전혀 알지 못하게 됩니다. 콜드 뉴스라면 유투브 뉴스처럼 뉴스를 클릭해서 처음부터 보게되겠죠? 당연히 모든 뉴스를 다 볼수 있습니다. 실무에서도 이러한 차이점을 이해하고 적절한 방식으로 작성해야합니다.
    데이터기준으로 설명하면 데이터가 Observable 자체에서 생성되면 Cold (당연히 요청시에 데이터는 생성해서 내뱉으니까요) , 데이터가 외부에서 생성되면 Hot(당연히 외부의 데이터를 흘려보내기만 하니까요) 으로 구분합니다.

1. Flow 생성

flow, flowof(), asflow()등이 있고 모두 콜드스트림입니다. (요청할떄마다 flow안의 내용이 매번 다시 처리됩니다.)

    flowOf(1,2,3,4,5)
        .onEach {
            delay(100)
        }
        .collect{
            println(it)
        }

    listOf(1, 2, 3, 4, 5).asFlow()
        .onEach {
            delay(100)
        }.collect {
            println(it)
        }
        
    channelFlow {
        for (i in 1..5) {
            delay(100)
            send(i)
        }
    }.collect{
        println(it)
    }

흐름은 콜드 스트림입니다. 생산자와 소비자는 스레드를 전환하지 않고 동기식이다.
흐름의 코드 블록은 RxJava에 의해 생성된 Observable이 subscribe()가 호출될 때까지 실행되지 않는 것과 마찬가지로 collect()가 호출될 때까지 실행되지 않습니다.

채널은 핫 스트림입니다. 그리고 channelFlow는 생산자와 소비자의 비동기 비차단 모델을 구현합니다.

콜드스트림을 예를 들자면, 아래는 emit할때 colleter 도 100ms 기다리기 때문에 전체 수행시간이 1초가 걸립니다. 동기식으로 collect하는 순간 emit->collect->emit->collect…순으로 실행됩니다.

fun main() = runBlocking {

    val time = measureTimeMillis {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect{
            delay(100)
            println(it)
        }
    }

    print("cost $time")
}

핫스트림인channelFlow는 emitter가 100ms초 마다 방출하고 있고, 그 사이에 collector가 100ms 마다 데이터를 수집하여 결국 앞뒤로 200ms 추가되어 700ms 가 걸립니다.

fun main() = runBlocking {

    val time = measureTimeMillis{
        channelFlow {
            for (i in 1..5) {
                delay(100)
                send(i)
            }
        }.collect{
            delay(100)
            println(it)
        }
    }

    print("cost $time")
}

쓰레드 간에 플로우를 전환하는 경우 소요되는 시간은 약 700밀리초로 channelFlow 빌더를 사용한 효과와 유사합니다.

fun main() = runBlocking {

    val time = measureTimeMillis{
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.IO)
            .collect {
                delay(100)
                println(it)
            }
    }

    print("cost $time")
}

flow

flow 또는 flow<데이터형식> 으로 사용합니다. emit 으로 데이터를 배출합니다.

flow<Int>{
    emit(1)
    emit(2)
    emit(flowOf(1,2,3))
}

flowOf

여러개의 데이터를 list 정의하듯이 지정하여 flow를 생성합니다.

val testFLow = flowOf(1,2,3)
launch{
    testFLow.collect{ value->
        print(value)
    }
}

asFlow

기존의 데이터를 flow형식으로 바꾸어 생성합니다.

listOf(1,2,3).asFlow()

emptyFlow

빈 flow를 생성합니다.

emptyFlow<Int>()

2. Collect( Flow 수집)

Flow를 수집하기 위해서는 collect 블럭을 사용합니다. Flow는 내부적으로 하나씩 주어진 요소에새대 하나씩 수행하기 때문에 collect에서도 받은 데이터를 하나씩 받아서 처리합니다.

collect

flow에 수집을 요청하여 배출(emit)하는 데이터를 받아서 처리합니다.

flowOf(1,2,3).collect{ value->
        print(value)
    }

collectIndexed

collect와 같으나 순서대로 데이터를 처리합니다.

flowOf(1,2,3).collectIndexed{ value->
        print(value)
    }

collectLastest

collect를 여러번 요청할경우 이전의 수집이 완료도지 않았다면 이전값을 취소하고 현재 요쳥한 결과값을 최후의 데이터로 가집니다.

flow {
 emit(1)
    delay(50)
    emit(2)
 } .collectLatest { value ->

    delay(100)
    println("$value collected")

 }


2 collected

first(), firstOrNull()

스트림이 내보낸 첫번쨰 값만 요청합니다. 첫번쨰 값이 없을경우는 에러가 나게되는데 이때는 firstOrNull 을 사용하여 데이터가 없을 때는 null이라도 받을수 있도록합니다.

val myFlow= flow {
   emit(1)
   emit(2)
 }
launch{
    print(myFlow.first())
    //또는 print(myFlow.firstOrNull())
}

last(), lastOrNull()

스트림이 내보낸 마지막 값만 요청합니다. 마지막 값이 없을경우는 에러가 나게되는데 이때는 lastOrNull 을 사용하여 데이터가 없을 때는 null이라도 받을수 있도록합니다.

val myFlow= flow {
   emit(1)
   emit(2)
 }
launch{
    print(myFlow.last())
    //또는 print(myFlow.lastOrNull())
}

single(), singleOrNull()

스트림이 내보낸 데이터가 하나임을 증명할때 사용합니다. 두개이거나 없을때는 에러가 됩니다.

val myFlow= flow {
   emit(1)
   emit(2)
 }
launch{
    print(myFlow.single()) // Error
    //또는 print(myFlow.singleOrNull())
}

count()

스트림이 내보내는 데이터의 개수를 반환합니다. 단 버퍼가 있는 SharedFlow에서는 의미가 앖습니다.

val myFlow= flow {
   emit(1)
   emit(2)
 }
launch{
    print(myFlow.count()) 
}

3. 스레드 전환

flowOn을 사용하여 flow처리중 스레드를 전환 할수 있습니다. flowOn은 flow에 대해 특정 스레드에서 수행할것을 지정합니다. 즉 flowOn을 만나는 시점까지는 flowOn에 지정된 스레드에서 수행합니다.

    • flowOn고유한 이전 컨텍스트가 없는 연산자에만 영향을 줍니다. 이미 컨텍스트가 있는 연산자는 나중에 영향을 받지 않습니다
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println(it)
        }

흐름의 스레드를 전환하기 위해 withContext()를 사용하지 않는다는 점은 주목할 가치가 있습니다.

4. 흐름취소

흐름이 일시 중단 기능 내에서 일시 중단되면 흐름을 취소할 수 있으며 그렇지 않으면 취소할 수 없습니다.

fun main() = runBlocking {

    withTimeoutOrNull(2500) {
        flow {
            for (i in 1..5) {
                delay(1000)
                emit(i)
            }
        }.collect {
            println(it)
        }
    }

    println("Done")
}

5. 흐름연산자

https://flowmarbles.com/ 에 연산자 샘플과 설명이있습니다.

filter , filterNot, filterNotNull

이름에서 알 수 있듯이 fliter연산자는 주로 데이터에 대한 필터를 수행하여 주어진 원래 스트림과 일치하는 값만 포함하는 스트림을 반환합니다.

fun test() {
    lifecycleScope.launch {
        (1..3).asFlow().filter {
            it < 2
        }.collect { value ->
            Log.d(TAG, "collect :${value}")
        }
    }
}

transform

변환 연산자를 사용할 때 원하는 만큼 방출을 호출할 수 있습니다.이것이 변환과 맵의 가장 큰 차이점입니다.
flow 블럭에서는 아이템을 하나씩 꺼내서 처리하도록 되어 있습니다. 이때 꺼낸 아이템을 흘려보내기 전에 추가적인 처리를 하고 싶다면 transform { } 블럭으로 지정해야 합니다. 즉 , flow내의 아이템을 방출(emit) 하기전에 아이템의 값과 형식등에 변화를 주고 싶을때 사용합니다.

val paramFlow = flowOf("kim", "blabla")  
runBlocking {  
  paramFlow  
        .searchFromTwoApi()  
        .collect {  // 아이템의 it 타입은 Any타입이된다.
                println("$it")  
        }  
  println("done")  
}
  
fun Flow<String>.searchFromTwoApi() = transform { request ->  
  emit("length of $request is ")  
    emit(request.length)  
}
fun main() = runBlocking {

    (1..5).asFlow()
        .transform {
            emit(it * 2)
            delay(100)
            emit(it * 4)
        }
        .collect { println(it) }
}
fun main() = runBlocking {

    (1..5).asFlow()
        .transform {
            emit(it * 2)
            delay(100)
            emit("emit $it")
        }
        .collect { println(it) }
}

take, takeWhile

take 연산자는 처음 몇 개의 방출에 의해 방출된 값만 취합니다.
takeWhile은 조건을 추가하여 조건에 맞는 데이터만 취합니.

fun main() = runBlocking {

    (1..5).asFlow()
        .take(2)
        //또는 .takewhile{ it == 3 }
        .collect { println(it) }
}

drop, dropWhile

drop연산자는 take정반대이며 지정된 양을 버리고 count후속 스트림을 실행합니다.

fun test() {
   lifecycleScope.launch {
       (1..3).asFlow().drop(2)
           .collect { value ->
           Log.d(TAG, "collect :${value}")
       }
   }
}

dropWhile 은 조건을 지정하여 조건에 맞을때까지는 모두 버리고 맞는 순간다음번부터 방출합니다. 만일 첫번쨰부터 조건에 맞지 않으면 첫번째도 포함하여 모두 방출합니다.

flow {
 emit(3)
 emit(1) // 1 이기 때문에 조건에 안맞는 순간.
 emit(2)
 emit(4)
}. dropWhile { it == 3  } .collect { value ->
  print(value)
}
//1,2,4


flow {
 emit(1) // 처름부터 조건에 안맞는 순간.
 emit(3) 
 emit(2)
 emit(4)
}. dropWhile { it == 3  } .collect { value ->
  print(value)
}
//1,3, 2,4

reduce

reduce 는 현재의 결과값을 다음 순회의 매개변수로 사용합니다.
예를 들어 1~5 의 계승을 계산하려면 다음을 수행합니다.

fun main() = runBlocking {

    val sum = (1..5).asFlow().reduce { a, b -> a * b }

    println(sum)
}

runningReduce 를 사용하면 중간 값을 이용가능합니다.

flowOf(1, 2, 3).runningReduce { a, b ->
    a + b
}.collect {
    println(it)
}

1
3
6

fold

Kotlin 컬렉션의 fold 함수와 유사하게 fold도 초기 값을 설정해야 합니다.
위의 코드에서 초기 값 0은 제곱 급수의 합을 얻기 위해 reduce 함수를 사용하는 것과 유사합니다.

fun main() = runBlocking {

    val sum = (1..5).asFlow()
        .map { it * it }
        .fold(0) { a, b -> a + b }

    println(sum)
}

fun main() = runBlocking {

    val sum = (1..5).asFlow().fold(1) { a, b -> a * b }

    println(sum)
}

때로는 결과 값이 필요하지 않지만 흐름을 계속 작동해야 하는 경우 runningFold를 사용할 수 있습니다.

flowOf(1, 2, 3).runningFold("a") { a, b ->
    a + b
}.collect {
    println(it)
}

a
a1
a12
a123

debounce

Debounce는 밀리초 값 매개변수를 전달해야 합니다. 기능은 지정된 시간에 도달한 후에만 데이터가 전송되고 마지막 데이터가 전송 됩니다.

flow {
    emit(1)
    delay(590)
    emit(2)
    delay(590)
    emit(3)
    delay(1010)
    emit(4)
    delay(1010)
}.debounce(
    1000
).collect {
    println(it)
}


3
4

Sample

샘플은 디바운스와 매우 유사하며 기능은 다음과 같습니다. 지정된 시간 내에 하나의 데이터만 전송합니다.

flow {
    repeat(4) {
        emit(it)
        delay(50)
    }
}.sample(100).collect {
    println(it)
}


1
3

Buffering

Flow은 기본적으로 아이템을 하나씩 꺼내서 방출하는 방식인데, 버퍼링을 이용하면 flow안의 아이템들을 버퍼(버퍼크기만큼)에 담아두고 한번에 버퍼만큼 방출하도록 한다.

 runBlocking {  
    val time = measureTimeMillis {  
    simple()  
      .buffer() // buffer emissions, don't wait  
      .collect { value ->  
              delay(300) // pretend we are processing it for 300 ms  
              println(value)  
       }  
 }  
 println("Collected in $time ms")  
}

fun simple(): Flow<Int> = flow {  
  
  for (i in 1..3) {  
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i)  
    }  
}

위와 같이 한다면 버퍼의기본값(64개) 아이템만큼 버퍼에 담아서 방출하기 때문에, 방출시 100ms 지연했던것이 한번만 적용되고 64개를 모아서 방출(emit)하게 되어 총 실행시간이 1000ms (100 +300*3)정도가 됩니다
만인 버퍼링을 안했다면 300 +900 = 1200ms 정도가 되었을것입니다.
버퍼의 종류는 channel과 같습니다 ( UNLINITED, BUFFERED, CONFLATED, RENDEZVOUS, 사용자정의 ) 채널부분 설명을 참고하세요.

Composing multiple flows

여러개의 flow를 조건에 따라 복합시켜 처리하는 복합연산자도 있습니다. 예를 들어 다수의 조건이 모두 화면표시가능할 경우에만 해당 메뉴가 보이도록 할떄는 Combine으로, 다수의 API를 동시에 요청하여 모든 데이터가 성공일때 화면처리를 하기위해서는 Zip 등을 사용합니다.

Zip

서로 다른 두개이상의 flow 를 하나로 묶어서(zip) 수집(collect)하도록 도와줍니다. flow는 서로 아이템의 개수가 짝을 이루어야 한다. 짝이 맞지 않을 때는 적은 수에 맞추어 방출됩니다.

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
// 묶어서 전송하기 때문에 가장 긴 시간 400ms 에 송출이되어 수집됩니다.
fun main() = runBlocking {

    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three","four","five")
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect { println(it) }
}

zip 연산자는 flowA의 항목을 flowB의 해당 항목과 병합합니다. flowB의 각 항목이 delay() 함수를 사용하더라도 병합 프로세스는 병합 전에 delay()가 완료될 때까지 기다립니다.

fun main() = runBlocking {

    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three", "four", "five").onEach { delay(100) }

    val time = measureTimeMillis {
        flowA.zip(flowB) { a, b -> "$a and $b" }
            .collect { println(it) }
    }

    println("Cost $time ms")
}

flowA의 항목 수가 flowB의 항목 수보다 큰 경우:
병합이 실행된 후 새 흐름의 항목 수 = 더 작은 흐름의 항목 수입니다.

fun main() = runBlocking {

    val flowA = (1..6).asFlow()
    val flowB = flowOf("one", "two", "three","four","five")
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect { println(it) }
}
Combine

Zip은 두개의 아이템이 일치되는 시점으로 방출시간을 정하지만, Combine은 각각의 flow들이 묶여서 방출가능한지 알아보고 가능하다면 순서에 상관없이 무조건 묶어서 방출을 하는 방식입니다.

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
val startTime = System.currentTimeMillis() // remember the start time 
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
1 -> one at 452 ms from start 
2 -> one at 651 ms from start 
2 -> two at 854 ms from start 
3 -> two at 952 ms from start 
3 -> three at 1256 ms from start

위와 같은 결과가 되는 이유는 nums(300ms) 와 strs(400ms) 각각 방출시간이 달라서 최최 1회때 nums가 방출하려고 하지만 처음에는 strs(400ms)를 기다려야 묶여서 방출될수 있었습니다. 그후 nums는 300ms후에 두번째아이템을 방출하는데 strs는 아직 다음방출시간이 안되었기떄문에 첫번쨰 방출아이템을 들고있습니다.
하지만 combine에서는 일단 짝을 이루면 내보내기 때문에 일단 방출이 이루어집니다. nums의 3번째 방출시간이 되기 전에 strs가 두번째 방출시간이 되어 또 nums의 두번째 방출아이템을 묶어서 방출한다. 이후 900ms 시간이 되면 다시 nums의 방출시간이 되고 strs 와 묶어서 방출합니다.

Combine을 사용하여 병합할 때 새 항목이 flowA에서 방출될 때마다 flowB의 최신 항목과 병합됩니다.

fun main() = runBlocking {

    val flowA = (1..5).asFlow().onEach { delay(100)  }
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200)  }
    flowA.combine(flowB) { a, b -> "$a and $b" }
        .collect { println(it) }
}

Flatterning flows

Flow를 처리하다보면 Flow<Flow> 와 같이 중첩된 flow를 처리해야하는경우가 있는데 이를 평평하게 펴주어 하나의 값으로 처리할수 있도록 해줍니다.

flatMapConcat, flattern, flatMap, flatMapMerge, flattenConcat, flatMapConcat 등의 함수가 있습니다…

flattenConcat, flatMapConcat

[1]Flow<[2]Flow> 와 같은 flow가 있을때, [1]Flow의 요소하나가 방출될때 [2]Flow의 전체요소를 같이붙여서(Concat) 평면화(flat)시켜 방출한다.
[1]1,[2]a,b,c
[1]2,[2]a,b,c
[1]3,[2]a,b,c

만일 flatten을 하지않았다면
[1]1,[2]Flow
[1]2,[2]Flow
[1]3,[2]Flow

가 됩니다.

실제로 flattenConcat는 여러 흐름을 결합하지 않고 단일 흐름으로 실행합니다.

fun main() = runBlocking {

    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three","four","five")

    flowOf(flowA,flowB)
        .flattenConcat()
        .collect{ println(it) }
}

flatMapConcat는 호출한 후 collect 함수는 새 값을 수집하기 전에 flatMapConcat 내부의 흐름이 완료될 때까지 기다립니다.

fun currTime() = System.currentTimeMillis()

var start: Long = 0

fun main() = runBlocking {

    (1..5).asFlow()
        .onStart { start = currTime() }
        .onEach { delay(100) }
        .flatMapConcat {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("$it at ${System.currentTimeMillis() - start} ms from start")
        }
}
flatMapMerge

flatMapMerge는 내부 코드 블록을 순차적으로 호출하고 수집 기능을 병렬로 실행합니다.

fun currTime() = System.currentTimeMillis()

var start: Long = 0

fun main() = runBlocking {

    (1..5).asFlow()
        .onStart { start = currTime() }
        .onEach { delay(100) }
        .flatMapMerge {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("$it at ${System.currentTimeMillis() - start} ms from start")
        }
}
flatMapLatest

새 값이 내보내지면 이전 흐름이 취소됩니다.

fun currTime() = System.currentTimeMillis()

var start: Long = 0

fun main() = runBlocking {

    (1..5).asFlow()
        .onStart { start = currTime() }
        .onEach { delay(100) }
        .flatMapLatest {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("$it at ${System.currentTimeMillis() - start} ms from start")
        }
}
distinctUntilChanged

메소드명에 적힌대로 이전 데이터와 다른 경우에만 수신이 가능하며, 이전 데이터로 파악되면 필터링 됩니다.

flowOf(1, 1, 2, 2, 3, 1).distinctUntilChanged().collect {
    println(it)
}

1
2
3
1
distinctUntilChangedBy

distinctUntilChanged와 비슷하지만 By키워드가 붙어있어서 해당 키를 기준으로 데이터가 변경된다면 값을 방출합니다.

flowOf(
    Cart(name = "Snack", price = 80),
    Cart(name = "Snack", price = 120),
    Cart(name = "Snack", price = 120)
).distinctUntilChangedBy { it.name } .collect { value ->
     print("${value.name} : ${value.price} ")
}
Snack : 80


flowOf(
    Cart(name = "Snack", price = 80),
    Cart(name = "Beer", price = 120),
    Cart(name = "Snack", price = 130)
).distinctUntilChangedBy { it.name } .collect { value ->
     print("${value.name} : ${value.price} ")
}
Snack : 80
Beer : 120
Snack : 130

flowOf(
    Cart(name = "Snack", price = 80),
    Cart(name = "Snack", price = 130),
    Cart(name = "Beer", price = 120)
).distinctUntilChangedBy { it.name } .collect { value ->
     print("${value.name} : ${value.price} ")
}
Snack : 80
Beer : 120

6. 예외처리

흐름은 전통적인 try…catch를 사용하여 예외를 잡을 수 있습니다.
또는 catch 연산자를 사용하여 예외를 catch할 수 있습니다.


fun main() = runBlocking {
    flow {
        emit(1)
        try {
            throw RuntimeException()
        } catch (e: Exception) {
            e.stackTrace
        }

    }.onCompletion { println("Done") }
        .collect { println(it) }
}

catch 연산자

catch 연산자는 업스트림에서 예외를 catch할 수 있습니다.

fun main() = runBlocking {
    flow {
        emit(1)
        throw RuntimeException()
    }.onCompletion { cause ->
        if (cause != null)
            println("Flow completed exceptionally")
        else
            println("Done")
    }.collect { println(it) }
}
fun main() = runBlocking {
    flow {
        emit(1)
        throw RuntimeException()
    }
    .onCompletion { cause ->
        if (cause != null)
            println("Flow completed exceptionally")
        else
            println("Done")
    }
    .catch{ println("catch exception") }
    .collect { println(it) }
}

재시도 연산자 ( retry, retryWHEN)

업스트림에서 예외가 발생하고 재시도 연산자가 사용되는 경우 재시도는 Flow가 재시도에 지정된 횟수까지 재시도하도록 합니다.
예를 들어 다음은 “Emitting 1”, "Emitting 2"를 세 번 인쇄하고 마지막 두 개는 재시도 연산자에 의해 인쇄됩니다.

fun main() = runBlocking {

    (1..5).asFlow().onEach {
        if (it == 3) throw RuntimeException("Error on $it")
    }.retry(2) {

        if (it is RuntimeException) {
            return@retry true
        }
        false
    }
    .onEach { println("Emitting $it") }
    .catch { it.printStackTrace() }
    .collect()
}

7. Flow 생명주기

Flow는 수명 주기의 다양한 단계를 모니터링할 수 있는 연산자가 많지 않으며 현재는 onStart 및 onCompletion만 Flow의 생성 및 종료를 모니터링하는 데 사용됩니다.

fun main() = runBlocking {

    (1..5).asFlow().onEach {
        if (it == 3) throw RuntimeException("Error on $it")
    }
    .onStart { println("Starting flow") }
    .onEach { println("On each $it") }
    .catch { println("Exception : ${it.message}") }
    .onCompletion { println("Flow completed") }
    .collect()
}

사용 시나리오의 예: 예를 들어 Android 개발에서 Flow를 사용하여 네트워크 요청을 생성할 때 로딩 애니메이션은 onStart 연산자를 통해 호출되고 애니메이션은 네트워크 요청이 종료된 후 onCompletion 연산자를 통해 취소됩니다.

다른 예를 들어, 이러한 연산자를 사용하여 일부 로그 인쇄를 수행합니다.

fun <T> Flow<T>.log(opName: String) = onStart {
    println("Loading $opName")
}.onEach {
    println("Loaded $opName : $it")
}.onCompletion { maybeErr ->
    maybeErr?.let {
        println("Error $opName: $it")
    } ?: println("Completed $opName")
}

Flow가 데이터를 전송하기 위해 emit()을 사용하면 현재 Flow가 활성화되어 있는지 여부를 감지하기 위해 sureActive()를 사용하고, 활성화되어 있지 않으면 직접 CancellationException을 발생시키고 전체 Flow를 취소합니다.Flow가 취소 감지를 수행해야 함을 나타내는 cancelable() 함수를 사용해야 합니다.

fun main() {
    runBlocking {
        flow {
            (1..5).forEach {
                emit(it)
            }
        }.cancellable().collect {
            println(it)
            if (it == 3) cancel()
        }
    }
}

job을 이용한 취소(안드로이드 라이프사이클에 연계)

val job= flowOf(1,3,5,7).cancellable().onEach { value->
     print(value)
 } .launchIn(lifecycleScope)
 
 job.cancel()

8. Flow 쓰레드 작업

flowOn 으로 가능합니다.

fun main() = runBlocking {

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
            it * it
        }.flowOn(Dispatchers.IO)
        .collect {
            println("${Thread.currentThread().name}: $it")
        }
}
병렬작동

Flow에는 병렬 처리를 위한 해당 연산자 flatMapMerge도 있습니다.


fun main() = runBlocking {

    val result = arrayListOf<Int>()
    for (index in 1..100){
        result.add(index)
    }

    result.asFlow()
        .flatMapMerge {
            flow {
                emit(it)
            }
            .flowOn(Dispatchers.IO)
        }
        .collect { println("$it") }
}

0 comments:

댓글 쓰기