Skip to content

通道

延遲值(Deferred values)提供了一種在協程(coroutines)之間傳輸單個值的便捷方式。 通道(Channels)提供了一種傳輸值串流的方式。

通道基礎

一個 Channel 在概念上與 BlockingQueue 非常相似。一個主要區別是, 它沒有阻塞的 put 操作,而是有掛起(suspending)的 send 操作;也沒有 阻塞的 take 操作,而是有掛起(suspending)的 receive 操作。

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        // this might be heavy CPU-consuming computation or async logic, 
        // we'll just send five squares
        for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

您可以在此處獲取完整程式碼。

此程式碼的輸出為:

text
1
4
9
16
25
Done!

關閉通道和迭代通道

與佇列(queue)不同,通道可以關閉以指示不再有元素進入。 在接收端,使用常規的 for 迴圈從通道接收元素非常方便。

概念上,close 就像是向通道發送一個特殊的關閉標記(token)。 一旦接收到此關閉標記,迭代就會停止,因此保證在關閉之前發送的所有元素都已被接收:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
}

您可以在此處獲取完整程式碼。

建立通道生產者

協程產生元素序列的模式非常常見。 這是並行程式碼中常見的「生產者-消費者」(producer-consumer)模式的一部分。 您可以將這樣的生產者抽象為一個以通道作為參數的函數,但這與「函數必須返回值」的常識相悖。

有一個方便的協程建構器名為 produce,它使得在生產者端正確操作變得容易, 還有一個擴展函數 consumeEach,它取代了消費者端的 for 迴圈:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}

fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}

您可以在此處獲取完整程式碼。

管線

管線(pipeline)是一種模式,其中一個協程產生一個可能無限的值串流:

kotlin
fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

另一個或多個協程則消費該串流,進行一些處理,並產生其他結果。 在下面的範例中,數字只是被平方:

kotlin
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}

主程式碼啟動並連接整個管線:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    repeat(5) {
        println(squares.receive()) // print first five
    }
    println("Done!") // we are done
    coroutineContext.cancelChildren() // cancel children coroutines
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}

您可以在此處獲取完整程式碼。

所有建立協程的函數都定義為 CoroutineScope 的擴展, 因此我們可以依靠結構化並行(structured concurrency)來確保我們的應用程式中沒有 懸而未決的全域協程。

使用管線生成質數

讓我們透過一個使用協程管線生成質數的範例,將管線應用推向極致。 我們從一個無限的數字序列開始。

kotlin
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

以下管線階段過濾傳入的數字串流,移除所有可被給定質數整除的數字:

kotlin
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}

現在我們透過從 2 開始一個數字串流、從當前通道獲取一個質數、並為每個找到的質數啟動一個新的管線階段來建立我們的管線:

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...

以下範例印出前十個質數, 在主執行緒的上下文(context)中執行整個管線。由於所有協程都是在主 runBlocking 協程的 作用域(scope)中啟動的,我們無需保留所有已啟動協程的明確列表。 我們使用 cancelChildren 擴展函數來取消所有子協程,在印出 前十個質數之後。

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    var cur = numbersFrom(2)
    repeat(10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}

您可以在此處獲取完整程式碼。

此程式碼的輸出為:

text
2
3
5
7
11
13
17
19
23
29

請注意,您可以使用標準函式庫中的 iterator 協程建構器來建立相同的管線。 將 produce 替換為 iteratorsend 替換為 yieldreceive 替換為 nextReceiveChannel 替換為 Iterator,並去除協程作用域。您也無需 runBlocking。 然而,如上所示,使用通道的管線的好處是,如果您在 Dispatchers.Default 上下文中運行它,它實際上可以使用 多個 CPU 核心。

無論如何,這是一種極不實用的尋找質數的方法。實際上,管線確實涉及一些 其他掛起調用(例如對遠端服務的非同步呼叫),並且這些管線無法使用 sequence/iterator 建立,因為它們不允許任意掛起,這與 完全非同步的 produce 不同。

扇出(Fan-out)

多個協程可以從同一個通道接收,將工作分發給彼此。 讓我們從一個定期產生整數(每秒十個數字)的生產者協程開始:

kotlin
fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

然後我們可以有多個處理器協程。在這個範例中,它們只是印出自己的 ID 和 接收到的數字:

kotlin
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }    
}

現在讓我們啟動五個處理器,讓它們工作近一秒。看看會發生什麼:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }    
}

您可以在此處獲取完整程式碼。

輸出將類似於以下內容,儘管接收 每個特定整數的處理器 ID 可能不同:

text
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

請注意,取消生產者協程會關閉其通道,從而最終終止處理器協程正在進行的通道迭代。

此外,請注意我們如何在 launchProcessor 程式碼中明確使用 for 迴圈迭代通道以執行扇出(fan-out)。 與 consumeEach 不同,這種 for 迴圈模式在多個協程中使用是完全安全的。如果其中一個處理器 協程失敗,那麼其他協程仍會繼續處理通道,而透過 consumeEach 編寫的處理器在其正常或異常完成時總會消費(取消)底層通道。

扇入(Fan-in)

多個協程可以向同一個通道發送。 例如,讓我們有一個字串通道,以及一個重複以指定延遲向此通道發送指定字串的掛起函數:

kotlin
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

現在,讓我們看看如果我們啟動幾個發送字串的協程會發生什麼(在這個範例中,我們將它們作為主協程的子項在主執行緒的上下文(context)中啟動):

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(6) { // receive first six
        println(channel.receive())
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

您可以在此處獲取完整程式碼。

輸出為:

text
foo
foo
BAR!
foo
foo
BAR!

緩衝通道

到目前為止所示的通道都沒有緩衝區。無緩衝通道在發送者和接收者 相遇(又稱會合,rendezvous)時傳輸元素。如果 send 先被調用,則會掛起直到 receive 被調用; 如果 receive 先被調用,則會掛起直到 send 被調用。

Channel() 工廠函數和 produce 建構器都接受一個可選的 capacity 參數來 指定「緩衝區大小」(buffer size)。緩衝區允許發送者在掛起之前發送多個元素, 類似於具有指定容量的 BlockingQueue,當緩衝區滿時它會阻塞。

看看以下程式碼的行為:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>(4) // create buffered channel
    val sender = launch { // launch sender coroutine
        repeat(10) {
            println("Sending $it") // print before sending each element
            channel.send(it) // will suspend when buffer is full
        }
    }
    // don't receive anything... just wait....
    delay(1000)
    sender.cancel() // cancel sender coroutine
}

您可以在此處獲取完整程式碼。

它使用容量為「四」的緩衝通道,印出「sending」字樣「五」次:

text
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

前四個元素被添加到緩衝區,發送者在嘗試發送第五個元素時會掛起。

通道是公平的

通道上的發送和接收操作對於來自多個協程的調用順序是「公平的」(fair)。 它們以先進先出(first-in first-out)的順序服務,例如,第一個調用 receive 的協程會獲得該元素。 在以下範例中,兩個協程「ping」和「pong」正從共享的「table」通道接收「ball」物件。

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}

您可以在此處獲取完整程式碼。

「ping」協程先啟動,所以它是第一個接收到球的。儘管「ping」 協程在將球發回桌面後立即再次開始接收球,但球還是被「pong」協程接收了, 因為它已經在等待了:

text
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

請注意,有時通道可能會產生看起來不公平的執行,這是由於所使用的執行器(executor)的性質所致。 詳情請參閱此問題

時鐘通道(Ticker channels)

「時鐘通道」(Ticker channel)是一種特殊的會合通道,它在每次從該通道消費後,經過給定延遲後產生 Unit。 雖然它單獨看起來可能沒用,但它是建立複雜基於時間的 produce 管線以及執行視窗化(windowing)和其他時間相關處理的運算子(operators)的有用建構塊。 時鐘通道可以用於 select 以執行「按時鐘脈衝」(on tick)動作。

若要建立此類通道,請使用工廠方法 ticker。 為指示不再需要更多元素,請在其上使用 ReceiveChannel.cancel 方法。

現在讓我們看看它在實踐中如何運作:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    val tickerChannel = ticker(delayMillis = 200, initialDelayMillis = 0) // create a ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // no initial delay

    nextElement = withTimeoutOrNull(100) { tickerChannel.receive() } // all subsequent elements have 200ms delay
    println("Next element is not ready in 100 ms: $nextElement")

    nextElement = withTimeoutOrNull(120) { tickerChannel.receive() }
    println("Next element is ready in 200 ms: $nextElement")

    // Emulate large consumption delays
    println("Consumer pauses for 300ms")
    delay(300)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(120) { tickerChannel.receive() }
    println("Next element is ready in 100ms after consumer pause in 300ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}

您可以在此處獲取完整程式碼。

它會印出以下幾行:

text
Initial element is available immediately: kotlin.Unit
Next element is not ready in 100 ms: null
Next element is ready in 200 ms: kotlin.Unit
Consumer pauses for 300ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 100ms after consumer pause in 300ms: kotlin.Unit

請注意,ticker 會意識到可能的消費者暫停,並且預設情況下,如果發生暫停,它會調整下一個產生元素的 延遲,試圖維持固定的元素產生速率。

(可選)可以指定一個等於 TickerMode.FIXED_DELAYmode 參數,以維持元素之間固定的延遲。