Kotlin

코루틴 API 살펴보기

devsh 2024. 1. 1. 19:56
728x90
반응형

코루틴과 동시성

코루틴은 매우 가볍다

  • 코루틴은 JVM의 플랫폼 스레드보다 리소스 집약도가 낮다. 즉, 훨씬 적은 메모리를 사용하여 더 많은 일을 할 수 있게된다.
  • 예제는 1,000,000개의 코루틴을 만들어서 각각 5초를 기다린 다음 마침표(.)을 출력하는 예제이다.
import kotlinx.coroutines.*

fun main() = runBlocking {
    repeat(1_000_000) { // launch a lot of coroutines
        launch {
            delay(5000L)
            print(".")
        }
    }
}

스레드 예제) 테스트 PC 기준 4000개 정도 생성된 후 OOME가 발생한다.

fun main() {
    repeat(1_000_000) { // launch a lot of threads
        thread {
            Thread.sleep(5000L)
            print(".")
        }
    }
    Thread.sleep(10000L)
}

[0.344s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (EAGAIN) for attributes: stacksize: 2048k, guardsize: 16k, detached.
[0.344s][warning][os,thread] Failed to start the native thread for java.lang.Thread "Thread-4074"
Exception in thread "main" java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
	at java.base/java.lang.Thread.start0(Native Method)
	at java.base/java.lang.Thread.start(Thread.java:1526)
	at kotlin.concurrent.ThreadsKt.thread(Thread.kt:42)
	at kotlin.concurrent.ThreadsKt.thread$default(Thread.kt:20)
	at Example1Kt.main(example1.kt:17)
	at Example1Kt.main(example1.kt)

코루틴 빌더

코루틴 빌더는 코루틴을 만드는 함수를 말한다.

1. runBlocking

  • runBlocking은 코루틴을 생성하는 코루틴 빌더이다.
  • runBlocking으로 감싼 코드는 코루틴 내부의 코드가 수행이 끝날때 까지 스레드가 블로킹된다
import kotlinx.coroutines.*

fun main() {
    
    runBlocking {
        println("Hello")
    }
     println("World")         
}

// Hello
// World
  • 일반적으로 코루틴은 스레드를 차단하지 않고 사용해야하므로 runBlocking을 사용하는 것은 좋지 않지만 꼭 사용해야하는 경우가 있다.
    • 코루틴을 지원하지 않는 경우 예) 테스트 코드, 스프링 배치 등
  • 실행옵션에 -Dkotlinx.coroutines.debug 을 붙여주면 코루틴에서 수행되는 스레드는 이름 뒤에 @coroutine#1 이 붙어있는 것을 볼 수 있다

2. launch

  • launch는 스레드 차단 없이 새 코루틴을 시작하고 결과로 job을 반환하는 코루틴 빌더이다
  • launch는 결과를 만들어내지 않는 비동기 작업에 적합하기 때문에 Unit을 반환하는 람다를 인자로 받는다
fun main() =  runBlocking<Unit> {
    launch {
        delay(500L)
        println("World!")
    }
    println("Hello")
}

// Hello
// World
  • delay() 함수는 코루틴 라이브러리에 정의된 일시 중단 함수이며 Thread.sleep() 과 유사하지만 현재 스레드를 차단하지 않고 일시 중단 시킨다. 이때 일시 중단 된 스레드는 코루틴내에서 다른 일시 중단 함수를 수행한다
  • launch를 사용해서 여러개의 작업을 동시에 수행할 수 있다
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

fun main() {

    runBlocking {
        launch {
            val timeMillis = measureTimeMillis {
                delay(150)
            }
            println("async task-1 $timeMillis ms")
        }

        launch {
            val timeMillis = measureTimeMillis {
                delay(100)
            }
            println("async task-2 $timeMillis ms")
        }

    }

}
  • launch가 반환하는 Job 을 사용해 현재 코루틴의 상태를 확인하거나 실행 또는 취소도 가능하다
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

fun main() = runBlocking<Unit> {
    val job1: Job = launch {
        val timeMillis = measureTimeMillis {
            delay(150)
        }
        println("async task-1 $timeMillis ms")
    }
    job1.cancel() // 취소 

		
    val job2: Job = launch(start = CoroutineStart.LAZY) {
        val timeMillis = measureTimeMillis {
            delay(100)
        }
        println("async task-2 $timeMillis ms")
    }

    println("start task-2")

    job2.start()

}
  • job1.cancel() 을 호출해 코루틴을 취소할 수 있다
  • launch(start = CoroutineStart.LAZY) 를 사용해서 start 함수를 호출하는 시점에 코루틴을 동작시킬 수 있다
    • start 함수를 주석처리하면 launch가 동작하지 않는다

3. async

  • async 빌더는 비동기 작업을 통해 결과를 만들어내는 경우에 적합하다
import kotlinx.coroutines.*

fun sum(a: Int, b: Int) = a + b

fun main() = runBlocking<Unit> {

    val result1: Deferred<Int> = async {
        delay(100)
        sum(1, 3)
    }

    println("result1 : ${result1.await()}")

    val result2: Deferred<Int> = async {
        delay(100)
        delay(100)
        sum(2, 5)
    }

    println("result2 : ${result2.await()}")
}
  • async는 비동기 작업의 결과로 Deferred 라는 특별한 인스턴스를 반환하는데 await 이라는 함수를 통해 async로 수행한 비동기 작업의 결과를 받아올 수 있다
  • 자바 스크립트나 파이썬과 같이 다른 언어의 async-await은 키워드 인 경우가 보통이지만 코틀린의 코루틴은 async-await이 함수인 점이 차이점이다

자바 스크립트의 async-await 예시)

async function showAvatar() {

  // JSON 읽기
  let response = await fetch('/article/promise-chaining/user.json');
  let user = await response.json();

  // github 사용자 정보 읽기
  let githubResponse = await fetch(`https://api.github.com/users/${user.name}`);
  let githubUser = await githubResponse.json();

  // 아바타 보여주기
  let img = document.createElement('img');
  img.src = githubUser.avatar_url;
  img.className = "promise-avatar-example";
  document.body.append(img);

  // 3초 대기
  await new Promise((resolve, reject) => setTimeout(resolve, 3000));

  img.remove();

  return githubUser;
}

showAvatar();

구조적 동시성

  • 동시 실행 가능한 작업을 구조화된 방식으로 관리하여 코드의 가독성을 높이고 오류 가능성을 줄인다.
  • 계층 구조로 관리되어 부모 코루틴은 자식 코루틴의 작업이 모두 끝나기 전까지 종료되지 않는다.
  • 자식 코루틴에서 발생한 에러는 부모 코루틴으로 전파된다. 이를 통해 에러 처리를 중앙화할 수있고, 동시성을 처리하는 개별 코루틴의 에러핸들링이 가능하다.

suspend 함수

  • suspend 함수는 코루틴의 핵심 요소로써 일시 중단이 가능한 함수를 말한다
  • suspend는 키워드이다
  • suspend 함수는 일반 함수를 마음껏 호출할 수 있지만 일반 함수에선 suspend 함수를 호출할 수 없다
package structuredconcurrency

fun main() {
    printHello() // 컴파일 에러
}

suspend fun printHello() = println("hello")
  • Caller 함수에서 suspend 키워드를 붙여주면 된다
package structuredconcurrency

suspend fun main() {
    printHello()
}

suspend fun printHello() = println("hello")
  • 일시 중단 함수는 IntelliJ 에서 suspension point가 표시된다.

coroutineScope

  • runBlocking은 현재 스레드를 블로킹시키고 결과를 기다리지만 coroutineScope 는 스레드가 블로킹되지 않고 결과를 기다린다.
package structuredconcurrency

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main() {
    doSomething()
}

private suspend fun doSomething() = coroutineScope {
    launch {
        delay(200)
        println("world!")
    }

    launch {
        println("hello")
    }

}

예외 처리

  • coroutineScope 내부의 자식 코루틴에서 에러가 발생하면 모든 코루틴이 종료된다.
private suspend fun doSomething() = coroutineScope {

    launch {
        delay(200)
        println("world!") // 실행안됨

    }

    launch {
        throw RuntimeException("error")
    }
}
Exception in thread "DefaultDispatcher-worker-2" java.lang.RuntimeException
	at structuredconcurrency.CoroutineScopeErrorKt$doSomething$2$2$1.invokeSuspend(coroutineScopeError.kt:21)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
	Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@4bbea199, Dispatchers.Default]

  • 코루틴 빌더 내부에서 try-catch로 핸들링
private suspend fun doSomething() = coroutineScope {

    launch {
        delay(200)
        println("world!")

    }

    launch {
        try {
            throw RuntimeException()
        } catch (e: Exception) {
            println("hello")
        }
    }
}

hello
world!
  • supervisorScope 를 사용해서 예외를 부모 코루틴으로 전파하지않는 방법
private suspend fun doSomething() = coroutineScope {

    launch {
        delay(200)
        println("world!")

    }

    supervisorScope {
        launch {
            throw RuntimeException()
        }
    }
}

Exception in thread "DefaultDispatcher-worker-2" java.lang.RuntimeException
	at structuredconcurrency.CoroutineScopeErrorKt$doSomething$2$2$1.invokeSuspend(coroutineScopeError.kt:21)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
	Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@4bbea199, Dispatchers.Default]
world!
  • 코루틴의 예외 상황을 체계적으로 관리하고 싶을때 CEH(CoroutineExceptionHandler 를 사용한다
  • supervisorScope 와 CoroutineExceptionHandler 함께 사용
private suspend fun doSomething() = coroutineScope {

    launch {
        delay(200)
        println("world!")
    }

    supervisorScope {
        launch(handler) {
            throw RuntimeException()
        }
    }

}

val handler = CoroutineExceptionHandler { _, throwable ->
    println("Caught $throwable")
}

Caught java.lang.RuntimeException
world!

코루틴 채널

  • **Channel** 은 코루틴간의 통신을 위한 파이프라인이다
  • 기본 컨셉은 BlockingQueue 와 유사한 자료구조로 볼 수 있다.
  • 데이터 발신자가 채널에 send 로 데이터를 전달하고 수신 측에서 receive 를 사용해 채널로 부터 데이터를 받아온다.

BlockingQueue 를 사용해 주식 시세를 처리하는 처리하는 예제

package channel

import kotlinx.coroutines.*
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import kotlin.random.Random

fun main() = runBlocking {
    val queue: BlockingQueue<String> = LinkedBlockingQueue()

    // 생산자와 소비자 코루틴을 실행
    producer(queue, 20)
    consumer(queue)
}

suspend fun producer(queue: BlockingQueue<String>, count: Int) = coroutineScope {
    launch {
        repeat(count) {
            val stockPrice = "APPLE : ${Random.nextInt(100, 200)}"
            queue.put(stockPrice)
            delay(100L) // 0.1초 간격으로 데이터 생성
        }
    }
}

fun consumer(queue: BlockingQueue<String>) {
    for (stockPrice in queue) {
        println("Processed $stockPrice")
    }
}
  • put, take 가 모두 블로킹으로 이뤄진다

Channel 를 사용해 주식 시세를 처리하는 처리하는 예제

package channel

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import kotlin.random.Random

fun main() = runBlocking {
    val channel: Channel<String> = Channel()

    // 생산자와 소비자 코루틴을 실행
    producer(channel, 20)
    consumer(channel)
}

suspend fun CoroutineScope.producer(channel: Channel<String>, count: Int) {
    launch {
        repeat(count) {
            val stockPrice = "APPLE : ${Random.nextInt(100, 200)}"
            channel.send(stockPrice)
            delay(100L) // 0.1초 간격으로 데이터 생성
        }
        channel.close()  // 모든 데이터를 보냈으면 채널을 닫습니다.
    }
}

suspend fun consumer(channel: Channel<String>) {
    for (stockPrice in channel) {
        println("Processed $stockPrice")
    }
}
  • 데이터 송수신 연산(send, receive)이 모두 일시중단되므로 논-블로킹으로 이뤄진다.

플로우 : 비동기 데이터 스트림

  • **Flow** 는 코루틴에서 리액티브 프로그래밍 스타일로 작성할 수 있도록 만들어진 비동기 스트림 API이다
  • 코루틴의 suspend 함수는 단일 값을 반환하지만 Flow를 사용하면 무한대 값을 반환할 수 있다

플로우 빌더

  • flow 빌더를 사용해 코드를 작성하고 emit 을 사용해 데이터를 전달한다
package flow

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val flow = simple()
    flow.collect { value -> println(value) }
}

fun simple(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
  • 리액티브 스트림과 같이 Terminal Operator(최종 연산자) 인 collect 를 호출하지 않으면 아무런 일도 일어나지 않는다

flowOf 를 사용하면 여러 값을 인자로 전달해서 플로우를 만들 수 있다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    flowOf(1, 2, 3, 4, 5).collect { value ->
        println(value)
    }
}

asFlow 를 사용하면 존재하는 컬렉션이나 시퀀스를 플로우로 변환할 수 있다

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    listOf(1, 2, 3, 4, 5).asFlow().collect { value ->
        println(value)
    }
    (6..10).asFlow().collect {
        println(it)
    }
}

다양한 연산자 제공

package flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun filterExample() {
    (1..10)
        .asFlow()
        .filter { (it % 2) == 0 }
        .collect {
            println(it)
        }
}

suspend fun takeExample() {
    (1..10)
        .asFlow()
        .take(2)
        .collect {
            println(it)
        }
}

suspend fun mapExample() {
    (1..10)
        .asFlow()
        .map { it * it }
        .collect {
            println(it)
        }
}

suspend fun zipExample() {
    val nums = (1..3).asFlow()
    val strs = listOf("one", "two", "three").asFlow()

    nums.zip(strs) { a, b -> "$a -> $b" }
        .collect { println(it) }
}

suspend fun flattenExample() {
    val left = flowOf(1, 2, 3)
    val right = flowOf(4, 5, 6)
    val flow = flowOf(left, right)

    flow.flattenConcat().collect { println(it) }
}
728x90
반응형