Skip to content

共有可能なミュータブルステートと並行処理

コルーチンは、Dispatchers.Default のようなマルチスレッドディスパッチャーを使用して並行して実行できます。これは、典型的な並行処理におけるあらゆる問題を引き起こします。その主な問題は、共有可能なミュータブルステートへのアクセスの同期です。コルーチンの世界におけるこの問題のいくつかの解決策は、マルチスレッドの世界での解決策と類似していますが、他は独特です。

問題

100個のコルーチンを起動し、それぞれが同じアクションを1000回実行するようにしてみましょう。また、今後の比較のためにそれらの完了時間も測定します。

kotlin
suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

まず、マルチスレッドのDispatchers.Defaultを使用して共有可能なミュータブル変数をインクリメントする、非常にシンプルなアクションから始めます。

kotlin
import kotlinx.coroutines.*
import kotlin.system.*    

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

NOTE

全コードはこちらから入手できます。

最終的に何が出力されるでしょうか?100個のコルーチンが何の同期もなしに複数のスレッドからcounterを並行してインクリメントするため、"Counter = 100000"と出力される可能性は非常に低いです。

volatileは役に立ちません

変数をvolatileにすることで並行処理の問題が解決するというよくある誤解があります。試してみましょう。

kotlin
import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

@Volatile // in Kotlin `volatile` is an annotation 
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

NOTE

全コードはこちらから入手できます。

このコードは動作が遅くなりますが、最終的に常に"Counter = 100000"が得られるわけではありません。なぜなら、volatile変数は対応する変数への線形化可能 (これは「アトミック」という技術用語です) な読み書きを保証しますが、より大きなアクション (我々のケースではインクリメント) のアトミック性を提供しないからです。

スレッドセーフなデータ構造

スレッドとコルーチンの両方で機能する一般的な解決策は、共有ステート上で実行する必要がある対応する操作に必要なすべての同期を提供する、スレッドセーフ (同期化された、線形化可能な、またはアトミックとも呼ばれる) なデータ構造を使用することです。単純なカウンターの場合、アトミックなincrementAndGet操作を持つAtomicIntegerクラスを使用できます。

kotlin
import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

val counter = AtomicInteger()

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}

NOTE

全コードはこちらから入手できます。

これは、この特定の問題に対する最も高速な解決策です。単純なカウンター、コレクション、キュー、その他の標準的なデータ構造やそれらに対する基本的な操作で機能します。しかし、複雑なステートや、すぐに利用できるスレッドセーフな実装がない複雑な操作には簡単にはスケールしません。

スレッドコンファインメント (きめ細かな)

_スレッドコンファインメント_は、特定の共有ステートへのすべてのアクセスが単一のスレッドに閉じ込められる、共有可能なミュータブルステートの問題へのアプローチです。これは通常、すべてのUIステートが単一のイベントディスパッチ/アプリケーションスレッドに閉じ込められるUIアプリケーションで使用されます。シングルスレッドコンテキストを使用することで、コルーチンで簡単に適用できます。

kotlin
import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // confine each increment to a single-threaded context
            withContext(counterContext) {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

NOTE

全コードはこちらから入手できます。

このコードは、_きめ細かな_スレッドコンファインメントを行うため、非常に遅く動作します。個々のインクリメントごとに、withContext(counterContext)ブロックを使用して、マルチスレッドのDispatchers.Defaultコンテキストからシングルスレッドコンテキストへ切り替わります。

スレッドコンファインメント (粗粒度)

実際には、スレッドコンファインメントは大きな塊で行われます。例えば、ステートを更新するビジネスロジックの大きなまとまりが単一のスレッドに閉じ込められます。以下の例は、最初から各コルーチンをシングルスレッドコンテキストで実行することで、そのように行います。

kotlin
import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    // confine everything to a single-threaded context
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

NOTE

全コードはこちらから入手できます。

これにより、はるかに高速に動作し、正しい結果が生成されます。

相互排他

問題に対する相互排他的な解決策は、共有ステートのすべての変更を、決して並行して実行されない_クリティカルセクション_で保護することです。ブロッキングの世界では、通常、そのためにsynchronizedReentrantLockを使用するでしょう。コルーチンの代替手段はMutexと呼ばれます。これはクリティカルセクションを区切るためのlock関数とunlock関数を持っています。主な違いは、Mutex.lock()がサスペンド関数であることです。これはスレッドをブロックしません。

また、mutex.lock(); try { ... } finally { mutex.unlock() }パターンを便利に表現するwithLock拡張関数もあります。

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // protect each increment with lock
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

NOTE

全コードはこちらから入手できます。

この例でのロックはきめ細かいため、その代償を払います。しかし、何らかの共有ステートを定期的にどうしても変更する必要があるが、このステートが閉じ込められる自然なスレッドがないような状況では、良い選択肢です。