Skip to content

共有ミュータブルステートと並行処理

コルーチンは、Dispatchers.Default のようなマルチスレッドディスパッチャーを使用して並行して実行できます。これは、 通常の並行処理に関するすべての問題を引き起こします。主な問題は、共有ミュータブルステートへのアクセス同期です。 コルーチンにおけるこの問題の解決策には、マルチスレッドの世界での解決策と似ているものもありますが、コルーチン特有のものもあります。

問題

100個のコルーチンを起動し、それぞれが同じアクションを1000回実行してみましょう。 さらに比較するために、それらの完了時間も計測します。

kotlin
suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

まずは、マルチスレッドの Dispatchers.Default を使用して共有ミュータブル変数をインクリメントするという非常にシンプルなアクションから始めます。

kotlin
import kotlinx.coroutines.*
import kotlin.system.*    

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

完全なコードはこちらから入手できます。

最終的に何が出力されるでしょうか?100個のコルーチンが同期なしに複数のスレッドからcounterを並行してインクリメントするため、「Counter = 100000」と出力されることはまずありません。

volatileは役に立たない

変数を volatile にすることで並行処理の問題が解決されるという誤解がよくあります。試してみましょう。

kotlin
import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

@Volatile // in Kotlin `volatile` is an annotation 
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

完全なコードはこちらから入手できます。

このコードは動作が遅くなりますが、最終的に「Counter = 100000」が常に得られるわけではありません。volatile変数は、対応する変数への線形化可能(これは「アトミック」の技術用語です)な読み書きを保証しますが、より大きなアクション(この場合はインクリメント)のアトミシティは提供しないためです。

スレッドセーフなデータ構造

スレッドとコルーチンの両方で機能する一般的な解決策は、共有ステートで実行する必要がある対応する操作に必要なすべての同期を提供する、スレッドセーフ(同期、線形化可能、またはアトミックとも呼ばれます)なデータ構造を使用することです。 単純なカウンターの場合、アトミックな incrementAndGet 操作を持つ AtomicInteger クラスを使用できます。

kotlin
import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

val counter = AtomicInteger()

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}

完全なコードはこちらから入手できます。

これは、この特定の問題に対する最速の解決策です。これは、単純なカウンター、コレクション、キュー、その他の標準的なデータ構造、およびそれらに対する基本的な操作で機能します。しかし、複雑なステートや、すぐに使用できるスレッドセーフな実装がない複雑な操作には、容易にスケールしません。

スレッド隔離(きめ細かい)

_スレッド隔離(Thread confinement)_とは、特定の共有ステートへのすべてのアクセスを単一のスレッドに限定する、共有ミュータブルステートの問題に対するアプローチです。これは通常、すべてのUIステートが単一のイベントディスパッチ/アプリケーションスレッドに限定されるUIアプリケーションで使用されます。コルーチンでは、単一スレッドコンテキストを使用することで簡単に適用できます。

kotlin
import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // confine each increment to a single-threaded context
            withContext(counterContext) {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

完全なコードはこちらから入手できます。

このコードは非常に遅く動作します。それは_きめ細かい(fine-grained)_スレッド隔離を行っているためです。各インクリメントは、マルチスレッドの Dispatchers.Default コンテキストから、withContext(counterContext) ブロックを使用して単一スレッドコンテキストに切り替わります。

スレッド隔離(粗い)

実際には、スレッド隔離は大きなチャンクで行われます。たとえば、ステートを更新するビジネスロジックの大きなまとまりが単一のスレッドに限定されます。以下の例では、各コルーチンを最初に単一スレッドコンテキストで実行することで、これを実現しています。

kotlin
import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    // confine everything to a single-threaded context
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

完全なコードはこちらから入手できます。

これにより、はるかに高速に動作し、正しい結果が生成されます。

相互排他

この問題に対する相互排他による解決策は、共有ステートのすべての変更を、並行して実行されることのない_クリティカルセクション_で保護することです。ブロッキングの世界では、通常synchronizedReentrantLockを使用します。コルーチンの代替はMutexと呼ばれます。これは、クリティカルセクションを区切るためのlock関数とunlock関数を持っています。重要な違いは、Mutex.lock()がサスペンド関数であることです。これはスレッドをブロックしません。

また、mutex.lock(); try { ... } finally { mutex.unlock() }パターンを便利に表現するwithLock拡張関数もあります。

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // protect each increment with lock
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

完全なコードはこちらから入手できます。

この例のロックはきめ細かいため、パフォーマンスの代償を払います。しかし、定期的に共有ステートをどうしても変更する必要があるものの、そのステートが限定される自然なスレッドがない状況においては、良い選択肢となります。