共有可能なミュータブルステートと並行処理
コルーチンは、Dispatchers.Default のようなマルチスレッドディスパッチャーを使用して並行して実行できます。これは、典型的な並行処理におけるあらゆる問題を引き起こします。その主な問題は、共有可能なミュータブルステートへのアクセスの同期です。コルーチンの世界におけるこの問題のいくつかの解決策は、マルチスレッドの世界での解決策と類似していますが、他は独特です。
問題
100個のコルーチンを起動し、それぞれが同じアクションを1000回実行するようにしてみましょう。また、今後の比較のためにそれらの完了時間も測定します。
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
まず、マルチスレッドのDispatchers.Defaultを使用して共有可能なミュータブル変数をインクリメントする、非常にシンプルなアクションから始めます。
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
NOTE
全コードはこちらから入手できます。
最終的に何が出力されるでしょうか?100個のコルーチンが何の同期もなしに複数のスレッドからcounter
を並行してインクリメントするため、"Counter = 100000"と出力される可能性は非常に低いです。
volatileは役に立ちません
変数をvolatile
にすることで並行処理の問題が解決するというよくある誤解があります。試してみましょう。
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
NOTE
全コードはこちらから入手できます。
このコードは動作が遅くなりますが、最終的に常に"Counter = 100000"が得られるわけではありません。なぜなら、volatile変数は対応する変数への線形化可能 (これは「アトミック」という技術用語です) な読み書きを保証しますが、より大きなアクション (我々のケースではインクリメント) のアトミック性を提供しないからです。
スレッドセーフなデータ構造
スレッドとコルーチンの両方で機能する一般的な解決策は、共有ステート上で実行する必要がある対応する操作に必要なすべての同期を提供する、スレッドセーフ (同期化された、線形化可能な、またはアトミックとも呼ばれる) なデータ構造を使用することです。単純なカウンターの場合、アトミックなincrementAndGet
操作を持つAtomicInteger
クラスを使用できます。
import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
NOTE
全コードはこちらから入手できます。
これは、この特定の問題に対する最も高速な解決策です。単純なカウンター、コレクション、キュー、その他の標準的なデータ構造やそれらに対する基本的な操作で機能します。しかし、複雑なステートや、すぐに利用できるスレッドセーフな実装がない複雑な操作には簡単にはスケールしません。
スレッドコンファインメント (きめ細かな)
_スレッドコンファインメント_は、特定の共有ステートへのすべてのアクセスが単一のスレッドに閉じ込められる、共有可能なミュータブルステートの問題へのアプローチです。これは通常、すべてのUIステートが単一のイベントディスパッチ/アプリケーションスレッドに閉じ込められるUIアプリケーションで使用されます。シングルスレッドコンテキストを使用することで、コルーチンで簡単に適用できます。
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// confine each increment to a single-threaded context
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
NOTE
全コードはこちらから入手できます。
このコードは、_きめ細かな_スレッドコンファインメントを行うため、非常に遅く動作します。個々のインクリメントごとに、withContext(counterContext)ブロックを使用して、マルチスレッドのDispatchers.Defaultコンテキストからシングルスレッドコンテキストへ切り替わります。
スレッドコンファインメント (粗粒度)
実際には、スレッドコンファインメントは大きな塊で行われます。例えば、ステートを更新するビジネスロジックの大きなまとまりが単一のスレッドに閉じ込められます。以下の例は、最初から各コルーチンをシングルスレッドコンテキストで実行することで、そのように行います。
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// confine everything to a single-threaded context
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
NOTE
全コードはこちらから入手できます。
これにより、はるかに高速に動作し、正しい結果が生成されます。
相互排他
問題に対する相互排他的な解決策は、共有ステートのすべての変更を、決して並行して実行されない_クリティカルセクション_で保護することです。ブロッキングの世界では、通常、そのためにsynchronized
やReentrantLock
を使用するでしょう。コルーチンの代替手段はMutexと呼ばれます。これはクリティカルセクションを区切るためのlock関数とunlock関数を持っています。主な違いは、Mutex.lock()
がサスペンド関数であることです。これはスレッドをブロックしません。
また、mutex.lock(); try { ... } finally { mutex.unlock() }
パターンを便利に表現するwithLock拡張関数もあります。
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// protect each increment with lock
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
NOTE
全コードはこちらから入手できます。
この例でのロックはきめ細かいため、その代償を払います。しかし、何らかの共有ステートを定期的にどうしても変更する必要があるが、このステートが閉じ込められる自然なスレッドがないような状況では、良い選択肢です。