코루틴과 채널 − 튜토리얼
이 튜토리얼에서는 IntelliJ IDEA에서 코루틴을 사용하여 기본 스레드나 콜백을 차단하지 않고 네트워크 요청을 수행하는 방법을 배웁니다.
코루틴에 대한 사전 지식은 필수가 아니지만, 기본적인 Kotlin 문법에 익숙해야 합니다.
다음 내용을 배울 것입니다:
- 네트워크 요청을 수행하기 위해 중단 함수(suspending function)를 사용해야 하는 이유와 방법.
- 코루틴을 사용하여 요청을 동시적으로 보내는 방법.
- 채널을 사용하여 다른 코루틴 간에 정보를 공유하는 방법.
네트워크 요청에는 Retrofit 라이브러리가 필요하지만, 이 튜토리얼에서 보여주는 접근 방식은 코루틴을 지원하는 다른 모든 라이브러리에도 유사하게 작동합니다.
모든 작업의 솔루션은 프로젝트 저장소의
solutions
브랜치에서 찾을 수 있습니다.
시작하기 전에
최신 버전의 IntelliJ IDEA를 다운로드하여 설치합니다.
환영 화면에서 Get from VCS를 선택하거나 File | New | Project from Version Control을 선택하여 프로젝트 템플릿을 복제합니다.
명령줄에서도 복제할 수 있습니다:
Bashgit clone https://github.com/kotlin-hands-on/intro-coroutines
GitHub 개발자 토큰 생성
프로젝트에서 GitHub API를 사용할 것입니다. 접근하려면 GitHub 계정 이름과 암호 또는 토큰을 제공해야 합니다. 2단계 인증이 활성화되어 있다면 토큰만으로 충분합니다.
귀하의 계정에서 GitHub API를 사용할 새 GitHub 토큰을 생성하십시오:
토큰의 이름을 지정합니다 (예:
coroutines-tutorial
):어떤 스코프도 선택하지 마십시오. 페이지 하단의 Generate token을 클릭합니다.
생성된 토큰을 복사합니다.
코드 실행
이 프로그램은 주어진 조직(기본적으로 "kotlin") 아래의 모든 저장소에 대한 기여자를 로드합니다. 나중에 기여 수를 기준으로 사용자를 정렬하는 로직을 추가할 것입니다.
src/contributors/main.kt
파일을 열고main()
함수를 실행합니다. 다음 창이 나타날 것입니다:글꼴이 너무 작으면
main()
함수의setDefaultFontSize(18f)
값을 변경하여 조정하십시오.해당 필드에 GitHub 사용자 이름과 토큰(또는 암호)을 제공합니다.
Variant 드롭다운 메뉴에서 BLOCKING 옵션이 선택되어 있는지 확인합니다.
_Load contributors_를 클릭합니다. UI가 잠시 멈춘 다음 기여자 목록이 표시될 것입니다.
데이터가 로드되었는지 확인하기 위해 프로그램 출력을 엽니다. 기여자 목록은 각 성공적인 요청 후에 로깅됩니다.
이 로직을 구현하는 여러 가지 방법이 있습니다: 블로킹 요청 또는 콜백을 사용하는 것입니다. 이러한 솔루션을 코루틴을 사용하는 솔루션과 비교하고, 채널을 사용하여 여러 코루틴 간에 정보를 공유하는 방법을 살펴볼 것입니다.
블로킹 요청
Retrofit 라이브러리를 사용하여 GitHub에 HTTP 요청을 수행할 것입니다. 이 라이브러리를 사용하면 주어진 조직 아래의 저장소 목록과 각 저장소에 대한 기여자 목록을 요청할 수 있습니다:
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
fun getOrgReposCall(
@Path("org") org: String
): Call<List<Repo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
fun getRepoContributorsCall(
@Path("owner") owner: String,
@Path("repo") repo: String
): Call<List<User>>
}
이 API는 loadContributorsBlocking()
함수에서 주어진 조직에 대한 기여자 목록을 가져오는 데 사용됩니다.
src/tasks/Request1Blocking.kt
를 열어 구현을 확인합니다:kotlinfun loadContributorsBlocking( service: GitHubService, req: RequestData ): List<User> { val repos = service .getOrgReposCall(req.org) // #1 .execute() // #2 .also { logRepos(req, it) } // #3 .body() ?: emptyList() // #4 return repos.flatMap { repo -> service .getRepoContributorsCall(req.org, repo.name) // #1 .execute() // #2 .also { logUsers(repo, it) } // #3 .bodyList() // #4 }.aggregate() }
- 먼저 주어진 조직 아래의 저장소 목록을 가져와
repos
목록에 저장합니다. 그런 다음 각 저장소에 대한 기여자 목록을 요청하고, 모든 목록을 하나의 최종 기여자 목록으로 병합합니다. getOrgReposCall()
및getRepoContributorsCall()
은 모두*Call
클래스의 인스턴스를 반환합니다(#1
). 이 시점에서는 어떤 요청도 보내지지 않습니다.*Call.execute()
가 호출되어 요청을 수행합니다(#2
).execute()
는 기본 스레드를 블로킹하는 동기 호출입니다.- 응답을 받으면 특정
logRepos()
및logUsers()
함수를 호출하여 결과를 로깅합니다(#3
). HTTP 응답에 오류가 포함되어 있으면 해당 오류가 여기에 로깅됩니다. - 마지막으로 필요한 데이터가 포함된 응답의 본문을 가져옵니다. 이 튜토리얼에서는 오류가 발생한 경우 빈 목록을 결과로 사용하고 해당 오류를 로깅합니다(
#4
).
- 먼저 주어진 조직 아래의 저장소 목록을 가져와
.body() ?: emptyList()
의 반복을 피하기 위해 확장 함수bodyList()
가 선언됩니다:kotlinfun <T> Response<List<T>>.bodyList(): List<T> { return body() ?: emptyList() }
프로그램을 다시 실행하고 IntelliJ IDEA의 시스템 출력을 살펴보십시오. 다음과 유사한 내용이 표시되어야 합니다:
text1770 [AWT-EventQueue-0] INFO Contributors - kotlin: loaded 40 repos 2025 [AWT-EventQueue-0] INFO Contributors - kotlin-examples: loaded 23 contributors 2229 [AWT-EventQueue-0] INFO Contributors - kotlin-koans: loaded 45 contributors ...
- 각 줄의 첫 번째 항목은 프로그램 시작 이후 경과한 밀리초 수이고, 그 다음은 대괄호 안의 스레드 이름입니다. 로딩 요청이 어느 스레드에서 호출되었는지 확인할 수 있습니다.
- 각 줄의 마지막 항목은 실제 메시지입니다: 로드된 저장소 또는 기여자의 수.
이 로그 출력은 모든 결과가 메인 스레드에서 로깅되었음을 보여줍니다. BLOCKING 옵션으로 코드를 실행하면 로딩이 완료될 때까지 창이 멈추고 입력에 반응하지 않습니다. 모든 요청은
loadContributorsBlocking()
이 호출된 스레드와 동일한 스레드(Swing에서는 AWT 이벤트 디스패칭 스레드)에서 실행됩니다. 이 메인 스레드가 블로킹되어 UI가 멈추게 됩니다:기여자 목록이 로드되면 결과가 업데이트됩니다.
src/contributors/Contributors.kt
에서 기여자가 로드되는 방식을 선택하는loadContributors()
함수를 찾아loadContributorsBlocking()
이 어떻게 호출되는지 확인합니다:kotlinwhen (getSelectedVariant()) { BLOCKING -> { // Blocking UI thread val users = loadContributorsBlocking(service, req) updateResults(users, startTime) } }
updateResults()
호출은loadContributorsBlocking()
호출 바로 다음에 옵니다.updateResults()
는 UI를 업데이트하므로 항상 UI 스레드에서 호출되어야 합니다.loadContributorsBlocking()
도 UI 스레드에서 호출되므로 UI 스레드가 블로킹되고 UI가 멈춥니다.
작업 1
첫 번째 작업은 작업 도메인에 익숙해지는 데 도움이 됩니다. 현재 각 기여자의 이름이 참여한 모든 프로젝트에 대해 한 번씩 여러 번 반복됩니다. 각 기여자가 한 번만 추가되도록 사용자를 결합하는 aggregate()
함수를 구현하십시오. User.contributions
속성은 주어진 사용자가 모든 프로젝트에 기여한 총 수를 포함해야 합니다. 결과 목록은 기여 수에 따라 내림차순으로 정렬되어야 합니다.
src/tasks/Aggregation.kt
를 열고 List<User>.aggregate()
함수를 구현하십시오. 사용자는 총 기여 수에 따라 정렬되어야 합니다.
해당 테스트 파일 test/tasks/AggregationKtTest.kt
는 예상 결과의 예를 보여줍니다.
IntelliJ IDEA 단축키
Ctrl+Shift+T
/⇧ ⌘ T
를 사용하여 소스 코드와 테스트 클래스 사이를 자동으로 이동할 수 있습니다.
이 작업을 구현한 후 "kotlin" 조직의 결과 목록은 다음과 유사해야 합니다:
작업 1 솔루션
사용자를 로그인별로 그룹화하려면
groupBy()
를 사용하십시오. 이 함수는 로그인을 키로, 이 로그인으로 여러 저장소에 나타나는 모든 사용자 인스턴스를 값으로 하는 맵을 반환합니다.각 맵 항목에 대해 각 사용자의 총 기여 수를 계산하고, 주어진 이름과 총 기여 수로
User
클래스의 새 인스턴스를 생성합니다.결과 목록을 내림차순으로 정렬합니다:
kotlinfun List<User>.aggregate(): List<User> = groupBy { it.login } .map { (login, group) -> User(login, group.sumOf { it.contributions }) } .sortedByDescending { it.contributions }
대안 솔루션은 groupBy()
대신 groupingBy()
함수를 사용하는 것입니다.
콜백
이전 솔루션은 작동하지만, 스레드를 블로킹하여 UI를 멈추게 합니다. 이를 피하는 전통적인 접근 방식은 _콜백_을 사용하는 것입니다.
작업 완료 직후 호출되어야 하는 코드를 직접 호출하는 대신, 이를 별도의 콜백(종종 람다)으로 추출하여 호출자에게 전달하여 나중에 호출되도록 할 수 있습니다.
UI를 반응형으로 만들려면 전체 연산을 별도의 스레드로 옮기거나, 블로킹 호출 대신 콜백을 사용하는 Retrofit API로 전환할 수 있습니다.
백그라운드 스레드 사용
src/tasks/Request2Background.kt
를 열어 구현을 확인합니다. 먼저 전체 연산을 다른 스레드로 옮깁니다.thread()
함수는 새 스레드를 시작합니다:kotlinthread { loadContributorsBlocking(service, req) }
이제 모든 로딩이 별도의 스레드로 옮겨졌으므로 메인 스레드는 자유로워 다른 작업을 수행할 수 있습니다:
loadContributorsBackground()
함수의 시그니처가 변경됩니다. 모든 로딩이 완료된 후 호출할updateResults()
콜백을 마지막 인수로 받습니다:kotlinfun loadContributorsBackground( service: GitHubService, req: RequestData, updateResults: (List<User>) -> Unit )
이제
loadContributorsBackground()
가 호출되면updateResults()
호출은 이전처럼 즉시 뒤따라오는 것이 아니라 콜백에서 발생합니다:kotlinloadContributorsBackground(service, req) { users -> SwingUtilities.invokeLater { updateResults(users, startTime) } }
SwingUtilities.invokeLater
를 호출함으로써 결과를 업데이트하는updateResults()
호출이 메인 UI 스레드(AWT 이벤트 디스패칭 스레드)에서 발생하도록 보장합니다.
그러나 BACKGROUND
옵션을 통해 기여자를 로드하려고 하면 목록은 업데이트되지만 아무것도 변경되지 않는 것을 볼 수 있습니다.
작업 2
결과 목록이 UI에 표시되도록 src/tasks/Request2Background.kt
파일의 loadContributorsBackground()
함수를 수정하십시오.
작업 2 솔루션
기여자를 로드하려고 하면 로그에서 기여자가 로드되지만 결과가 표시되지 않는 것을 볼 수 있습니다. 이를 수정하려면 결과 사용자 목록에 대해 updateResults()
를 호출하십시오:
thread {
updateResults(loadContributorsBlocking(service, req))
}
콜백으로 전달된 로직을 명시적으로 호출해야 합니다. 그렇지 않으면 아무 일도 일어나지 않습니다.
Retrofit 콜백 API 사용
이전 솔루션에서 전체 로딩 로직은 백그라운드 스레드로 옮겨졌지만, 여전히 리소스 사용에 최선은 아닙니다. 모든 로딩 요청은 순차적으로 진행되며 스레드는 로딩 결과를 기다리는 동안 블로킹되어 다른 작업에 사용될 수 있는 기회를 잃습니다. 특히, 스레드는 전체 결과를 더 빨리 받기 위해 다른 요청을 로드하기 시작할 수 있습니다.
각 저장소에 대한 데이터 처리는 두 부분으로 나뉘어져야 합니다: 로딩 및 결과 응답 처리. 두 번째 처리 부분은 콜백으로 추출되어야 합니다.
각 저장소에 대한 로딩은 이전 저장소의 결과가 수신되기 전(그리고 해당 콜백이 호출되기 전)에 시작될 수 있습니다:
Retrofit 콜백 API는 이를 달성하는 데 도움이 될 수 있습니다. Call.enqueue()
함수는 HTTP 요청을 시작하고 인수로 콜백을 받습니다. 이 콜백에서 각 요청 후에 무엇을 해야 할지 지정해야 합니다.
src/tasks/Request3Callbacks.kt
를 열어 이 API를 사용하는 loadContributorsCallbacks()
의 구현을 확인합니다:
fun loadContributorsCallbacks(
service: GitHubService, req: RequestData,
updateResults: (List<User>) -> Unit
) {
service.getOrgReposCall(req.org).onResponse { responseRepos -> // #1
logRepos(req, responseRepos)
val repos = responseRepos.bodyList()
val allUsers = mutableListOf<User>()
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name)
.onResponse { responseUsers -> // #2
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
}
}
}
// TODO: Why doesn't this code work? How to fix that?
updateResults(allUsers.aggregate())
}
- 편의를 위해 이 코드 조각은 같은 파일에 선언된
onResponse()
확장 함수를 사용합니다. 이 함수는 객체 표현식 대신 람다를 인수로 받습니다. - 응답을 처리하는 로직은 콜백으로 추출됩니다: 해당 람다는
#1
및#2
줄에서 시작됩니다.
그러나 제공된 솔루션은 작동하지 않습니다. 프로그램을 실행하고 CALLBACKS 옵션을 선택하여 기여자를 로드하면 아무것도 표시되지 않는 것을 볼 수 있습니다. 그러나 Request3CallbacksKtTest
의 테스트는 성공적으로 통과했음을 즉시 반환합니다.
주어진 코드가 예상대로 작동하지 않는 이유를 생각해 보고 수정하거나, 아래 솔루션을 참조하십시오.
작업 3 (선택 사항)
src/tasks/Request3Callbacks.kt
파일의 코드를 다시 작성하여 로드된 기여자 목록이 표시되도록 하십시오.
작업 3의 첫 번째 시도 솔루션
현재 솔루션에서는 많은 요청이 동시적으로 시작되어 총 로딩 시간이 줄어듭니다. 그러나 결과가 로드되지 않습니다. 이는 updateResults()
콜백이 모든 로딩 요청이 시작된 직후에 호출되기 때문입니다. 즉, allUsers
목록이 데이터로 채워지기 전에 호출됩니다.
다음과 같은 변경으로 이를 수정할 수 있습니다:
val allUsers = mutableListOf<User>()
for ((index, repo) in repos.withIndex()) { // #1
service.getRepoContributorsCall(req.org, repo.name)
.onResponse { responseUsers ->
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
if (index == repos.lastIndex) { // #2
updateResults(allUsers.aggregate())
}
}
}
- 먼저 인덱스와 함께 저장소 목록을 반복합니다(
#1
). - 그런 다음 각 콜백에서 마지막 반복인지 확인합니다(
#2
). - 그리고 그렇다면 결과가 업데이트됩니다.
그러나 이 코드도 우리의 목표를 달성하지 못합니다. 직접 답을 찾거나, 아래 솔루션을 참조하십시오.
작업 3의 두 번째 시도 솔루션
로딩 요청이 동시적으로 시작되므로 마지막 요청의 결과가 마지막으로 온다는 보장은 없습니다. 결과는 어떤 순서로든 올 수 있습니다.
따라서 완료 조건으로 현재 인덱스를 lastIndex
와 비교하면 일부 저장소에 대한 결과를 잃을 위험이 있습니다.
만약 마지막 저장소를 처리하는 요청이 이전 요청보다 더 빨리 반환된다면(그럴 가능성이 높음), 더 많은 시간이 걸리는 요청들의 모든 결과가 손실될 것입니다.
이를 수정하는 한 가지 방법은 인덱스를 도입하고 모든 저장소가 이미 처리되었는지 확인하는 것입니다:
val allUsers = Collections.synchronizedList(mutableListOf<User>())
val numberOfProcessed = AtomicInteger()
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name)
.onResponse { responseUsers ->
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
if (numberOfProcessed.incrementAndGet() == repos.size) {
updateResults(allUsers.aggregate())
}
}
}
이 코드는 동기화된 버전의 목록과 AtomicInteger()
를 사용합니다. 왜냐하면 일반적으로 getRepoContributors()
요청을 처리하는 다른 콜백이 항상 동일한 스레드에서 호출된다는 보장이 없기 때문입니다.
작업 3의 세 번째 시도 솔루션
더 나은 솔루션은 CountDownLatch
클래스를 사용하는 것입니다. 이 클래스는 저장소 수로 초기화된 카운터를 저장합니다. 이 카운터는 각 저장소를 처리한 후 감소합니다. 그런 다음 래치가 0이 될 때까지 기다린 다음 결과를 업데이트합니다:
val countDownLatch = CountDownLatch(repos.size)
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name)
.onResponse { responseUsers ->
// processing repository
countDownLatch.countDown()
}
}
countDownLatch.await()
updateResults(allUsers.aggregate())
결과는 메인 스레드에서 업데이트됩니다. 이는 로직을 자식 스레드에 위임하는 것보다 더 직접적인 방법입니다.
이 세 가지 솔루션 시도를 검토한 결과, 콜백으로 올바른 코드를 작성하는 것은 특히 여러 기본 스레드와 동기화가 발생할 때 사소하지 않고 오류가 발생하기 쉽다는 것을 알 수 있습니다.
추가 연습으로, RxJava 라이브러리를 사용하여 반응형 접근 방식으로 동일한 로직을 구현할 수 있습니다. 필요한 모든 종속성과 RxJava 사용을 위한 솔루션은 별도의
rx
브랜치에서 찾을 수 있습니다. 이 튜토리얼을 완료하고 제안된 Rx 버전을 구현하거나 확인하여 적절한 비교를 하는 것도 가능합니다.
중단 함수
중단 함수(suspending function)를 사용하여 동일한 로직을 구현할 수 있습니다. Call<List<Repo>>
를 반환하는 대신, 다음과 같이 API 호출을 중단 함수로 정의합니다:
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
suspend fun getOrgRepos(
@Path("org") org: String
): List<Repo>
}
getOrgRepos()
는suspend
함수로 정의됩니다. 중단 함수를 사용하여 요청을 수행할 때 기본 스레드는 블로킹되지 않습니다. 이것이 어떻게 작동하는지에 대한 자세한 내용은 나중에 다룰 것입니다.getOrgRepos()
는Call
을 반환하는 대신 결과를 직접 반환합니다. 결과가 실패하면 예외가 발생합니다.
대안으로, Retrofit은 결과를 Response
로 래핑하여 반환할 수 있습니다. 이 경우, 결과 본문이 제공되며 수동으로 오류를 확인할 수 있습니다. 이 튜토리얼에서는 Response
를 반환하는 버전을 사용합니다.
src/contributors/GitHubService.kt
에서 GitHubService
인터페이스에 다음 선언을 추가합니다:
interface GitHubService {
// getOrgReposCall & getRepoContributorsCall declarations
@GET("orgs/{org}/repos?per_page=100")
suspend fun getOrgRepos(
@Path("org") org: String
): Response<List<Repo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
suspend fun getRepoContributors(
@Path("owner") owner: String,
@Path("repo") repo: String
): Response<List<User>>
}
작업 4
귀하의 임무는 기여자를 로드하는 함수의 코드를 변경하여 두 개의 새로운 중단 함수인 getOrgRepos()
와 getRepoContributors()
를 사용하도록 하는 것입니다. 새로운 loadContributorsSuspend()
함수는 새 API를 사용하기 위해 suspend
로 표시됩니다.
중단 함수는 모든 곳에서 호출될 수 없습니다.
loadContributorsBlocking()
에서 중단 함수를 호출하면 "Suspend function 'getOrgRepos' should be called only from a coroutine or another suspend function" 메시지와 함께 오류가 발생할 것입니다.
src/tasks/Request1Blocking.kt
에 정의된loadContributorsBlocking()
의 구현을src/tasks/Request4Suspend.kt
에 정의된loadContributorsSuspend()
로 복사합니다.Call
을 반환하는 함수 대신 새로운 중단 함수가 사용되도록 코드를 수정합니다.- SUSPEND 옵션을 선택하여 프로그램을 실행하고 GitHub 요청이 수행되는 동안 UI가 계속 반응하는지 확인합니다.
작업 4 솔루션
.getOrgReposCall(req.org).execute()
를 .getOrgRepos(req.org)
로 바꾸고 두 번째 "기여자" 요청에 대해서도 동일한 교체를 반복합니다:
suspend fun loadContributorsSuspend(service: GitHubService, req: RequestData): List<User> {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
return repos.flatMap { repo ->
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}.aggregate()
}
loadContributorsSuspend()
는suspend
함수로 정의되어야 합니다.- 이전에는
Response
를 반환했던execute
를 더 이상 호출할 필요가 없습니다. 이제 API 함수가Response
를 직접 반환하기 때문입니다. 이 세부 사항은 Retrofit 라이브러리에만 해당됩니다. 다른 라이브러리에서는 API가 다르겠지만, 개념은 동일합니다.
코루틴
중단 함수를 사용한 코드는 "블로킹" 버전과 유사하게 보입니다. 블로킹 버전과의 주요 차이점은 스레드를 블로킹하는 대신 코루틴이 중단된다는 것입니다:
block -> suspend
thread -> coroutine
코루틴은 종종 경량 스레드(lightweight thread)라고 불리는데, 이는 스레드에서 코드를 실행하는 방식과 유사하게 코루틴에서 코드를 실행할 수 있기 때문입니다. 이전에는 블로킹되어 피해야 했던 작업들이 이제 코루틴을 중단시킬 수 있습니다.
새 코루틴 시작
src/contributors/Contributors.kt
에서 loadContributorsSuspend()
가 어떻게 사용되는지 살펴보면, launch
내에서 호출되는 것을 볼 수 있습니다. launch
는 람다를 인수로 받는 라이브러리 함수입니다:
launch {
val users = loadContributorsSuspend(req)
updateResults(users, startTime)
}
여기서 launch
는 데이터를 로드하고 결과를 표시하는 역할을 하는 새로운 연산을 시작합니다. 이 연산은 중단 가능합니다. 네트워크 요청을 수행할 때 연산은 중단되고 기본 스레드를 해제합니다. 네트워크 요청이 결과를 반환하면 연산이 재개됩니다.
이러한 중단 가능한 연산을 _코루틴_이라고 합니다. 따라서 이 경우 launch
는 데이터를 로드하고 결과를 표시하는 역할을 하는 _새 코루틴을 시작_합니다.
코루틴은 스레드 위에서 실행되며 중단될 수 있습니다. 코루틴이 중단되면 해당 연산은 일시 중지되고, 스레드에서 제거되어 메모리에 저장됩니다. 그동안 스레드는 다른 작업을 수행할 수 있도록 자유로워집니다:
연산이 계속될 준비가 되면 (반드시 같은 스레드는 아님) 스레드로 반환됩니다.
loadContributorsSuspend()
예시에서 각 "기여자" 요청은 이제 중단 메커니즘을 사용하여 결과를 기다립니다. 먼저 새 요청이 전송됩니다. 그런 다음 응답을 기다리는 동안 launch
함수에 의해 시작된 전체 "기여자 로드" 코루틴이 중단됩니다.
코루틴은 해당 응답이 수신된 후에만 재개됩니다:
응답을 기다리는 동안 스레드는 다른 작업을 수행할 수 있도록 자유로워집니다. 모든 요청이 메인 UI 스레드에서 이루어지고 있음에도 불구하고 UI는 계속 반응합니다:
SUSPEND 옵션을 사용하여 프로그램을 실행합니다. 로그는 모든 요청이 메인 UI 스레드로 전송되었음을 확인합니다:
text2538 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos 2729 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - ts2kt: loaded 11 contributors 3029 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-koans: loaded 45 contributors ... 11252 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-coroutines-workshop: loaded 1 contributors
로그는 해당 코드가 실행되는 코루틴을 보여줄 수 있습니다. 이를 활성화하려면 Run | Edit configurations를 열고
-Dkotlinx.coroutines.debug
VM 옵션을 추가하십시오:이 옵션을 사용하여
main()
이 실행되는 동안 코루틴 이름이 스레드 이름에 첨부됩니다. 모든 Kotlin 파일을 실행하기 위한 템플릿을 수정하고 이 옵션을 기본적으로 활성화할 수도 있습니다.
이제 모든 코드는 위에서 언급한 "@coroutine#1"로 표시된 "기여자 로드" 코루틴 하나에서 실행됩니다. 결과를 기다리는 동안 다른 요청을 보내기 위해 스레드를 재사용해서는 안 됩니다. 코드가 순차적으로 작성되었기 때문입니다. 새 요청은 이전 결과가 수신된 후에만 전송됩니다.
중단 함수는 스레드를 공정하게 처리하며 "대기"를 위해 스레드를 블로킹하지 않습니다. 그러나 이것은 아직 동시성을 가져오지 않습니다.
동시성
Kotlin 코루틴은 스레드보다 훨씬 적은 리소스를 사용합니다. 비동기적으로 새로운 연산을 시작하고 싶을 때마다 새 코루틴을 생성할 수 있습니다.
새 코루틴을 시작하려면 주요 코루틴 빌더 중 하나인 launch
, async
, 또는 runBlocking
을 사용하십시오. 다른 라이브러리들은 추가적인 코루틴 빌더를 정의할 수 있습니다.
async
는 새로운 코루틴을 시작하고 Deferred
객체를 반환합니다. Deferred
는 Future
또는 Promise
와 같은 다른 이름으로 알려진 개념을 나타냅니다. 이것은 연산을 저장하지만, 최종 결과를 얻는 시점을 _지연_시키고; _미래_의 어느 시점에 결과를 _약속_합니다.
async
와 launch
의 주요 차이점은 launch
는 특정 결과를 반환하지 않을 것으로 예상되는 연산을 시작하는 데 사용된다는 것입니다. launch
는 코루틴을 나타내는 Job
을 반환합니다. Job.join()
을 호출하여 완료될 때까지 기다릴 수 있습니다.
Deferred
는 Job
을 확장하는 제네릭 타입입니다. async
호출은 람다가 반환하는 내용(람다 내부의 마지막 표현식이 결과)에 따라 Deferred<Int>
또는 Deferred<CustomType>
를 반환할 수 있습니다.
코루틴의 결과를 얻으려면 Deferred
인스턴스에서 await()
를 호출할 수 있습니다. 결과를 기다리는 동안 await()
가 호출된 코루틴은 중단됩니다:
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferred: Deferred<Int> = async {
loadData()
}
println("waiting...")
println(deferred.await())
}
suspend fun loadData(): Int {
println("loading...")
delay(1000L)
println("loaded!")
return 42
}
runBlocking
은 일반 함수와 중단 함수 사이, 또는 블로킹 세계와 비블로킹 세계 사이의 다리(bridge) 역할을 합니다. 이는 최상위 메인 코루틴을 시작하기 위한 어댑터로 작동합니다. 주로 main()
함수 및 테스트에서 사용하도록 의도되었습니다.
코루틴에 대한 더 나은 이해를 위해 이 비디오를 시청하십시오.
지연 객체 목록이 있는 경우 awaitAll()
을 호출하여 모든 결과를 기다릴 수 있습니다:
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferreds: List<Deferred<Int>> = (1..3).map {
async {
delay(1000L * it)
println("Loading $it")
it
}
}
val sum = deferreds.awaitAll().sum()
println("$sum")
}
각 "기여자" 요청이 새 코루틴에서 시작되면 모든 요청이 비동기적으로 시작됩니다. 이전 요청의 결과가 수신되기 전에 새 요청을 보낼 수 있습니다:
총 로딩 시간은 CALLBACKS 버전과 거의 동일하지만, 콜백이 필요 없습니다. 더 나아가, async
는 코드에서 어떤 부분이 동시적으로 실행되는지 명시적으로 강조합니다.
작업 5
Request5Concurrent.kt
파일에서 이전 loadContributorsSuspend()
함수를 사용하여 loadContributorsConcurrent()
함수를 구현하십시오.
작업 5 팁
코루틴은 코루틴 스코프 내에서만 시작할 수 있습니다. loadContributorsSuspend()
의 내용을 coroutineScope
호출로 복사하여 async
함수를 호출할 수 있도록 하십시오:
suspend fun loadContributorsConcurrent(
service: GitHubService,
req: RequestData
): List<User> = coroutineScope {
// ...
}
다음 스키마를 기반으로 솔루션을 만드십시오:
val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
async {
// 각 저장소의 기여자 로드
}
}
deferreds.awaitAll() // List<List<User>>
작업 5 솔루션
각 "기여자" 요청을 async
로 감싸서 저장소 수만큼 코루틴을 생성합니다. async
는 Deferred<List<User>>
를 반환합니다. 새 코루틴을 생성하는 것은 리소스 집약적이지 않으므로 필요한 만큼 생성할 수 있다는 점은 문제가 되지 않습니다.
flatMap
은 더 이상 사용할 수 없습니다.map
의 결과가 이제 목록의 목록이 아니라Deferred
객체 목록이기 때문입니다.awaitAll()
은List<List<User>>
를 반환하므로flatten().aggregate()
를 호출하여 결과를 얻습니다:kotlinsuspend fun loadContributorsConcurrent( service: GitHubService, req: RequestData ): List<User> = coroutineScope { val repos = service .getOrgRepos(req.org) .also { logRepos(req, it) } .bodyList() val deferreds: List<Deferred<List<User>>> = repos.map { repo -> async { service.getRepoContributors(req.org, repo.name) .also { logUsers(repo, it) } .bodyList() } } deferreds.awaitAll().flatten().aggregate() }
코드를 실행하고 로그를 확인하십시오. 멀티스레딩이 아직 사용되지 않았기 때문에 모든 코루틴은 여전히 메인 UI 스레드에서 실행되지만, 코루틴을 동시적으로 실행하는 이점을 이미 볼 수 있습니다.
이 코드를 변경하여 "기여자" 코루틴을 공통 스레드 풀의 다른 스레드에서 실행하려면
async
함수의 컨텍스트 인수로Dispatchers.Default
를 지정하십시오:kotlinasync(Dispatchers.Default) { }
CoroutineDispatcher
는 해당 코루틴이 실행될 스레드를 결정합니다. 인수로 지정하지 않으면async
는 외부 스코프의 디스패처를 사용합니다.Dispatchers.Default
는 JVM의 공유 스레드 풀을 나타냅니다. 이 풀은 병렬 실행을 위한 수단을 제공합니다. 사용 가능한 CPU 코어 수만큼의 스레드로 구성되지만, 코어가 하나만 있어도 두 개의 스레드를 가집니다.
loadContributorsConcurrent()
함수의 코드를 수정하여 공통 스레드 풀의 다른 스레드에서 새 코루틴을 시작하도록 합니다. 또한 요청을 보내기 전에 추가 로깅을 추가하십시오:kotlinasync(Dispatchers.Default) { log("starting loading for ${repo.name}") service.getRepoContributors(req.org, repo.name) .also { logUsers(repo, it) } .bodyList() }
프로그램을 다시 실행하십시오. 로그에서 각 코루틴이 스레드 풀의 한 스레드에서 시작되어 다른 스레드에서 재개될 수 있음을 볼 수 있습니다:
text1946 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans 1946 [DefaultDispatcher-worker-3 @coroutine#5] INFO Contributors - starting loading for dokka 1946 [DefaultDispatcher-worker-1 @coroutine#3] INFO Contributors - starting loading for ts2kt ... 2178 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors 2569 [DefaultDispatcher-worker-1 @coroutine#5] INFO Contributors - dokka: loaded 36 contributors 2821 [DefaultDispatcher-worker-2 @coroutine#3] INFO Contributors - ts2kt: loaded 11 contributors
예를 들어, 이 로그 발췌문에서
coroutine#4
는worker-2
스레드에서 시작되어worker-1
스레드에서 계속됩니다.
src/contributors/Contributors.kt
에서 CONCURRENT 옵션의 구현을 확인하십시오:
코루틴을 메인 UI 스레드에서만 실행하려면
Dispatchers.Main
을 인수로 지정하십시오:kotlinlaunch(Dispatchers.Main) { updateResults() }
- 새 코루틴을 메인 스레드에서 시작할 때 메인 스레드가 바쁘면, 코루틴은 중단되고 이 스레드에서 실행되도록 스케줄링됩니다. 코루틴은 스레드가 자유로워질 때만 재개됩니다.
- 각 엔드포인트에 명시적으로 지정하는 대신 외부 스코프의 디스패처를 사용하는 것이 좋은 관행으로 간주됩니다.
Dispatchers.Default
를 인수로 전달하지 않고loadContributorsConcurrent()
를 정의하면, 이 함수를Default
디스패처, 메인 UI 스레드 또는 사용자 지정 디스패처와 같은 어떤 컨텍스트에서도 호출할 수 있습니다. - 나중에 보겠지만, 테스트에서
loadContributorsConcurrent()
를 호출할 때TestDispatcher
컨텍스트에서 호출할 수 있어 테스트를 단순화합니다. 이는 이 솔루션을 훨씬 더 유연하게 만듭니다.
호출자 측에서 디스패처를 지정하려면,
loadContributorsConcurrent
가 상속된 컨텍스트에서 코루틴을 시작하도록 하면서 프로젝트에 다음 변경 사항을 적용하십시오:kotlinlaunch(Dispatchers.Default) { val users = loadContributorsConcurrent(service, req) withContext(Dispatchers.Main) { updateResults(users, startTime) } }
updateResults()
는 메인 UI 스레드에서 호출되어야 하므로Dispatchers.Main
컨텍스트로 호출합니다.withContext()
는 지정된 코루틴 컨텍스트로 주어진 코드를 호출하고, 완료될 때까지 중단되며, 결과를 반환합니다. 이를 표현하는 또 다른 방법이지만 더 장황한 방법은 새 코루틴을 시작하고 완료될 때까지 명시적으로 기다리는(중단하는) 것입니다:launch(context) { ... }.join()
.
코드를 실행하고 코루틴이 스레드 풀의 스레드에서 실행되는지 확인하십시오.
구조화된 동시성
- _코루틴 스코프_는 서로 다른 코루틴 간의 구조와 부모-자식 관계를 담당합니다. 새 코루틴은 일반적으로 스코프 내에서 시작되어야 합니다.
- _코루틴 컨텍스트_는 코루틴의 사용자 지정 이름이나 코루틴이 스케줄링될 스레드를 지정하는 디스패처와 같이 주어진 코루틴을 실행하는 데 사용되는 추가 기술 정보를 저장합니다.
launch
, async
, 또는 runBlocking
이 새 코루틴을 시작하는 데 사용되면 자동으로 해당 스코프를 생성합니다. 이 모든 함수는 리시버를 가진 람다를 인수로 받으며, CoroutineScope
가 암묵적 리시버 타입입니다:
launch { /* this: CoroutineScope */ }
- 새 코루틴은 스코프 내에서만 시작할 수 있습니다.
launch
와async
는CoroutineScope
의 확장으로 선언되었으므로, 이들을 호출할 때는 항상 암묵적 또는 명시적 리시버가 전달되어야 합니다.runBlocking
에 의해 시작된 코루틴은 유일한 예외입니다.runBlocking
은 최상위 함수로 정의되어 있기 때문입니다. 그러나 현재 스레드를 블로킹하므로 주로main()
함수 및 테스트에서 브릿지 함수로 사용됩니다.
runBlocking
, launch
, 또는 async
내에서 새 코루틴은 스코프 내에서 자동으로 시작됩니다:
import kotlinx.coroutines.*
fun main() = runBlocking { /* this: CoroutineScope */
launch { /* ... */ }
// the same as:
this.launch { /* ... */ }
}
runBlocking
내에서 launch
를 호출할 때, 이것은 CoroutineScope
타입의 암묵적 리시버에 대한 확장으로 호출됩니다. 대안으로, this.launch
를 명시적으로 작성할 수도 있습니다.
(이 예시에서 launch
에 의해 시작된) 중첩 코루틴은 외부 코루틴(runBlocking
에 의해 시작된)의 자식으로 간주될 수 있습니다. 이 "부모-자식" 관계는 스코프를 통해 작동합니다. 자식 코루틴은 부모 코루틴에 해당하는 스코프에서 시작됩니다.
새 코루틴을 시작하지 않고도 coroutineScope
함수를 사용하여 새 스코프를 생성할 수 있습니다. suspend
함수 내에서 외부 스코프에 대한 접근 없이 구조화된 방식으로 새 코루틴을 시작하려면, 이 suspend
함수가 호출된 외부 스코프의 자식 스코프가 자동으로 되는 새 코루틴 스코프를 생성할 수 있습니다. loadContributorsConcurrent()
가 좋은 예입니다.
또한 GlobalScope.async
또는 GlobalScope.launch
를 사용하여 전역 스코프에서 새 코루틴을 시작할 수 있습니다. 이렇게 하면 최상위 "독립" 코루틴이 생성됩니다.
코루틴의 구조를 뒷받침하는 메커니즘을 구조화된 동시성(structured concurrency)이라고 합니다. 이는 전역 스코프에 비해 다음과 같은 이점을 제공합니다:
- 스코프는 일반적으로 자식 코루틴을 담당하며, 자식 코루틴의 수명은 스코프의 수명에 연결됩니다.
- 스코프는 문제가 발생하거나 사용자가 마음을 바꾸어 작업을 취소하기로 결정한 경우 자식 코루틴을 자동으로 취소할 수 있습니다.
- 스코프는 모든 자식 코루틴의 완료를 자동으로 기다립니다. 따라서 스코프가 코루틴에 해당한다면, 부모 코루틴은 해당 스코프에서 시작된 모든 코루틴이 완료될 때까지 완료되지 않습니다.
GlobalScope.async
를 사용할 경우, 여러 코루틴을 더 작은 스코프에 묶는 구조가 없습니다. 전역 스코프에서 시작된 코루틴은 모두 독립적입니다. 이들의 수명은 전체 애플리케이션의 수명에 의해서만 제한됩니다. 전역 스코프에서 시작된 코루틴에 대한 참조를 저장하고 명시적으로 완료를 기다리거나 취소할 수 있지만, 구조화된 동시성에서처럼 자동으로 발생하지는 않습니다.
기여자 로딩 취소
기여자 목록을 로드하는 함수의 두 가지 버전을 생성합니다. 부모 코루틴을 취소하려고 할 때 두 버전이 어떻게 작동하는지 비교합니다. 첫 번째 버전은 coroutineScope
를 사용하여 모든 자식 코루틴을 시작하고, 두 번째 버전은 GlobalScope
를 사용합니다.
Request5Concurrent.kt
에서loadContributorsConcurrent()
함수에 3초 지연을 추가합니다:kotlinsuspend fun loadContributorsConcurrent( service: GitHubService, req: RequestData ): List<User> = coroutineScope { // ... async { log("starting loading for ${repo.name}") delay(3000) // 저장소 기여자 로드 } // ... }
이 지연은 요청을 보내는 모든 코루틴에 영향을 미치므로, 코루틴이 시작된 후 요청이 보내지기 전에 로딩을 취소할 충분한 시간을 확보합니다.
로딩 함수의 두 번째 버전을 생성합니다.
loadContributorsConcurrent()
의 구현을Request5NotCancellable.kt
의loadContributorsNotCancellable()
로 복사한 다음, 새coroutineScope
생성을 제거합니다.이제
async
호출이 해결되지 않으므로,GlobalScope.async
를 사용하여 이들을 시작합니다:kotlinsuspend fun loadContributorsNotCancellable( service: GitHubService, req: RequestData ): List<User> { // #1 // ... GlobalScope.async { // #2 log("starting loading for ${repo.name}") // 저장소 기여자 로드 } // ... return deferreds.awaitAll().flatten().aggregate() // #3 }
- 이제 함수는 람다 내부의 마지막 표현식이 아닌 결과를 직접 반환합니다(
#1
및#3
줄). - 모든 "기여자" 코루틴은 코루틴 스코프의 자식이 아닌
GlobalScope
내에서 시작됩니다(#2
줄).
- 이제 함수는 람다 내부의 마지막 표현식이 아닌 결과를 직접 반환합니다(
프로그램을 실행하고 CONCURRENT 옵션을 선택하여 기여자를 로드합니다.
모든 "기여자" 코루틴이 시작될 때까지 기다린 다음 _Cancel_을 클릭합니다. 로그에 새 결과가 표시되지 않는데, 이는 모든 요청이 실제로 취소되었음을 의미합니다:
text2896 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 40 repos 2901 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans ... 2909 [DefaultDispatcher-worker-5 @coroutine#36] INFO Contributors - starting loading for mpp-example /* 'cancel' 클릭 */ /* 요청이 전송되지 않음 */
5단계를 반복하되, 이번에는
NOT_CANCELLABLE
옵션을 선택합니다:text2570 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos 2579 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - starting loading for kotlin-koans ... 2586 [DefaultDispatcher-worker-6 @coroutine#36] INFO Contributors - starting loading for mpp-example /* 'cancel' 클릭 */ /* 그러나 모든 요청은 여전히 전송됨: */ 6402 [DefaultDispatcher-worker-5 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors ... 9555 [DefaultDispatcher-worker-8 @coroutine#36] INFO Contributors - mpp-example: loaded 8 contributors
이 경우 어떤 코루틴도 취소되지 않으며 모든 요청은 여전히 전송됩니다.
"기여자" 프로그램에서 취소가 어떻게 트리거되는지 확인합니다. Cancel 버튼을 클릭하면 메인 "로딩" 코루틴이 명시적으로 취소되고 자식 코루틴은 자동으로 취소됩니다:
kotlininterface Contributors { fun loadContributors() { // ... when (getSelectedVariant()) { CONCURRENT -> { launch { val users = loadContributorsConcurrent(service, req) updateResults(users, startTime) }.setUpCancellation() // #1 } } } private fun Job.setUpCancellation() { val loadingJob = this // #2 // 'cancel' 버튼이 클릭되면 로딩 작업을 취소합니다: val listener = ActionListener { loadingJob.cancel() // #3 updateLoadingStatus(CANCELED) } // 'cancel' 버튼에 리스너를 추가합니다: addCancelListener(listener) // 로딩 작업이 완료된 후 상태를 업데이트하고 리스너를 제거합니다. } }
launch
함수는 Job
의 인스턴스를 반환합니다. Job
은 모든 데이터를 로드하고 결과를 업데이트하는 "로딩 코루틴"에 대한 참조를 저장합니다. Job
인스턴스를 리시버로 전달하여 setUpCancellation()
확장 함수를 호출할 수 있습니다( #1
줄).
이를 표현하는 또 다른 방법은 다음과 같이 명시적으로 작성하는 것입니다:
val job = launch { }
job.setUpCancellation()
- 가독성을 위해 함수 내부에서
setUpCancellation()
함수의 리시버를 새로운loadingJob
변수(#2
줄)로 참조할 수 있습니다. - 그런 다음 Cancel 버튼에 리스너를 추가하여 클릭될 때
loadingJob
이 취소되도록 할 수 있습니다(#3
줄).
구조화된 동시성을 사용하면 부모 코루틴만 취소하면 되고, 이는 모든 자식 코루틴으로 자동으로 취소가 전파됩니다.
외부 스코프의 컨텍스트 사용
주어진 스코프 내에서 새 코루틴을 시작하면 모든 코루틴이 동일한 컨텍스트로 실행되도록 훨씬 쉽게 보장할 수 있습니다. 또한 필요한 경우 컨텍스트를 교체하기도 훨씬 쉽습니다.
이제 외부 스코프의 디스패처를 사용하는 방법을 알아볼 차례입니다. coroutineScope
또는 코루틴 빌더에 의해 생성된 새 스코프는 항상 외부 스코프로부터 컨텍스트를 상속합니다. 이 경우 외부 스코프는 suspend loadContributorsConcurrent()
함수가 호출된 스코프입니다:
launch(Dispatchers.Default) { // 외부 스코프
val users = loadContributorsConcurrent(service, req)
// ...
}
모든 중첩 코루틴은 상속된 컨텍스트로 자동으로 시작됩니다. 디스패처는 이 컨텍스트의 일부입니다. 따라서 async
에 의해 시작된 모든 코루틴은 기본 디스패처의 컨텍스트로 시작됩니다:
suspend fun loadContributorsConcurrent(
service: GitHubService, req: RequestData
): List<User> = coroutineScope {
// 이 스코프는 외부 스코프에서 컨텍스트를 상속합니다
// ...
async { // 상속된 컨텍스트로 시작된 중첩 코루틴
// ...
}
// ...
}
구조화된 동시성을 사용하면 최상위 코루틴을 생성할 때 주요 컨텍스트 요소(예: 디스패처)를 한 번 지정할 수 있습니다. 그러면 모든 중첩 코루틴은 컨텍스트를 상속받고 필요할 때만 수정합니다.
예를 들어 안드로이드와 같은 UI 애플리케이션에서 코루틴 코드를 작성할 때, 기본적으로 최상위 코루틴에
CoroutineDispatchers.Main
을 사용하고, 다른 스레드에서 코드를 실행해야 할 때 명시적으로 다른 디스패처를 지정하는 것이 일반적인 관행입니다.
진행 상황 표시
일부 저장소에 대한 정보는 상당히 빠르게 로드되지만, 사용자는 모든 데이터가 로드된 후에야 최종 목록을 볼 수 있습니다. 그때까지 로더 아이콘은 진행 상황을 표시하지만, 현재 상태나 이미 로드된 기여자 정보는 없습니다.
중간 결과를 더 일찍 표시하고 각 저장소에 대한 데이터 로딩 후 모든 기여자를 표시할 수 있습니다:
이 기능을 구현하려면 src/tasks/Request6Progress.kt
에서 UI를 업데이트하는 로직을 콜백으로 전달해야 합니다. 이렇게 하면 각 중간 상태에서 호출됩니다:
suspend fun loadContributorsProgress(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit
) {
// 데이터 로딩
// 중간 상태에서 `updateResults()` 호출
}
Contributors.kt
의 호출 사이트에서는 PROGRESS 옵션에 대해 Main
스레드에서 결과를 업데이트하기 위해 콜백이 전달됩니다:
launch(Dispatchers.Default) {
loadContributorsProgress(service, req) { users, completed ->
withContext(Dispatchers.Main) {
updateResults(users, startTime, completed)
}
}
}
updateResults()
매개변수는loadContributorsProgress()
에서suspend
로 선언됩니다. 해당 람다 인수 내에서suspend
함수인withContext
를 호출해야 합니다.updateResults()
콜백은 로딩이 완료되었고 결과가 최종적인지 여부를 지정하는 추가적인 Boolean 매개변수를 인수로 받습니다.
작업 6
Request6Progress.kt
파일에서 중간 진행 상황을 보여주는 loadContributorsProgress()
함수를 구현하십시오. Request4Suspend.kt
의 loadContributorsSuspend()
함수를 기반으로 하십시오.
- 동시성을 사용하지 않는 간단한 버전을 사용하십시오. 동시성은 다음 섹션에서 추가할 것입니다.
- 기여자의 중간 목록은 각 저장소에 대해 로드된 사용자 목록이 아니라 "집계된" 상태로 표시되어야 합니다.
- 각 사용자의 총 기여 수는 각 새 저장소에 대한 데이터가 로드될 때마다 증가해야 합니다.
작업 6 솔루션
"모든 기여자" 목록의 중간 상태를 저장하려면 사용자 목록을 저장하는 allUsers
변수를 정의한 다음, 각 새 저장소에 대한 기여자가 로드된 후 이를 업데이트하십시오:
suspend fun loadContributorsProgress(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit
) {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
var allUsers = emptyList<User>()
for ((index, repo) in repos.withIndex()) {
val users = service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
allUsers = (allUsers + users).aggregate()
updateResults(allUsers, index == repos.lastIndex)
}
}
순차적 vs 동시적
updateResults()
콜백은 각 요청이 완료된 후 호출됩니다:
이 코드에는 동시성이 포함되어 있지 않습니다. 순차적이므로 동기화가 필요하지 않습니다.
가장 좋은 옵션은 요청을 동시에 보내고 각 저장소에 대한 응답을 받은 후 중간 결과를 업데이트하는 것입니다:
동시성을 추가하려면 _채널_을 사용하십시오.
채널
공유 가능한 가변 상태를 가진 코드를 작성하는 것은 상당히 어렵고 오류가 발생하기 쉽습니다(콜백을 사용하는 솔루션처럼). 더 간단한 방법은 공통 가변 상태를 사용하는 대신 통신을 통해 정보를 공유하는 것입니다. 코루틴은 _채널_을 통해 서로 통신할 수 있습니다.
채널은 코루틴 간에 데이터를 전달할 수 있는 통신 프리미티브입니다. 한 코루틴은 채널에 일부 정보를 보내고, 다른 코루틴은 채널에서 해당 정보를 받을 수 있습니다:
정보를 보내는(생산하는) 코루틴을 종종 생산자(producer)라고 부르며, 정보를 받는(소비하는) 코루틴을 소비자(consumer)라고 부릅니다. 하나 또는 여러 코루틴이 동일한 채널에 정보를 보낼 수 있으며, 하나 또는 여러 코루틴이 채널에서 데이터를 받을 수 있습니다:
여러 코루틴이 동일한 채널에서 정보를 받을 때, 각 요소는 소비자 중 하나에 의해 한 번만 처리됩니다. 요소가 처리되면 즉시 채널에서 제거됩니다.
채널을 요소의 컬렉션, 더 정확히는 요소가 한쪽 끝에 추가되고 다른 쪽 끝에서 수신되는 큐와 유사하다고 생각할 수 있습니다. 그러나 중요한 차이점이 있습니다. 동기화된 버전의 컬렉션과 달리 채널은 send()
및 receive()
작업을 _중단_시킬 수 있습니다. 이는 채널이 비어 있거나 가득 찼을 때 발생합니다. 채널 크기에 상한이 있다면 채널이 가득 찰 수 있습니다.
Channel
은 SendChannel
, ReceiveChannel
, 그리고 이 둘을 확장하는 Channel
의 세 가지 인터페이스로 표현됩니다. 일반적으로 채널을 생성하고 생산자에게 SendChannel
인스턴스로 제공하여 생산자만 채널에 정보를 보낼 수 있도록 합니다. 소비자에게는 ReceiveChannel
인스턴스로 채널을 제공하여 소비자만 채널에서 데이터를 받을 수 있도록 합니다. send
와 receive
메서드는 모두 suspend
로 선언됩니다:
interface SendChannel<in E> {
suspend fun send(element: E)
fun close(): Boolean
}
interface ReceiveChannel<out E> {
suspend fun receive(): E
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
생산자는 더 이상 요소가 오지 않을 것임을 나타내기 위해 채널을 닫을 수 있습니다.
라이브러리에는 여러 유형의 채널이 정의되어 있습니다. 이들은 내부적으로 저장할 수 있는 요소 수와 send()
호출이 중단될 수 있는지 여부에서 다릅니다. 모든 채널 유형에 대해 receive()
호출은 유사하게 작동합니다. 채널이 비어 있지 않으면 요소를 수신하고, 그렇지 않으면 중단됩니다.
무제한 채널 (Unlimited channel)
무제한 채널은 큐에 가장 가까운 비유입니다. 생산자는 이 채널에 요소를 보낼 수 있으며, 채널은 무한히 커질 것입니다. send()
호출은 결코 중단되지 않습니다. 프로그램의 메모리가 부족해지면 OutOfMemoryException
이 발생할 것입니다. 무제한 채널과 큐의 차이점은 소비자가 빈 채널에서 받으려고 시도할 때, 새 요소가 전송될 때까지 중단된다는 점입니다.

버퍼링된 채널 (Buffered channel)
버퍼링된 채널의 크기는 지정된 숫자로 제한됩니다. 생산자는 크기 제한에 도달할 때까지 이 채널에 요소를 보낼 수 있습니다. 모든 요소는 내부적으로 저장됩니다. 채널이 가득 차면, 다음 send
호출은 더 많은 빈 공간이 사용 가능해질 때까지 중단됩니다.

랑데부 채널 (Rendezvous channel)
"랑데부" 채널은 버퍼가 없는 채널로, 크기가 0인 버퍼링된 채널과 같습니다. 두 함수(send()
또는 receive()
) 중 하나는 다른 하나가 호출될 때까지 항상 중단됩니다.
만약 send()
함수가 호출되었고, 해당 요소를 처리할 준비가 된 중단된 receive()
호출이 없다면 send()
는 중단됩니다. 유사하게, receive()
함수가 호출되었고 채널이 비어 있거나, 다른 말로, 요소를 보낼 준비가 된 중단된 send()
호출이 없다면 receive()
호출은 중단됩니다.
"랑데부"라는 이름("합의된 시간과 장소에서의 만남")은 send()
와 receive()
가 "제때 만나야" 한다는 사실을 나타냅니다.

Conflated 채널 (Conflated channel)
Conflated 채널로 전송된 새 요소는 이전에 전송된 요소를 덮어씁니다. 따라서 수신자는 항상 최신 요소만 받게 됩니다. send()
호출은 결코 중단되지 않습니다.

채널을 생성할 때, 유형이나 버퍼 크기를 지정합니다(버퍼링된 채널이 필요한 경우):
val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)
기본적으로 "랑데부" 채널이 생성됩니다.
다음 작업에서는 "랑데부" 채널, 두 개의 생산자 코루틴, 그리고 하나의 소비자 코루틴을 생성할 것입니다:
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
channel.send("A1")
channel.send("A2")
log("A done")
}
launch {
channel.send("B1")
log("B done")
}
launch {
repeat(3) {
val x = channel.receive()
log(x)
}
}
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
채널에 대한 더 나은 이해를 위해 이 비디오를 시청하십시오.
작업 7
src/tasks/Request7Channels.kt
에서 모든 GitHub 기여자를 동시에 요청하고 동시에 중간 진행 상황을 보여주는 loadContributorsChannels()
함수를 구현하십시오.
이전 함수인 Request5Concurrent.kt
의 loadContributorsConcurrent()
와 Request6Progress.kt
의 loadContributorsProgress()
를 사용하십시오.
작업 7 팁
다양한 저장소에 대한 기여자 목록을 동시에 수신하는 다른 코루틴은 수신된 모든 결과를 동일한 채널로 보낼 수 있습니다:
val channel = Channel<List<User>>()
for (repo in repos) {
launch {
val users = TODO()
// ...
channel.send(users)
}
}
그러면 이 채널의 요소들을 하나씩 수신하여 처리할 수 있습니다:
repeat(repos.size) {
val users = channel.receive()
// ...
}
receive()
호출은 순차적이므로 추가적인 동기화는 필요하지 않습니다.
작업 7 솔루션
loadContributorsProgress()
함수와 마찬가지로, "모든 기여자" 목록의 중간 상태를 저장할 allUsers
변수를 생성한 다음, 각 새 저장소에 대한 기여자가 로드된 후 이를 업데이트할 수 있습니다:
suspend fun loadContributorsChannels(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit
) = coroutineScope {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
val channel = Channel<List<User>>()
for (repo in repos) {
launch {
val users = service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
channel.send(users)
}
}
var allUsers = emptyList<User>()
repeat(repos.size) {
val users = channel.receive()
allUsers = (allUsers + users).aggregate()
updateResults(allUsers, it == repos.lastIndex)
}
}
- 다른 저장소의 결과는 준비되는 즉시 채널에 추가됩니다. 처음에는 모든 요청이 전송되고 데이터가 수신되지 않을 때
receive()
호출이 중단됩니다. 이 경우 전체 "기여자 로드" 코루틴이 중단됩니다. - 그런 다음 사용자 목록이 채널로 전송되면 "기여자 로드" 코루틴이 재개되고
receive()
호출이 이 목록을 반환하며 결과가 즉시 업데이트됩니다.
이제 프로그램을 실행하고 CHANNELS 옵션을 선택하여 기여자를 로드하고 결과를 볼 수 있습니다.
코루틴과 채널이 동시성으로 인한 복잡성을 완전히 제거하지는 않지만, 상황을 이해해야 할 때 작업을 더 쉽게 만듭니다.
코루틴 테스트하기
이제 모든 솔루션을 테스트하여 동시 코루틴을 사용하는 솔루션이 suspend
함수를 사용하는 솔루션보다 빠른지 확인하고, 채널을 사용하는 솔루션이 단순한 "진행 상황" 솔루션보다 빠른지 확인해 봅시다.
다음 작업에서는 솔루션의 총 실행 시간을 비교합니다. GitHub 서비스를 모의(mock)하고 이 서비스가 주어진 타임아웃 후에 결과를 반환하도록 할 것입니다:
저장소 요청 - 1000ms 지연 내에 응답 반환
repo-1 - 1000ms 지연
repo-2 - 1200ms 지연
repo-3 - 800ms 지연
suspend
함수를 사용하는 순차적 솔루션은 약 4000ms가 소요되어야 합니다 (4000 = 1000 + (1000 + 1200 + 800)). 동시 솔루션은 약 2200ms가 소요되어야 합니다 (2200 = 1000 + max(1000, 1200, 800)).
진행 상황을 보여주는 솔루션의 경우, 타임스탬프와 함께 중간 결과를 확인할 수도 있습니다.
해당 테스트 데이터는 test/contributors/testData.kt
에 정의되어 있으며, Request4SuspendKtTest
, Request7ChannelsKtTest
등은 모의 서비스 호출을 사용하는 간단한 테스트를 포함합니다.
그러나 여기에 두 가지 문제가 있습니다:
- 이 테스트는 실행하는 데 너무 오래 걸립니다. 각 테스트는 약 2초에서 4초가 걸리며, 매번 결과를 기다려야 합니다. 효율적이지 않습니다.
- 코드를 준비하고 실행하는 데 추가 시간이 걸리기 때문에 솔루션이 실행되는 정확한 시간에 의존할 수 없습니다. 상수를 추가할 수는 있지만, 그러면 기계마다 시간이 다를 것입니다. 모의 서비스 지연은 이 상수보다 높아야 차이를 볼 수 있습니다. 만약 상수가 0.5초라면, 지연을 0.1초로 만드는 것은 충분하지 않을 것입니다.
더 나은 방법은 특수 프레임워크를 사용하여 동일한 코드를 여러 번 실행하면서 타이밍을 테스트하는 것이지만(이는 총 시간을 더 증가시킵니다), 이는 배우고 설정하기에 복잡합니다.
이러한 문제를 해결하고 제공된 테스트 지연을 사용하여 솔루션이 예상대로 작동하는지, 즉 하나가 다른 것보다 빠른지 확인하려면 특수 테스트 디스패처를 사용하여 _가상 시간_을 사용하십시오. 이 디스패처는 시작 이후 경과된 가상 시간을 추적하고 모든 것을 실시간으로 즉시 실행합니다. 이 디스패처에서 코루틴을 실행하면 delay
가 즉시 반환되고 가상 시간을 진행시킬 것입니다.
이 메커니즘을 사용하는 테스트는 빠르게 실행되지만, 가상 시간의 다른 순간에 무엇이 발생하는지 여전히 확인할 수 있습니다. 총 실행 시간은 급격히 감소합니다:
가상 시간을 사용하려면 runBlocking
호출을 runTest
로 바꾸십시오. runTest
는 TestScope
에 대한 확장 람다를 인수로 받습니다. 이 특수 스코프 내에서 suspend
함수에서 delay
를 호출하면 delay
는 실제 시간으로 지연시키는 대신 가상 시간을 증가시킵니다:
@Test
fun testDelayInSuspend() = runTest {
val realStartTime = System.currentTimeMillis()
val virtualStartTime = currentTime
foo()
println("${System.currentTimeMillis() - realStartTime} ms") // ~ 6 ms
println("${currentTime - virtualStartTime} ms") // 1000 ms
}
suspend fun foo() {
delay(1000) // 지연 없이 자동 진행
println("foo") // foo()가 호출되면 즉시 실행
}
TestScope
의 currentTime
속성을 사용하여 현재 가상 시간을 확인할 수 있습니다.
이 예제에서 실제 실행 시간은 몇 밀리초인 반면, 가상 시간은 지연 인수와 동일한 1000 밀리초입니다.
자식 코루틴에서 "가상" delay
의 완전한 효과를 얻으려면 모든 자식 코루틴을 TestDispatcher
로 시작해야 합니다. 그렇지 않으면 작동하지 않습니다. 이 디스패처는 다른 디스패처를 제공하지 않는 한 다른 TestScope
에서 자동으로 상속됩니다:
@Test
fun testDelayInLaunch() = runTest {
val realStartTime = System.currentTimeMillis()
val virtualStartTime = currentTime
bar()
println("${System.currentTimeMillis() - realStartTime} ms") // ~ 11 ms
println("${currentTime - virtualStartTime} ms") // 1000 ms
}
suspend fun bar() = coroutineScope {
launch {
delay(1000) // 지연 없이 자동 진행
println("bar") // bar()가 호출되면 즉시 실행
}
}
위 예제에서 launch
가 Dispatchers.Default
컨텍스트로 호출되면 테스트가 실패할 것입니다. 작업이 아직 완료되지 않았다는 예외가 발생할 것입니다.
loadContributorsConcurrent()
함수를 이 방식으로 테스트할 수 있는 경우는, 자식 코루틴을 상속된 컨텍스트로 시작하고 Dispatchers.Default
디스패처를 사용하여 컨텍스트를 수정하지 않는 경우뿐입니다.
디스패처와 같은 컨텍스트 요소를 함수를 _정의_할 때가 아니라 _호출_할 때 지정할 수 있어, 더 유연하고 테스트하기 쉽습니다.
가상 시간을 지원하는 테스트 API는 실험적이며 향후 변경될 수 있습니다.
기본적으로 컴파일러는 실험적인 테스트 API를 사용하면 경고를 표시합니다. 이러한 경고를 억제하려면 @OptIn(ExperimentalCoroutinesApi::class)
로 테스트 함수 또는 테스트를 포함하는 전체 클래스에 어노테이션을 달아주십시오. 컴파일러에 실험적 API를 사용하고 있음을 지시하는 컴파일러 인수를 추가하십시오:
compileTestKotlin {
kotlinOptions {
freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
}
}
이 튜토리얼에 해당하는 프로젝트에서는 Gradle 스크립트에 컴파일러 인수가 이미 추가되어 있습니다.
작업 8
tests/tasks/
의 다음 테스트를 실제 시간 대신 가상 시간을 사용하도록 리팩터링하십시오:
- Request4SuspendKtTest.kt
- Request5ConcurrentKtTest.kt
- Request6ProgressKtTest.kt
- Request7ChannelsKtTest.kt
리팩터링 적용 전후의 총 실행 시간을 비교하십시오.
작업 8 팁
runBlocking
호출을runTest
로 바꾸고,System.currentTimeMillis()
를currentTime
으로 바꾸십시오:kotlin@Test fun test() = runTest { val startTime = currentTime // action val totalTime = currentTime - startTime // testing result }
정확한 가상 시간을 확인하는 어설션을 주석 해제하십시오.
@UseExperimental(ExperimentalCoroutinesApi::class)
를 추가하는 것을 잊지 마십시오.
작업 8 솔루션
다음은 동시성 및 채널 경우에 대한 솔루션입니다:
fun testConcurrent() = runTest {
val startTime = currentTime
val result = loadContributorsConcurrent(MockGithubService, testRequestData)
Assert.assertEquals("Wrong result for 'loadContributorsConcurrent'", expectedConcurrentResults.users, result)
val totalTime = currentTime - startTime
Assert.assertEquals(
"The calls run concurrently, so the total virtual time should be 2200 ms: " +
"1000 for repos request plus max(1000, 1200, 800) = 1200 for concurrent contributors requests)",
expectedConcurrentResults.timeFromStart, totalTime
)
}
먼저, 예상 가상 시간에 정확히 결과가 사용 가능한지 확인한 다음, 결과 자체를 확인합니다:
fun testChannels() = runTest {
val startTime = currentTime
var index = 0
loadContributorsChannels(MockGithubService, testRequestData) { users, _ ->
val expected = concurrentProgressResults[index++]
val time = currentTime - startTime
Assert.assertEquals(
"Expected intermediate results after ${expected.timeFromStart} ms:",
expected.timeFromStart, time
)
Assert.assertEquals("Wrong intermediate results after $time:", expected.users, users)
}
}
채널을 사용하는 마지막 버전의 첫 번째 중간 결과는 진행 상황 버전보다 더 빨리 사용할 수 있으며, 가상 시간을 사용하는 테스트에서 차이를 볼 수 있습니다.
나머지 "suspend" 및 "progress" 작업에 대한 테스트는 매우 유사합니다. 프로젝트의
solutions
브랜치에서 찾을 수 있습니다.
다음 단계
- KotlinConf의 Kotlin을 사용한 비동기 프로그래밍 워크숍을 확인하십시오.
- 가상 시간 및 실험적 테스트 패키지 사용에 대해 자세히 알아보십시오.