suspendfunmassiveRun(action: suspend () -> Unit) { val n = 100// number of coroutines to launch val k = 1000// times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") }
suspendfunmassiveRun(action: suspend () -> Unit) { val n = 100// number of coroutines to launch val k = 1000// times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") }
suspendfunmassiveRun(action: suspend () -> Unit) { val n = 100// number of coroutines to launch val k = 1000// times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") }
//sampleStart @Volatile// in Kotlin `volatile` is an annotation var counter = 0
suspendfunmassiveRun(action: suspend () -> Unit) { val n = 100// number of coroutines to launch val k = 1000// times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") }
suspendfunmassiveRun(action: suspend () -> Unit) { val n = 100// number of coroutines to launch val k = 1000// times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") }
//sampleStart val counterContext = newSingleThreadContext("CounterContext") var counter = 0
funmain() = runBlocking { withContext(Dispatchers.Default) { massiveRun { // confine each increment to a single-threaded context withContext(counterContext) { counter++ } } } println("Counter = $counter") } //sampleEnd
suspendfunmassiveRun(action: suspend () -> Unit) { val n = 100// number of coroutines to launch val k = 1000// times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") }
//sampleStart val counterContext = newSingleThreadContext("CounterContext") var counter = 0
suspendfunmassiveRun(action: suspend () -> Unit) { val n = 100// number of coroutines to launch val k = 1000// times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") }
actor 是一个实体,由一个协程、被限制并封装到这个协程中的状态以及一个与其它协程通信的通道组成。简单的 actor 可以写成函数,但具有复杂状态的 actor 更适合类
有一个 actor 协程构造器,它可以方便地将 actor 的 mailbox channel 合并到其接收的消息的作用域中,并将 send channel 合并到生成的 job 对象中,以便可以将对 actor 的单个引用作为其句柄引有
使用 actor 的第一步是定义一类 actor 将要处理的消息。kotlin 的密封类非常适合这个目的。在 CounterMsg 密封类中,我们用 IncCounter 消息来定义递增计数器,用 GetCounter 消息来获取其值,后者需要返回值。为此,这里使用 CompletableDeferred communication primitive,它表示将来已知(通信)的单个值
1 2 3 4
// Message types for counterActor sealedclassCounterMsg object IncCounter : CounterMsg() // one-way message to increment counter classGetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
然后,我们定义一个函数,该函数使用 actor 协程构造器来启动 actor:
1 2 3 4 5 6 7 8 9 10
// This function launches a new counter actor fun CoroutineScope.counterActor() = actor<CounterMsg> { var counter = 0// actor state for (msg in channel) { // iterate over incoming messages when (msg) { is IncCounter -> counter++ is GetCounter -> msg.response.complete(counter) } } }
suspendfunmassiveRun(action: suspend () -> Unit) { val n = 100// number of coroutines to launch val k = 1000// times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") }
// Message types for counterActor sealedclassCounterMsg object IncCounter : CounterMsg() // one-way message to increment counter classGetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
// This function launches a new counter actor fun CoroutineScope.counterActor() = actor<CounterMsg> { var counter = 0// actor state for (msg in channel) { // iterate over incoming messages when (msg) { is IncCounter -> counter++ is GetCounter -> msg.response.complete(counter) } } }
//sampleStart funmain() = runBlocking<Unit> { val counter = counterActor() // create the actor withContext(Dispatchers.Default) { massiveRun { counter.send(IncCounter) } } // send a message to get a counter value from an actor val response = CompletableDeferred<Int>() counter.send(GetCounter(response)) println("Counter = ${response.await()}") counter.close() // shutdown the actor } //sampleEnd
在什么上下文中执行 actor 本身并不重要(为了正确)。actor 是一个协程,并且协程是按顺序执行的,因此将状态限制到特定的协程可以解决共享可变状态的问题。实际上,actors 可以修改自己的私有状态,但只能通过消息相互影响(避免需要任何锁)