funmain() = runBlocking { //sampleStart val channel = Channel<Int>() launch { // this might be heavy CPU-consuming computation or async logic, we'll just send five squares for (x in1..5) channel.send(x * x) } // here we print five received integers: repeat(5) { println(channel.receive()) } println("Done!") //sampleEnd }
输出结果是:
1 2 3 4 5 6
1 4 9 16 25 Done!
二、关闭和迭代通道
与队列不同,通道可以关闭,以此来表明元素已发送完成。在接收方,使用常规的 for 循环从通道接收元素是比较方便的
从概念上讲,close 类似于向通道发送一个特殊的 cloase 标记。一旦接收到这个 close 标记,迭代就会停止,因此可以保证接收到 close 之前发送的所有元素:
funmain() = runBlocking { //sampleStart val channel = Channel<Int>() launch { for (x in1..5) channel.send(x * x) channel.close() // we're done sending } // here we print received values using `for` loop (until the channel is closed) for (y in channel) println(y) println("Done!") //sampleEnd }
funmain() = runBlocking { //sampleStart val numbers = produceNumbers() // produces integers from 1 and on val squares = square(numbers) // squares integers repeat(5) { println(squares.receive()) // print first five } println("Done!") // we are done coroutineContext.cancelChildren() // cancel children coroutines //sampleEnd }
fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) send(x++) // infinite stream of integers starting from 1 }
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { for (x in numbers) send(x * x) }
funmain() = runBlocking { //sampleStart var cur = numbersFrom(2) repeat(10) { val prime = cur.receive() println(prime) cur = filter(cur, prime) } coroutineContext.cancelChildren() // cancel all children to let main finish //sampleEnd }
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { var x = start while (true) send(x++) // infinite stream of integers from start }
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { for (x in numbers) if (x % prime != 0) send(x) }
fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1// start from 1 while (true) { send(x++) // produce next delay(100) // wait 0.1s } }
然后我们可以有多个处理器(processor)协程。在本例中,他们只需打印他们的 id 和接收的数字:
1 2 3 4 5
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { for (msg in channel) { println("Processor #$id received $msg") } }
funmain() = runBlocking<Unit> { //sampleStart val producer = produceNumbers() repeat(5) { launchProcessor(it, producer) } delay(950) producer.cancel() // cancel producer coroutine and thus kill them all //sampleEnd }
fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1// start from 1 while (true) { send(x++) // produce next delay(100) // wait 0.1s } }
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { for (msg in channel) { println("Processor #$id received $msg") } }
尽管接收每个特定整数的处理器 id 可能不同,但运行结果将类似于以下输出:
1 2 3 4 5 6 7 8 9 10
Processor #2 received 1 Processor #4 received 2 Processor #0 received 3 Processor #1 received 4 Processor #3 received 5 Processor #2 received 6 Processor #4 received 7 Processor #0 received 8 Processor #1 received 9 Processor #3 received 10
funmain() = runBlocking { //sampleStart val channel = Channel<String>() launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR!", 500L) } repeat(6) { // receive first six println(channel.receive()) } coroutineContext.cancelChildren() // cancel all children to let main finish //sampleEnd }
funmain() = runBlocking { val table = Channel<Ball>() // a shared table launch { player("ping", table) } launch { player("pong", table) } table.send(Ball(0)) // serve the ball delay(1000) // delay 1 second coroutineContext.cancelChildren() // game over, cancel them }
suspendfunplayer(name: String, table: Channel<Ball>) { for (ball in table) { // receive the ball in a loop ball.hits++ println("$name$ball") delay(300) // wait a bit table.send(ball) // send the ball back } } //sampleEnd
计时器通道是一种特殊的会合(rendezvous)通道,自该通道的最后一次消耗以来,每次给定的延迟时间结束后都将返回 Unit 值。尽管它看起来是无用处的,但它是一个有用的构建块,可以创建复杂的基于时间的 produce 管道和进行窗口化操作以及其它时间相关的处理。计时器通道可用于 select 执行 “on tick” 操作
funmain() = runBlocking<Unit> { val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays println("Consumer pauses for 150ms") delay(150) // Next element is available immediately nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Next element is available immediately after large consumer delay: $nextElement") // Note that the pause between `receive` calls is taken into account and next element arrives faster nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed }
运行结果:
1 2 3 4 5 6
Initial element is available immediately: kotlin.Unit Next element is not ready in50 ms: null Next element is ready in100 ms: kotlin.Unit Consumer pauses for 150ms Next element is available immediately after large consumer delay: kotlin.Unit Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit