suspendfunselectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) { select<Unit> { // <Unit> means that this select expression does not produce any result fizz.onReceive { value -> // this is the first select clause println("fizz -> '$value'") } buzz.onReceive { value -> // this is the second select clause println("buzz -> '$value'") } } }
fun CoroutineScope.fizz() = produce<String> { while (true) { // sends "Fizz" every 300 ms delay(300) send("Fizz") } }
fun CoroutineScope.buzz() = produce<String> { while (true) { // sends "Buzz!" every 500 ms delay(500) send("Buzz!") } }
suspendfunselectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) { select<Unit> { // <Unit> means that this select expression does not produce any result fizz.onReceive { value -> // this is the first select clause println("fizz -> '$value'") } buzz.onReceive { value -> // this is the second select clause println("buzz -> '$value'") } } }
suspendfunselectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = select<String> { a.onReceiveOrNull { value -> if (value == null) "Channel 'a' is closed" else "a -> '$value'" } b.onReceiveOrNull { value -> if (value == null) "Channel 'b' is closed" else "b -> '$value'" } } funmain() = runBlocking<Unit> { //sampleStart val a = produce<String> { repeat(4) { send("Hello $it") } } val b = produce<String> { repeat(4) { send("World $it") } } repeat(8) { // print first eight results println(selectAorB(a, b)) } coroutineContext.cancelChildren() //sampleEnd }
这段代码的结果非常有趣,所以我们将在细节中分析它:
1 2 3 4 5 6 7 8
a -> 'Hello 0' a -> 'Hello 1' b -> 'World 0' a -> 'Hello 2' a -> 'Hello 3' b -> 'World 1' Channel 'a'is closed Channel 'a'is closed
从中可以观察到几点
首先,select 偏向于第一个子句。当同时可以选择多个子句时,将选择其中的第一个子句。在这里,两个通道都在不断地产生字符串,因此作为 select 中的第一个子句的通道获胜。但是,因为我们使用的是无缓冲通道,所以 a 在其发送调用时会不时地被挂起,从而给了 b 发送的机会
第二个观察结果是,当通道已经关闭时,onReceiveOrNull 将立即被选中
三、Selecting to send
select 表达式有 onSend 子句,可以与 selection 的偏向性质结合使用。 让我们写一个整数生产者的例子,当主通道上的消费者跟不上时,它会将其值发送到 side 通道:
1 2 3 4 5 6 7 8 9
fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> { for (num in1..10) { // produce 10 numbers from 1 to 10 delay(100) // every 100 ms select<Unit> { onSend(num) {} // Send to the primary channel side.onSend(num) {} // or to the side channel } } }
fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> { for (num in1..10) { // produce 10 numbers from 1 to 10 delay(100) // every 100 ms select<Unit> { onSend(num) {} // Send to the primary channel side.onSend(num) {} // or to the side channel } } }
funmain() = runBlocking<Unit> { //sampleStart val side = Channel<Int>() // allocate side channel launch { // this is a very fast consumer for the side channel side.consumeEach { println("Side channel has $it") } } produceNumbers(side).consumeEach { println("Consuming $it") delay(250) // let us digest the consumed number properly, do not hurry } println("Done consuming") coroutineContext.cancelChildren() //sampleEnd }
让我们看看会发生什么:
1 2 3 4 5 6 7 8 9 10 11
Consuming 1 Side channel has 2 Side channel has 3 Consuming 4 Side channel has 5 Side channel has 6 Consuming 7 Side channel has 8 Side channel has 9 Consuming 10 Done consuming
fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> { var current = input.receive() // start with first received deferred value while (isActive) { // loop while not cancelled/closed val next = select<Deferred<String>?> { // return next deferred value from this select or null input.onReceiveOrNull { update -> update // replaces next value to wait } current.onAwait { value -> send(value) // send value that current deferred has produced input.receiveOrNull() // and use the next deferred from the input channel } } if (next == null) { println("Channel was closed") break// out of loop } else { current = next } } }
import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> { var current = input.receive() // start with first received deferred value while (isActive) { // loop while not cancelled/closed val next = select<Deferred<String>?> { // return next deferred value from this select or null input.onReceiveOrNull { update -> update // replaces next value to wait } current.onAwait { value -> send(value) // send value that current deferred has produced input.receiveOrNull() // and use the next deferred from the input channel } } if (next == null) { println("Channel was closed") break// out of loop } else { current = next } } }
funmain() = runBlocking<Unit> { //sampleStart val chan = Channel<Deferred<String>>() // the channel for test launch { // launch printing coroutine for (s in switchMapDeferreds(chan)) println(s) // print each received string } chan.send(asyncString("BEGIN", 100)) delay(200) // enough time for "BEGIN" to be produced chan.send(asyncString("Slow", 500)) delay(100) // not enough time to produce slow chan.send(asyncString("Replace", 100)) delay(500) // give it time before the last one chan.send(asyncString("END", 500)) delay(1000) // give it time to process chan.close() // close the channel ... delay(500) // and wait some time to let it finish //sampleEnd }