Kotlin
코루틴 API 살펴보기
devsh
2024. 1. 1. 19:56
728x90
반응형
코루틴과 동시성
코루틴은 매우 가볍다
- 코루틴은 JVM의 플랫폼 스레드보다 리소스 집약도가 낮다. 즉, 훨씬 적은 메모리를 사용하여 더 많은 일을 할 수 있게된다.
- 예제는 1,000,000개의 코루틴을 만들어서 각각 5초를 기다린 다음 마침표(.)을 출력하는 예제이다.
import kotlinx.coroutines.*
fun main() = runBlocking {
repeat(1_000_000) { // launch a lot of coroutines
launch {
delay(5000L)
print(".")
}
}
}
스레드 예제) 테스트 PC 기준 4000개 정도 생성된 후 OOME가 발생한다.
fun main() {
repeat(1_000_000) { // launch a lot of threads
thread {
Thread.sleep(5000L)
print(".")
}
}
Thread.sleep(10000L)
}
[0.344s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (EAGAIN) for attributes: stacksize: 2048k, guardsize: 16k, detached.
[0.344s][warning][os,thread] Failed to start the native thread for java.lang.Thread "Thread-4074"
Exception in thread "main" java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
at java.base/java.lang.Thread.start0(Native Method)
at java.base/java.lang.Thread.start(Thread.java:1526)
at kotlin.concurrent.ThreadsKt.thread(Thread.kt:42)
at kotlin.concurrent.ThreadsKt.thread$default(Thread.kt:20)
at Example1Kt.main(example1.kt:17)
at Example1Kt.main(example1.kt)
코루틴 빌더
코루틴 빌더는 코루틴을 만드는 함수를 말한다.
1. runBlocking
- runBlocking은 코루틴을 생성하는 코루틴 빌더이다.
- runBlocking으로 감싼 코드는 코루틴 내부의 코드가 수행이 끝날때 까지 스레드가 블로킹된다
import kotlinx.coroutines.*
fun main() {
runBlocking {
println("Hello")
}
println("World")
}
// Hello
// World
- 일반적으로 코루틴은 스레드를 차단하지 않고 사용해야하므로 runBlocking을 사용하는 것은 좋지 않지만 꼭 사용해야하는 경우가 있다.
- 코루틴을 지원하지 않는 경우 예) 테스트 코드, 스프링 배치 등
- 실행옵션에 -Dkotlinx.coroutines.debug 을 붙여주면 코루틴에서 수행되는 스레드는 이름 뒤에 @coroutine#1 이 붙어있는 것을 볼 수 있다
2. launch
- launch는 스레드 차단 없이 새 코루틴을 시작하고 결과로 job을 반환하는 코루틴 빌더이다
- launch는 결과를 만들어내지 않는 비동기 작업에 적합하기 때문에 Unit을 반환하는 람다를 인자로 받는다
fun main() = runBlocking<Unit> {
launch {
delay(500L)
println("World!")
}
println("Hello")
}
// Hello
// World
- delay() 함수는 코루틴 라이브러리에 정의된 일시 중단 함수이며 Thread.sleep() 과 유사하지만 현재 스레드를 차단하지 않고 일시 중단 시킨다. 이때 일시 중단 된 스레드는 코루틴내에서 다른 일시 중단 함수를 수행한다
- launch를 사용해서 여러개의 작업을 동시에 수행할 수 있다
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
fun main() {
runBlocking {
launch {
val timeMillis = measureTimeMillis {
delay(150)
}
println("async task-1 $timeMillis ms")
}
launch {
val timeMillis = measureTimeMillis {
delay(100)
}
println("async task-2 $timeMillis ms")
}
}
}
- launch가 반환하는 Job 을 사용해 현재 코루틴의 상태를 확인하거나 실행 또는 취소도 가능하다
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
fun main() = runBlocking<Unit> {
val job1: Job = launch {
val timeMillis = measureTimeMillis {
delay(150)
}
println("async task-1 $timeMillis ms")
}
job1.cancel() // 취소
val job2: Job = launch(start = CoroutineStart.LAZY) {
val timeMillis = measureTimeMillis {
delay(100)
}
println("async task-2 $timeMillis ms")
}
println("start task-2")
job2.start()
}
- job1.cancel() 을 호출해 코루틴을 취소할 수 있다
- launch(start = CoroutineStart.LAZY) 를 사용해서 start 함수를 호출하는 시점에 코루틴을 동작시킬 수 있다
- start 함수를 주석처리하면 launch가 동작하지 않는다
3. async
- async 빌더는 비동기 작업을 통해 결과를 만들어내는 경우에 적합하다
import kotlinx.coroutines.*
fun sum(a: Int, b: Int) = a + b
fun main() = runBlocking<Unit> {
val result1: Deferred<Int> = async {
delay(100)
sum(1, 3)
}
println("result1 : ${result1.await()}")
val result2: Deferred<Int> = async {
delay(100)
delay(100)
sum(2, 5)
}
println("result2 : ${result2.await()}")
}
- async는 비동기 작업의 결과로 Deferred 라는 특별한 인스턴스를 반환하는데 await 이라는 함수를 통해 async로 수행한 비동기 작업의 결과를 받아올 수 있다
- 자바 스크립트나 파이썬과 같이 다른 언어의 async-await은 키워드 인 경우가 보통이지만 코틀린의 코루틴은 async-await이 함수인 점이 차이점이다
자바 스크립트의 async-await 예시)
async function showAvatar() {
// JSON 읽기
let response = await fetch('/article/promise-chaining/user.json');
let user = await response.json();
// github 사용자 정보 읽기
let githubResponse = await fetch(`https://api.github.com/users/${user.name}`);
let githubUser = await githubResponse.json();
// 아바타 보여주기
let img = document.createElement('img');
img.src = githubUser.avatar_url;
img.className = "promise-avatar-example";
document.body.append(img);
// 3초 대기
await new Promise((resolve, reject) => setTimeout(resolve, 3000));
img.remove();
return githubUser;
}
showAvatar();
구조적 동시성
- 동시 실행 가능한 작업을 구조화된 방식으로 관리하여 코드의 가독성을 높이고 오류 가능성을 줄인다.
- 계층 구조로 관리되어 부모 코루틴은 자식 코루틴의 작업이 모두 끝나기 전까지 종료되지 않는다.
- 자식 코루틴에서 발생한 에러는 부모 코루틴으로 전파된다. 이를 통해 에러 처리를 중앙화할 수있고, 동시성을 처리하는 개별 코루틴의 에러핸들링이 가능하다.
suspend 함수
- suspend 함수는 코루틴의 핵심 요소로써 일시 중단이 가능한 함수를 말한다
- suspend는 키워드이다
- suspend 함수는 일반 함수를 마음껏 호출할 수 있지만 일반 함수에선 suspend 함수를 호출할 수 없다
package structuredconcurrency
fun main() {
printHello() // 컴파일 에러
}
suspend fun printHello() = println("hello")
- Caller 함수에서 suspend 키워드를 붙여주면 된다
package structuredconcurrency
suspend fun main() {
printHello()
}
suspend fun printHello() = println("hello")
- 일시 중단 함수는 IntelliJ 에서 suspension point가 표시된다.
coroutineScope
- runBlocking은 현재 스레드를 블로킹시키고 결과를 기다리지만 coroutineScope 는 스레드가 블로킹되지 않고 결과를 기다린다.
package structuredconcurrency
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
suspend fun main() {
doSomething()
}
private suspend fun doSomething() = coroutineScope {
launch {
delay(200)
println("world!")
}
launch {
println("hello")
}
}
예외 처리
- coroutineScope 내부의 자식 코루틴에서 에러가 발생하면 모든 코루틴이 종료된다.
private suspend fun doSomething() = coroutineScope {
launch {
delay(200)
println("world!") // 실행안됨
}
launch {
throw RuntimeException("error")
}
}
Exception in thread "DefaultDispatcher-worker-2" java.lang.RuntimeException
at structuredconcurrency.CoroutineScopeErrorKt$doSomething$2$2$1.invokeSuspend(coroutineScopeError.kt:21)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@4bbea199, Dispatchers.Default]
- 코루틴 빌더 내부에서 try-catch로 핸들링
private suspend fun doSomething() = coroutineScope {
launch {
delay(200)
println("world!")
}
launch {
try {
throw RuntimeException()
} catch (e: Exception) {
println("hello")
}
}
}
hello
world!
- supervisorScope 를 사용해서 예외를 부모 코루틴으로 전파하지않는 방법
private suspend fun doSomething() = coroutineScope {
launch {
delay(200)
println("world!")
}
supervisorScope {
launch {
throw RuntimeException()
}
}
}
Exception in thread "DefaultDispatcher-worker-2" java.lang.RuntimeException
at structuredconcurrency.CoroutineScopeErrorKt$doSomething$2$2$1.invokeSuspend(coroutineScopeError.kt:21)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@4bbea199, Dispatchers.Default]
world!
- 코루틴의 예외 상황을 체계적으로 관리하고 싶을때 CEH(CoroutineExceptionHandler 를 사용한다
- supervisorScope 와 CoroutineExceptionHandler 함께 사용
private suspend fun doSomething() = coroutineScope {
launch {
delay(200)
println("world!")
}
supervisorScope {
launch(handler) {
throw RuntimeException()
}
}
}
val handler = CoroutineExceptionHandler { _, throwable ->
println("Caught $throwable")
}
Caught java.lang.RuntimeException
world!
코루틴 채널
- **Channel** 은 코루틴간의 통신을 위한 파이프라인이다
- 기본 컨셉은 BlockingQueue 와 유사한 자료구조로 볼 수 있다.
- 데이터 발신자가 채널에 send 로 데이터를 전달하고 수신 측에서 receive 를 사용해 채널로 부터 데이터를 받아온다.
BlockingQueue 를 사용해 주식 시세를 처리하는 처리하는 예제
package channel
import kotlinx.coroutines.*
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import kotlin.random.Random
fun main() = runBlocking {
val queue: BlockingQueue<String> = LinkedBlockingQueue()
// 생산자와 소비자 코루틴을 실행
producer(queue, 20)
consumer(queue)
}
suspend fun producer(queue: BlockingQueue<String>, count: Int) = coroutineScope {
launch {
repeat(count) {
val stockPrice = "APPLE : ${Random.nextInt(100, 200)}"
queue.put(stockPrice)
delay(100L) // 0.1초 간격으로 데이터 생성
}
}
}
fun consumer(queue: BlockingQueue<String>) {
for (stockPrice in queue) {
println("Processed $stockPrice")
}
}
- put, take 가 모두 블로킹으로 이뤄진다
Channel 를 사용해 주식 시세를 처리하는 처리하는 예제
package channel
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import kotlin.random.Random
fun main() = runBlocking {
val channel: Channel<String> = Channel()
// 생산자와 소비자 코루틴을 실행
producer(channel, 20)
consumer(channel)
}
suspend fun CoroutineScope.producer(channel: Channel<String>, count: Int) {
launch {
repeat(count) {
val stockPrice = "APPLE : ${Random.nextInt(100, 200)}"
channel.send(stockPrice)
delay(100L) // 0.1초 간격으로 데이터 생성
}
channel.close() // 모든 데이터를 보냈으면 채널을 닫습니다.
}
}
suspend fun consumer(channel: Channel<String>) {
for (stockPrice in channel) {
println("Processed $stockPrice")
}
}
- 데이터 송수신 연산(send, receive)이 모두 일시중단되므로 논-블로킹으로 이뤄진다.
플로우 : 비동기 데이터 스트림
- **Flow** 는 코루틴에서 리액티브 프로그래밍 스타일로 작성할 수 있도록 만들어진 비동기 스트림 API이다
- 코루틴의 suspend 함수는 단일 값을 반환하지만 Flow를 사용하면 무한대 값을 반환할 수 있다
플로우 빌더
- flow 빌더를 사용해 코드를 작성하고 emit 을 사용해 데이터를 전달한다
package flow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val flow = simple()
flow.collect { value -> println(value) }
}
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
- 리액티브 스트림과 같이 Terminal Operator(최종 연산자) 인 collect 를 호출하지 않으면 아무런 일도 일어나지 않는다
flowOf 를 사용하면 여러 값을 인자로 전달해서 플로우를 만들 수 있다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5).collect { value ->
println(value)
}
}
asFlow 를 사용하면 존재하는 컬렉션이나 시퀀스를 플로우로 변환할 수 있다
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
listOf(1, 2, 3, 4, 5).asFlow().collect { value ->
println(value)
}
(6..10).asFlow().collect {
println(it)
}
}
다양한 연산자 제공
- https://kotlinlang.org/docs/flow.html
- 리액티브 스트림 수준으로 다양한 연산자를 제공하진 못하고 있지만 일반적인 상황에선 충분하다.
package flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun filterExample() {
(1..10)
.asFlow()
.filter { (it % 2) == 0 }
.collect {
println(it)
}
}
suspend fun takeExample() {
(1..10)
.asFlow()
.take(2)
.collect {
println(it)
}
}
suspend fun mapExample() {
(1..10)
.asFlow()
.map { it * it }
.collect {
println(it)
}
}
suspend fun zipExample() {
val nums = (1..3).asFlow()
val strs = listOf("one", "two", "three").asFlow()
nums.zip(strs) { a, b -> "$a -> $b" }
.collect { println(it) }
}
suspend fun flattenExample() {
val left = flowOf(1, 2, 3)
val right = flowOf(4, 5, 6)
val flow = flowOf(left, right)
flow.flattenConcat().collect { println(it) }
}
728x90
반응형