非同期フロー
停止関数は非同期に単一の値を返しますが、複数の非同期的に計算された値を返すにはどうすればよいでしょうか?ここでKotlinのFlowが登場します。
複数の値を表現する
Kotlinでは、[コレクション]を使用して複数の値を表現できます。 たとえば、3つの数値のListを返し、forEachを使用してそれらすべてをプリントするsimple
関数を作成できます。
fun simple(): List<Int> = listOf(1, 2, 3)
fun main() {
simple().forEach { value -> println(value) }
}
全体のコードはこちらから入手できます。
このコードは次を出力します。
1
2
3
シーケンス
数値がCPUを大量に消費するブロッキングコード(各計算に100ミリ秒かかる)で計算されている場合、Sequenceを使用して数値を表現できます。
fun simple(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
simple().forEach { value -> println(value) }
}
全体のコードはこちらから入手できます。
このコードは同じ数値を出力しますが、それぞれを出力する前に100ミリ秒待機します。
停止関数
しかし、この計算はコードを実行しているメインスレッドをブロックします。 これらの値が非同期コードによって計算される場合、simple
関数をsuspend
修飾子でマークすることで、ブロックせずにその作業を実行し、結果をリストとして返すことができます。
import kotlinx.coroutines.*
suspend fun simple(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
全体のコードはこちらから入手できます。
このコードは、1秒待機した後で数値をプリントします。
Flow
List<Int>
の結果型を使用すると、すべての値を一度にしか返せません。非同期的に計算されている値のストリームを表現するには、同期的に計算された値にSequence<Int>
型を使用するのと同じように、Flow<Int>
型を使用できます。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
simple().collect { value -> println(value) }
}
全体のコードはこちらから入手できます。
このコードは、メインスレッドをブロックせずに各数値をプリントする前に100ミリ秒待機します。これは、メインスレッドで実行されている別のコルーチンから100ミリ秒ごとに「I'm not blocked」をプリントすることで検証されます。
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
以前の例のFlowを使用したコードの以下の違いに注目してください。
- Flow型のビルダー関数はflowと呼ばれます。
flow { ... }
ビルダーブロック内のコードは停止できます。simple
関数にはもはやsuspend
修飾子は付けられません。- 値はemit関数を使用してフローから_emit_されます。
- 値はcollect関数を使用してフローから_collect_されます。
simple
のflow { ... }
のボディでdelayをThread.sleep
に置き換えると、この場合、メインスレッドがブロックされることがわかります。
Flowはコールド
Flowはシーケンスと同様に_コールド_ストリームです。つまり、flowビルダー内のコードは、フローが収集されるまで実行されません。これは以下の例で明らかになります。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
全体のコードはこちらから入手できます。
これは次を出力します。
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
これは、simple
関数(Flowを返す関数)にsuspend
修飾子が付けられていない主な理由です。 simple()
呼び出し自体はすぐに返され、何も待機しません。フローは収集されるたびに新たに開始されるため、collect
を再度呼び出すたびに「Flow started」と表示されます。
Flowのキャンセルの基本
Flowは、コルーチンの一般的な協調キャンセルに従います。通常通り、フローがキャンセル可能な停止関数(delayなど)で停止されている場合、フローの収集はキャンセルできます。 以下の例は、withTimeoutOrNullブロックで実行されているときにフローがタイムアウトでキャンセルされ、コードの実行を停止する方法を示しています。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
simple().collect { value -> println(value) }
}
println("Done")
}
全体のコードはこちらから入手できます。
simple
関数内のフローによって2つの数値のみがemitされ、以下の出力が生成されることに注目してください。
Emitting 1
1
Emitting 2
2
Done
詳細については、「フローのキャンセルチェック」セクションを参照してください。
Flowビルダー
以前の例のflow { ... }
ビルダーは最も基本的なものです。フローを宣言できる他のビルダーもあります。
- flowOfビルダーは、固定された値のセットをemitするフローを定義します。
- さまざまなコレクションやシーケンスは、
.asFlow()
拡張関数を使用してフローに変換できます。
たとえば、フローから1から3までの数値をプリントするスニペットは次のように書き換えられます。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }
}
全体のコードはこちらから入手できます。
中間Flowオペレーター
Flowは、コレクションやシーケンスを変換するのと同じように、オペレーターを使用して変換できます。 中間オペレーターはアップストリームフローに適用され、ダウンストリームフローを返します。 これらのオペレーターは、フローと同様にコールドです。そのようなオペレーターの呼び出し自体は、停止関数ではありません。 それは高速に動作し、新しい変換されたフローの定義を返します。
基本的なオペレーターには、mapやfilterのような馴染みのある名前があります。 これらのオペレーターとシーケンスの重要な違いは、これらのオペレーター内のコードブロックが停止関数を呼び出せることです。
たとえば、受信リクエストのフローは、リクエストの実行が停止関数によって実装される長時間の操作である場合でも、mapオペレーターでその結果にマッピングできます。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
全体のコードはこちらから入手できます。
これは次の3行を生成し、それぞれが前の行から1秒後に表示されます。
response 1
response 2
response 3
transform
オペレーター
フロー変換オペレーターの中で、最も汎用的なものはtransformと呼ばれます。それはmapやfilterのような単純な変換を模倣するためにも、より複雑な変換を実装するためにも使用できます。 transform
オペレーターを使用すると、任意の値を任意の回数emitできます。
たとえば、transform
を使用して、時間のかかる非同期リクエストを実行する前に文字列をemitし、その後にレスポンスを続けることができます。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
}
全体のコードはこちらから入手できます。
このコードの出力は次のとおりです。
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
サイズ制限オペレーター
takeのようなサイズ制限中間オペレーターは、対応する制限に達するとフローの実行をキャンセルします。コルーチンでのキャンセルは常に例外をスローすることによって実行されるため、すべてのリソース管理関数(try { ... } finally { ... }
ブロックなど)はキャンセルの場合でも正常に動作します。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
全体のコードはこちらから入手できます。
このコードの出力は、numbers()
関数内のflow { ... }
ボディの実行が2番目の数値のemit後に停止したことを明確に示しています。
1
2
Finally in numbers
終端Flowオペレーター
フローの終端オペレーターは、フローの収集を開始する_停止関数_です。 collectオペレーターが最も基本的なものですが、他にも収集を容易にする終端オペレーターがあります。
例えば:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
}
全体のコードはこちらから入手できます。
単一の数値を出力します。
55
Flowは逐次的
複数のフローを操作する特殊なオペレーターが使用されない限り、フローの個々の収集は順次実行されます。収集は、終端オペレーターを呼び出すコルーチンで直接機能します。デフォルトでは新しいコルーチンは起動されません。 emitされた各値は、アップストリームからダウンストリームまですべての中間オペレーターによって処理され、その後、終端オペレーターに届けられます。
偶数整数をフィルタリングし、文字列にマッピングする次の例を参照してください。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
}
全体のコードはこちらから入手できます。
出力:
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
Flowコンテキスト
フローの収集は常に、呼び出し元のコルーチンのコンテキストで発生します。たとえば、simple
フローがある場合、simple
フローの実装詳細に関わらず、以下のコードはこのコードの作成者によって指定されたコンテキストで実行されます。
withContext(context) {
simple().collect { value ->
println(value) // run in the specified context
}
}
このフローのプロパティは、_コンテキストの保存_と呼ばれます。
したがって、デフォルトでは、flow { ... }
ビルダー内のコードは、対応するフローのコレクターによって提供されるコンテキストで実行されます。たとえば、呼び出されたスレッドをプリントし、3つの数値をemitするsimple
関数の実装を考えてみましょう。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
全体のコードはこちらから入手できます。
このコードを実行すると、次が生成されます。
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
simple().collect
がメインスレッドから呼び出されるため、simple
のフローの本体もメインスレッドで呼び出されます。これは、実行コンテキストを気にせず、呼び出し元をブロックしない高速に実行される、または非同期コードにとって完璧なデフォルトです。
withContext
使用時の一般的な落とし穴
しかし、長時間実行されるCPUを大量に消費するコードはDispatchers.Defaultのコンテキストで実行する必要があるかもしれませんし、UI更新コードはDispatchers.Mainのコンテキストで実行する必要があるかもしれません。通常、Kotlinコルーチンを使用するコードでコンテキストを変更するにはwithContextが使用されますが、flow { ... }
ビルダー内のコードはコンテキストの保存プロパティを尊重する必要があり、異なるコンテキストからemitすることは許可されていません。
次のコードを実行してみてください。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
全体のコードはこちらから入手できます。
このコードは次の例外を生成します。
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
flowOn
オペレーター
この例外は、フローのemitのコンテキストを変更するために使用されるべきflowOn関数に言及しています。 フローのコンテキストを変更する正しい方法は以下の例に示されており、すべてがどのように機能するかを示すために対応するスレッド名もプリントされます。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
全体のコードはこちらから入手できます。
flow { ... }
がバックグラウンドスレッドで動作し、収集がメインスレッドで発生することに注目してください。
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
ここで注目すべきもう1つの点は、flowOnオペレーターがフローのデフォルトの逐次的な性質を変更したことです。 今や収集は1つのコルーチン(「coroutine#1」)で発生し、emitは別のスレッドで収集コルーチンと並行して実行されている別のコルーチン(「coroutine#2」)で発生します。flowOnオペレーターは、コンテキスト内のCoroutineDispatcherを変更する必要がある場合に、アップストリームフローのために別のコルーチンを作成します。
バッファリング
フローの異なる部分を異なるコルーチンで実行することは、フローを収集するのにかかる全体的な時間の観点から役立ちます。特に、長時間実行される非同期操作が関係している場合にそうです。たとえば、simple
フローによるemitが遅く、要素の生成に100ミリ秒かかる場合を考えてみましょう。そして、コレクターも遅く、要素の処理に300ミリ秒かかります。そのそのようなフローを3つの数値で収集するのにどれくらいの時間がかかるか見てみましょう。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
全体のコードはこちらから入手できます。
これは次のようなものを生成し、全体の収集には約1200ミリ秒かかります(3つの数値、それぞれに400ミリ秒)。
1
2
3
Collected in 1220 ms
フローにbufferオペレーターを使用すると、simple
フローのemitコードを収集コードと並行して実行できます(逐次的に実行するのではなく)。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
全体のコードはこちらから入手できます。
同じ数値がより速く生成されます。これは、処理パイプラインを効果的に作成したためで、最初の数値のために100ミリ秒だけ待てばよく、その後は各数値を処理するために300ミリ秒しか費やしません。この方法では、実行に約1000ミリ秒かかります。
1
2
3
Collected in 1071 ms
flowOnオペレーターはCoroutineDispatcherを変更する必要があるときに同じバッファリングメカニズムを使用しますが、ここでは実行コンテキストを変更せずに明示的にバッファリングを要求していることに注意してください。
コンフレーション
フローが操作の部分的な結果または操作ステータスの更新を表す場合、各値を処理する必要はなく、代わりに最新の値のみを処理すればよい場合があります。この場合、conflateオペレーターを使用して、コレクターが値を処理するには遅すぎる場合に中間値をスキップできます。前の例に基づいて考えてみましょう。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
全体のコードはこちらから入手できます。
最初の数値がまだ処理されている間、2番目と3番目の数値はすでに生成されていたため、2番目のものは_コンフレーション_され、最新のもの(3番目のもの)だけがコレクターに届けられたことがわかります。
1
3
Collected in 758 ms
最新の値を処理する
コンフレーションは、emit元とコレクターの両方が遅い場合に処理を高速化する1つの方法です。これは、emitされた値をドロップすることで行われます。 もう1つの方法は、遅いコレクターをキャンセルし、新しい値がemitされるたびにそれを再起動することです。 xxxLatest
オペレーターには、xxx
オペレーターと同じ本質的なロジックを実行しますが、新しい値でブロック内のコードをキャンセルする一連のものが存在します。前の例でconflateをcollectLatestに変更してみましょう。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
}
全体のコードはこちらから入手できます。
collectLatestのボディは300ミリ秒かかりますが、新しい値は100ミリ秒ごとにemitされるため、ブロックはすべての値で実行されますが、最後の値に対してのみ完了することがわかります。
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
複数のFlowの合成
複数のFlowを合成する方法はたくさんあります。
Zip
Kotlin標準ライブラリのSequence.zip拡張関数と同様に、Flowには2つのフローの対応する値を結合するzipオペレーターがあります。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
}
全体のコードはこちらから入手できます。
この例は次を出力します。
1 -> one
2 -> two
3 -> three
Combine
フローが変数または操作の最新値を表す場合(関連する「コンフレーション」セクションも参照)、対応するフローの最新値に依存する計算を実行し、いずれかのアップストリームフローが値をemitするたびにそれを再計算する必要があるかもしれません。対応するオペレーターのファミリーはcombineと呼ばれます。
たとえば、前の例の数値が300ミリ秒ごとに更新され、文字列が400ミリ秒ごとに更新される場合でも、zipオペレーターを使用してそれらをzipすると、400ミリ秒ごとに結果がプリントされるものの、同じ結果が生成されます。
この例では、各要素を遅延させ、サンプルフローをemitするコードをより宣言的かつ短くするために、onEach中間オペレーターを使用しています。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
全体のコードはこちらから入手できます。
しかし、ここでzipの代わりにcombineオペレーターを使用すると、
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
全体のコードはこちらから入手できます。
nums
またはstrs
フローのいずれかからの各emitで1行がプリントされる、かなり異なる出力が得られます。
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start
Flowのフラット化
Flowは非同期に受信される値のシーケンスを表すため、各値が別の値のシーケンスへのリクエストをトリガーする状況に陥りやすいです。たとえば、500ミリ秒間隔で2つの文字列のフローを返す次の関数があるとします。
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
さて、3つの整数のフローがあり、それぞれに対してこのようにrequestFlow
を呼び出すと、
(1..3).asFlow().map { requestFlow(it) }
その後、さらなる処理のために単一のフローに_フラット化_する必要があるフローのフロー(Flow<Flow<String>>
)になってしまいます。コレクションとシーケンスには、このためのflattenおよびflatMapオペレーターがあります。しかし、フローの非同期的な性質のため、それらは異なる_フラット化モード_を必要とし、したがって、フローには一連のフラット化オペレーターが存在します。
flatMapConcat
フローのフローの連結は、flatMapConcatおよびflattenConcatオペレーターによって提供されます。それらは、対応するシーケンスオペレーターの最も直接的な類義語です。以下の例が示すように、次のフローの収集を開始する前に、内部フローが完了するのを待ちます。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
全体のコードはこちらから入手できます。
flatMapConcatの逐次的な性質は、出力で明確に確認できます。
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
flatMapMerge
もう1つのフラット化操作は、受信するすべてのフローを並行して収集し、それらの値を単一のフローにマージして、可能な限り早く値がemitされるようにすることです。 それはflatMapMergeおよびflattenMergeオペレーターによって実装されます。それらは両方とも、同時に収集される並行フローの数を制限するオプションのconcurrency
パラメーターを受け入れます(デフォルトではDEFAULT_CONCURRENCYに等しいです)。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
全体のコードはこちらから入手できます。
flatMapMergeの並行的な性質は明らかです。
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
flatMapMergeはそのコードブロック(この例では
{ requestFlow(it) }
)を逐次的に呼び出しますが、結果のフローを並行して収集することに注意してください。これは、まず逐次的なmap { requestFlow(it) }
を実行し、その結果に対してflattenMergeを呼び出すことと同じです。
flatMapLatest
「最新の値を処理する」セクションで説明したcollectLatestオペレーターと同様に、新しいフローがemitされるとすぐに以前のフローの収集がキャンセルされる、対応する「Latest」フラット化モードがあります。 それはflatMapLatestオペレーターによって実装されます。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
全体のコードはこちらから入手できます。
この例の出力は、flatMapLatestがどのように機能するかをよく示しています。
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
flatMapLatestは、新しい値が受信されたときに、そのブロック(この例では
{ requestFlow(it) }
)内のすべてのコードをキャンセルすることに注意してください。requestFlow
の呼び出し自体は高速で、停止せず、キャンセルできないため、この特定の例では違いはありません。しかし、requestFlow
でdelay
のような停止関数を使用した場合、出力に違いが見られるでしょう。
Flowの例外
フローの収集は、emit元またはオペレーター内のコードが例外をスローした場合に例外で完了する可能性があります。これらの例外を処理する方法はいくつかあります。
コレクターのtry
とcatch
コレクターは、Kotlinのtry/catch
ブロックを使用して例外を処理できます。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
全体のコードはこちらから入手できます。
このコードはcollect終端オペレーターで例外を正常にキャッチし、ご覧のとおり、その後は値がemitされません。
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
すべてがキャッチされる
前の例では、実際にはemit元、または任意の中間オペレーターや終端オペレーターで発生するすべての例外がキャッチされます。 たとえば、emitされた値が文字列にmapされるようにコードを変更し、対応するコードが例外を生成するようにしてみましょう。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
全体のコードはこちらから入手できます。
この例外は引き続きキャッチされ、収集は停止します。
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
例外の透過性
しかし、emit元のコードはどのようにして例外処理の振る舞いをカプセル化できるのでしょうか?
Flowは_例外に対して透過的_である必要があり、flow { ... }
ビルダー内でtry/catch
ブロックの内部から値をemitすることは、例外透過性の違反です。これにより、例外をスローするコレクターは、前の例のようにtry/catch
を使用して常にそれをキャッチできることが保証されます。
emit元は、この例外透過性を保持し、例外処理のカプセル化を可能にするcatchオペレーターを使用できます。catch
オペレーターのボディは例外を分析し、どの例外がキャッチされたかに応じてさまざまな方法で反応できます。
- 例外は
throw
を使用して再スローできます。 - 例外は、catchのボディからemitを使用して値のemitに変換できます。
- 例外は無視されたり、ログに記録されたり、他のコードによって処理されたりすることができます。
たとえば、例外をキャッチしたときにテキストをemitしてみましょう。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
}
全体のコードはこちらから入手できます。
コードの周りにtry/catch
がなくなったにもかかわらず、例の出力は同じです。
透過的なcatch
catch中間オペレーターは、例外透過性を尊重し、アップストリームの例外のみをキャッチします(つまり、catch
よりも上にあるすべてのオペレーターからの例外であり、それより下にあるものではありません)。 collect { ... }
内のブロック(catch
の下に配置される)が例外をスローすると、それはエスケープします。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") } // does not catch downstream exceptions
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
全体のコードはこちらから入手できます。
catch
オペレーターがあるにもかかわらず、「Caught ...」メッセージはプリントされません。
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at ...
宣言的なキャッチ
catchオペレーターの宣言的な性質と、すべての例外を処理したいという要望を組み合わせるには、collectオペレーターのボディをonEachに移動し、それをcatch
オペレーターの前に置きます。このフローの収集は、パラメータなしのcollect()
の呼び出しによってトリガーされる必要があります。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
}
全体のコードはこちらから入手できます。
これで、「Caught ...」メッセージがプリントされ、try/catch
ブロックを明示的に使用せずにすべての例外をキャッチできることがわかります。
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2
Flowの完了
フローの収集が完了すると(正常に、または例外的に)、アクションを実行する必要がある場合があります。 すでにお気づきかもしれませんが、これには命令的または宣言的な2つの方法があります。
命令的なfinally
ブロック
try
/catch
に加えて、コレクターはfinally
ブロックを使用してcollect
の完了時にアクションを実行することもできます。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("Done")
}
}
全体のコードはこちらから入手できます。
このコードは、simple
フローによって生成された3つの数値に続いて「Done」文字列をプリントします。
1
2
3
Done
宣言的な処理
宣言的なアプローチの場合、フローにはonCompletion中間オペレーターがあり、フローが完全に収集されたときに呼び出されます。
前の例はonCompletionオペレーターを使用して書き換えられ、同じ出力を生成します。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }
}
全体のコードはこちらから入手できます。
onCompletionの主な利点は、フローの収集が正常に完了したか、例外的に完了したかを判断するために使用できるラムダのnullableなThrowable
パラメータです。次の例では、simple
フローは数値1をemitした後に例外をスローします。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
全体のコードはこちらから入手できます。
ご想像のとおり、次がプリントされます。
1
Flow completed exceptionally
Caught exception
onCompletionオペレーターは、catchとは異なり、例外を処理しません。上記の例のコードからわかるように、例外は依然としてダウンストリームに流れます。それはさらにonCompletion
オペレーターに届けられ、catch
オペレーターで処理できます。
正常な完了
catchオペレーターとのもう1つの違いは、onCompletionがすべての例外を確認し、アップストリームフローの正常な完了(キャンセルまたは失敗なし)の場合にのみnull
例外を受け取ることです。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
全体のコードはこちらから入手できます。
フローがダウンストリームの例外によって中断されたため、完了原因がnullではないことがわかります。
1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
命令的 vs 宣言的
これで、フローを収集し、その完了と例外を命令的および宣言的の両方の方法で処理する方法がわかりました。 ここでの当然の疑問は、どちらのアプローチが推奨され、なぜかということです。 ライブラリとしては、特定のどのアプローチも推奨せず、どちらのオプションも有効であり、個人の好みとコードスタイルに応じて選択されるべきだと考えます。
Flowの起動
フローを使用して、何らかのソースから来る非同期イベントを表現するのは簡単です。 この場合、受信イベントに対する反応としてコードの一部を登録し、さらなる作業を継続するaddEventListener
関数に似たものが必要です。onEachオペレーターがこの役割を果たすことができます。 しかし、onEach
は中間オペレーターです。フローを収集するための終端オペレーターも必要です。 そうでなければ、onEach
を呼び出すだけでは効果がありません。
onEach
の後にcollect終端オペレーターを使用すると、その後のコードはフローが収集されるまで待機します。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Collecting the flow waits
println("Done")
}
全体のコードはこちらから入手できます。
ご覧のとおり、次がプリントされます。
Event: 1
Event: 2
Event: 3
Done
ここでlaunchIn終端オペレーターが役立ちます。collect
をlaunchIn
に置き換えることで、別のコルーチンでフローの収集を起動し、その後のコードの実行がすぐに継続するようにできます。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
全体のコードはこちらから入手できます。
これは次を出力します。
Done
Event: 1
Event: 2
Event: 3
launchIn
の必須パラメータは、フローを収集するためのコルーチンが起動されるCoroutineScopeを指定する必要があります。上記の例では、このスコープはrunBlockingコルーチンビルダーから来るため、フローが実行されている間、このrunBlockingスコープはその子コルーチンの完了を待機し、main関数が戻ってこの例を終了するのを防ぎます。
実際のアプリケーションでは、スコープは限られた寿命を持つエンティティから来ます。このエンティティの寿命が終了するとすぐに、対応するスコープがキャンセルされ、対応するフローの収集もキャンセルされます。このように、onEach { ... }.launchIn(scope)
のペアはaddEventListener
のように機能します。しかし、キャンセルと構造化された並行性がこの目的を果たすため、対応するremoveEventListener
関数は必要ありません。
launchInはJobも返し、これはスコープ全体をキャンセルすることなく、対応するフロー収集コルーチンをcancelしたり、それにjoinしたりするために使用できることに注意してください。
Flowのキャンセルチェック
利便性のために、flowビルダーは、emitされる各値に対して追加のensureActiveキャンセルチェックを実行します。 これは、flow { ... }
からemitするビジーループがキャンセル可能であることを意味します。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
全体のコードはこちらから入手できます。
3までの数値と、4番目の数値をemitしようとした後にCancellationExceptionのみが取得されます。
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
しかし、他のほとんどのフローオペレーターは、パフォーマンス上の理由から、独自の追加のキャンセルチェックを実行しません。 たとえば、IntRange.asFlow拡張を使用して同じビジーループを記述し、どこも停止しない場合、キャンセルチェックは行われません。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
全体のコードはこちらから入手できます。
1から5まですべての数値が収集され、キャンセルはrunBlocking
から戻る直前にのみ検出されます。
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
ビジーフローをキャンセル可能にする
コルーチンを使用したビジーループがある場合は、明示的にキャンセルをチェックする必要があります。 .onEach { currentCoroutineContext().ensureActive() }
を追加できますが、そのためにcancellableオペレーターが用意されています。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
全体のコードはこちらから入手できます。
cancellable
オペレーターを使用すると、1から3までの数値のみが収集されます。
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
FlowとReactive Streams
Reactive StreamsやRxJava、Project Reactorなどのリアクティブフレームワークに精通している人にとっては、Flowの設計は非常に馴染み深く見えるかもしれません。
実際、その設計はReactive Streamsとその様々な実装に触発されています。しかし、Flowの主な目標は、可能な限りシンプルな設計を持ち、Kotlinと停止に友好的であり、構造化された並行性を尊重することです。この目標を達成することは、リアクティブの先駆者とその多大な貢献なしには不可能でした。完全な話は、「Reactive Streams and Kotlin Flows」の記事で読むことができます。
異なる点はあるものの、概念的にはFlowはリアクティブストリームであり、リアクティブ(仕様およびTCK準拠)なPublisherに変換することも、その逆も可能です。 そのような変換はkotlinx.coroutines
によってすぐに提供されており、対応するリアクティブモジュール(Reactive Streams用にはkotlinx-coroutines-reactive
、Project Reactor用にはkotlinx-coroutines-reactor
、RxJava2/RxJava3用にはkotlinx-coroutines-rx2
/kotlinx-coroutines-rx3
)で見つけることができます。 統合モジュールには、Flow
との相互変換、ReactorのContext
との統合、および様々なリアクティブエンティティを扱うための停止に友好的な方法が含まれています。