728x90
반응형

코루틴과 동시성

코루틴은 매우 가볍다

  • 코루틴은 JVM의 플랫폼 스레드보다 리소스 집약도가 낮다. 즉, 훨씬 적은 메모리를 사용하여 더 많은 일을 할 수 있게된다.
  • 예제는 1,000,000개의 코루틴을 만들어서 각각 5초를 기다린 다음 마침표(.)을 출력하는 예제이다.
import kotlinx.coroutines.*

fun main() = runBlocking {
    repeat(1_000_000) { // launch a lot of coroutines
        launch {
            delay(5000L)
            print(".")
        }
    }
}

스레드 예제) 테스트 PC 기준 4000개 정도 생성된 후 OOME가 발생한다.

fun main() {
    repeat(1_000_000) { // launch a lot of threads
        thread {
            Thread.sleep(5000L)
            print(".")
        }
    }
    Thread.sleep(10000L)
}

[0.344s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (EAGAIN) for attributes: stacksize: 2048k, guardsize: 16k, detached.
[0.344s][warning][os,thread] Failed to start the native thread for java.lang.Thread "Thread-4074"
Exception in thread "main" java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
	at java.base/java.lang.Thread.start0(Native Method)
	at java.base/java.lang.Thread.start(Thread.java:1526)
	at kotlin.concurrent.ThreadsKt.thread(Thread.kt:42)
	at kotlin.concurrent.ThreadsKt.thread$default(Thread.kt:20)
	at Example1Kt.main(example1.kt:17)
	at Example1Kt.main(example1.kt)

코루틴 빌더

코루틴 빌더는 코루틴을 만드는 함수를 말한다.

1. runBlocking

  • runBlocking은 코루틴을 생성하는 코루틴 빌더이다.
  • runBlocking으로 감싼 코드는 코루틴 내부의 코드가 수행이 끝날때 까지 스레드가 블로킹된다
import kotlinx.coroutines.*

fun main() {
    
    runBlocking {
        println("Hello")
    }
     println("World")         
}

// Hello
// World
  • 일반적으로 코루틴은 스레드를 차단하지 않고 사용해야하므로 runBlocking을 사용하는 것은 좋지 않지만 꼭 사용해야하는 경우가 있다.
    • 코루틴을 지원하지 않는 경우 예) 테스트 코드, 스프링 배치 등
  • 실행옵션에 -Dkotlinx.coroutines.debug 을 붙여주면 코루틴에서 수행되는 스레드는 이름 뒤에 @coroutine#1 이 붙어있는 것을 볼 수 있다

2. launch

  • launch는 스레드 차단 없이 새 코루틴을 시작하고 결과로 job을 반환하는 코루틴 빌더이다
  • launch는 결과를 만들어내지 않는 비동기 작업에 적합하기 때문에 Unit을 반환하는 람다를 인자로 받는다
fun main() =  runBlocking<Unit> {
    launch {
        delay(500L)
        println("World!")
    }
    println("Hello")
}

// Hello
// World
  • delay() 함수는 코루틴 라이브러리에 정의된 일시 중단 함수이며 Thread.sleep() 과 유사하지만 현재 스레드를 차단하지 않고 일시 중단 시킨다. 이때 일시 중단 된 스레드는 코루틴내에서 다른 일시 중단 함수를 수행한다
  • launch를 사용해서 여러개의 작업을 동시에 수행할 수 있다
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

fun main() {

    runBlocking {
        launch {
            val timeMillis = measureTimeMillis {
                delay(150)
            }
            println("async task-1 $timeMillis ms")
        }

        launch {
            val timeMillis = measureTimeMillis {
                delay(100)
            }
            println("async task-2 $timeMillis ms")
        }

    }

}
  • launch가 반환하는 Job 을 사용해 현재 코루틴의 상태를 확인하거나 실행 또는 취소도 가능하다
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

fun main() = runBlocking<Unit> {
    val job1: Job = launch {
        val timeMillis = measureTimeMillis {
            delay(150)
        }
        println("async task-1 $timeMillis ms")
    }
    job1.cancel() // 취소 

		
    val job2: Job = launch(start = CoroutineStart.LAZY) {
        val timeMillis = measureTimeMillis {
            delay(100)
        }
        println("async task-2 $timeMillis ms")
    }

    println("start task-2")

    job2.start()

}
  • job1.cancel() 을 호출해 코루틴을 취소할 수 있다
  • launch(start = CoroutineStart.LAZY) 를 사용해서 start 함수를 호출하는 시점에 코루틴을 동작시킬 수 있다
    • start 함수를 주석처리하면 launch가 동작하지 않는다

3. async

  • async 빌더는 비동기 작업을 통해 결과를 만들어내는 경우에 적합하다
import kotlinx.coroutines.*

fun sum(a: Int, b: Int) = a + b

fun main() = runBlocking<Unit> {

    val result1: Deferred<Int> = async {
        delay(100)
        sum(1, 3)
    }

    println("result1 : ${result1.await()}")

    val result2: Deferred<Int> = async {
        delay(100)
        delay(100)
        sum(2, 5)
    }

    println("result2 : ${result2.await()}")
}
  • async는 비동기 작업의 결과로 Deferred 라는 특별한 인스턴스를 반환하는데 await 이라는 함수를 통해 async로 수행한 비동기 작업의 결과를 받아올 수 있다
  • 자바 스크립트나 파이썬과 같이 다른 언어의 async-await은 키워드 인 경우가 보통이지만 코틀린의 코루틴은 async-await이 함수인 점이 차이점이다

자바 스크립트의 async-await 예시)

async function showAvatar() {

  // JSON 읽기
  let response = await fetch('/article/promise-chaining/user.json');
  let user = await response.json();

  // github 사용자 정보 읽기
  let githubResponse = await fetch(`https://api.github.com/users/${user.name}`);
  let githubUser = await githubResponse.json();

  // 아바타 보여주기
  let img = document.createElement('img');
  img.src = githubUser.avatar_url;
  img.className = "promise-avatar-example";
  document.body.append(img);

  // 3초 대기
  await new Promise((resolve, reject) => setTimeout(resolve, 3000));

  img.remove();

  return githubUser;
}

showAvatar();

구조적 동시성

  • 동시 실행 가능한 작업을 구조화된 방식으로 관리하여 코드의 가독성을 높이고 오류 가능성을 줄인다.
  • 계층 구조로 관리되어 부모 코루틴은 자식 코루틴의 작업이 모두 끝나기 전까지 종료되지 않는다.
  • 자식 코루틴에서 발생한 에러는 부모 코루틴으로 전파된다. 이를 통해 에러 처리를 중앙화할 수있고, 동시성을 처리하는 개별 코루틴의 에러핸들링이 가능하다.

suspend 함수

  • suspend 함수는 코루틴의 핵심 요소로써 일시 중단이 가능한 함수를 말한다
  • suspend는 키워드이다
  • suspend 함수는 일반 함수를 마음껏 호출할 수 있지만 일반 함수에선 suspend 함수를 호출할 수 없다
package structuredconcurrency

fun main() {
    printHello() // 컴파일 에러
}

suspend fun printHello() = println("hello")
  • Caller 함수에서 suspend 키워드를 붙여주면 된다
package structuredconcurrency

suspend fun main() {
    printHello()
}

suspend fun printHello() = println("hello")
  • 일시 중단 함수는 IntelliJ 에서 suspension point가 표시된다.

coroutineScope

  • runBlocking은 현재 스레드를 블로킹시키고 결과를 기다리지만 coroutineScope 는 스레드가 블로킹되지 않고 결과를 기다린다.
package structuredconcurrency

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main() {
    doSomething()
}

private suspend fun doSomething() = coroutineScope {
    launch {
        delay(200)
        println("world!")
    }

    launch {
        println("hello")
    }

}

예외 처리

  • coroutineScope 내부의 자식 코루틴에서 에러가 발생하면 모든 코루틴이 종료된다.
private suspend fun doSomething() = coroutineScope {

    launch {
        delay(200)
        println("world!") // 실행안됨

    }

    launch {
        throw RuntimeException("error")
    }
}
Exception in thread "DefaultDispatcher-worker-2" java.lang.RuntimeException
	at structuredconcurrency.CoroutineScopeErrorKt$doSomething$2$2$1.invokeSuspend(coroutineScopeError.kt:21)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
	Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@4bbea199, Dispatchers.Default]

  • 코루틴 빌더 내부에서 try-catch로 핸들링
private suspend fun doSomething() = coroutineScope {

    launch {
        delay(200)
        println("world!")

    }

    launch {
        try {
            throw RuntimeException()
        } catch (e: Exception) {
            println("hello")
        }
    }
}

hello
world!
  • supervisorScope 를 사용해서 예외를 부모 코루틴으로 전파하지않는 방법
private suspend fun doSomething() = coroutineScope {

    launch {
        delay(200)
        println("world!")

    }

    supervisorScope {
        launch {
            throw RuntimeException()
        }
    }
}

Exception in thread "DefaultDispatcher-worker-2" java.lang.RuntimeException
	at structuredconcurrency.CoroutineScopeErrorKt$doSomething$2$2$1.invokeSuspend(coroutineScopeError.kt:21)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
	Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@4bbea199, Dispatchers.Default]
world!
  • 코루틴의 예외 상황을 체계적으로 관리하고 싶을때 CEH(CoroutineExceptionHandler 를 사용한다
  • supervisorScope 와 CoroutineExceptionHandler 함께 사용
private suspend fun doSomething() = coroutineScope {

    launch {
        delay(200)
        println("world!")
    }

    supervisorScope {
        launch(handler) {
            throw RuntimeException()
        }
    }

}

val handler = CoroutineExceptionHandler { _, throwable ->
    println("Caught $throwable")
}

Caught java.lang.RuntimeException
world!

코루틴 채널

  • **Channel** 은 코루틴간의 통신을 위한 파이프라인이다
  • 기본 컨셉은 BlockingQueue 와 유사한 자료구조로 볼 수 있다.
  • 데이터 발신자가 채널에 send 로 데이터를 전달하고 수신 측에서 receive 를 사용해 채널로 부터 데이터를 받아온다.

BlockingQueue 를 사용해 주식 시세를 처리하는 처리하는 예제

package channel

import kotlinx.coroutines.*
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import kotlin.random.Random

fun main() = runBlocking {
    val queue: BlockingQueue<String> = LinkedBlockingQueue()

    // 생산자와 소비자 코루틴을 실행
    producer(queue, 20)
    consumer(queue)
}

suspend fun producer(queue: BlockingQueue<String>, count: Int) = coroutineScope {
    launch {
        repeat(count) {
            val stockPrice = "APPLE : ${Random.nextInt(100, 200)}"
            queue.put(stockPrice)
            delay(100L) // 0.1초 간격으로 데이터 생성
        }
    }
}

fun consumer(queue: BlockingQueue<String>) {
    for (stockPrice in queue) {
        println("Processed $stockPrice")
    }
}
  • put, take 가 모두 블로킹으로 이뤄진다

Channel 를 사용해 주식 시세를 처리하는 처리하는 예제

package channel

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import kotlin.random.Random

fun main() = runBlocking {
    val channel: Channel<String> = Channel()

    // 생산자와 소비자 코루틴을 실행
    producer(channel, 20)
    consumer(channel)
}

suspend fun CoroutineScope.producer(channel: Channel<String>, count: Int) {
    launch {
        repeat(count) {
            val stockPrice = "APPLE : ${Random.nextInt(100, 200)}"
            channel.send(stockPrice)
            delay(100L) // 0.1초 간격으로 데이터 생성
        }
        channel.close()  // 모든 데이터를 보냈으면 채널을 닫습니다.
    }
}

suspend fun consumer(channel: Channel<String>) {
    for (stockPrice in channel) {
        println("Processed $stockPrice")
    }
}
  • 데이터 송수신 연산(send, receive)이 모두 일시중단되므로 논-블로킹으로 이뤄진다.

플로우 : 비동기 데이터 스트림

  • **Flow** 는 코루틴에서 리액티브 프로그래밍 스타일로 작성할 수 있도록 만들어진 비동기 스트림 API이다
  • 코루틴의 suspend 함수는 단일 값을 반환하지만 Flow를 사용하면 무한대 값을 반환할 수 있다

플로우 빌더

  • flow 빌더를 사용해 코드를 작성하고 emit 을 사용해 데이터를 전달한다
package flow

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val flow = simple()
    flow.collect { value -> println(value) }
}

fun simple(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
  • 리액티브 스트림과 같이 Terminal Operator(최종 연산자) 인 collect 를 호출하지 않으면 아무런 일도 일어나지 않는다

flowOf 를 사용하면 여러 값을 인자로 전달해서 플로우를 만들 수 있다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    flowOf(1, 2, 3, 4, 5).collect { value ->
        println(value)
    }
}

asFlow 를 사용하면 존재하는 컬렉션이나 시퀀스를 플로우로 변환할 수 있다

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    listOf(1, 2, 3, 4, 5).asFlow().collect { value ->
        println(value)
    }
    (6..10).asFlow().collect {
        println(it)
    }
}

다양한 연산자 제공

package flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun filterExample() {
    (1..10)
        .asFlow()
        .filter { (it % 2) == 0 }
        .collect {
            println(it)
        }
}

suspend fun takeExample() {
    (1..10)
        .asFlow()
        .take(2)
        .collect {
            println(it)
        }
}

suspend fun mapExample() {
    (1..10)
        .asFlow()
        .map { it * it }
        .collect {
            println(it)
        }
}

suspend fun zipExample() {
    val nums = (1..3).asFlow()
    val strs = listOf("one", "two", "three").asFlow()

    nums.zip(strs) { a, b -> "$a -> $b" }
        .collect { println(it) }
}

suspend fun flattenExample() {
    val left = flowOf(1, 2, 3)
    val right = flowOf(4, 5, 6)
    val flow = flowOf(left, right)

    flow.flattenConcat().collect { println(it) }
}
728x90
반응형
728x90
반응형

1. 비동기-논블로킹 프로그래밍

전통적 웹 방식 (스프링 MVC)

  • 전통적 웹 방식은 1개 요청당 1개의 스레드를 사용하는 Thread per Request 모델
  • 요청 당 스레드를 사용하는 방식은 DB, Network IO 등이 발생할 경우 결과를 받기전까지 해당 스레드는 블로킹 상태가 됨


  • 요청을 처리하기 위해 스레드를 생성하고 전환하는데 따른 Context Switching 비용이 발생
  • 요청에 따라 무한정 스레드를 생성할 수 없기 때문에 스레드풀을 사용해 스레드를 재사용(톰캣 기본 200개)

 

 

비동기-논블로킹 방식

  • IO(DB, File, Network) 처리 시 스레드가 대기하지 않고 처리가 완료되면 다양한 방식으로 이벤트를 통지받는 방식 (콜백, 프로미스, async-await 등)
  • 더 적은 스레드로 같은 일을 할 수 있기 때문에 Context Switching에 따른 비용이 훨씬 적다. 즉 하드웨어 자원을 더 적게 사용한다
  • 순차적으로 동작하지 않으므로 코드의 흐름을 파악하기 어렵고 콜백헬(Callback Hell)로 인해 가독성과 유지보수성이 현저히 떨어짐


 

C10K 문제

  • 컴퓨터 과학에서 C10K 문제란 1만명의 사용자가 동시에 접속했을때(1만개의 소켓이 열림) 하드웨어가 충분함에도 기존의 I/O 통지 방식(e.g. select)으로는 제대로 처리하지 못하는 것을 말함
  • Event-Driven 모델을 사용해 해결한다.
    • 커널 레벨에선 OS에서 제공하는 epoll, kqueue와 같은 통지 모델을 사용해 해결할 수 있다.
    • 프로그래머는 로우레벨을 직접 다루지 않고 Netty, Node.js, Nginx, Vert.x 등을 사용한다.


 

2. 리액티브 프로그래밍

  • 비동기-논블로킹을 처리하는 새로운 패러다임
  • 2010년 에릭마이어에 의해 마이크로소프트 .NET 에코 시스템으로 정의됨
  • 리액티브 프로그래밍은 기본적으로 옵저버 패턴 처럼 데이터 제공자가 소비하는 측에 데이터를 통지하는 푸시 기반(Push-Based)
  • 구현체에 따라서 풀과 푸시를 모두 제공하는 경우도 있음 (Pull-Push Hybrid)
  • 데이터의 통지, 완료, 에러 처리를 옵저버 패턴에 영감을 받아 설계되었고 데이터의 손쉬운 비동기 처리를 위해 함수형 언어의 접근 방식을 사용

 

옵저버 패턴

  • 관찰 대상이 되는 객체가 변경되면 대상 객체를 관찰하고 있는 옵저버(Observer)에게 변경사항을 통지하는 디자인 패턴

직접 구현한 옵저버 패턴의 예시

import java.util.*

class Coffee(val name: String)


// 데이터 제공자
class Barista : Observable() {

    private fun makeCoffee(name: String) = Coffee(name)

    fun serve(name: String) {
         setChanged()
         notifyObservers(makeCoffee(name))
    }
}

// 데이터 구독자
class Customer : Observer {

    fun orderCoffee() = "Iced Americano"

    override fun update(o: Observable?, arg: Any?) {
         val coffee = arg as Coffee
         println("I got a cup of ${coffee.name}")
    }
}


fun main(args: Array<String>) {
    val barista = Barista()
    val customer = Customer()

    barista.addObserver(customer)
    barista.serve(customer.orderCoffee())
}

--------------------
출력 결과)
--------------------
I got a cup of Iced Americano

 

리액티브 스트림

  • JVM 환경에서 리액티브 프로그래밍의 표준 API 사양으로 비동기 데이터 스트림논블로킹-백프레셔(Back-Pressure)에 대한 사양을 제공
  • 리액티브 스트림 이전의 비동기식 애플리케이션에서는 멀티 코어를 제대로 활용하기 위해 복잡한 병렬 처리 코드가 필요
  • 처리할 데이터가 무한정 많아져서 시스템의 한계를 넘어서는 경우 애플리케이션은 병목 현상(bottleneck)이 발생하거나 심각한 경우 애플리케이션이 정지되는 경우도 발생할 수 있음(논블로킹-백프레셔로 해결)
  • Netflix, Vmware(Pivotal), Lightbend, Red Hat과 같은 유명 회사들이 표준화에 참여 중
  • 리액티브 스트림 인터페이스

  • Publisher : 데이터를 생성하고 구독자에게 통지
  • Subscriber : 데이터를 구독하고 통지 받은 데이터를 처리
  • Subscription : Publisher, Subscriber간의 데이터를 교환하도록 연결하는 역할을 하며 전달받을 데이터의 개수와 구독을 해지할 수 있다
  • Processor : Publisher, Subscriber을 모두 상속받은 인터페이스
  • 리액티브 스트림에서 Publisher와 Subscriber 간의 데이터 처리 흐름

리액티브 스트림을 표준 사양을 채택한 대표적인 구현체들

  • Project Reactor
  • RxJava
  • JDK9 Flow
  • Akka Streams
  • Vert.x
  • Hibernate Reactive(Vert.x 사용)

 

3. 프로젝트 리액터

  • 프로젝트 리액터(Project Reactor)는 리액티브 스트림의 구현체 중 하나로 스프링의 에코시스템 범주에 포함된 프레임워크이다
  • 리액티브 스트림 사양을 구현하고 있으므로 리액티브 스트림에서 사용하는 용어와 규칙을 그대로 사용한다
  • 리액터를 사용하면 애플리케이션에 리액티브 프로그래밍을 적용할 수 있고 비동기-논블로킹을 적용할 수 있다
  • 함수형 프로그래밍의 접근 방식을 사용해서 비동기-논블로킹 코드의 난해함을 해결한다
  • 백프레셔(Backpressure) 를 사용해 시스템의 부하를 효율적으로 조절할 수 있다
  • 리액터는 리액티브 스트림의 Publisher 인터페이스를 구현하는 모노(Mono)플럭스(Flux)라는 두 가지 핵심 타입을 제공한다
  • 두 타입 모두 리액티브 스트림 데이터 처리 프로토콜대로 onComplete 또는 onError 시그널이 발생할 때 까지 onNext를 사용해 구독자에게 데이터를 통지한다

모노

  • 0..1개의 단일 요소 스트림을 통지하는 발행자(Publisher)

플럭스

  • 0..N개로 이뤄진 무한대 요소를 통지하는 발행자

  • 연산자를 사용하기 위해서는 터미널 오퍼레이터인 구독(subcribe)이 필수이다. 구독하지 않으면 연산자는 실행되지 않는다. (이 개념은 Java8 스트림과 유사하며 WebFlux에선 컨트롤러에서 Mono,Flux 타입 반환시 자동으로 해줌)
  • CompletableFuture, DefferedResult, ListenableFuture 등 기존의 비동기 처리 방식은 구독 형태가 아니며, 단순한 기능만 가지고 있다 (async, join, callback 등)

 

4. 스프링 웹플럭스

  • 기본적으로 Project Reactor 기반 (RxJava와 같은 다른 구현체도 사용 가능함)
  • 서블릿 기반인 스프링 MVC와 대비되는 리액티브 기반의 웹 스택
  • 스프링 MVC와 베타적으로 동작하지만 부분적으로 상호 운용이 가능함 예) WebClient, 애노테이션 등

  • 내부적으로 블로킹 API가 있으면 성능이 현저하게 떨어짐
  • 예시로 Spring Data JPA는 내부에서 JDBC API를 사용하는데 JDBC 드라이버는 블로킹 API (Thread per Connection)이므로 JPA를 쓰는 경우 Spring MVC 쓰는게 더 나은 옵션이다
  • 꼭 블로킹 API 써야한다면 아래와 같이 별도의 스케쥴러로 동작시켜야한다
val blockingWrapper = Mono.fromCallable {

        /* make a remote synchronous call */ 

}.subscribeOn(Schedulers.boundedElastic())

  • 그 외에도 깊게 공부하면 연산자 융합(operator-fusion), Hot-Cold 퍼블리셔 등등 공부할게 정말 많다.

 

스프링 웹플럭스의 코루틴 지원

  • 리액터의 연산자 스타일에 비해 직관적인 스타일(진리의 사바사)
  • 리액터에 비해 러닝커브가 적은 편
  • 공식 레퍼런스 문서의 코틀린 예제들을 보면 코루틴 기반으로 소개하고 있다


코루틴이란?

  • 코루틴(Coroutine)은 코틀린에서 비동기-논블로킹 프로그래밍을 명령형 스타일로 작성할 수 있도록 도와주는 라이브러리이다
  • 코루틴은 멀티 플랫폼을 지원하여 코틀린을 사용하는 안드로이드, 서버 등 여러 환경에서 사용할 수 있다
  • 코루틴은 일시 중단 가능한 함수(suspend function) 를 통해 스레드가 실행을 잠시 중단했다가 중단한 지점부터 다시 재개(resume) 할 수 있다
  • 다른 언어와는 다르게 코틀린의 코루틴은 특별하다

 

코루틴을 사용한 구조적 동시성 예시

suspend fun combineApi() = coroutineScope {
	val response1 = async { getApi1() }
	val response2 = async { getApi2() }
	
	return ApiResult (
		response1.await()
		response2.await()
	)
}

 

 

리액티브가 코루틴으로 변환되는 방식

//Mono → suspend 
fun handler(): Mono<Void> -> suspend fun handler()

//Flux → Flow
fun handler(): Flux<T> -> fun handler(): Flow<T>
  • 모노는 서스펜드 함수로 플럭스는 플로우로 변환되는 걸 알 수 있다. 반대도 성립된다.

 

코루틴을 적용한 컨트롤러 코드

@RestController
class UserController(
	private val userService : UserService,
	private val userDetailService: UserDetailService
) {
	
	@GetMapping("/{id}")
	suspend fun get(@PathVariable id: Long) : User {
		return userService.getById(id)
	}
	
	@GetMapping("/users")
	suspend fun gets() = withContext(Dispatchers.IO) {
			val usersDeffered = async { userService.gets() }
			val userDetailsDeffered = async { userDetailService.gets() }
				
			return UserList(usersDeffered.await(), userDetailsDeffered.await())	
	}

}

 

  • Spring Data R2DBC는 아래와 같이 CoroutineCrudRepository를 지원한다

기존 리액티브 스택의 Spring Data R2DBC, Spring Data Mongo Reactive 등에서 ReactiveRepository 로 개발된 코드에서 코루틴으로 변환한 예제

interface ContentReactiveRepository : ReactiveCrudRepository<Content, Long> {

    fun findByUserId(userId: Long) : Mono<Content>

    fun findAllByUserId(userId: Long): Flux<Content>
}


class ContentService (
	val repository : ContentReactiveRepository
) {

	
	fun findByUserIdMono(userId: Long) : Mono<Content> {
		return repository.findByUserId(userId)
	}	

	suspend findByUserId (userId: Long) : Content {
		return repository.findByUserId(userId).awaitSingle()
	}

}

더 좋은 방법 : CoroutineCrudRepository 를 사용하면 awaitXXX 확장 함수 없이 사용 가능

interface ContentCouroutineRepository : CoroutineCrudRepository<Content, Long> {

    suspend fun findByUserId(userId:Long) : Content?

    fun findAllByUserId(userId: Long): Flow<Content>
}


class ContentService (
	val repository : ContentCouroutineRepository
) {


	suspend findByUserId (userId: Long) : Content {
			return repository.findByUserId(userId)
	}

}

 

 

 

Flow

  • Flow 는 코루틴에서 리액티브 프로그래밍 스타일로 작성할 수 있도록 만들어진 API이다
  • 코루틴의 suspend 함수는 단일 값을 비동기로 반환하지만 Flow를 사용하면 무한대 값을 반환할 수 있다
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    val flow = simple()
    flow.collect { value -> println(value) }
}

fun simple(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

// Flow started
// 1
// 2
// 3

 

  • 리액티브 스트림과 같이 Terminal Operator(최종 연산자)collect 를 호출하지 않으면 아무런 일도 일어나지 않는다
  • 코루틴의 플로우는 리액티브 스트림에서 영감을 받아 제작
    • 리액터의 플럭스(Flux)는 Pull-Push Hybrid
    • 코틀린의 플로우(Flow)는 Push Only
    • 아직은 리액터 플럭스에 비해 기능이 적다

 

스프링 MVC에서도 리액티브와 코루틴을 쓸수 있다

  • 코루틴, 리액티브를 쓰고 싶은 경우 의존성 추가
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
  • WebClient와 같은 비동기-논블로킹 라이브러리가 필요하다면 의존성 추가
implementation("org.springframework.boot:spring-boot-starter-webflux")
  • 웹플럭스를 바로 도입하기 어렵거나 MVC에서 비동기-논블로킹 구현을 우아하게 작성하고 싶은 경우 이 방법을 추천

 

 

DEMO

예제 코드

https://github.com/digimon1740/backend-techtalk

부하 테스트 시나리오

  1. Mock 데이터를 응답하는 유저 API 서버가 존재함
  2. 유저 API 서버는 API 서버로 부터 요청을 받으면 응답하는데까지 2초가 소요된다
  3. 유저 API 서버가 최초 실행되면 균형있는 테스트를 위해 warm-up을 수행한다
  4. 10초 동안 MVC, WebFlux 서버에 아래 조건으로 요청을 보내고 Vegeta 리포트와, VisualVM 모니터를 확인한다
  5. 100/s 요청 전송
  6. 200/s 요청 전송
  7. 300/s 요청 전송
  8. 1000/s 요청 전송

 

# MVC test
echo 'GET http://localhost:8080/mvc/users/1?delay=2000' | vegeta attack -duration=10s -rate=300/s | vegeta report

 

 

# WebFlux test
echo 'GET http://localhost:8081/webflux/users/1?delay=2000' | vegeta attack -duration=10s -rate=300/s | vegeta report

 

6. 정리

  • 전통적 블로킹 방식에 비해 비동기-논블로킹 모델은 더 적은 리소스로 같거나 더 많은 일을 할 수 있다
  • 필연적으로 비동기-논블로킹 모델은 코드의 가독성이 떨어지고 디버깅이 힘들 수 있다
  • 리액티브 프로그래밍은 비동기-논블로킹 개발을 더 쉽게 해주는 패러다임이다
  • 스프링 MVC는 느리고 구리다? 스프링 웹플럭스는 빠르고 멋지다?
  • 일반적으론 스프링 MVC도 충분하지만 한정된 자원에서 대량의 트래픽이 요구되는 서비스거나 비동기-논블로킹 IO가 필요한 경우 추천
  • 기존 스프링 MVC 프로젝트에 스프링 웹플럭스 의존성을 추가해서 써보는 것도 좋은 방법이다
  • 블로킹 API(JDBC, RestTemplate 등)을 쓸거면 그냥 마음 편히 스프링 MVC로 개발하자
  • 코틀린 + 웹플럭스 조합의 경우 메인은 코루틴을 사용하고 필요에 따라 리액티브를 같이 적용하는 것을 추천
  • 코루틴의 지원 범위를 넘어선 기능을 찾거나 연산자에 대한 학습이 충분하다면 리액티브는 여전히 매력적인 기술이다
  • 새로운 패러다임을 공부하는 것은 항상 어렵지만 도전했을때 얻는 것도 많다
  • 은탄환은 없다.

레퍼런스

https://devsh.tistory.com/category/Reactive Programming

https://tech.lezhin.com/2020/07/15/kotlin-webflux

https://github.com/digimon1740/webflux-demo

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/index.html

https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html

https://docs.spring.io/spring-framework/docs/current/reference/html/languages.html#coroutines

728x90
반응형
728x90
반응형


Hot vs Cold

리액터에는 두 개의 발행자 타입이 존재하는데 바로 Hot 발행자 Cold 발행자입니다. 먼저 Cold 발행자는 각각의 구독에 대해 새로운 데이터를 생성하는 구조입니다. 발행자에 대한 구독이 이뤄지지 않으면 데이터는 결코 새롭게 생성되지 않습니다. Cold 발행자에 대한 예를 들어보면 HTTP 요청이 Cold에 해당합니다. HTTP 요청은 클라이언트로부터 새로운 요청이 발생할 때마다 데이터를 생성하여 응답하는데 이때 여러 개의 클라이언트에서 요청이 발생하더라도 동일한 요청이라면 동일한 데이터 구조를 새로 응답할 것입니다. 또 다른 예로 넷플릭스에서 여러 사용자가 동일한 드라마 회차를 감상하는 경우 사용자들은 자신만의 타임라인을 가집니다. 즉 사용자별로 드라마를 처음 재생하면 처음부터 재생되고 재생을 시작한 시점부터 현재 보고 있는 시점이 각각 다른데 이러한 것들을 Cold 방식으로 볼 수 있습니다.

예제 1.36은 동일한 플럭스를 3개의 구독자가 구독하여도 동일한 데이터를 통지하는 Cold 발행자의 특징을 보여줍니다.

예제 1.36 Cold 발행자 예제 - reactor/reactor-basic/src/hotcold/Cold.kt

package hotcold
 
import reactor.core.publisher.Flux
 
fun main() {
    val list = listOf("one", "two", "three")
    val flux: Flux<String> = Flux.fromIterable(list)
 
    flux.subscribe { println("Subscriber-1 : $it") }
    flux.subscribe { println("Subscriber-2 : $it") }
    flux.subscribe { println("Subscriber-3 : $it") }
}
 
--------------------
출력 결과)
--------------------
Subscriber-1 : one
Subscriber-1 : two
Subscriber-1 : three
Subscriber-2 : one
Subscriber-2 : two
Subscriber-2 : three
Subscriber-3 : one
Subscriber-3 : two
Subscriber-3 : three


출력된 결과를 확인해보면 구독자가 각각 동일한 데이터를 통지받은 것을 확인할 수 있습니다. 즉 각각의 구독자가 새로운 구독을 만들었다는 것을 확인할 수 있습니다.

Hot 발행자는 구독자의 상태에 상관없이 데이터를 지속적으로 제공합니다. 예를 들어 주식 데이터를 제공하는 API의 경우 실시간으로 최신 데이터를 제공합니다. 국내 주식의 경우 정규장으로 오전 9시부터 시작됩니다. 만약 출근 후에 주식 정보를 보고 싶어서 네이버에 접속해 오전 10시에 주식 정보를 확인한다면 정규장이 시작된 오전 9시의 가격이 아닌 오전 10시의 주식가격을 확인하게 됩니다. 또 다른 예로 유투브의 실시간 스트리밍을 들 수 있습니다. 유저가 구독 중인 크리에이터의 실시간 스트리밍을 1시간이 지난 뒤 입장하면 당연히 처음부터 보는 게 아니라 실시간으로 크리에이터와 상호작용하게 됩니다. 이렇게 나중에 구독에 참가한 구독자는 처음부터 새로운 데이터를 얻는 게 아닌 새로이 구독한 시점부터 데이터를 제공받는 개념을 Hot 방식이라고 합니다.

예제 1.37은 Hot 발행자인 커넥터블 플럭스(ConnectableFlux)를 사용하여 각각의 구독자를 구독한 시점부터 데이터를 통지받도록 연결하여 출력하는 예제입니다.

예제 1.37 Hot 발행자 예제 - reactor/reactor-basic/src/hotcold/Hot.kt

package hotcold
 
import reactor.core.publisher.ConnectableFlux
import reactor.core.publisher.Flux
import java.time.Duration
 
 
fun main() {
    val flux: ConnectableFlux<Int> =
        Flux.range(1, 5)
            .delayElements(Duration.ofMillis(100))
            .publish()
 
    flux.connect()
 
    flux.subscribe { println("Subscriber-1 : $it") }
 
    Thread.sleep(Duration.ofMillis(200).toMillis())
    flux.subscribe { println("Subscriber-2 : $it") }
 
    Thread.sleep(Duration.ofMillis(200).toMillis())
    flux.subscribe { println("Subscriber-3 : $it") }
 
    Thread.sleep(Duration.ofSeconds(2).toMillis())
}
 
--------------------
출력 결과)
--------------------
Subscriber-1 : 1
Subscriber-1 : 2
Subscriber-2 : 2
Subscriber-1 : 3
Subscriber-2 : 3
Subscriber-1 : 4
Subscriber-2 : 4
Subscriber-3 : 4
Subscriber-1 : 5
Subscriber-2 : 5
Subscriber-3 : 5

 

  • range(1, 5)를 사용해 1부터 5까지의 플럭스를 생성하고 delayElements(Duration.ofMillis(100)) 함수를 사용해 각각의 데이터를 100밀리초 지연 후 publish로 데이터를 통지합니다.
  • 만약 flux.connect()가 없으면 발행자를 구독을 하더라도 아무 일도 일어나지 않습니다. 
  • 3개의 구독자인 flux.subscribe{}는 다음 구독자로 넘어가는 시점을 200밀리초 뒤로 미루고 현재 시점에서 통지 받은 데이터를 출력합니다.

출력 결과를 확인해보면 Subscriber-1은 1부터 5까지의 데이터를 모두 전달받았지만 Subscriber-2와 Subscriber-3은 지연된 만큼 데이터를 전달받지 못한 것을 확인할 수 있습니다.

앞서 커넥터블 플럭스를 사용해 Hot 발행자의 동작 방식에 대해 설명했습니다. 커넥터블 플럭스는 대표적인 Hot 발행자로 하나의 데이터를 생성하는 발행자에 여러 구독자가 동시에 구독하는 것을 허용합니다. 특징은 일반적인 Cold 방식의 발행자와 다른 점은 구독자가 구독을 위해 subscribe를 호출하더라도 데이터를 통지하지 않고 connect 연산자를 호출해야 비로소 데이터를 통지합니다. 

Cold 발행자인 플럭스를 커넥터블 플럭스로 변환하는 방법에는 2가지가 있습니다. 표 1.1은 커넥터블 플럭스로 변환하기 위한 연산자에 대해 설명합니다.

표 1.1 플럭스를 커넥터블 플럭스로 변환하는 연산자

함수 설명
publish 구독자가 연결된 시점 부터 새롭게 생성되는 데이터를 통지하는 커넥터블 플럭스를 제공하는 연산자입니다.
replay 인자로 지정한 만큼의 데이터를 캐시하고 있으면서 새롭게 구독자가 연결되어 구독을 시작하는 경우 시존에 캐시 된 데이터를 먼저 통지하고 캐시 된 데이터 데이터가 없다면 구독 이후로 생성된 데이터를 통지하는 연산자입니다.


생성된 커넥터블 플럭스는 connect와 같이 구독에 대한 추가적인 처리를 위한 방법도 제공합니다. 표 1.2는 커넥터블 플럭스가 제공하는 연산자에 대해 설명합니다. 

표 1.2 커넥터블 플럭스가 제공하는 연산자

함수 설명
connect 커넥터블 플럭스에 대한 구독을 활성화하기 위해 수동으로 호출하여 사용합니다. 
autoConnect 함수에 지정된 인자의 개수만큼 구독이 연결되면 자동으로 데이터를 통지합니다.
refCount 커넥터블 플럭스를 새로운 플럭스로 변환하고 함수에 지정된 인자의 구독자 개수만큼 연결된 구독자가 있을 경우에만 데이터를 처음부터 통지하고 새롭게 연결된 구독자에게는 구독 이후로 생성된 데이터를 통지합니다.

 

백프레셔

백프레셔(Back-Pressure)는 리액티브 스트림의 핵심 사양 중 하나입니다. 백프레셔는 데이터를 소비하는 측인 구독자가 자체적으로 처리 가능한 만큼의 데이터를 발행자에게 역으로 요청(request)합니다. 데이터를 소비하는 속도 즉 처리 속도보다 데이터를 생성하고 통지하는 속도가 빠르다면 구독자 입장에선 압박(pressure)이 될 수 있습니다. 백프레셔는 이런 상황에서 구독자가 처리 가능한 만큼 데이터를 통지할 수 있도록 조절해주는 기능입니다.

서비스를 예를 들어 동시 접속자가 최대 100명인 시스템에 1000명이 동시에 접속한다면 장애가 발생할 수 있습니다. 이런 경우 인프라를 증설하여 처리해야 하지만 예상치 못한 동시 접속자 증가는 대응하기가 쉽지 않습니다. 이럴 때 서버 앞단에서 요청에 대한 요청 제한(Rate Limiting)을 적용해 HTTP 상태 코드 429 Too Many Requests를 반환하여 처리하면 최악의 상황인 서비스 장애는 방지할 수 있습니다.

예제 1.38은 Flux.range를 사용하여 1부터 10까지 순차적으로 데이터를 통지하고 구독자(Subscriber)를 직접 구현하여 요청을 조절하는 간단한 백프레셔를 구현하는 예제입니다. 

예제 1.38 백프레셔 예제 - reactor/reactor-basic/src/backpressure/Backpressure.kt

package backpressure
 
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import reactor.core.publisher.Flux
 
fun main() {
 
    Flux.range(1, 10)
        .log()
        .subscribe(object : Subscriber<Int?> {
            private var s: Subscription? = null
            private var count = 0
 
            override fun onSubscribe(s: Subscription) {
                println("onSubscribe : 구독 시작")
                println("==================")
                this.s = s
                s.request(5)
            }
 
            override fun onNext(integer: Int?) {
                count++
                if (count % 5 == 0) {
                    println("==================")
                    s!!.request(5)
                }
            }
 
            override fun onError(t: Throwable) {}
            override fun onComplete() {
                 println("==================")
                 println("onComplete : 구독 완료")
                 println("통지된 numbers 개수 : $count")
            }
        })
}
 
--------------------
출력 결과)
--------------------
onSubscribe : 구독 시작
==================
[ INFO] (main) | request(5)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onNext(5)
==================
[ INFO] (main) | request(5)
[ INFO] (main) | onNext(6)
[ INFO] (main) | onNext(7)
[ INFO] (main) | onNext(8)
[ INFO] (main) | onNext(9)
[ INFO] (main) | onNext(10)
==================
[ INFO] (main) | request(5)
[ INFO] (main) | onComplete()
==================
onComplete : 구독 완료
최종 통지된 numbers 개수 : 10


subscribe 함수의 인자에는 리액티브 스트림의 구독자 인터페이스인 Subscriber를 직접 구현하였습니다. 우선 구독이 시작되면서 최초에 한번 실행되는 onSubscribe 함수에서는 Subscription을 초기화하고 s.request(5)를 호출해서 발행자에게 request에 전달받은 만큼의 데이터를 통지하도록 요청합니다. 그다음 호출되는 onNext는 발행자가 생성한 데이터를 담아 통지 신호가 발생할 때마다 호출됩니다. onNext 내부에는 데이터 통지 횟수를 count 변수에 기록하고 count가 5의 배수인 경우에 request를 호출해서 발행자에게 데이터 통지를 요청합니다. 마지막으로 onComplete는 구독에 대한 통지가 완료되었을 때 동작합니다.

출력 결과를 확인해보면 request(5)로 요청한 대로 onNext(1).. onNext(5)까지 순서대로 통지된 후 다음엔 onNext(6).. onNext(10)까지 차례로 통지된 것을 알 수 있습니다. 또한 더이상 통지할 데이터가 없는 경우에는 request(5)를 요청했지만 onNext가 호출되지 않고 onComplete 시그널이 호출되며 구독이 종료되었습니다.

 

리액터로 작성한 코드를 테스트하기

자동화된 테스트를 작성하는 것은 프로그램의 잠재된 버그의 발견을 쉽게 하고 코드의 품질을 높여줍니다. 그러나 비동기-논 블로킹 형태의 코드는 테스트가 번거롭고 예측하기 어렵습니다. 예를 들어 여러 스레드를 동시에 생성한 후 각각 다른 작업을 한 후 합친 결과에 대해 테스트를 하거나 시간차로 동작하는 코드는 자동화된 테스트를 어렵게 하는 요인으로 알려져 있지만 리액터는 자동화된 테스트를 위한 여러 가지 도구로써 reactor-test라는 라이브러리를 제공합니다. 표 1.3은 reactor-test에서 제공하는 핵심 3가지 패키지에 대한 설명입니다.

표 1.3 reactor-test가 제공하는 패키지

함수 설명
reactor.test 발행자에 대한 검증 및 테스트를 위한 메인 테스트 컴포넌트 패키지
reactor.test.publisher 테스트 가능한 발행자의 생성을 위한 컴포넌트 패키지
reactor.test.scheduler 테스트 가능한 스케쥴러의 생성을 위한 컴포넌트 패키지

 

StepVerifier

리액터 테스트 패키지에는 StepVerifier라는 리액티브 스트림의 각 단계를 테스트하기 위한 API를 제공합니다.  StepVerifier를 사용하면 데이터 통지와 구독에 대한 조건이 예상한 결과를 만족하는지를 판단하는 테스트 시나리오를 작성할 수 있으므로 편리합니다. 또한 다음에 예상되는 동작과 주어진 시간 동안 딜레이를 주는 등의 다양한 테스트를 가능하게 합니다. StepVerifier는 빌더 패턴 형태로 제공되는데 빌더 패턴을 적용한 함수의 반환값은 this 즉 자기 자신이기 때문에 함수를 연쇄적으로 호출할 수 있다는 장점이 있습니다. 이런 기법을 메소드 체이닝(Method Chaining)이라고 부릅니다.

 

expectNext

리액터가 제공하는 테스트 도구를 사용하기 위해 우선 코틀린 그레이들 프로젝트를 생성합니다. 정상적으로 생성되었다면 아래와 같은 구조로 프로젝트가 생성됩니다.

├── build.gradle.kts
├── gradlew
├── settings.gradle.kts
└── src
    ├── main
    │   ├── kotlin
    │   └── resources
    └── test
        ├── kotlin
        └── resources


프로젝트가 정상적으로 생성되었다면 build.gradle.kts를 열어 예제 1.39와 같이 작성합니다.

예제 1.39 build.gradle.kts - reactor/reactor-test/build.gradle.kts

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
 
plugins {
    kotlin("jvm") version "1.4.21"
}
 
group = "com.reactor.reactortest"
version = "1.0-SNAPSHOT"
 
repositories {
    mavenCentral()
}
 
dependencies {
    implementation("org.jetbrains.kotlin:kotlin-stdlib")
    implementation("io.projectreactor:reactor-core:3.4.0")
  
    testImplementation(kotlin("test-junit5"))
    testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.0")
    testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.0")
    testImplementation("io.projectreactor:reactor-test:3.4.0")
}
 
tasks.test {
    useJUnitPlatform()
}
 
tasks.withType<KotlinCompile>() {
    kotlinOptions.jvmTarget = "11"
}

예제에서는 리액터 테스트를 위한 reactor-test 라이브러리와 유닛 테스트 프레임워크인 JUnit5를 의존성에 추가했습니다. 그다음 test/kotlin 디렉토리 하위에 ReactorTest1 클래스를 생성한뒤 예제 1.40과 같이 첫번째 테스트 코드를 작성합니다.

 

예제 1.40 첫번째 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest1.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest1 {
 
    @Test
    fun `짝수로 된 데이터만 통지해야 한다`() {
        val flux = Flux.just(1, 2, 3, 4)
            // rem은 나머지를 구하는 %와 같다.
            .filter { num -> num.rem(2) == 0 }
 
        StepVerifier
            .create(flux)
            .expectNext(2)
            .expectNext(4)
            .expectComplete()
            .verify()
    }
}


예제 1.40는 Flux.just의 인자에 1부터 4까지 지정하고 filter 연산자에서 통지된 정수 값을 2로 나눈 나머지가 0인 데이터만 필터링하므로 짝수인 2, 4만 통지됩니다. 그다음 테스트의 핵심인 StepVerifier는 첫 단계로 create(publisher) 연산자를 사용해 초기화합니다. StepVerifier가 초기화되면 expect로 시작하는 연산자를 사용해 단계별로 검증이 가능합니다. 가장 먼저 사용된 expectNext는 onNext로 통지받은 데이터가 인자로 지정된 값과 일치하는지를 검증합니다. 더 이상 검증할 값이 없다면 expectComplete 연산자로 OnComplete 신호를 통지합니다. 그다음 최종적으로 verify 함수를 호출하면 테스트 시나리오에 대한 검증이 완료됩니다. 검증이 정상이라면 테스트를 통과할 것이고 검증이 잘못됬다면 테스트가 실패할 것 입니다.

 

recordWith, expectNextCount, expectRecordedMatches

예제 1.41은 여러개의 과일을 가진 플럭스를 생성하고 과일의 개수와 특정한 과일이 존재하는지 검증하는 예제입니다. 

예제 1.41 reocrdWith 관련 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest2.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
import java.util.ArrayList
 
class ReactorTest2 {
 
    @Test
    fun `사과가 들어있는지 확인`() {
        val flux: Flux<String> = Flux.just("사과", "바나나", "딸기")
 
        StepVerifier
            .create(flux)
            .recordWith { ArrayList() }
            .expectNextCount(3)
            .expectRecordedMatches { fruits ->
                fruits.contains("사과")
            }
            .expectComplete()
            .verify()
    }
}


먼저 StepVerifier를 create로 초기화 후 recordWith{ ArrayList() }를 사용했습니다. recordWith는 create로 초기화된 발행자가 통지하는 onNext 값을 인자로 제공받은 컬렉션에 기록합니다. recordWith로 기록된 데이터는 내부에 컬렉터로 불리는 내부 큐에 보관되며 expectRecordedMatches와 consumeRecordedWith 같은 테스트 연산자를 사용하기 위해 필수로 호출돼야 합니다.

다음 스텝으로 expectNextCount는 통지된 데이터의 개수가 인자로 지정된 개수와 일치하는지 검증합니다. 복수의 데이터가 통지되는 경우 유용하게 사용할 수 있는 연산자입니다. 그다음 expectRecordedMatches는 인자로  함수형 인터페이스인 Predicate를 전달받습니다. Predicate 내부의 test 함수는 Boolean을 리턴하기 때문에 전달받은 조건이 true인 경우에 테스트가 통과합니다. 예제에서는 fruits로 제공받은 리스트 안에 ‘사과’가 존재하므로 이 경우에는 true을 반환하므로 테스트를 통과하게 됩니다.

 

expectError, expectErrorMessage, expectErrorSatisfies

테스트 케이스를 작성할 때  특정 조건에서 의도한 에러가 발생하는지 검증하는 것도 중요한 테스트입니다. 애플리케이션이 운영 중인 상황에서 의도치 않은 에러가 발생하여 시스템에 장애가 발생하게 될 경우 간단한 코드 상의  에러라면 빠르게 처리하여 핫픽스 배포를 진행할 수 있지만 돈에 관련된 문제라든지 데이터의 일관성에 부정확을 초래하는 에러인 경우 잠깐 사이에도 큰 장애가 될 수 있습니다. 이런 경우를 방지하기 위해 특정한 조건에서 의도한 에러를 발생시키면 개발자가 에러를 핸들링할 수 있기 때문에 에러 검증을 위한 테스트 케이스는 매우 중요합니다.

예제 1.42는 기존에 존재하는 플럭스에 임의의 에러 플럭스를 합친 다음 정상적인 데이터가 통지된 이후 onError 신호가 발생하는지 검증하는 테스트입니다.

예제 1.42 expectError 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest3.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest3 {
 
    @Test
    fun `IllegalArgumentException 에러가 발생해야 한다`() {
        val error = IllegalArgumentException("error")
        val errorFlux = Flux.error<String>(error)
 
        val source = Flux.just("success-1", "success-2")
            .concatWith(errorFlux)
 
        // 종류에 상관없이 에러가 발생하면 통과
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
            .expectError()
            .verify()
 
        // 인자로 지정한 에러가 발생할 때만 통과
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
            .expectError(IllegalArgumentException::class.java)
            .verify()
    }
}

 


expectError는 현재 구독하는 발행자가 에러를 통지하는지 검증합니다. 리액티브 스트림에는 서브스크라이버의 onError에 대한 사양이 있기 때문에 내부적으로 onError 신호가 발생하는지를 판단하여 통과여부를 판단하게됩니다. 예제에서 먼저 사용한 인자 없는 expectError 연산자는 에러의 종류에 상관없이 onError 신호가 발생하면 테스트를 통과하게되고 상세한 에러를 인자로 받는 expectError(Throwable)는 지정한 에러가 발생하는 경우에만 테스트를 통과합니다.

모든 에러의 최상위 부모인 Throwable은 기본 생성자 외에도 사용자 정의 에러 메시지를 인자로 받는 Throwable(message)을 제공합니다. 이러한 사용자 정의 에러 메시지를 검증하고 싶은 경우 expectErrorMessage 연산자를 사용해 테스트할 수 있습니다. 예제 1.43에서는 expectErrorMessage 연산자를 사용해 사용자 정의 에러 메시지에 대한 테스트 방법을 확인할 수 있습니다.

예제 1.43 expectErrorMessage 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest4.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest4 {
 
    @Test
    fun `An Error Occurred 에러 메시지가 발생해야 한다`() {
        val error = IllegalArgumentException("An error occurred")
        val errorFlux = Flux.error<String>(error)
 
        val source = Flux.just("success-1", "success-2")
            .concatWith(errorFlux)
 
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
            .expectErrorMessage("An error occurred")
            .verify()
    }
}

IllegalArgumentException의 인자로 “An error occurred” 에러 메시지를 지정하고 에러를 통지하면 expectErrorMessage는 내부적으로 통지받은 에러 플럭스를 Exception의 최상위 타입인 Throwable로 변환한 뒤 메시지를 비교하고 기대하는 결과가 동일하다면 테스트를 통과시킵니다.

마지막으로 expectErrorSatisfies 함수는 인자로 Consumer<Throwable>을 받아서 개발자가 에러에 대한 검증을 직접 구현할 수 있습니다. 인자로 받는 Consumer 역시 함수형 인터페이스의 하나로 반환 값 없이 인자로 전달된 함수를 단순히 실행하는 함수형 인터페이스입니다.

@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}

예제 1.44는 expectErrorSatisfies에 전달되는 람다 함수로 JUnit5의 Assertions API를 사용해 통지된 에러의 타입을 assertTrue로 우선 체크 한 후 에러 메시지가 기대한 결과와 일치하는지를 assertEquals로 검증하는 코드입니다.

 

예제 1.44 expectErrorMessage 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest5.kt

import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest5 {
 
    @Test
    fun `An Error Occurred 에러 메시지가 발생해야 한다`() {
        val error = IllegalArgumentException("An error occurred")
        val errorFlux = Flux.error<String>(error)
 
        val source = Flux.just("success-1", "success-2")
            .concatWith(errorFlux)
 
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
            .expectErrorSatisfies {
                assertTrue(it is IllegalArgumentException)
                assertEquals("An error occurred", it.message)
            }
            .verify()
    }
}

 


발생할 에러의 타입이 다양하여 분기 처리가 필요하거나 상세한 검증 로직이 필요한 경우 expectErrorSatisfies를 사용하면 쉽게 구현이 가능합니다.

 

verify로 시작하는 편의 함수

앞서 알아본 예제의 경우  코드의 마지막에 최종적으로 verify를 호출하여 테스트를 실행하였는데 편의 함수(Convenience Function)를 사용하면 조금 더 간결하게 코드를 작성할 수 있습니다. StepVerifier는 verify로 시작하는 이름의 편의 함수를 제공합니다. 

먼저 예제 1.41에서 작성한 테스트 코드의 마지막 부분인 expectComplete.verify()입니다. 예제 1.45와 같이 편의 함수인 verifyComplete를 사용해서 한 번의 함수 호출로 코드를 재작성 할 수 있습니다.

예제 1.45 verifyComplete 예제 - reactor/reactor-test/src/test/kotlin/ReactorTest6.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
import java.util.*
 
class ReactorTest6 {
 
    @Test
    fun `사과가 들어있는지 확인`() {
        val flux: Flux<String> = Flux.just("사과", "바나나", "딸기")
 
        StepVerifier
            .create(flux)
            .recordWith { ArrayList() }
            .expectNextCount(3)
            .expectRecordedMatches { fruits ->
                fruits.contains("사과")
            }
//          .expectComplete()
//          .verify()
            .verifyComplete()
    }
}

 

예제 1.42의 expectError.verify()와 expectError(Throwable).verify()는 예제 1.46와 같이 verifyError와 verifyError(Throwable)을 사용해 코드를 재작성 할 수 있습니다.

예제 1.46 verifyError 예제 - reactor/reactor-test/src/test/kotlin/ReactorTest7.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest7 {
 
    @Test
    fun `IllegalArgumentException 에러가 발생해야 한다`() {
        val error = IllegalArgumentException("error")
        val errorFlux = Flux.error<String>(error)
 
        val source = Flux.just("success-1", "success-2")
            .concatWith(errorFlux)
 
        // 종류에 상관없이 에러가 발생하면 통과
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
//          .expectError()
//          .verify()
            .verifyError()
 
        // 인자로 지정한 에러가 발생할 때만 통과
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
//          .expectError(IllegalArgumentException::class.java)
//          .verify()
            .verifyError(IllegalArgumentException::class.java)
    }
}

 

예제 1.43의 expectErrorMessage.verify()는 예제 1.47과 같이 편의 함수인 verifyErrorMessage(message)를 사용해서 한 번의 함수 호출로 코드를 재작성 할 수 있습니다.

 

예제 1.47 verifyErrorMessage 예제 - reactor/reactor-test/src/test/kotlin/ReactorTest8.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest8 {
 
    @Test
    fun `An Error Occurred 에러 메시지가 발생해야 한다`() {
        val error = IllegalArgumentException("An error occurred")
        val errorFlux = Flux.error<String>(error)
 
        val source = Flux.just("success-1", "success-2")
            .concatWith(errorFlux)
 
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
//          .expectErrorMessage("An error occurred")
//          .verify()
            .verifyErrorMessage("An error occurred")
    }
}

 

마지막으로 예제 1.44의 expectErrorSatisfies.verify()입니다. 예제 1.48과 같이 편의 함수인 verifyErrorMessage를 사용해서 한 번의 함수 호출로 코드를 재작성 할 수 있습니다.

 

예제 1.48 verifyErrorSatisfies 예제 - reactor/reactor-test/src/test/kotlin/ReactorTest9.kt

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest9 {
 
    @Test
    fun `An Error Occurred 에러 메시지가 발생해야 한다`() {
        val error = IllegalArgumentException("An error occurred")
        val errorFlux = Flux.error<String>(error)
 
        val source = Flux.just("success-1", "success-2")
            .concatWith(errorFlux)
 
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
//          .expectErrorSatisfies {
//              assertTrue(it is IllegalArgumentException)
//              assertEquals("An error occurred", it.message)
//          }
//          .verify()
            .verifyErrorSatisfies {
                assertTrue(it is IllegalArgumentException)
                assertEquals("An error occurred", it.message)
            }
    }
}


이렇게 verify로 시작하는 편의 함수를 사용하면 마지막에 호출되는 expect 테스트 함수를 줄일 수 있으므로 조금 더 간결하고 편리하게 코드를 작성할 수 있습니다. 

 

부수 효과 적용

부수 효과(Side Effect)는 핵심 로직이 아닌 부가적으로 동작하는 별개의 기능을 추가하는 것을 말합니다. 리액터는 기존에 작성한 연산자를 수정하지 않고 부수 효과를 지원하는 연산자를 통해 로직을 특정한 신호와 함께 동작하도록 추가하거나 통지되는 데이터를 들여다보기 위한 방법을 제공합니다. 이러한 연산자는 대부분 ‘do’라는 접두어를 사용합니다. 주로 사용되는 연산자는 ‘doOn’으로 시작하는 연산자로 특정 신호(onSubscribe, onNext 등)가 동작하면 콜백 형태로 동작하므로 각각의 신호가 발생할 때마다 쉽게 부수 효과를 적용할 수 있습니다. 예를 들어 데이터를 변환하거나 필터링 후 데이터가 어떻게 통지되는지 로그로 출력하는 등 핵심 로직과는 별개의 작업이 필요한 경우 유용하게 사용될 수 있을 것입니다. 표 1.4는 주로 사용되는 doOn으로 시작하는 연산자들을 소개합니다.


표 1.4 주로 사용되는 doOn으로 시작하는 연산자

함수 설명
doOnSubscribe 구독을 시작하고 onSubscribe 신호가 발생하면 호출 
doOnNext onNext로 데이터를 통지한뒤 호출 
doOnComplete onComplete 신호가 발생하기 직전에 호출. 플럭스 전용 연산자
doOnError onError 신호가 발생하면 호출


이외에도 더 많은 부수 효과를 위한 연산자들은 리액터 공식 문서에서 확인할 수 있습니다. 

 

doOnSubscribe

doOnSubscribe는 구독자의 onSubscribe가 호출되면 이어서 동작하는 연산자입니다. onSubscribe는 구독이 일어나면 최초 1회만 호출되는 것이 특징입니다. doOnSubscribe은 함수형 인터페이스인 Consumer를 제공받아서 반환 값 없이 함수 내부의 코드만 실행하는 것이 특징입니다. 개발자는 Consumer를 작성하면 인자로 서브스크립션(Subscription)을 제공받으므로 현재 구독에 대한 부수 효과도 같이 처리할 수 있습니다. 

예제 1.49는 “red”와 “blue”라는 문자열을 가진 플럭스를 생성하고 log 연산자를 사용해 내부적으로 어떤 동작이 일어나는지 로그로 출력하고 전달받은 문자열을 대문자로 변환한 뒤 각각 출력합니다.

예제 1.49 doOnSubscribe 예제 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnSub1.kt

package sideeffect
 
import reactor.core.publisher.Flux
 
fun main() {
    Flux.just("red", "blue")
        .log()
        .map {
            it.toUpperCase()
        }
        .subscribe(::println)
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(red)
RED
[ INFO] (main) | onNext(blue)
BLUE
[ INFO] (main) | onComplete()


출력 결과를 보면 [ INFO]로 시작하는 로그를 확인할 수 있는데 이것들은 모두 log() 연산자를 사용하였기 때문에 내부 동작이 출력되는 것입니다. 실무에서는 디버깅 용도를 제외하면 성능을 하락시킬 수 있기 때문에 유의하여 사용해야 합니다.  로그 내용을 보면 최초 onSubscribe가 호출된 후 request(unbounded)가 호출된 것을 알 수 있습니다. unbounded는 즉 Long.MAX_VALUE 값이기 때문에 무한대의 값을 요청한 것과 마찬가지입니다. request 이후에는 onNext(red)로 데이터가 통지되고 subscribe(::println)에서 ‘RED’를 출력합니다. 그다음 onNext(blue)로 두 번째 데이터가 다시 통지되고 ‘BLUE’가 다시 subscribe(::println)에 의해 콘솔에 출력됩니다. 최종적으로 구독이 완료되어 onComplete 신호가 호출된 것을 확인할 수 있습니다.

예제 1.50은 doOnSubscribe를 적용하면 어떻게 동작하는지 알아보기 위해 예제 1.49에 doOnSubscribe를 추가하고 몇 가지 부수 효과를 적용한 예제입니다. 먼저 부수 효과를 주기위해 적용한 doOnSubscribe 내부에선 인자로 전달받은 서브스크립션을 조작합니다. 우선 request(1)을 호출하여 onNext로 통지 될 데이터를 1개씩으로 제한합니다. 그런다음 cancel을 호출하여 구독을 취소한뒤 부수 효과가 제대로 적용됬는지 확인하기 위해 “doOnSubscribe is called”를 콘솔에 출력합니다. 

예제 1.50 doOnSubscribe 예제2 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnSub2.kt

package sideeffect
 
import reactor.core.publisher.Flux
 
fun main() {
    Flux.just("red", "blue")
        .log()
        .doOnSubscribe { subscription ->
            subscription.request(1)
            subscription.cancel()
            println("doOnSubscribe is called")
        }
        .map {
            it.toUpperCase()
        }
        .subscribe(::println)
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(red)
RED
[ INFO] (main) | cancel()
doOnSubscribe is called
[ INFO] (main) | request(unbounded)


출력 결과를 확인해보면 최초 onSubscribe가 호출되고 이후 request(1)이 출력되었습니다. 예제에서 subscription.request(1)을 호출했기 때문에 데이터 통지가 1로 제한된 것입니다. 그다음엔 onNext(red)로 데이터가 통지되고 ‘RED’가 출력됩니다. 그다음 이전 예제에서는 ‘BLUE’가 출력됐지만 이번 예제에서는 cancel()이 호출되고 저희가 작성한 ‘doOnSubscribe is called’가 출력되었습니다. 즉 subscription.cancel()을 호출하여 구독을 취소했기 때문에 처음에 request(1)로 요청한 데이터는 정상 통지되고 두 번째로 데이터를 통지하려는 시점에서는 이미 구독이 취소되어 데이터가 출력되지 않은 것입니다.

 

doOnNext

doOnNext는 데이터가 통지되는 시점인 onNext가 호출된 후 동작하는 연산자입니다. doOnNext는 onNext로 통지된 값을 인자로 전달받습니다. 그러므로  통지된 값을 확인하거나 통지된 값을 가지고 별도의 부수 효과를 적용할 수 있으므로 유용하게 사용될 수 있습니다. 

예제 1.51은 1부터 5까지 데이터를 통지하는 플럭스를 생성하고 doOnNext에서 통지된 값을 확인할 수 있도록 출력하는 예제입니다. 

예제 1.51 doOnNext 예제1 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnNext1.kt

package sideeffect
 
import reactor.core.publisher.Flux
 
fun main() {
    Flux.range(1, 5)
        .log()
        .doOnNext {
            println("통지된 값 : $it")
        }
        .subscribe()
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
통지된 값 : 1
[ INFO] (main) | onNext(2)
통지된 값 : 2
[ INFO] (main) | onNext(3)
통지된 값 : 3
[ INFO] (main) | onNext(4)
통지된 값 : 4
[ INFO] (main) | onNext(5)
통지된 값 : 5
[ INFO] (main) | onComplete()


출력 결과를 보면 Flux.range에 지정된 값의 순서대로 onNext로 데이터 통지 신호가 발생하고 각각의 값이 doOnNext로 전달되는 것을 확인할 수 있습니다.

예제 1.52에서는 AtomicInteger 클래스를 사용해 onNext 신호가 발생할 때마다 데이터 통지를 기록할 카운터를 생성하고 카운트한 뒤 출력하는 예제입니다.

예제 1.52 doOnNext 예제2 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnNext2.kt

package sideeffect
 
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicInteger
 
fun main() {
   val counter = AtomicInteger()
 
   Flux.range(1, 5)
       .log()
       .doOnNext {
           counter.incrementAndGet()
           println("카운터 증가")
       }
       .subscribe()
 
   println("총 개수 : ${counter.get()}")
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
카운터 증가
[ INFO] (main) | onNext(2)
카운터 증가
[ INFO] (main) | onNext(3)
카운터 증가
[ INFO] (main) | onNext(4)
카운터 증가
[ INFO] (main) | onNext(5)
카운터 증가
[ INFO] (main) | onComplete()
총 개수 : 5

출력 결과를 보면 onNext 신호가 발생할때마다 “카운터 증가" 메시지가 콘솔에 출력되는 것을 확인할 수 있습니다. 총 5번의 카운터가 증가되고 최종적으로 “총 개수 :5”가 출력됩니다. doOnNext를 사용하면 각각 데이터 통지 마다 이러한 부수 효과를 적용할 수 있습니다.

 

doOnComplete

doOnComplete는 통지가 성공적으로 완료되는 시점에 동작하는 연산자로 인자로 함수형 인터페이스인 Runnable을 전달 받아서 동작하는 것이 특징입니다. 또한 doOnComplete는 플럭스만 제공하는 연산자로 모노의 경우 비슷한 동작을 하는 doOnSuccess가 있습니다.

예제 1.53은 A부터 C를 통지하는 플럭스를 생성하고 데이터 통지가 완료되면 doOnComplete 내부에서 AtomicBoolean을 사용해 성공 여부를 true로 변경한 후 데이터 통지 성공 여부를 출력합니다.

예제 1.53 doOnComplete 예제1 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnComp1.kt

package sideeffect
 
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicBoolean
 
fun main() {
    val completed = AtomicBoolean(false)
    Flux.just("A", "B", "C")
        .log()
        .doOnComplete {
            completed.set(true)
        }
        .subscribe()
 
    if (completed.get()) {
        println("데이터 통지 완료")
    } else {
        println("데이터 통지 실패")
    }
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(A)
[ INFO] (main) | onNext(B)
[ INFO] (main) | onNext(C)
doOnComplete 동작
[ INFO] (main) | onComplete()
데이터 통지 성공


출력 결과를 보면 우선 onNext로 A, B, C가 순차적으로 통지됩니다. 그다음 doOnComplete에 작성한 “doOnComplete 동작”이 출력되고 onComplete 신호가 발생하고 최종적으로 “데이터 통지 완료" 메시지가 콘솔에 출력됩니다. 그러므로 doOnComplete는 onComplete 신호 직전에 호출된다는 것을 확인할 수 있습니다. 


이번엔 에러가 통지될 경우 doOnComplete가 제대로 동작하는지 알아보기 위해 예제 1.54에서는 기존의 플럭스에 concatWith로 Flux.error(IllegalArgumentException())을 추가한 뒤 실행해보겠습니다.

예제 1.54 doOnComplete 예제2 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnComp2.kt

package sideeffect
 
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicBoolean
 
fun main() {
    val completed = AtomicBoolean(false)
    Flux.just("A", "B", "C")
        .concatWith(Flux.error(IllegalArgumentException()))
        .log()
        .doOnComplete {
            println("doOnComplete 동작")
            completed.set(true)
        }
        .subscribe()
 
    if (completed.get()) {
        println("데이터 통지 완료")
    } else {
        println("데이터 통지 실패")
    }
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(A)
[ INFO] (main) onNext(B)
[ INFO] (main) onNext(C)
데이터 통지 실패
[ERROR] (main) onError(java.lang.IllegalArgumentException)
[ERROR] (main)  - java.lang.IllegalArgumentException


출력 결과를 보면 doOnComplete와 onComplete가 호출되지 않고 “데이터 통지 실패"가 출력된 후 onError 신호가 발생하는 것을 확인할 수 있습니다. 또한 에러에 대한 별도 처리가 없으므로 java.lang.IllegalArgumentException이 발생하게 됩니다. onComplete는 에러 없이 정상적으로 데이터 통지가 완료될 경우에만 발생하는데 doOnComplete는 onComplete 직전에 동작하므로 에러 통지가 없어야 동작하는 것을 확인할 수 있습니다.

 

doOnError

doOnError는 에러가 통지되는 시점에  지정한 부수 효과를 적용할 수 있습니다. 에러가 통지되는 시점에는 onError 신호가 발생하는데 doOnError 연산자는 인자로 함수형 인터페이스인 Consumer를 제공받아서  onError 신호가 발생하기 직전에 실행합니다. 인자로 제공된 Consumer에는 최상위 에러 클래스인 Throwable가 전달됩니다. 그렇기 때문에 내부에서 어떤 에러가 발생했는지 확인하여 처리를 다르게 할 수도 있습니다.

예제 1.55는 에러를 통지하는 플럭스를 생성하고 에러가 발생할 경우 doOnError가 동작하는 예제입니다.

예제 1.55 doOnError 예제 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnError.kt

package sideeffect
 
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicBoolean
 
fun main() {
    val errorOccurred = AtomicBoolean(false)
    Flux.range(1,3)
        .concatWith(Flux.error(IllegalArgumentException()))
        .log()
        .doOnError { throwable: Throwable ->
            println("doOnError 동작")
            println("발생한 에러 : ${throwable.javaClass}")
            errorOccurred.set(true)
        }
        .subscribe()
 
    if (errorOccurred.get()) {
        println("에러 발생")
    }
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(1)
[ INFO] (main) onNext(2)
[ INFO] (main) onNext(3)
doOnError 동작
발생한 에러 : class java.lang.IllegalArgumentException
에러 발생
[ERROR] (main) onError(java.lang.IllegalArgumentException)


출력 결과를 보면 onNext로 데이터를 정상적으로 통지하다가 에러가 통지되면 doOnError가 먼저 발생한 후 최종적으로 onError가 발생합니다. doOnError 내부에서는 람다 함수로 제공되는 Consumer는 인자로 Throwable을 받기 때문에 에러의 종류와 어떤 메시지를 출력하는지 확인할 수 있습니다.

 

리액터 코틀린 지원

스프링 프로젝트는 코틀린에 대한 지원을 점점 늘리고 있는데 최신 스프링 프로젝트 공식 문서를 보면 자바 예제외에도 코틀린 예제를 쉽게 확인할 수 있습니다. 또한 코틀린이 제공하는 여러 라이브러리도 스프링 프로젝트에 기본으로 탑재되고 있는데 리액터 또한 리액터 코틀린 확장(reactor-kotlin-extensions)을 지원합니다 . 리액터 코틀린 확장 기능을 사용하면 코틀린의 우아한 문법을 사용해 리액터 코드를 자바를 사용했을 때보다 더 간결하게 작성이 가능합니다. 


먼저 리액터 코틀린 확장 기능을 사용하기 위해 예제 1.56과 같이 build.gradle.kts에 reactor-kotlin-extensions 모듈을 추가 합니다.

예제 1.56 프로젝트에 reactor-kotlin-extensions 의존성 추가 - reactor/reactor-basic/build.gradle.kts

plugins {
   id("org.jetbrains.kotlin.jvm") version ("1.4.10")
}
 
group = "com.reactor.reactorbasic"
version = "1.0-SNAPSHOT"
 
repositories {
    mavenCentral()
}
 
dependencies {
   implementation("org.jetbrains.kotlin:kotlin-stdlib")
   implementation("io.projectreactor:reactor-core:3.4.0")
 implementation("io.projectreactor.kotlin:reactor-kotlin-extensions:1.1.2")
}

 

toMono

toMono는 객체를 모노로 변환하기 위한 코틀린 확장 함수입니다. toMono 함수를 사용하면 Mono.justOrEmpty, Mono.fromCallable, Mono.fromFuture와 같은 팩토리 함수를 쓰지 않아도 객체를 모노로 쉽게 변환할 수 있습니다.

다음은 리액터 코틀린 확장 모듈의 모노 확장 함수 구현 코드입니다.

fun <T> Publisher<T>.toMono(): Mono<T> = Mono.from(this)
 
fun <T> (() -> T?).toMono(): Mono<T> = Mono.fromSupplier(this)
 
fun <T : Any> T?.toMono(): Mono<T> = Mono.justOrEmpty(this)
 
fun <T> Callable<T?>.toMono(): Mono<T> = Mono.fromCallable(this::call)


코드를 보면 toMono는 코틀린의 최상위 객체인 Any에 확장한 함수부터 Callable, CompletableFuture, 리액터의 Publisher를 지원하는 확장 함수가 구현돼있습니다.

예제 1.57은 앞서 살펴본 toMono를 사용해 다양한 객체를 모노로 변환하는 예제입니다. 

예제 1.57 toMono 예제 - reactor/reactor-basic/src/main/kotlin/extensions/toMono.kt

package extensions
 
import reactor.kotlin.core.publisher.toMono
import java.util.concurrent.Callable
import java.util.concurrent.CompletableFuture
 
fun main() {
    "String.toMono"
        .toMono()
        .subscribe(::println)
 
    listOf("Collection.toMono")
        .toMono()
        .subscribe(::println)
 
    Callable { "Callable.toMono" }
        .toMono()
        .subscribe(::println)
 
    CompletableFuture.supplyAsync { "supplyAsync.toMono" }
        .toMono()
        .subscribe(::println)
}


--------------------
출력 결과)
--------------------
String.toMono
[Collection.toMono]
Callable.toMono
CompletableFuture.supplyAsync.toMono


출력 결과를 보면 toMono로 변환한 객체들이 정상적으로 모노로 변환되어 데이터 통지가 이뤄진 것을 확인할 수 있습니다. toMono를 사용하면 다양한 객체를 팩토리 함수 없이 모노로 쉽게 변환할 수 있습니다. 

 

toFlux

toFlux는 객체를 플럭스로 변환하기 위한 코틀린 확장 함수입니다. toFlux는 toMono와 다르게 컬렉션이나 배열 같은 복수의 데이터를 저장하는 자료구조를 toFlux로 변환하는 것이 특징입니다. toFlux 함수를 사용하면 Flux.just, Flux.fromIterable, Flux.fromArray 같은 팩토리 함수를 쓰지 않아도 객체를 플럭스로 쉽게 변환할 수 있습니다. 

다음은 리액터 코틀린 확장 모듈의 플럭스 확장 함수 구현 코드입니다.

fun <T : Any> Publisher<T>.toFlux(): Flux<T> = Flux.from(this)
 
fun <T : Any> Iterator<T>.toFlux(): Flux<T> = toIterable().toFlux()
 
fun <T : Any> Iterable<T>.toFlux(): Flux<T> = Flux.fromIterable(this)
 
fun <T : Any> Stream<T>.toFlux(): Flux<T> = Flux.fromStream(this)
 
fun BooleanArray.toFlux(): Flux<Boolean> = this.toList().toFlux()
 
fun ByteArray.toFlux(): Flux<Byte> = this.toList().toFlux()
 
fun ShortArray.toFlux(): Flux<Short> = this.toList().toFlux()
 
fun IntArray.toFlux(): Flux<Int> = this.toList().toFlux()
 
fun LongArray.toFlux(): Flux<Long> = this.toList().toFlux()
 
fun FloatArray.toFlux(): Flux<Float> = this.toList().toFlux()
 
fun DoubleArray.toFlux(): Flux<Double> = this.toList().toFlux()
 
fun <T> Array<out T>.toFlux(): Flux<T> = Flux.fromArray(this)


코드를 보면 다양한 배열과 컬렉션 구조에 대한 toFlux 확장 함수가 구현돼 있습니다.

예제 1.58은 toFlux를 사용해 자료구조를 플럭스로 변환하는 예제입니다. 

예제 1.58 toFlux 예제 - reactor/reactor-basic/src/main/kotlin/extensions/toFlux.kt

package extensions
 
import reactor.kotlin.core.publisher.toFlux
import reactor.kotlin.core.publisher.toMono
 
fun main() {
    listOf("A", "B", "C")
        .toFlux()
        .subscribe(::println)
 
    intArrayOf(1, 2, 3)
        .toFlux()
        .subscribe(::println)
 
    setOf("D", "E", "F")
        .toFlux()
        .subscribe(::println)
 
    "MonoToFlux".toMono()
       .toFlux()
       .subscribe(::println)
}
 
--------------------
출력 결과)
--------------------
A
B
C
1
2
3
D
E
F
MonoToFlux

출력 결과를 보면 컬렉션과 배열을 플럭스로 변환해서 데이터 통지가 정상적으로 이뤄진 것을 확인할 수 있습니다. toFlux의 한가지 특징은 컬렉션이나 배열의 경우 플럭스로 바로 변환이 가능하지만 “MonoToFlux” 문자열과 같은 일반 객체의 경우 toMono로 우선 변환한 뒤 toFlux 함수를 사용해 플럭스로 변환해야 합니다. toFlux의 경우 자료구조 외에는 발행자인 Publisher에만 확장할 수 있도록 구현돼 있기 때문입니다.

 

728x90
반응형
728x90
반응형


플럭스의 연산자

플럭스는 0..N개로 이뤄진 복수개의 요소를 통지하는 발행자입니다. 모노는 1개의 요소를 통지하지만 플럭스는 제한 없는 무한대의 요소를 통지합니다. 플럭스의 연산자는 모노의 연산자와 이름이 같고 역할이 같은 연산자가 많습니다. 대표적으로 앞선 장에서 소개한 map, flatMap, switchIfEmpty 등의 연산자는 플럭스를 사용할시에도 동일하게 사용할 수 있습니다.

just

Flux.just는 플럭스를 생성하는 기본 팩토리 함수입니다. Flux.just의 인자는 가변 인자를 지원하기 때문에 데이터의 개수 제한 없이 추가하여 플럭스를 생성할 수 있습니다. 예제 1.20는 두 개의 CellPhone 객체를 생성하고 Flux.just의 인자로 전달하고 다른 작업 없이 출력합니다.

package flux

import reactor.core.publisher.Flux

data class Cellphone(
    val name: String,
    val price: Int,
    val currency: Currency,
)

enum class Currency {
    KRW, USD
}

fun main() {
    val iphone =
        Cellphone(name = "Iphone", price = 100, currency = Currency.KRW)
    val galaxy =
        Cellphone(name = "Galaxy", price = 90, currency = Currency.KRW)

    val flux: Flux<Cellphone> =
        Flux.just(iphone, galaxy)

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Cellphone(name=Iphone, price=100, currency=KRW)
Cellphone(name=Galaxy, price=90, currency=KRW)

출력 결과와 같이 Flux.just에 들어가는 인자의 순서대로 차례로 데이터가 통지됩니다. just 함수에 만약 null인 객체가 전달되면 예제 1.21과 같이 NullPointerException이 발생하기 때문에 주의해야합니다.

예제 1.21 Flux.just에 null이 들어간 경우 - reactor/reactor-basic/src/flux/Flux1.kt

package flux

import reactor.core.publisher.Flux

data class Cellphone(
    val name: String,
    val price: Int,
    val currency: Currency,
)

enum class Currency {
    KRW, USD
}

fun main() {
    val iphone =
        Cellphone(name = "Iphone", price = 100, currency = Currency.KRW)
    val galaxy =
        Cellphone(name = "Galaxy", price = 90, currency = Currency.KRW)

    val flux: Flux<Cellphone> =
        Flux.just(iphone, galaxy, null)

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
java.lang.NullPointerException: The 2th array element was null

fromArray, fromIterable

Flux.fromArray는 함수는 인자에 배열을 받아 플럭스를 생성하는 팩토리 함수입니다. 예제 1.22는 배열에 객체를 담고 Flux.fromArray를 사용해 플럭스를 생성한 뒤 순서대로 출력합니다.

예제 1.22 Flux.fromArray - reactor/reactor-basic/src/flux/Flux2.kt

package flux

import reactor.core.publisher.Flux

data class Hero(
   val nickname: String,
   val name: String,
)

fun main() {
   val avengers = arrayOf(
       Hero("Black Widow", "Natasha Romanoff"),
       Hero("Iron Man", "Tony Stark"),
       Hero("Captain America", "Steve Rogers"),
       Hero("Thor", "Thor Odinson"),
       Hero("Hulk", "Bruce Banner"),
       Hero("Hawkeye", "Clint Barton"),
       Hero("Black Panther", "T'Challa")
   )

   val flux: Flux<Hero> =
       Flux.fromArray(avengers)

   flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Hero(nickname=Iron Man, name=Tony Stark)
Hero(nickname=Black Widow, name=Natasha Romanoff)
Hero(nickname=Captain America, name=Steve Rogers)
Hero(nickname=Thor, name=Thor Odinson)
Hero(nickname=Hulk, name=Bruce Banner)
Hero(nickname=Hawkeye, name=Clint Barton)
Hero(nickname=Black Panther, name=T'Challa)

예제 1.22의 출력 결과를 보면 예상한대로 배열에 지정한 순서대로 출력되는 것을 알 수 있습니다.

배열이 아닌 컬렉션을 이용해 플럭스를 생성할 경우 Flux.fromIterable 함수를 사용할 수 있습니다. fromArray와 마찬가지로 리스트에 지정한 순서대로 데이터를 통지합니다. 예제 1.23은 Flux.fromIterable을 사용해 플럭스를 생성하고 순서대로 출력합니다.

예제 1.23 Flux.fromIterable - reactor/reactor-basic/src/flux/Flux3.kt

package flux

import reactor.core.publisher.Flux

fun main() {
   val avengers = listOf(
       Hero("Iron Man", "Tony Stark"),
       Hero("Black Widow", "Natasha Romanoff"),
       Hero("Captain America", "Steve Rogers"),
       Hero("Thor", "Thor Odinson"),
       Hero("Hulk", "Bruce Banner"),
       Hero("Hawkeye", "Clint Barton"),
       Hero("Black Panther", "T'Challa"),
   )

   val flux: Flux<Hero> =
       Flux.fromIterable(avengers)

   flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Hero(nickname=Iron Man, name=Tony Stark)
Hero(nickname=Black Widow, name=Natasha Romanoff)
Hero(nickname=Captain America, name=Steve Rogers)
Hero(nickname=Thor, name=Thor Odinson)
Hero(nickname=Hulk, name=Bruce Banner)
Hero(nickname=Hawkeye, name=Clint Barton)
Hero(nickname=Black Panther, name=T'Challa)

출력 결과에서 확인할 수 있듯이 fromIterable도 리스트에 지정한 순서 즉 인덱스의 순서대로 데이터를 통지하는 것을 확인할 수 있습니다.

range

Flux.range 함수는 첫 번째 인자인 시작 값부터 두 번째 인자인 카운트 값까지 값을 하나씩 증가하는 데이터를 통지하는 팩토리 함수입니다. 예제 1.24는 Flux.range를 사용해 1부터 100까지 더한 값을 출력합니다.

예제 1.24 Flux.range로 sum한 값 구하기 - reactor/reactor-basic/src/flux/Flux4.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val sum = Flux.range(1, 100)
    .reduce { t, u ->
        t + u
    }
    sum.subscribe(::println)
}

--------------------
출력 결과)
--------------------
5050

출력 결과를 보면 1부터 100까지 더한 값인 5050이 출력되었습니다. 즉 1부터 100까지 순서대로 데이터를 통지하고 reduce 함수를 사용해 통지받은 데이터를 더해졌다는 것을 알 수 있습니다.

다음 예제 1.25는 take를 사용해 인자로 지정한 10개 데이터를 통지하고 buffer는 통지받은 인자로 지정한 값만큼 데이터를 버퍼에 담아서 통지합니다.

예제 1.25 take와 buffer를 사용한 예제 - reactor/reactor-basic/src/flux/Flux5.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val buffer: Flux<List<Int>> =
        Flux.range(1, 100)
            .take(10)
            .buffer(4)

    buffer.subscribe(::println)
}

--------------------
출력 결과)
--------------------
[1, 2, 3, 4]
[5, 6, 7, 8]
[9, 10]

출력 결과를 보면 버퍼에 할당된 4건씩 데이터가 나뉘어 통지된 것을 확인할 수 있습니다.

interval

Flux.Interval 함수를 사용하면 인자로 지정해준 주기대로 신호를 발생 시킵니다. 발생된 신호는 Long 타입으로 ‘0, 1, 2,3’ 순서대로 전달 받을 수 있습니다. 예제 1.26은 0.5초 간격으로 랜덤한 식별자(id)를 가지는 쿠폰을 생성하는 예제 입니다.

예제 1.26 Flux.interval로 쿠폰 생성 예제 - reactor/reactor-basic/src/flux/Flux6.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration
import java.util.UUID

data class Coupon(
    val id: String,
    val description: String,
)

fun main() {
    val flux: Flux<Coupon> =
        Flux.interval(Duration.ofMillis(500))
            .map { count ->
                val id = UUID.randomUUID().toString()
                Coupon(id = id, description = "${count.plus(1)}번째 쿠폰")
            }
            .take(15)

    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(10).toMillis())
}


--------------------
출력 결과)
--------------------
Coupon(id=b755285e-476e-405a-8a9f-e0b49b3520a6, description=1번째 쿠폰)
Coupon(id=9c4eea7a-1f12-4123-9fae-678e6161f7de, description=2번째 쿠폰)
Coupon(id=6478a919-ae2b-47c9-b238-d41fe5d73010, description=3번째 쿠폰)
Coupon(id=faf7b65f-299e-4467-9308-964b901e978a, description=4번째 쿠폰)
Coupon(id=77db0f58-27c8-4abc-a47f-8966259cea78, description=5번째 쿠폰)
Coupon(id=ccfab126-a5b1-4e9e-bbf3-c55f16a0e475, description=6번째 쿠폰)
Coupon(id=c59c6f88-cf6f-4d05-80f9-261851e49359, description=7번째 쿠폰)
Coupon(id=a2ec4a36-c9f8-4087-ac93-6b963954f029, description=8번째 쿠폰)
Coupon(id=8663e6ee-293c-492f-98b7-8246e19aeb3e, description=9번째 쿠폰)
Coupon(id=c9cf3146-d1ad-4c2d-85f6-c66f3635d53e, description=10번째 쿠폰)
Coupon(id=1c66e15e-0794-4bc5-bc33-d0088e0384ca, description=11번째 쿠폰)
Coupon(id=c2e2c8c3-d5b2-47c9-be99-06aa3a679018, description=12번째 쿠폰)
Coupon(id=cf53e12c-8a54-4875-b6e3-11f5494c57c5, description=13번째 쿠폰)
Coupon(id=c37c3981-e34b-4147-8d8a-66eb3bc626ed, description=14번째 쿠폰)
Coupon(id=32942297-f457-4778-b559-c0f45023bbd6, description=15번째 쿠폰)

먼저 Flux.interval(Duration.ofMillis(500))는 0.5초 간격으로 신호를 발생시키고 순번을 데이터로 통지합니다. map 연산자 내부에선 UUID.randomUUID().toString()을 사용해 유니크한 식별자를 생성하고 쿠폰 객체의 id 프로퍼티에 할당합니다. description에는 몇 번째로 생성된 쿠폰인지를 확인하기 위해 count라는 람다 변수를 선언했습니다. 이때 count의 값은 최초에 0부터 시작하기 때문에 count.plus(1)을 사용해 기존 값에 1을 더해줍니다. 그 다음은 take로 15개의 데이터만 통지하도록 조절한 후 출력합니다.

이때 마지막 라인에서 예시와 같이 스레드를 일시정지하는 코드가 있어야 정상적으로 출력됩니다.

 Thread.sleep(Duration.ofSeconds(10).toMillis())

스레드를 일시정지하는 이유는 Flux.interval은 내부적으로 별도의 스레드에서 동작하도록 만들어져있는데 코드를 실행하는 스레드는 메인 함수의 스레드이므로 일시정지하지 않으면 subscribe에서 출력하기 전에 메인 스레드가 종료되기 때문입니다.

예제 1.27에서 Thread.sleep을 주석 처리하여 어떤 결과가 출력되는지 확인해 보겠습니다.

package flux

import reactor.core.publisher.Flux
import java.time.Duration
import java.util.UUID

data class Coupon(
    val id: String,
    val description: String,
)

fun main() {
    val flux: Flux<Coupon> =
        Flux.interval(Duration.ofMillis(500))
            .map { count ->
                val id = UUID.randomUUID().toString()
                Coupon(id = id, description = "${count.plus(1)}번째 쿠폰")
            }
            .take(15)

    flux.subscribe(::println)

    //Thread.sleep(Duration.ofSeconds(10).toMillis())
}


--------------------
출력 결과)
--------------------

출력 결과를 보면 Flux.interval에서 사용하는 스레드가 완료되기 전에 메인 스레드가 종료되어 아무것도 출력되지 않은 것을 확인할 수 있습니다.

single, singleOrEmpty

single 함수를 사용하면 최초에 전달받은 1개의 데이터만 통지받도록 제한할 수 있습니다. 그러므로 single을 연산자로 적용한 경우 통지받은 데이터가 정확히 1개 임을 보장해야 합니다. 데이터가 없거나 1개 이상인 경우는 예외가 발생합니다.

예제 1.29에서는 데이터가 1개가 아닌 경우 어떤 예외가 발생하는지 확인하기 위해 Flux.just로 6개의 데이터를 인자로 받고 take(2)로 데이터를 2번 통지하여 예외를 발생시킵니다.

예제 1.29 single 예제 - reactor/reactor-basic/src/flux/Flux7.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val flux = Flux.just(1, 2, 3, 4, 5, 6)
        .take(2)
        .single()

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
java.lang.IndexOutOfBoundsException: Source emitted more than one item

출력 결과를 보면 IndexOutOfBoundsException가 발생한 것을 확인할 수 있습니다. 예제 1.30는 take(1)을 사용해 하나의 데이터만 통지하는 예제입니다.

예제 1.30 single 예제2 - reactor/reactor-basic/src/flux/Flux8.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val flux = Flux.just("one", "two", "three")
        .take(1)
        .single()

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
one

출력 결과를 확인해보면 정상적으로 Flux.just의 인자로 처음 전달받은 문자열 ‘one’이 출력된 것을 확인할 수 있습니다.

만약 Flux로 통지받은 데이터가 비어있을(empty) 가능성이 있는 경우 singleOrEmpty를 사용할 수 있습니다. singleOrEmpty 함수는 1개의 데이터 또는 빈 값을 허용합니다. 예제 1.31은 Flux.empty를 사용해 빈 값을 통지하고 singleOrEmpty를 사용해 통지합니다.

예제 1.31 singleOrEmpty 예제 - reactor/reactor-basic/src/flux/Flux9.kt

package flux

import reactor.core.publisher.Flux

fun main() {
   val flux = Flux.empty<Any>()
       .singleOrEmpty()

   flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------

Flux.empty로 플럭스를 생성하고 singleOrEmpty를 사용했기 때문에 빈 결과가 출력되었습니다. singleOrEmpty 함수도 1개 또는 빈 값이 아닌 경우는 동일하게 IndexOutOfBoundsException가 발생합니다.

예제 1.32는 Flux.just에 첫 번째 인자는 ‘1’ 두 번째 인자는 Flux.empty()를 지정한 후 singleOrEmpty 함수를 사용하여 예외를 발생시킵니다.

예제 1.32 singleOrEmpty 예제2 - reactor/reactor-basic/src/flux/Flux9.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val flux = Flux.just(1, Flux.empty<Int>())
        .singleOrEmpty()

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
java.lang.IndexOutOfBoundsException: Source emitted more than one item

merge, concat, mergeSequential

Flux.merge는 플럭스를 제공받은 순서대로 결합하는 팩토리 함수입니다. zip 함수와의 차이점은 merge 함수는 튜플 객체로 감싸지지 않습니다. 예제 1.33은 Flux.merge를 사용해 인자로 지정된 플럭스를 순서대로 통지하는 예제입니다.

예제 1.32 Flux.merge 예제 - reactor/reactor-basic/src/flux/Flux10.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val three = Flux.just("three")
    val two = Flux.just("two")
    val one = Flux.just("one")

    val flux: Flux<String> = Flux.merge(three, two, one)
    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
three
two
one

출력 결과를 확인해보면 인자로 지정한 순서대로 결과가 출력되었음을 확인할 수 있습니다. 예제 1.33은 예제 1.32를 조금 수정하여 Flux.merge 함수에 인자로 전달되는 플럭스에 delayElements 함수를 사용하여 정해진 지연 시간 이후에 각각 병렬로 처리되는데 이 경우에도 순서가 유지되는지 알아보겠습니다.

예제 1.33 Flux.merge 예제2 - reactor/reactor-basic/src/flux/Flux11.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration

fun main() {
    val one = Flux.just("one").delayElements(Duration.ofMillis(300))
    val two = Flux.just("two").delayElements(Duration.ofMillis(400))
    val three = Flux.just("three").delayElements(Duration.ofMillis(500))

    val flux: Flux<String> = Flux.merge(three, two, one)
    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(2).toMillis())
}

--------------------
출력 결과)
--------------------
one
two
three

출력 결과를 확인해보면 인자로 지정된 순서대로 통지되지 않고 지연 시간에 따라서 순서가 정해진 것을 확인할 수 있습니다. 만약 병렬 처리 시에도 인자로 지정된 순서대로 통지해야 하는 경우 예제 1.34와 같이 concat 함수를 사용해 순서를 보장할 수 있습니다.

예제 1.34 Flux.concat 예제 - reactor/reactor-basic/src/flux/Flux12.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration

fun main() {
    val one = Flux.just("one").delayElements(Duration.ofMillis(300))
    val two = Flux.just("two").delayElements(Duration.ofMillis(400))
    val three = Flux.just("three").delayElements(Duration.ofMillis(500))

    val flux: Flux<String> = Flux.concat(three, two, one)
    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(2).toMillis())
}

--------------------
출력 결과)
--------------------
three
two
one

출력 결과를 확인해보면 플럭스가 concat 함수에 병렬로 전달되었지만 인자로 지정된 순서대로 통지된 것을 확인할 수 있습니다. 순서가 보장되는 이유는 병렬 처리 시 인자로 지정된 플럭스를 결합한 다음 플럭스를 순서대로 구독하여 데이터를 통지하고 이전 플럭스에 대한 통지가 완료되면 다음 플럭스가 구독 처리될 때까지 기다리는 순차처리 방식의 연산자이기 때문입니다.

예제 1.35에서 사용한 mergeSequential도 concat과 같이 순서가 보장됩니다.

예제 1.35 Flux.concat 예제 - reactor/reactor-basic/src/flux/Flux12.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration

fun main() {
    val one = Flux.just("one").delayElements(Duration.ofMillis(300))
    val two = Flux.just("two").delayElements(Duration.ofMillis(400))
    val three = Flux.just("three").delayElements(Duration.ofMillis(500))

    val flux: Flux<String> = Flux.mergeSequential(three, two, one)
    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(2).toMillis())
}

--------------------
출력 결과)
--------------------
three
two
one

concat 과의 차이점이라면 concat은 여러 개의 플럭스를 인자로 전달받아 하나로 결합한 후에 순서대로 구독 처리하는 연산자라면 mergeSequential은 여러 개의 플럭스를 병렬로 구독 처리하고 최종 변환 시 인자로 지정한 구독 순서에 따라서 새로운 플럭스로 결합한다는 차이점이 있습니다.

728x90
반응형

+ Recent posts