Skip to content

코루틴과 채널 − 튜토리얼

이 튜토리얼에서는 IntelliJ IDEA에서 코루틴을 사용하여 기본 스레드나 콜백을 차단하지 않고 네트워크 요청을 수행하는 방법을 배웁니다.

코루틴에 대한 사전 지식은 필수가 아니지만, 기본적인 Kotlin 문법에 익숙해야 합니다.

다음 내용을 배울 것입니다:

  • 네트워크 요청을 수행하기 위해 중단 함수(suspending function)를 사용해야 하는 이유와 방법.
  • 코루틴을 사용하여 요청을 동시적으로 보내는 방법.
  • 채널을 사용하여 다른 코루틴 간에 정보를 공유하는 방법.

네트워크 요청에는 Retrofit 라이브러리가 필요하지만, 이 튜토리얼에서 보여주는 접근 방식은 코루틴을 지원하는 다른 모든 라이브러리에도 유사하게 작동합니다.

모든 작업의 솔루션은 프로젝트 저장소solutions 브랜치에서 찾을 수 있습니다.

시작하기 전에

  1. 최신 버전의 IntelliJ IDEA를 다운로드하여 설치합니다.

  2. 환영 화면에서 Get from VCS를 선택하거나 File | New | Project from Version Control을 선택하여 프로젝트 템플릿을 복제합니다.

    명령줄에서도 복제할 수 있습니다:

    Bash
    git clone https://github.com/kotlin-hands-on/intro-coroutines

GitHub 개발자 토큰 생성

프로젝트에서 GitHub API를 사용할 것입니다. 접근하려면 GitHub 계정 이름과 암호 또는 토큰을 제공해야 합니다. 2단계 인증이 활성화되어 있다면 토큰만으로 충분합니다.

귀하의 계정에서 GitHub API를 사용할 새 GitHub 토큰을 생성하십시오:

  1. 토큰의 이름을 지정합니다 (예: coroutines-tutorial):

    새 GitHub 토큰 생성

  2. 어떤 스코프도 선택하지 마십시오. 페이지 하단의 Generate token을 클릭합니다.

  3. 생성된 토큰을 복사합니다.

코드 실행

이 프로그램은 주어진 조직(기본적으로 "kotlin") 아래의 모든 저장소에 대한 기여자를 로드합니다. 나중에 기여 수를 기준으로 사용자를 정렬하는 로직을 추가할 것입니다.

  1. src/contributors/main.kt 파일을 열고 main() 함수를 실행합니다. 다음 창이 나타날 것입니다:

    첫 번째 창

    글꼴이 너무 작으면 main() 함수의 setDefaultFontSize(18f) 값을 변경하여 조정하십시오.

  2. 해당 필드에 GitHub 사용자 이름과 토큰(또는 암호)을 제공합니다.

  3. Variant 드롭다운 메뉴에서 BLOCKING 옵션이 선택되어 있는지 확인합니다.

  4. _Load contributors_를 클릭합니다. UI가 잠시 멈춘 다음 기여자 목록이 표시될 것입니다.

  5. 데이터가 로드되었는지 확인하기 위해 프로그램 출력을 엽니다. 기여자 목록은 각 성공적인 요청 후에 로깅됩니다.

이 로직을 구현하는 여러 가지 방법이 있습니다: 블로킹 요청 또는 콜백을 사용하는 것입니다. 이러한 솔루션을 코루틴을 사용하는 솔루션과 비교하고, 채널을 사용하여 여러 코루틴 간에 정보를 공유하는 방법을 살펴볼 것입니다.

블로킹 요청

Retrofit 라이브러리를 사용하여 GitHub에 HTTP 요청을 수행할 것입니다. 이 라이브러리를 사용하면 주어진 조직 아래의 저장소 목록과 각 저장소에 대한 기여자 목록을 요청할 수 있습니다:

kotlin
interface GitHubService {
    @GET("orgs/{org}/repos?per_page=100")
    fun getOrgReposCall(
        @Path("org") org: String
    ): Call<List<Repo>>

    @GET("repos/{owner}/{repo}/contributors?per_page=100")
    fun getRepoContributorsCall(
        @Path("owner") owner: String,
        @Path("repo") repo: String
    ): Call<List<User>>
}

이 API는 loadContributorsBlocking() 함수에서 주어진 조직에 대한 기여자 목록을 가져오는 데 사용됩니다.

  1. src/tasks/Request1Blocking.kt를 열어 구현을 확인합니다:

    kotlin
    fun loadContributorsBlocking(
        service: GitHubService,
        req: RequestData
    ): List<User> {
        val repos = service
            .getOrgReposCall(req.org)   // #1
            .execute()                  // #2
            .also { logRepos(req, it) } // #3
            .body() ?: emptyList()      // #4
    
        return repos.flatMap { repo ->
            service
                .getRepoContributorsCall(req.org, repo.name) // #1
                .execute()                                   // #2
                .also { logUsers(repo, it) }                 // #3
                .bodyList()                                  // #4
        }.aggregate()
    }
    • 먼저 주어진 조직 아래의 저장소 목록을 가져와 repos 목록에 저장합니다. 그런 다음 각 저장소에 대한 기여자 목록을 요청하고, 모든 목록을 하나의 최종 기여자 목록으로 병합합니다.
    • getOrgReposCall()getRepoContributorsCall()은 모두 *Call 클래스의 인스턴스를 반환합니다( #1). 이 시점에서는 어떤 요청도 보내지지 않습니다.
    • *Call.execute()가 호출되어 요청을 수행합니다( #2). execute()는 기본 스레드를 블로킹하는 동기 호출입니다.
    • 응답을 받으면 특정 logRepos()logUsers() 함수를 호출하여 결과를 로깅합니다( #3). HTTP 응답에 오류가 포함되어 있으면 해당 오류가 여기에 로깅됩니다.
    • 마지막으로 필요한 데이터가 포함된 응답의 본문을 가져옵니다. 이 튜토리얼에서는 오류가 발생한 경우 빈 목록을 결과로 사용하고 해당 오류를 로깅합니다( #4).
  2. .body() ?: emptyList()의 반복을 피하기 위해 확장 함수 bodyList()가 선언됩니다:

    kotlin
    fun <T> Response<List<T>>.bodyList(): List<T> {
        return body() ?: emptyList()
    }
  3. 프로그램을 다시 실행하고 IntelliJ IDEA의 시스템 출력을 살펴보십시오. 다음과 유사한 내용이 표시되어야 합니다:

    text
    1770 [AWT-EventQueue-0] INFO  Contributors - kotlin: loaded 40 repos
    2025 [AWT-EventQueue-0] INFO  Contributors - kotlin-examples: loaded 23 contributors
    2229 [AWT-EventQueue-0] INFO  Contributors - kotlin-koans: loaded 45 contributors
    ...
    • 각 줄의 첫 번째 항목은 프로그램 시작 이후 경과한 밀리초 수이고, 그 다음은 대괄호 안의 스레드 이름입니다. 로딩 요청이 어느 스레드에서 호출되었는지 확인할 수 있습니다.
    • 각 줄의 마지막 항목은 실제 메시지입니다: 로드된 저장소 또는 기여자의 수.

    이 로그 출력은 모든 결과가 메인 스레드에서 로깅되었음을 보여줍니다. BLOCKING 옵션으로 코드를 실행하면 로딩이 완료될 때까지 창이 멈추고 입력에 반응하지 않습니다. 모든 요청은 loadContributorsBlocking()이 호출된 스레드와 동일한 스레드(Swing에서는 AWT 이벤트 디스패칭 스레드)에서 실행됩니다. 이 메인 스레드가 블로킹되어 UI가 멈추게 됩니다:

    블로킹된 메인 스레드

    기여자 목록이 로드되면 결과가 업데이트됩니다.

  4. src/contributors/Contributors.kt에서 기여자가 로드되는 방식을 선택하는 loadContributors() 함수를 찾아 loadContributorsBlocking()이 어떻게 호출되는지 확인합니다:

    kotlin
    when (getSelectedVariant()) {
        BLOCKING -> { // Blocking UI thread
            val users = loadContributorsBlocking(service, req)
            updateResults(users, startTime)
        }
    }
    • updateResults() 호출은 loadContributorsBlocking() 호출 바로 다음에 옵니다.
    • updateResults()는 UI를 업데이트하므로 항상 UI 스레드에서 호출되어야 합니다.
    • loadContributorsBlocking()도 UI 스레드에서 호출되므로 UI 스레드가 블로킹되고 UI가 멈춥니다.

작업 1

첫 번째 작업은 작업 도메인에 익숙해지는 데 도움이 됩니다. 현재 각 기여자의 이름이 참여한 모든 프로젝트에 대해 한 번씩 여러 번 반복됩니다. 각 기여자가 한 번만 추가되도록 사용자를 결합하는 aggregate() 함수를 구현하십시오. User.contributions 속성은 주어진 사용자가 모든 프로젝트에 기여한 총 수를 포함해야 합니다. 결과 목록은 기여 수에 따라 내림차순으로 정렬되어야 합니다.

src/tasks/Aggregation.kt를 열고 List<User>.aggregate() 함수를 구현하십시오. 사용자는 총 기여 수에 따라 정렬되어야 합니다.

해당 테스트 파일 test/tasks/AggregationKtTest.kt는 예상 결과의 예를 보여줍니다.

IntelliJ IDEA 단축키 Ctrl+Shift+T / ⇧ ⌘ T를 사용하여 소스 코드와 테스트 클래스 사이를 자동으로 이동할 수 있습니다.

이 작업을 구현한 후 "kotlin" 조직의 결과 목록은 다음과 유사해야 합니다:

kotlin 조직 목록

작업 1 솔루션

  1. 사용자를 로그인별로 그룹화하려면 groupBy()를 사용하십시오. 이 함수는 로그인을 키로, 이 로그인으로 여러 저장소에 나타나는 모든 사용자 인스턴스를 값으로 하는 맵을 반환합니다.

  2. 각 맵 항목에 대해 각 사용자의 총 기여 수를 계산하고, 주어진 이름과 총 기여 수로 User 클래스의 새 인스턴스를 생성합니다.

  3. 결과 목록을 내림차순으로 정렬합니다:

    kotlin
    fun List<User>.aggregate(): List<User> =
        groupBy { it.login }
            .map { (login, group) -> User(login, group.sumOf { it.contributions }) }
            .sortedByDescending { it.contributions }

대안 솔루션은 groupBy() 대신 groupingBy() 함수를 사용하는 것입니다.

콜백

이전 솔루션은 작동하지만, 스레드를 블로킹하여 UI를 멈추게 합니다. 이를 피하는 전통적인 접근 방식은 _콜백_을 사용하는 것입니다.

작업 완료 직후 호출되어야 하는 코드를 직접 호출하는 대신, 이를 별도의 콜백(종종 람다)으로 추출하여 호출자에게 전달하여 나중에 호출되도록 할 수 있습니다.

UI를 반응형으로 만들려면 전체 연산을 별도의 스레드로 옮기거나, 블로킹 호출 대신 콜백을 사용하는 Retrofit API로 전환할 수 있습니다.

백그라운드 스레드 사용

  1. src/tasks/Request2Background.kt를 열어 구현을 확인합니다. 먼저 전체 연산을 다른 스레드로 옮깁니다. thread() 함수는 새 스레드를 시작합니다:

    kotlin
    thread {
        loadContributorsBlocking(service, req)
    }

    이제 모든 로딩이 별도의 스레드로 옮겨졌으므로 메인 스레드는 자유로워 다른 작업을 수행할 수 있습니다:

    자유로워진 메인 스레드

  2. loadContributorsBackground() 함수의 시그니처가 변경됩니다. 모든 로딩이 완료된 후 호출할 updateResults() 콜백을 마지막 인수로 받습니다:

    kotlin
    fun loadContributorsBackground(
        service: GitHubService, req: RequestData,
        updateResults: (List<User>) -> Unit
    )
  3. 이제 loadContributorsBackground()가 호출되면 updateResults() 호출은 이전처럼 즉시 뒤따라오는 것이 아니라 콜백에서 발생합니다:

    kotlin
    loadContributorsBackground(service, req) { users ->
        SwingUtilities.invokeLater {
            updateResults(users, startTime)
        }
    }

    SwingUtilities.invokeLater를 호출함으로써 결과를 업데이트하는 updateResults() 호출이 메인 UI 스레드(AWT 이벤트 디스패칭 스레드)에서 발생하도록 보장합니다.

그러나 BACKGROUND 옵션을 통해 기여자를 로드하려고 하면 목록은 업데이트되지만 아무것도 변경되지 않는 것을 볼 수 있습니다.

작업 2

결과 목록이 UI에 표시되도록 src/tasks/Request2Background.kt 파일의 loadContributorsBackground() 함수를 수정하십시오.

작업 2 솔루션

기여자를 로드하려고 하면 로그에서 기여자가 로드되지만 결과가 표시되지 않는 것을 볼 수 있습니다. 이를 수정하려면 결과 사용자 목록에 대해 updateResults()를 호출하십시오:

kotlin
thread {
    updateResults(loadContributorsBlocking(service, req))
}

콜백으로 전달된 로직을 명시적으로 호출해야 합니다. 그렇지 않으면 아무 일도 일어나지 않습니다.

Retrofit 콜백 API 사용

이전 솔루션에서 전체 로딩 로직은 백그라운드 스레드로 옮겨졌지만, 여전히 리소스 사용에 최선은 아닙니다. 모든 로딩 요청은 순차적으로 진행되며 스레드는 로딩 결과를 기다리는 동안 블로킹되어 다른 작업에 사용될 수 있는 기회를 잃습니다. 특히, 스레드는 전체 결과를 더 빨리 받기 위해 다른 요청을 로드하기 시작할 수 있습니다.

각 저장소에 대한 데이터 처리는 두 부분으로 나뉘어져야 합니다: 로딩 및 결과 응답 처리. 두 번째 처리 부분은 콜백으로 추출되어야 합니다.

각 저장소에 대한 로딩은 이전 저장소의 결과가 수신되기 전(그리고 해당 콜백이 호출되기 전)에 시작될 수 있습니다:

콜백 API 사용

Retrofit 콜백 API는 이를 달성하는 데 도움이 될 수 있습니다. Call.enqueue() 함수는 HTTP 요청을 시작하고 인수로 콜백을 받습니다. 이 콜백에서 각 요청 후에 무엇을 해야 할지 지정해야 합니다.

src/tasks/Request3Callbacks.kt를 열어 이 API를 사용하는 loadContributorsCallbacks()의 구현을 확인합니다:

kotlin
fun loadContributorsCallbacks(
    service: GitHubService, req: RequestData,
    updateResults: (List<User>) -> Unit
) {
    service.getOrgReposCall(req.org).onResponse { responseRepos ->  // #1
        logRepos(req, responseRepos)
        val repos = responseRepos.bodyList()

        val allUsers = mutableListOf<User>()
        for (repo in repos) {
            service.getRepoContributorsCall(req.org, repo.name)
                .onResponse { responseUsers ->  // #2
                    logUsers(repo, responseUsers)
                    val users = responseUsers.bodyList()
                    allUsers += users
                }
            }
        }
        // TODO: Why doesn't this code work? How to fix that?
        updateResults(allUsers.aggregate())
    }
  • 편의를 위해 이 코드 조각은 같은 파일에 선언된 onResponse() 확장 함수를 사용합니다. 이 함수는 객체 표현식 대신 람다를 인수로 받습니다.
  • 응답을 처리하는 로직은 콜백으로 추출됩니다: 해당 람다는 #1#2 줄에서 시작됩니다.

그러나 제공된 솔루션은 작동하지 않습니다. 프로그램을 실행하고 CALLBACKS 옵션을 선택하여 기여자를 로드하면 아무것도 표시되지 않는 것을 볼 수 있습니다. 그러나 Request3CallbacksKtTest의 테스트는 성공적으로 통과했음을 즉시 반환합니다.

주어진 코드가 예상대로 작동하지 않는 이유를 생각해 보고 수정하거나, 아래 솔루션을 참조하십시오.

작업 3 (선택 사항)

src/tasks/Request3Callbacks.kt 파일의 코드를 다시 작성하여 로드된 기여자 목록이 표시되도록 하십시오.

작업 3의 첫 번째 시도 솔루션

현재 솔루션에서는 많은 요청이 동시적으로 시작되어 총 로딩 시간이 줄어듭니다. 그러나 결과가 로드되지 않습니다. 이는 updateResults() 콜백이 모든 로딩 요청이 시작된 직후에 호출되기 때문입니다. 즉, allUsers 목록이 데이터로 채워지기 전에 호출됩니다.

다음과 같은 변경으로 이를 수정할 수 있습니다:

kotlin
val allUsers = mutableListOf<User>()
for ((index, repo) in repos.withIndex()) {   // #1
    service.getRepoContributorsCall(req.org, repo.name)
        .onResponse { responseUsers ->
            logUsers(repo, responseUsers)
            val users = responseUsers.bodyList()
            allUsers += users
            if (index == repos.lastIndex) {    // #2
                updateResults(allUsers.aggregate())
            }
        }
}
  • 먼저 인덱스와 함께 저장소 목록을 반복합니다( #1).
  • 그런 다음 각 콜백에서 마지막 반복인지 확인합니다( #2).
  • 그리고 그렇다면 결과가 업데이트됩니다.

그러나 이 코드도 우리의 목표를 달성하지 못합니다. 직접 답을 찾거나, 아래 솔루션을 참조하십시오.

작업 3의 두 번째 시도 솔루션

로딩 요청이 동시적으로 시작되므로 마지막 요청의 결과가 마지막으로 온다는 보장은 없습니다. 결과는 어떤 순서로든 올 수 있습니다.

따라서 완료 조건으로 현재 인덱스를 lastIndex와 비교하면 일부 저장소에 대한 결과를 잃을 위험이 있습니다.

만약 마지막 저장소를 처리하는 요청이 이전 요청보다 더 빨리 반환된다면(그럴 가능성이 높음), 더 많은 시간이 걸리는 요청들의 모든 결과가 손실될 것입니다.

이를 수정하는 한 가지 방법은 인덱스를 도입하고 모든 저장소가 이미 처리되었는지 확인하는 것입니다:

kotlin
val allUsers = Collections.synchronizedList(mutableListOf<User>())
val numberOfProcessed = AtomicInteger()
for (repo in repos) {
    service.getRepoContributorsCall(req.org, repo.name)
        .onResponse { responseUsers ->
            logUsers(repo, responseUsers)
            val users = responseUsers.bodyList()
            allUsers += users
            if (numberOfProcessed.incrementAndGet() == repos.size) {
                updateResults(allUsers.aggregate())
            }
        }
}

이 코드는 동기화된 버전의 목록과 AtomicInteger()를 사용합니다. 왜냐하면 일반적으로 getRepoContributors() 요청을 처리하는 다른 콜백이 항상 동일한 스레드에서 호출된다는 보장이 없기 때문입니다.

작업 3의 세 번째 시도 솔루션

더 나은 솔루션은 CountDownLatch 클래스를 사용하는 것입니다. 이 클래스는 저장소 수로 초기화된 카운터를 저장합니다. 이 카운터는 각 저장소를 처리한 후 감소합니다. 그런 다음 래치가 0이 될 때까지 기다린 다음 결과를 업데이트합니다:

kotlin
val countDownLatch = CountDownLatch(repos.size)
for (repo in repos) {
    service.getRepoContributorsCall(req.org, repo.name)
        .onResponse { responseUsers ->
            // processing repository
            countDownLatch.countDown()
        }
}
countDownLatch.await()
updateResults(allUsers.aggregate())

결과는 메인 스레드에서 업데이트됩니다. 이는 로직을 자식 스레드에 위임하는 것보다 더 직접적인 방법입니다.

이 세 가지 솔루션 시도를 검토한 결과, 콜백으로 올바른 코드를 작성하는 것은 특히 여러 기본 스레드와 동기화가 발생할 때 사소하지 않고 오류가 발생하기 쉽다는 것을 알 수 있습니다.

추가 연습으로, RxJava 라이브러리를 사용하여 반응형 접근 방식으로 동일한 로직을 구현할 수 있습니다. 필요한 모든 종속성과 RxJava 사용을 위한 솔루션은 별도의 rx 브랜치에서 찾을 수 있습니다. 이 튜토리얼을 완료하고 제안된 Rx 버전을 구현하거나 확인하여 적절한 비교를 하는 것도 가능합니다.

중단 함수

중단 함수(suspending function)를 사용하여 동일한 로직을 구현할 수 있습니다. Call<List<Repo>>를 반환하는 대신, 다음과 같이 API 호출을 중단 함수로 정의합니다:

kotlin
interface GitHubService {
    @GET("orgs/{org}/repos?per_page=100")
    suspend fun getOrgRepos(
        @Path("org") org: String
    ): List<Repo>
}
  • getOrgRepos()suspend 함수로 정의됩니다. 중단 함수를 사용하여 요청을 수행할 때 기본 스레드는 블로킹되지 않습니다. 이것이 어떻게 작동하는지에 대한 자세한 내용은 나중에 다룰 것입니다.
  • getOrgRepos()Call을 반환하는 대신 결과를 직접 반환합니다. 결과가 실패하면 예외가 발생합니다.

대안으로, Retrofit은 결과를 Response로 래핑하여 반환할 수 있습니다. 이 경우, 결과 본문이 제공되며 수동으로 오류를 확인할 수 있습니다. 이 튜토리얼에서는 Response를 반환하는 버전을 사용합니다.

src/contributors/GitHubService.kt에서 GitHubService 인터페이스에 다음 선언을 추가합니다:

kotlin
interface GitHubService {
    // getOrgReposCall & getRepoContributorsCall declarations

    @GET("orgs/{org}/repos?per_page=100")
    suspend fun getOrgRepos(
        @Path("org") org: String
    ): Response<List<Repo>>

    @GET("repos/{owner}/{repo}/contributors?per_page=100")
    suspend fun getRepoContributors(
        @Path("owner") owner: String,
        @Path("repo") repo: String
    ): Response<List<User>>
}

작업 4

귀하의 임무는 기여자를 로드하는 함수의 코드를 변경하여 두 개의 새로운 중단 함수인 getOrgRepos()getRepoContributors()를 사용하도록 하는 것입니다. 새로운 loadContributorsSuspend() 함수는 새 API를 사용하기 위해 suspend로 표시됩니다.

중단 함수는 모든 곳에서 호출될 수 없습니다. loadContributorsBlocking()에서 중단 함수를 호출하면 "Suspend function 'getOrgRepos' should be called only from a coroutine or another suspend function" 메시지와 함께 오류가 발생할 것입니다.

  1. src/tasks/Request1Blocking.kt에 정의된 loadContributorsBlocking()의 구현을 src/tasks/Request4Suspend.kt에 정의된 loadContributorsSuspend()로 복사합니다.
  2. Call을 반환하는 함수 대신 새로운 중단 함수가 사용되도록 코드를 수정합니다.
  3. SUSPEND 옵션을 선택하여 프로그램을 실행하고 GitHub 요청이 수행되는 동안 UI가 계속 반응하는지 확인합니다.

작업 4 솔루션

.getOrgReposCall(req.org).execute().getOrgRepos(req.org)로 바꾸고 두 번째 "기여자" 요청에 대해서도 동일한 교체를 반복합니다:

kotlin
suspend fun loadContributorsSuspend(service: GitHubService, req: RequestData): List<User> {
    val repos = service
        .getOrgRepos(req.org)
        .also { logRepos(req, it) }
        .bodyList()

    return repos.flatMap { repo ->
        service.getRepoContributors(req.org, repo.name)
            .also { logUsers(repo, it) }
            .bodyList()
    }.aggregate()
}
  • loadContributorsSuspend()suspend 함수로 정의되어야 합니다.
  • 이전에는 Response를 반환했던 execute를 더 이상 호출할 필요가 없습니다. 이제 API 함수가 Response를 직접 반환하기 때문입니다. 이 세부 사항은 Retrofit 라이브러리에만 해당됩니다. 다른 라이브러리에서는 API가 다르겠지만, 개념은 동일합니다.

코루틴

중단 함수를 사용한 코드는 "블로킹" 버전과 유사하게 보입니다. 블로킹 버전과의 주요 차이점은 스레드를 블로킹하는 대신 코루틴이 중단된다는 것입니다:

text
block -> suspend
thread -> coroutine

코루틴은 종종 경량 스레드(lightweight thread)라고 불리는데, 이는 스레드에서 코드를 실행하는 방식과 유사하게 코루틴에서 코드를 실행할 수 있기 때문입니다. 이전에는 블로킹되어 피해야 했던 작업들이 이제 코루틴을 중단시킬 수 있습니다.

새 코루틴 시작

src/contributors/Contributors.kt에서 loadContributorsSuspend()가 어떻게 사용되는지 살펴보면, launch 내에서 호출되는 것을 볼 수 있습니다. launch는 람다를 인수로 받는 라이브러리 함수입니다:

kotlin
launch {
    val users = loadContributorsSuspend(req)
    updateResults(users, startTime)
}

여기서 launch는 데이터를 로드하고 결과를 표시하는 역할을 하는 새로운 연산을 시작합니다. 이 연산은 중단 가능합니다. 네트워크 요청을 수행할 때 연산은 중단되고 기본 스레드를 해제합니다. 네트워크 요청이 결과를 반환하면 연산이 재개됩니다.

이러한 중단 가능한 연산을 _코루틴_이라고 합니다. 따라서 이 경우 launch는 데이터를 로드하고 결과를 표시하는 역할을 하는 _새 코루틴을 시작_합니다.

코루틴은 스레드 위에서 실행되며 중단될 수 있습니다. 코루틴이 중단되면 해당 연산은 일시 중지되고, 스레드에서 제거되어 메모리에 저장됩니다. 그동안 스레드는 다른 작업을 수행할 수 있도록 자유로워집니다:

코루틴 중단 프로세스

연산이 계속될 준비가 되면 (반드시 같은 스레드는 아님) 스레드로 반환됩니다.

loadContributorsSuspend() 예시에서 각 "기여자" 요청은 이제 중단 메커니즘을 사용하여 결과를 기다립니다. 먼저 새 요청이 전송됩니다. 그런 다음 응답을 기다리는 동안 launch 함수에 의해 시작된 전체 "기여자 로드" 코루틴이 중단됩니다.

코루틴은 해당 응답이 수신된 후에만 재개됩니다:

요청 중단

응답을 기다리는 동안 스레드는 다른 작업을 수행할 수 있도록 자유로워집니다. 모든 요청이 메인 UI 스레드에서 이루어지고 있음에도 불구하고 UI는 계속 반응합니다:

  1. SUSPEND 옵션을 사용하여 프로그램을 실행합니다. 로그는 모든 요청이 메인 UI 스레드로 전송되었음을 확인합니다:

    text
    2538 [AWT-EventQueue-0 @coroutine#1] INFO  Contributors - kotlin: loaded 30 repos
    2729 [AWT-EventQueue-0 @coroutine#1] INFO  Contributors - ts2kt: loaded 11 contributors
    3029 [AWT-EventQueue-0 @coroutine#1] INFO  Contributors - kotlin-koans: loaded 45 contributors
    ...
    11252 [AWT-EventQueue-0 @coroutine#1] INFO  Contributors - kotlin-coroutines-workshop: loaded 1 contributors
  2. 로그는 해당 코드가 실행되는 코루틴을 보여줄 수 있습니다. 이를 활성화하려면 Run | Edit configurations를 열고 -Dkotlinx.coroutines.debug VM 옵션을 추가하십시오:

    실행 구성 편집

    이 옵션을 사용하여 main()이 실행되는 동안 코루틴 이름이 스레드 이름에 첨부됩니다. 모든 Kotlin 파일을 실행하기 위한 템플릿을 수정하고 이 옵션을 기본적으로 활성화할 수도 있습니다.

이제 모든 코드는 위에서 언급한 "@coroutine#1"로 표시된 "기여자 로드" 코루틴 하나에서 실행됩니다. 결과를 기다리는 동안 다른 요청을 보내기 위해 스레드를 재사용해서는 안 됩니다. 코드가 순차적으로 작성되었기 때문입니다. 새 요청은 이전 결과가 수신된 후에만 전송됩니다.

중단 함수는 스레드를 공정하게 처리하며 "대기"를 위해 스레드를 블로킹하지 않습니다. 그러나 이것은 아직 동시성을 가져오지 않습니다.

동시성

Kotlin 코루틴은 스레드보다 훨씬 적은 리소스를 사용합니다. 비동기적으로 새로운 연산을 시작하고 싶을 때마다 새 코루틴을 생성할 수 있습니다.

새 코루틴을 시작하려면 주요 코루틴 빌더 중 하나인 launch, async, 또는 runBlocking을 사용하십시오. 다른 라이브러리들은 추가적인 코루틴 빌더를 정의할 수 있습니다.

async는 새로운 코루틴을 시작하고 Deferred 객체를 반환합니다. DeferredFuture 또는 Promise와 같은 다른 이름으로 알려진 개념을 나타냅니다. 이것은 연산을 저장하지만, 최종 결과를 얻는 시점을 _지연_시키고; _미래_의 어느 시점에 결과를 _약속_합니다.

asynclaunch의 주요 차이점은 launch는 특정 결과를 반환하지 않을 것으로 예상되는 연산을 시작하는 데 사용된다는 것입니다. launch는 코루틴을 나타내는 Job을 반환합니다. Job.join()을 호출하여 완료될 때까지 기다릴 수 있습니다.

DeferredJob을 확장하는 제네릭 타입입니다. async 호출은 람다가 반환하는 내용(람다 내부의 마지막 표현식이 결과)에 따라 Deferred<Int> 또는 Deferred<CustomType>를 반환할 수 있습니다.

코루틴의 결과를 얻으려면 Deferred 인스턴스에서 await()를 호출할 수 있습니다. 결과를 기다리는 동안 await()가 호출된 코루틴은 중단됩니다:

kotlin
import kotlinx.coroutines.*

fun main() = runBlocking {
    val deferred: Deferred<Int> = async {
        loadData()
    }
    println("waiting...")
    println(deferred.await())
}

suspend fun loadData(): Int {
    println("loading...")
    delay(1000L)
    println("loaded!")
    return 42
}

runBlocking은 일반 함수와 중단 함수 사이, 또는 블로킹 세계와 비블로킹 세계 사이의 다리(bridge) 역할을 합니다. 이는 최상위 메인 코루틴을 시작하기 위한 어댑터로 작동합니다. 주로 main() 함수 및 테스트에서 사용하도록 의도되었습니다.

코루틴에 대한 더 나은 이해를 위해 이 비디오를 시청하십시오.

지연 객체 목록이 있는 경우 awaitAll()을 호출하여 모든 결과를 기다릴 수 있습니다:

kotlin
import kotlinx.coroutines.*

fun main() = runBlocking {
    val deferreds: List<Deferred<Int>> = (1..3).map {
        async {
            delay(1000L * it)
            println("Loading $it")
            it
        }
    }
    val sum = deferreds.awaitAll().sum()
    println("$sum")
}

각 "기여자" 요청이 새 코루틴에서 시작되면 모든 요청이 비동기적으로 시작됩니다. 이전 요청의 결과가 수신되기 전에 새 요청을 보낼 수 있습니다:

동시 코루틴

총 로딩 시간은 CALLBACKS 버전과 거의 동일하지만, 콜백이 필요 없습니다. 더 나아가, async는 코드에서 어떤 부분이 동시적으로 실행되는지 명시적으로 강조합니다.

작업 5

Request5Concurrent.kt 파일에서 이전 loadContributorsSuspend() 함수를 사용하여 loadContributorsConcurrent() 함수를 구현하십시오.

작업 5 팁

코루틴은 코루틴 스코프 내에서만 시작할 수 있습니다. loadContributorsSuspend()의 내용을 coroutineScope 호출로 복사하여 async 함수를 호출할 수 있도록 하십시오:

kotlin
suspend fun loadContributorsConcurrent(
    service: GitHubService,
    req: RequestData
): List<User> = coroutineScope {
    // ...
}

다음 스키마를 기반으로 솔루션을 만드십시오:

kotlin
val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
    async {
        // 각 저장소의 기여자 로드
    }
}
deferreds.awaitAll() // List<List<User>>

작업 5 솔루션

각 "기여자" 요청을 async로 감싸서 저장소 수만큼 코루틴을 생성합니다. asyncDeferred<List<User>>를 반환합니다. 새 코루틴을 생성하는 것은 리소스 집약적이지 않으므로 필요한 만큼 생성할 수 있다는 점은 문제가 되지 않습니다.

  1. flatMap은 더 이상 사용할 수 없습니다. map의 결과가 이제 목록의 목록이 아니라 Deferred 객체 목록이기 때문입니다. awaitAll()List<List<User>>를 반환하므로 flatten().aggregate()를 호출하여 결과를 얻습니다:

    kotlin
    suspend fun loadContributorsConcurrent(
        service: GitHubService, 
        req: RequestData
    ): List<User> = coroutineScope {
        val repos = service
            .getOrgRepos(req.org)
            .also { logRepos(req, it) }
            .bodyList()
    
        val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
            async {
                service.getRepoContributors(req.org, repo.name)
                    .also { logUsers(repo, it) }
                    .bodyList()
            }
        }
        deferreds.awaitAll().flatten().aggregate()
    }
  2. 코드를 실행하고 로그를 확인하십시오. 멀티스레딩이 아직 사용되지 않았기 때문에 모든 코루틴은 여전히 메인 UI 스레드에서 실행되지만, 코루틴을 동시적으로 실행하는 이점을 이미 볼 수 있습니다.

  3. 이 코드를 변경하여 "기여자" 코루틴을 공통 스레드 풀의 다른 스레드에서 실행하려면 async 함수의 컨텍스트 인수로 Dispatchers.Default를 지정하십시오:

    kotlin
    async(Dispatchers.Default) { }
    • CoroutineDispatcher는 해당 코루틴이 실행될 스레드를 결정합니다. 인수로 지정하지 않으면 async는 외부 스코프의 디스패처를 사용합니다.
    • Dispatchers.Default는 JVM의 공유 스레드 풀을 나타냅니다. 이 풀은 병렬 실행을 위한 수단을 제공합니다. 사용 가능한 CPU 코어 수만큼의 스레드로 구성되지만, 코어가 하나만 있어도 두 개의 스레드를 가집니다.
  4. loadContributorsConcurrent() 함수의 코드를 수정하여 공통 스레드 풀의 다른 스레드에서 새 코루틴을 시작하도록 합니다. 또한 요청을 보내기 전에 추가 로깅을 추가하십시오:

    kotlin
    async(Dispatchers.Default) {
        log("starting loading for ${repo.name}")
        service.getRepoContributors(req.org, repo.name)
            .also { logUsers(repo, it) }
            .bodyList()
    }
  5. 프로그램을 다시 실행하십시오. 로그에서 각 코루틴이 스레드 풀의 한 스레드에서 시작되어 다른 스레드에서 재개될 수 있음을 볼 수 있습니다:

    text
    1946 [DefaultDispatcher-worker-2 @coroutine#4] INFO  Contributors - starting loading for kotlin-koans
    1946 [DefaultDispatcher-worker-3 @coroutine#5] INFO  Contributors - starting loading for dokka
    1946 [DefaultDispatcher-worker-1 @coroutine#3] INFO  Contributors - starting loading for ts2kt
    ...
    2178 [DefaultDispatcher-worker-1 @coroutine#4] INFO  Contributors - kotlin-koans: loaded 45 contributors
    2569 [DefaultDispatcher-worker-1 @coroutine#5] INFO  Contributors - dokka: loaded 36 contributors
    2821 [DefaultDispatcher-worker-2 @coroutine#3] INFO  Contributors - ts2kt: loaded 11 contributors

    예를 들어, 이 로그 발췌문에서 coroutine#4worker-2 스레드에서 시작되어 worker-1 스레드에서 계속됩니다.

src/contributors/Contributors.kt에서 CONCURRENT 옵션의 구현을 확인하십시오:

  1. 코루틴을 메인 UI 스레드에서만 실행하려면 Dispatchers.Main을 인수로 지정하십시오:

    kotlin
    launch(Dispatchers.Main) {
        updateResults()
    }
    • 새 코루틴을 메인 스레드에서 시작할 때 메인 스레드가 바쁘면, 코루틴은 중단되고 이 스레드에서 실행되도록 스케줄링됩니다. 코루틴은 스레드가 자유로워질 때만 재개됩니다.
    • 각 엔드포인트에 명시적으로 지정하는 대신 외부 스코프의 디스패처를 사용하는 것이 좋은 관행으로 간주됩니다. Dispatchers.Default를 인수로 전달하지 않고 loadContributorsConcurrent()를 정의하면, 이 함수를 Default 디스패처, 메인 UI 스레드 또는 사용자 지정 디스패처와 같은 어떤 컨텍스트에서도 호출할 수 있습니다.
    • 나중에 보겠지만, 테스트에서 loadContributorsConcurrent()를 호출할 때 TestDispatcher 컨텍스트에서 호출할 수 있어 테스트를 단순화합니다. 이는 이 솔루션을 훨씬 더 유연하게 만듭니다.
  2. 호출자 측에서 디스패처를 지정하려면, loadContributorsConcurrent가 상속된 컨텍스트에서 코루틴을 시작하도록 하면서 프로젝트에 다음 변경 사항을 적용하십시오:

    kotlin
    launch(Dispatchers.Default) {
        val users = loadContributorsConcurrent(service, req)
        withContext(Dispatchers.Main) {
            updateResults(users, startTime)
        }
    }
    • updateResults()는 메인 UI 스레드에서 호출되어야 하므로 Dispatchers.Main 컨텍스트로 호출합니다.
    • withContext()는 지정된 코루틴 컨텍스트로 주어진 코드를 호출하고, 완료될 때까지 중단되며, 결과를 반환합니다. 이를 표현하는 또 다른 방법이지만 더 장황한 방법은 새 코루틴을 시작하고 완료될 때까지 명시적으로 기다리는(중단하는) 것입니다: launch(context) { ... }.join().
  3. 코드를 실행하고 코루틴이 스레드 풀의 스레드에서 실행되는지 확인하십시오.

구조화된 동시성

  • _코루틴 스코프_는 서로 다른 코루틴 간의 구조와 부모-자식 관계를 담당합니다. 새 코루틴은 일반적으로 스코프 내에서 시작되어야 합니다.
  • _코루틴 컨텍스트_는 코루틴의 사용자 지정 이름이나 코루틴이 스케줄링될 스레드를 지정하는 디스패처와 같이 주어진 코루틴을 실행하는 데 사용되는 추가 기술 정보를 저장합니다.

launch, async, 또는 runBlocking이 새 코루틴을 시작하는 데 사용되면 자동으로 해당 스코프를 생성합니다. 이 모든 함수는 리시버를 가진 람다를 인수로 받으며, CoroutineScope가 암묵적 리시버 타입입니다:

kotlin
launch { /* this: CoroutineScope */ }
  • 새 코루틴은 스코프 내에서만 시작할 수 있습니다.
  • launchasyncCoroutineScope의 확장으로 선언되었으므로, 이들을 호출할 때는 항상 암묵적 또는 명시적 리시버가 전달되어야 합니다.
  • runBlocking에 의해 시작된 코루틴은 유일한 예외입니다. runBlocking은 최상위 함수로 정의되어 있기 때문입니다. 그러나 현재 스레드를 블로킹하므로 주로 main() 함수 및 테스트에서 브릿지 함수로 사용됩니다.

runBlocking, launch, 또는 async 내에서 새 코루틴은 스코프 내에서 자동으로 시작됩니다:

kotlin
import kotlinx.coroutines.*

fun main() = runBlocking { /* this: CoroutineScope */
    launch { /* ... */ }
    // the same as:   
    this.launch { /* ... */ }
}

runBlocking 내에서 launch를 호출할 때, 이것은 CoroutineScope 타입의 암묵적 리시버에 대한 확장으로 호출됩니다. 대안으로, this.launch를 명시적으로 작성할 수도 있습니다.

(이 예시에서 launch에 의해 시작된) 중첩 코루틴은 외부 코루틴(runBlocking에 의해 시작된)의 자식으로 간주될 수 있습니다. 이 "부모-자식" 관계는 스코프를 통해 작동합니다. 자식 코루틴은 부모 코루틴에 해당하는 스코프에서 시작됩니다.

새 코루틴을 시작하지 않고도 coroutineScope 함수를 사용하여 새 스코프를 생성할 수 있습니다. suspend 함수 내에서 외부 스코프에 대한 접근 없이 구조화된 방식으로 새 코루틴을 시작하려면, 이 suspend 함수가 호출된 외부 스코프의 자식 스코프가 자동으로 되는 새 코루틴 스코프를 생성할 수 있습니다. loadContributorsConcurrent()가 좋은 예입니다.

또한 GlobalScope.async 또는 GlobalScope.launch를 사용하여 전역 스코프에서 새 코루틴을 시작할 수 있습니다. 이렇게 하면 최상위 "독립" 코루틴이 생성됩니다.

코루틴의 구조를 뒷받침하는 메커니즘을 구조화된 동시성(structured concurrency)이라고 합니다. 이는 전역 스코프에 비해 다음과 같은 이점을 제공합니다:

  • 스코프는 일반적으로 자식 코루틴을 담당하며, 자식 코루틴의 수명은 스코프의 수명에 연결됩니다.
  • 스코프는 문제가 발생하거나 사용자가 마음을 바꾸어 작업을 취소하기로 결정한 경우 자식 코루틴을 자동으로 취소할 수 있습니다.
  • 스코프는 모든 자식 코루틴의 완료를 자동으로 기다립니다. 따라서 스코프가 코루틴에 해당한다면, 부모 코루틴은 해당 스코프에서 시작된 모든 코루틴이 완료될 때까지 완료되지 않습니다.

GlobalScope.async를 사용할 경우, 여러 코루틴을 더 작은 스코프에 묶는 구조가 없습니다. 전역 스코프에서 시작된 코루틴은 모두 독립적입니다. 이들의 수명은 전체 애플리케이션의 수명에 의해서만 제한됩니다. 전역 스코프에서 시작된 코루틴에 대한 참조를 저장하고 명시적으로 완료를 기다리거나 취소할 수 있지만, 구조화된 동시성에서처럼 자동으로 발생하지는 않습니다.

기여자 로딩 취소

기여자 목록을 로드하는 함수의 두 가지 버전을 생성합니다. 부모 코루틴을 취소하려고 할 때 두 버전이 어떻게 작동하는지 비교합니다. 첫 번째 버전은 coroutineScope를 사용하여 모든 자식 코루틴을 시작하고, 두 번째 버전은 GlobalScope를 사용합니다.

  1. Request5Concurrent.kt에서 loadContributorsConcurrent() 함수에 3초 지연을 추가합니다:

    kotlin
    suspend fun loadContributorsConcurrent(
        service: GitHubService, 
        req: RequestData
    ): List<User> = coroutineScope {
        // ...
        async {
            log("starting loading for ${repo.name}")
            delay(3000)
            // 저장소 기여자 로드
        }
        // ...
    }

    이 지연은 요청을 보내는 모든 코루틴에 영향을 미치므로, 코루틴이 시작된 후 요청이 보내지기 전에 로딩을 취소할 충분한 시간을 확보합니다.

  2. 로딩 함수의 두 번째 버전을 생성합니다. loadContributorsConcurrent()의 구현을 Request5NotCancellable.ktloadContributorsNotCancellable()로 복사한 다음, 새 coroutineScope 생성을 제거합니다.

  3. 이제 async 호출이 해결되지 않으므로, GlobalScope.async를 사용하여 이들을 시작합니다:

    kotlin
    suspend fun loadContributorsNotCancellable(
        service: GitHubService,
        req: RequestData
    ): List<User> {   // #1
        // ...
        GlobalScope.async {   // #2
            log("starting loading for ${repo.name}")
            // 저장소 기여자 로드
        }
        // ...
        return deferreds.awaitAll().flatten().aggregate()  // #3
    }
    • 이제 함수는 람다 내부의 마지막 표현식이 아닌 결과를 직접 반환합니다( #1#3 줄).
    • 모든 "기여자" 코루틴은 코루틴 스코프의 자식이 아닌 GlobalScope 내에서 시작됩니다( #2 줄).
  4. 프로그램을 실행하고 CONCURRENT 옵션을 선택하여 기여자를 로드합니다.

  5. 모든 "기여자" 코루틴이 시작될 때까지 기다린 다음 _Cancel_을 클릭합니다. 로그에 새 결과가 표시되지 않는데, 이는 모든 요청이 실제로 취소되었음을 의미합니다:

    text
    2896 [AWT-EventQueue-0 @coroutine#1] INFO  Contributors - kotlin: loaded 40 repos
    2901 [DefaultDispatcher-worker-2 @coroutine#4] INFO  Contributors - starting loading for kotlin-koans
    ...
    2909 [DefaultDispatcher-worker-5 @coroutine#36] INFO  Contributors - starting loading for mpp-example
    /* 'cancel' 클릭 */
    /* 요청이 전송되지 않음 */
  6. 5단계를 반복하되, 이번에는 NOT_CANCELLABLE 옵션을 선택합니다:

    text
    2570 [AWT-EventQueue-0 @coroutine#1] INFO  Contributors - kotlin: loaded 30 repos
    2579 [DefaultDispatcher-worker-1 @coroutine#4] INFO  Contributors - starting loading for kotlin-koans
    ...
    2586 [DefaultDispatcher-worker-6 @coroutine#36] INFO  Contributors - starting loading for mpp-example
    /* 'cancel' 클릭 */
    /* 그러나 모든 요청은 여전히 전송됨: */
    6402 [DefaultDispatcher-worker-5 @coroutine#4] INFO  Contributors - kotlin-koans: loaded 45 contributors
    ...
    9555 [DefaultDispatcher-worker-8 @coroutine#36] INFO  Contributors - mpp-example: loaded 8 contributors

    이 경우 어떤 코루틴도 취소되지 않으며 모든 요청은 여전히 전송됩니다.

  7. "기여자" 프로그램에서 취소가 어떻게 트리거되는지 확인합니다. Cancel 버튼을 클릭하면 메인 "로딩" 코루틴이 명시적으로 취소되고 자식 코루틴은 자동으로 취소됩니다:

    kotlin
    interface Contributors {
    
        fun loadContributors() {
            // ...
            when (getSelectedVariant()) {
                CONCURRENT -> {
                    launch {
                        val users = loadContributorsConcurrent(service, req)
                        updateResults(users, startTime)
                    }.setUpCancellation()      // #1
                }
            }
        }
    
        private fun Job.setUpCancellation() {
            val loadingJob = this              // #2
    
            // 'cancel' 버튼이 클릭되면 로딩 작업을 취소합니다:
            val listener = ActionListener {
                loadingJob.cancel()            // #3
                updateLoadingStatus(CANCELED)
            }
            // 'cancel' 버튼에 리스너를 추가합니다:
            addCancelListener(listener)
    
            // 로딩 작업이 완료된 후 상태를 업데이트하고 리스너를 제거합니다.
        }
    }

launch 함수는 Job의 인스턴스를 반환합니다. Job은 모든 데이터를 로드하고 결과를 업데이트하는 "로딩 코루틴"에 대한 참조를 저장합니다. Job 인스턴스를 리시버로 전달하여 setUpCancellation() 확장 함수를 호출할 수 있습니다( #1 줄).

이를 표현하는 또 다른 방법은 다음과 같이 명시적으로 작성하는 것입니다:

kotlin
val job = launch { }
job.setUpCancellation()
  • 가독성을 위해 함수 내부에서 setUpCancellation() 함수의 리시버를 새로운 loadingJob 변수( #2 줄)로 참조할 수 있습니다.
  • 그런 다음 Cancel 버튼에 리스너를 추가하여 클릭될 때 loadingJob이 취소되도록 할 수 있습니다( #3 줄).

구조화된 동시성을 사용하면 부모 코루틴만 취소하면 되고, 이는 모든 자식 코루틴으로 자동으로 취소가 전파됩니다.

외부 스코프의 컨텍스트 사용

주어진 스코프 내에서 새 코루틴을 시작하면 모든 코루틴이 동일한 컨텍스트로 실행되도록 훨씬 쉽게 보장할 수 있습니다. 또한 필요한 경우 컨텍스트를 교체하기도 훨씬 쉽습니다.

이제 외부 스코프의 디스패처를 사용하는 방법을 알아볼 차례입니다. coroutineScope 또는 코루틴 빌더에 의해 생성된 새 스코프는 항상 외부 스코프로부터 컨텍스트를 상속합니다. 이 경우 외부 스코프는 suspend loadContributorsConcurrent() 함수가 호출된 스코프입니다:

kotlin
launch(Dispatchers.Default) {  // 외부 스코프
    val users = loadContributorsConcurrent(service, req)
    // ...
}

모든 중첩 코루틴은 상속된 컨텍스트로 자동으로 시작됩니다. 디스패처는 이 컨텍스트의 일부입니다. 따라서 async에 의해 시작된 모든 코루틴은 기본 디스패처의 컨텍스트로 시작됩니다:

kotlin
suspend fun loadContributorsConcurrent(
    service: GitHubService, req: RequestData
): List<User> = coroutineScope {
    // 이 스코프는 외부 스코프에서 컨텍스트를 상속합니다
    // ...
    async {   // 상속된 컨텍스트로 시작된 중첩 코루틴
        // ...
    }
    // ...
}

구조화된 동시성을 사용하면 최상위 코루틴을 생성할 때 주요 컨텍스트 요소(예: 디스패처)를 한 번 지정할 수 있습니다. 그러면 모든 중첩 코루틴은 컨텍스트를 상속받고 필요할 때만 수정합니다.

예를 들어 안드로이드와 같은 UI 애플리케이션에서 코루틴 코드를 작성할 때, 기본적으로 최상위 코루틴에 CoroutineDispatchers.Main을 사용하고, 다른 스레드에서 코드를 실행해야 할 때 명시적으로 다른 디스패처를 지정하는 것이 일반적인 관행입니다.

진행 상황 표시

일부 저장소에 대한 정보는 상당히 빠르게 로드되지만, 사용자는 모든 데이터가 로드된 후에야 최종 목록을 볼 수 있습니다. 그때까지 로더 아이콘은 진행 상황을 표시하지만, 현재 상태나 이미 로드된 기여자 정보는 없습니다.

중간 결과를 더 일찍 표시하고 각 저장소에 대한 데이터 로딩 후 모든 기여자를 표시할 수 있습니다:

데이터 로딩 중

이 기능을 구현하려면 src/tasks/Request6Progress.kt에서 UI를 업데이트하는 로직을 콜백으로 전달해야 합니다. 이렇게 하면 각 중간 상태에서 호출됩니다:

kotlin
suspend fun loadContributorsProgress(
    service: GitHubService,
    req: RequestData,
    updateResults: suspend (List<User>, completed: Boolean) -> Unit
) {
    // 데이터 로딩
    // 중간 상태에서 `updateResults()` 호출
}

Contributors.kt의 호출 사이트에서는 PROGRESS 옵션에 대해 Main 스레드에서 결과를 업데이트하기 위해 콜백이 전달됩니다:

kotlin
launch(Dispatchers.Default) {
    loadContributorsProgress(service, req) { users, completed ->
        withContext(Dispatchers.Main) {
            updateResults(users, startTime, completed)
        }
    }
}
  • updateResults() 매개변수는 loadContributorsProgress()에서 suspend로 선언됩니다. 해당 람다 인수 내에서 suspend 함수인 withContext를 호출해야 합니다.
  • updateResults() 콜백은 로딩이 완료되었고 결과가 최종적인지 여부를 지정하는 추가적인 Boolean 매개변수를 인수로 받습니다.

작업 6

Request6Progress.kt 파일에서 중간 진행 상황을 보여주는 loadContributorsProgress() 함수를 구현하십시오. Request4Suspend.ktloadContributorsSuspend() 함수를 기반으로 하십시오.

  • 동시성을 사용하지 않는 간단한 버전을 사용하십시오. 동시성은 다음 섹션에서 추가할 것입니다.
  • 기여자의 중간 목록은 각 저장소에 대해 로드된 사용자 목록이 아니라 "집계된" 상태로 표시되어야 합니다.
  • 각 사용자의 총 기여 수는 각 새 저장소에 대한 데이터가 로드될 때마다 증가해야 합니다.

작업 6 솔루션

"모든 기여자" 목록의 중간 상태를 저장하려면 사용자 목록을 저장하는 allUsers 변수를 정의한 다음, 각 새 저장소에 대한 기여자가 로드된 후 이를 업데이트하십시오:

kotlin
suspend fun loadContributorsProgress(
    service: GitHubService,
    req: RequestData,
    updateResults: suspend (List<User>, completed: Boolean) -> Unit
) {
    val repos = service
        .getOrgRepos(req.org)
        .also { logRepos(req, it) }
        .bodyList()

    var allUsers = emptyList<User>()
    for ((index, repo) in repos.withIndex()) {
        val users = service.getRepoContributors(req.org, repo.name)
            .also { logUsers(repo, it) }
            .bodyList()

        allUsers = (allUsers + users).aggregate()
        updateResults(allUsers, index == repos.lastIndex)
    }
}

순차적 vs 동시적

updateResults() 콜백은 각 요청이 완료된 후 호출됩니다:

요청 진행 상황

이 코드에는 동시성이 포함되어 있지 않습니다. 순차적이므로 동기화가 필요하지 않습니다.

가장 좋은 옵션은 요청을 동시에 보내고 각 저장소에 대한 응답을 받은 후 중간 결과를 업데이트하는 것입니다:

동시 요청

동시성을 추가하려면 _채널_을 사용하십시오.

채널

공유 가능한 가변 상태를 가진 코드를 작성하는 것은 상당히 어렵고 오류가 발생하기 쉽습니다(콜백을 사용하는 솔루션처럼). 더 간단한 방법은 공통 가변 상태를 사용하는 대신 통신을 통해 정보를 공유하는 것입니다. 코루틴은 _채널_을 통해 서로 통신할 수 있습니다.

채널은 코루틴 간에 데이터를 전달할 수 있는 통신 프리미티브입니다. 한 코루틴은 채널에 일부 정보를 보내고, 다른 코루틴은 채널에서 해당 정보를 받을 수 있습니다:

채널 사용

정보를 보내는(생산하는) 코루틴을 종종 생산자(producer)라고 부르며, 정보를 받는(소비하는) 코루틴을 소비자(consumer)라고 부릅니다. 하나 또는 여러 코루틴이 동일한 채널에 정보를 보낼 수 있으며, 하나 또는 여러 코루틴이 채널에서 데이터를 받을 수 있습니다:

다수의 코루틴과 함께 채널 사용

여러 코루틴이 동일한 채널에서 정보를 받을 때, 각 요소는 소비자 중 하나에 의해 한 번만 처리됩니다. 요소가 처리되면 즉시 채널에서 제거됩니다.

채널을 요소의 컬렉션, 더 정확히는 요소가 한쪽 끝에 추가되고 다른 쪽 끝에서 수신되는 큐와 유사하다고 생각할 수 있습니다. 그러나 중요한 차이점이 있습니다. 동기화된 버전의 컬렉션과 달리 채널은 send()receive() 작업을 _중단_시킬 수 있습니다. 이는 채널이 비어 있거나 가득 찼을 때 발생합니다. 채널 크기에 상한이 있다면 채널이 가득 찰 수 있습니다.

ChannelSendChannel, ReceiveChannel, 그리고 이 둘을 확장하는 Channel의 세 가지 인터페이스로 표현됩니다. 일반적으로 채널을 생성하고 생산자에게 SendChannel 인스턴스로 제공하여 생산자만 채널에 정보를 보낼 수 있도록 합니다. 소비자에게는 ReceiveChannel 인스턴스로 채널을 제공하여 소비자만 채널에서 데이터를 받을 수 있도록 합니다. sendreceive 메서드는 모두 suspend로 선언됩니다:

kotlin
interface SendChannel<in E> {
    suspend fun send(element: E)
    fun close(): Boolean
}

interface ReceiveChannel<out E> {
    suspend fun receive(): E
}

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

생산자는 더 이상 요소가 오지 않을 것임을 나타내기 위해 채널을 닫을 수 있습니다.

라이브러리에는 여러 유형의 채널이 정의되어 있습니다. 이들은 내부적으로 저장할 수 있는 요소 수와 send() 호출이 중단될 수 있는지 여부에서 다릅니다. 모든 채널 유형에 대해 receive() 호출은 유사하게 작동합니다. 채널이 비어 있지 않으면 요소를 수신하고, 그렇지 않으면 중단됩니다.

무제한 채널 (Unlimited channel)

무제한 채널은 큐에 가장 가까운 비유입니다. 생산자는 이 채널에 요소를 보낼 수 있으며, 채널은 무한히 커질 것입니다. send() 호출은 결코 중단되지 않습니다. 프로그램의 메모리가 부족해지면 OutOfMemoryException이 발생할 것입니다. 무제한 채널과 큐의 차이점은 소비자가 빈 채널에서 받으려고 시도할 때, 새 요소가 전송될 때까지 중단된다는 점입니다.

무제한 채널
버퍼링된 채널 (Buffered channel)

버퍼링된 채널의 크기는 지정된 숫자로 제한됩니다. 생산자는 크기 제한에 도달할 때까지 이 채널에 요소를 보낼 수 있습니다. 모든 요소는 내부적으로 저장됩니다. 채널이 가득 차면, 다음 send 호출은 더 많은 빈 공간이 사용 가능해질 때까지 중단됩니다.

버퍼링된 채널
랑데부 채널 (Rendezvous channel)

"랑데부" 채널은 버퍼가 없는 채널로, 크기가 0인 버퍼링된 채널과 같습니다. 두 함수(send() 또는 receive()) 중 하나는 다른 하나가 호출될 때까지 항상 중단됩니다.

만약 send() 함수가 호출되었고, 해당 요소를 처리할 준비가 된 중단된 receive() 호출이 없다면 send()는 중단됩니다. 유사하게, receive() 함수가 호출되었고 채널이 비어 있거나, 다른 말로, 요소를 보낼 준비가 된 중단된 send() 호출이 없다면 receive() 호출은 중단됩니다.

"랑데부"라는 이름("합의된 시간과 장소에서의 만남")은 send()receive()가 "제때 만나야" 한다는 사실을 나타냅니다.

랑데부 채널
Conflated 채널 (Conflated channel)

Conflated 채널로 전송된 새 요소는 이전에 전송된 요소를 덮어씁니다. 따라서 수신자는 항상 최신 요소만 받게 됩니다. send() 호출은 결코 중단되지 않습니다.

Conflated 채널

채널을 생성할 때, 유형이나 버퍼 크기를 지정합니다(버퍼링된 채널이 필요한 경우):

kotlin
val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

기본적으로 "랑데부" 채널이 생성됩니다.

다음 작업에서는 "랑데부" 채널, 두 개의 생산자 코루틴, 그리고 하나의 소비자 코루틴을 생성할 것입니다:

kotlin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        channel.send("A1")
        channel.send("A2")
        log("A done")
    }
    launch {
        channel.send("B1")
        log("B done")
    }
    launch {
        repeat(3) {
            val x = channel.receive()
            log(x)
        }
    }
}

fun log(message: Any?) {
    println("[${Thread.currentThread().name}] $message")
}

채널에 대한 더 나은 이해를 위해 이 비디오를 시청하십시오.

작업 7

src/tasks/Request7Channels.kt에서 모든 GitHub 기여자를 동시에 요청하고 동시에 중간 진행 상황을 보여주는 loadContributorsChannels() 함수를 구현하십시오.

이전 함수인 Request5Concurrent.ktloadContributorsConcurrent()Request6Progress.ktloadContributorsProgress()를 사용하십시오.

작업 7 팁

다양한 저장소에 대한 기여자 목록을 동시에 수신하는 다른 코루틴은 수신된 모든 결과를 동일한 채널로 보낼 수 있습니다:

kotlin
val channel = Channel<List<User>>()
for (repo in repos) {
    launch {
        val users = TODO()
        // ...
        channel.send(users)
    }
}

그러면 이 채널의 요소들을 하나씩 수신하여 처리할 수 있습니다:

kotlin
repeat(repos.size) {
    val users = channel.receive()
    // ...
}

receive() 호출은 순차적이므로 추가적인 동기화는 필요하지 않습니다.

작업 7 솔루션

loadContributorsProgress() 함수와 마찬가지로, "모든 기여자" 목록의 중간 상태를 저장할 allUsers 변수를 생성한 다음, 각 새 저장소에 대한 기여자가 로드된 후 이를 업데이트할 수 있습니다:

kotlin
suspend fun loadContributorsChannels(
    service: GitHubService,
    req: RequestData,
    updateResults: suspend (List<User>, completed: Boolean) -> Unit
) = coroutineScope {

    val repos = service
        .getOrgRepos(req.org)
        .also { logRepos(req, it) }
        .bodyList()

    val channel = Channel<List<User>>()
    for (repo in repos) {
        launch {
            val users = service.getRepoContributors(req.org, repo.name)
                .also { logUsers(repo, it) }
                .bodyList()
            channel.send(users)
        }
    }
    var allUsers = emptyList<User>()
    repeat(repos.size) {
        val users = channel.receive()
        allUsers = (allUsers + users).aggregate()
        updateResults(allUsers, it == repos.lastIndex)
    }
}
  • 다른 저장소의 결과는 준비되는 즉시 채널에 추가됩니다. 처음에는 모든 요청이 전송되고 데이터가 수신되지 않을 때 receive() 호출이 중단됩니다. 이 경우 전체 "기여자 로드" 코루틴이 중단됩니다.
  • 그런 다음 사용자 목록이 채널로 전송되면 "기여자 로드" 코루틴이 재개되고 receive() 호출이 이 목록을 반환하며 결과가 즉시 업데이트됩니다.

이제 프로그램을 실행하고 CHANNELS 옵션을 선택하여 기여자를 로드하고 결과를 볼 수 있습니다.

코루틴과 채널이 동시성으로 인한 복잡성을 완전히 제거하지는 않지만, 상황을 이해해야 할 때 작업을 더 쉽게 만듭니다.

코루틴 테스트하기

이제 모든 솔루션을 테스트하여 동시 코루틴을 사용하는 솔루션이 suspend 함수를 사용하는 솔루션보다 빠른지 확인하고, 채널을 사용하는 솔루션이 단순한 "진행 상황" 솔루션보다 빠른지 확인해 봅시다.

다음 작업에서는 솔루션의 총 실행 시간을 비교합니다. GitHub 서비스를 모의(mock)하고 이 서비스가 주어진 타임아웃 후에 결과를 반환하도록 할 것입니다:

text
저장소 요청 - 1000ms 지연 내에 응답 반환
repo-1 - 1000ms 지연
repo-2 - 1200ms 지연
repo-3 - 800ms 지연

suspend 함수를 사용하는 순차적 솔루션은 약 4000ms가 소요되어야 합니다 (4000 = 1000 + (1000 + 1200 + 800)). 동시 솔루션은 약 2200ms가 소요되어야 합니다 (2200 = 1000 + max(1000, 1200, 800)).

진행 상황을 보여주는 솔루션의 경우, 타임스탬프와 함께 중간 결과를 확인할 수도 있습니다.

해당 테스트 데이터는 test/contributors/testData.kt에 정의되어 있으며, Request4SuspendKtTest, Request7ChannelsKtTest 등은 모의 서비스 호출을 사용하는 간단한 테스트를 포함합니다.

그러나 여기에 두 가지 문제가 있습니다:

  • 이 테스트는 실행하는 데 너무 오래 걸립니다. 각 테스트는 약 2초에서 4초가 걸리며, 매번 결과를 기다려야 합니다. 효율적이지 않습니다.
  • 코드를 준비하고 실행하는 데 추가 시간이 걸리기 때문에 솔루션이 실행되는 정확한 시간에 의존할 수 없습니다. 상수를 추가할 수는 있지만, 그러면 기계마다 시간이 다를 것입니다. 모의 서비스 지연은 이 상수보다 높아야 차이를 볼 수 있습니다. 만약 상수가 0.5초라면, 지연을 0.1초로 만드는 것은 충분하지 않을 것입니다.

더 나은 방법은 특수 프레임워크를 사용하여 동일한 코드를 여러 번 실행하면서 타이밍을 테스트하는 것이지만(이는 총 시간을 더 증가시킵니다), 이는 배우고 설정하기에 복잡합니다.

이러한 문제를 해결하고 제공된 테스트 지연을 사용하여 솔루션이 예상대로 작동하는지, 즉 하나가 다른 것보다 빠른지 확인하려면 특수 테스트 디스패처를 사용하여 _가상 시간_을 사용하십시오. 이 디스패처는 시작 이후 경과된 가상 시간을 추적하고 모든 것을 실시간으로 즉시 실행합니다. 이 디스패처에서 코루틴을 실행하면 delay가 즉시 반환되고 가상 시간을 진행시킬 것입니다.

이 메커니즘을 사용하는 테스트는 빠르게 실행되지만, 가상 시간의 다른 순간에 무엇이 발생하는지 여전히 확인할 수 있습니다. 총 실행 시간은 급격히 감소합니다:

총 실행 시간 비교

가상 시간을 사용하려면 runBlocking 호출을 runTest로 바꾸십시오. runTestTestScope에 대한 확장 람다를 인수로 받습니다. 이 특수 스코프 내에서 suspend 함수에서 delay를 호출하면 delay는 실제 시간으로 지연시키는 대신 가상 시간을 증가시킵니다:

kotlin
@Test
fun testDelayInSuspend() = runTest {
    val realStartTime = System.currentTimeMillis() 
    val virtualStartTime = currentTime
        
    foo()
    println("${System.currentTimeMillis() - realStartTime} ms") // ~ 6 ms
    println("${currentTime - virtualStartTime} ms")             // 1000 ms
}

suspend fun foo() {
    delay(1000)    // 지연 없이 자동 진행
    println("foo") // foo()가 호출되면 즉시 실행
}

TestScopecurrentTime 속성을 사용하여 현재 가상 시간을 확인할 수 있습니다.

이 예제에서 실제 실행 시간은 몇 밀리초인 반면, 가상 시간은 지연 인수와 동일한 1000 밀리초입니다.

자식 코루틴에서 "가상" delay의 완전한 효과를 얻으려면 모든 자식 코루틴을 TestDispatcher로 시작해야 합니다. 그렇지 않으면 작동하지 않습니다. 이 디스패처는 다른 디스패처를 제공하지 않는 한 다른 TestScope에서 자동으로 상속됩니다:

kotlin
@Test
fun testDelayInLaunch() = runTest {
    val realStartTime = System.currentTimeMillis()
    val virtualStartTime = currentTime

    bar()

    println("${System.currentTimeMillis() - realStartTime} ms") // ~ 11 ms
    println("${currentTime - virtualStartTime} ms")             // 1000 ms
}

suspend fun bar() = coroutineScope {
    launch {
        delay(1000)    // 지연 없이 자동 진행
        println("bar") // bar()가 호출되면 즉시 실행
    }
}

위 예제에서 launchDispatchers.Default 컨텍스트로 호출되면 테스트가 실패할 것입니다. 작업이 아직 완료되지 않았다는 예외가 발생할 것입니다.

loadContributorsConcurrent() 함수를 이 방식으로 테스트할 수 있는 경우는, 자식 코루틴을 상속된 컨텍스트로 시작하고 Dispatchers.Default 디스패처를 사용하여 컨텍스트를 수정하지 않는 경우뿐입니다.

디스패처와 같은 컨텍스트 요소를 함수를 _정의_할 때가 아니라 _호출_할 때 지정할 수 있어, 더 유연하고 테스트하기 쉽습니다.

가상 시간을 지원하는 테스트 API는 실험적이며 향후 변경될 수 있습니다.

기본적으로 컴파일러는 실험적인 테스트 API를 사용하면 경고를 표시합니다. 이러한 경고를 억제하려면 @OptIn(ExperimentalCoroutinesApi::class)로 테스트 함수 또는 테스트를 포함하는 전체 클래스에 어노테이션을 달아주십시오. 컴파일러에 실험적 API를 사용하고 있음을 지시하는 컴파일러 인수를 추가하십시오:

kotlin
compileTestKotlin {
    kotlinOptions {
        freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
    }
}

이 튜토리얼에 해당하는 프로젝트에서는 Gradle 스크립트에 컴파일러 인수가 이미 추가되어 있습니다.

작업 8

tests/tasks/의 다음 테스트를 실제 시간 대신 가상 시간을 사용하도록 리팩터링하십시오:

  • Request4SuspendKtTest.kt
  • Request5ConcurrentKtTest.kt
  • Request6ProgressKtTest.kt
  • Request7ChannelsKtTest.kt

리팩터링 적용 전후의 총 실행 시간을 비교하십시오.

작업 8 팁

  1. runBlocking 호출을 runTest로 바꾸고, System.currentTimeMillis()currentTime으로 바꾸십시오:

    kotlin
    @Test
    fun test() = runTest {
        val startTime = currentTime
        // action
        val totalTime = currentTime - startTime
        // testing result
    }
  2. 정확한 가상 시간을 확인하는 어설션을 주석 해제하십시오.

  3. @UseExperimental(ExperimentalCoroutinesApi::class)를 추가하는 것을 잊지 마십시오.

작업 8 솔루션

다음은 동시성 및 채널 경우에 대한 솔루션입니다:

kotlin
fun testConcurrent() = runTest {
    val startTime = currentTime
    val result = loadContributorsConcurrent(MockGithubService, testRequestData)
    Assert.assertEquals("Wrong result for 'loadContributorsConcurrent'", expectedConcurrentResults.users, result)
    val totalTime = currentTime - startTime

    Assert.assertEquals(
        "The calls run concurrently, so the total virtual time should be 2200 ms: " +
                "1000 for repos request plus max(1000, 1200, 800) = 1200 for concurrent contributors requests)",
        expectedConcurrentResults.timeFromStart, totalTime
    )
}

먼저, 예상 가상 시간에 정확히 결과가 사용 가능한지 확인한 다음, 결과 자체를 확인합니다:

kotlin
fun testChannels() = runTest {
    val startTime = currentTime
    var index = 0
    loadContributorsChannels(MockGithubService, testRequestData) { users, _ ->
        val expected = concurrentProgressResults[index++]
        val time = currentTime - startTime
        Assert.assertEquals(
            "Expected intermediate results after ${expected.timeFromStart} ms:",
            expected.timeFromStart, time
        )
        Assert.assertEquals("Wrong intermediate results after $time:", expected.users, users)
    }
}

채널을 사용하는 마지막 버전의 첫 번째 중간 결과는 진행 상황 버전보다 더 빨리 사용할 수 있으며, 가상 시간을 사용하는 테스트에서 차이를 볼 수 있습니다.

나머지 "suspend" 및 "progress" 작업에 대한 테스트는 매우 유사합니다. 프로젝트의 solutions 브랜치에서 찾을 수 있습니다.

다음 단계