如果我们使用一些 CPU 消耗型 的阻塞代码(每次计算需要100毫秒)来计算数字,那么我们可以使用一个序列(Sequence)来表示数字:
1 2 3 4 5 6 7 8 9 10 11
funfoo(): Sequence<Int> = sequence { // sequence builder for (i in1..3) { Thread.sleep(100) // pretend we are computing it yield(i) // yield next value } }
funmain() { foo().forEach { value -> println(value) } }
//sampleStart funfoo(): Flow<Int> = flow { // flow builder for (i in1..3) { delay(100) // pretend we are doing something useful here emit(i) // emit next value } }
funmain() = runBlocking<Unit> { // Launch a concurrent coroutine to check if the main thread is blocked launch { for (k in1..3) { println("I'm not blocked $k") delay(100) } } // Collect the flow foo().collect { value -> println(value) } } //sampleEnd
此代码在打印每个数字前等待100毫秒,但不会阻塞主线程。通过从主线程中运行的单独协程中每隔100毫秒打印了一次 “I’m not blocked”,可以验证这一点:
1 2 3 4 5 6
I'm not blocked 1 1 I'm not blocked 2 2 I'm not blocked 3 3
funmain() = runBlocking<Unit> { //sampleStart val sum = (1..5).asFlow() .map { it * it } // squares of numbers from 1 to 5 .reduce { a, b -> a + b } // sum them (terminal operator) println(sum) //sampleEnd }
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart funfoo(): Flow<Int> = flow { // The WRONG way to change context for CPU-consuming code in flow builder kotlinx.coroutines.withContext(Dispatchers.Default) { for (i in1..3) { Thread.sleep(100) // pretend we are computing it in CPU-consuming way emit(i) // emit next value } } }
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323], but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher]. Please refer to 'flow' documentation or use 'flowOn' instead at ...
funlog(msg: String) = println("[${Thread.currentThread().name}] $msg") //sampleStart funfoo(): Flow<Int> = flow { for (i in1..3) { Thread.sleep(100) // pretend we are computing it in CPU-consuming way log("Emitting $i") emit(i) // emit next value } }.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
//sampleStart funfoo(): Flow<Int> = flow { for (i in1..3) { delay(100) // pretend we are asynchronously waiting 100 ms emit(i) // emit next value } }
funmain() = runBlocking<Unit> { val time = measureTimeMillis { foo().collect { value -> delay(300) // pretend we are processing it for 300 ms println(value) } } println("Collected in $time ms") } //sampleEnd
funfoo(): Flow<Int> = flow { for (i in1..3) { delay(100) // pretend we are asynchronously waiting 100 ms emit(i) // emit next value } }
funmain() = runBlocking<Unit> { //sampleStart val time = measureTimeMillis { foo() .buffer() // buffer emissions, don't wait .collect { value -> delay(300) // pretend we are processing it for 300 ms println(value) } } println("Collected in $time ms") //sampleEnd }
funfoo(): Flow<Int> = flow { for (i in1..3) { delay(100) // pretend we are asynchronously waiting 100 ms emit(i) // emit next value } }
funmain() = runBlocking<Unit> { //sampleStart val time = measureTimeMillis { foo() .conflate() // conflate emissions, don't process each one .collect { value -> delay(300) // pretend we are processing it for 300 ms println(value) } } println("Collected in $time ms") //sampleEnd }
funfoo(): Flow<Int> = flow { for (i in1..3) { delay(100) // pretend we are asynchronously waiting 100 ms emit(i) // emit next value } }
funmain() = runBlocking<Unit> { //sampleStart val time = measureTimeMillis { foo() .collectLatest { value -> // cancel & restart on the latest value println("Collecting $value") delay(300) // pretend we are processing it for 300 ms println("Done $value") } } println("Collected in $time ms") //sampleEnd }
funmain() = runBlocking<Unit> { //sampleStart val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms val startTime = System.currentTimeMillis() // remember the start time nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip" .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //sampleEnd }
funmain() = runBlocking<Unit> { //sampleStart val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms val startTime = System.currentTimeMillis() // remember the start time nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine" .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //sampleEnd }
我们得到了完全不同的输出:
1 2 3 4 5
1 -> one at 452 ms from start 2 -> one at 651 ms from start 2 -> two at 854 ms from start 3 -> two at 952 ms from start 3 -> three at 1256 ms from start
funmain() = runBlocking<Unit> { //sampleStart val startTime = System.currentTimeMillis() // remember the start time (1..3).asFlow().onEach { delay(100) } // a number every 100 ms .flatMapConcat { requestFlow(it) } .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //sampleEnd }
flatMapConcat 的顺序特性在输出结果中清晰可见:
1 2 3 4 5 6
1: First at 121 ms from start 1: Second at 622 ms from start 2: First at 727 ms from start 2: Second at 1227 ms from start 3: First at 1328 ms from start 3: Second at 1829 ms from start
funmain() = runBlocking<Unit> { //sampleStart val startTime = System.currentTimeMillis() // remember the start time (1..3).asFlow().onEach { delay(100) } // a number every 100 ms .flatMapMerge { requestFlow(it) } .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //sampleEnd }
flatMapMerge 的并发性是显而易见的:
1 2 3 4 5 6
1: First at 136 ms from start 2: First at 231 ms from start 3: First at 333 ms from start 1: Second at 639 ms from start 2: Second at 732 ms from start 3: Second at 833 ms from start
funmain() = runBlocking<Unit> { //sampleStart val startTime = System.currentTimeMillis() // remember the start time (1..3).asFlow().onEach { delay(100) } // a number every 100 ms .flatMapLatest { requestFlow(it) } .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //sampleEnd }
本例中的输出很好的演示了 flatMapLatest 的工作原理
1 2 3 4
1: First at 142 ms from start 2: First at 322 ms from start 3: First at 425 ms from start 3: Second at 931 ms from start
For those who are familiar with Reactive Streams or reactive frameworks such as RxJava and project Reactor, design of the Flow may look very familiar.
Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible, be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in Reactive Streams and Kotlin Flows article.
While being different, conceptually, Flow is a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa. Such converters are provided by kotlinx.coroutines out-of-the-box and can be found in corresponding reactive modules (kotlinx-coroutines-reactive for Reactive Streams, kotlinx-coroutines-reactor for Project Reactor and kotlinx-coroutines-rx2 for RxJava2). Integration modules include conversions from and to Flow, integration with Reactor’s Context and suspension-friendly ways to work with various reactive entities.