728x90
반응형

1. 비동기-논블로킹 프로그래밍

전통적 웹 방식 (스프링 MVC)

  • 전통적 웹 방식은 1개 요청당 1개의 스레드를 사용하는 Thread per Request 모델
  • 요청 당 스레드를 사용하는 방식은 DB, Network IO 등이 발생할 경우 결과를 받기전까지 해당 스레드는 블로킹 상태가 됨


  • 요청을 처리하기 위해 스레드를 생성하고 전환하는데 따른 Context Switching 비용이 발생
  • 요청에 따라 무한정 스레드를 생성할 수 없기 때문에 스레드풀을 사용해 스레드를 재사용(톰캣 기본 200개)

 

 

비동기-논블로킹 방식

  • IO(DB, File, Network) 처리 시 스레드가 대기하지 않고 처리가 완료되면 다양한 방식으로 이벤트를 통지받는 방식 (콜백, 프로미스, async-await 등)
  • 더 적은 스레드로 같은 일을 할 수 있기 때문에 Context Switching에 따른 비용이 훨씬 적다. 즉 하드웨어 자원을 더 적게 사용한다
  • 순차적으로 동작하지 않으므로 코드의 흐름을 파악하기 어렵고 콜백헬(Callback Hell)로 인해 가독성과 유지보수성이 현저히 떨어짐


 

C10K 문제

  • 컴퓨터 과학에서 C10K 문제란 1만명의 사용자가 동시에 접속했을때(1만개의 소켓이 열림) 하드웨어가 충분함에도 기존의 I/O 통지 방식(e.g. select)으로는 제대로 처리하지 못하는 것을 말함
  • Event-Driven 모델을 사용해 해결한다.
    • 커널 레벨에선 OS에서 제공하는 epoll, kqueue와 같은 통지 모델을 사용해 해결할 수 있다.
    • 프로그래머는 로우레벨을 직접 다루지 않고 Netty, Node.js, Nginx, Vert.x 등을 사용한다.


 

2. 리액티브 프로그래밍

  • 비동기-논블로킹을 처리하는 새로운 패러다임
  • 2010년 에릭마이어에 의해 마이크로소프트 .NET 에코 시스템으로 정의됨
  • 리액티브 프로그래밍은 기본적으로 옵저버 패턴 처럼 데이터 제공자가 소비하는 측에 데이터를 통지하는 푸시 기반(Push-Based)
  • 구현체에 따라서 풀과 푸시를 모두 제공하는 경우도 있음 (Pull-Push Hybrid)
  • 데이터의 통지, 완료, 에러 처리를 옵저버 패턴에 영감을 받아 설계되었고 데이터의 손쉬운 비동기 처리를 위해 함수형 언어의 접근 방식을 사용

 

옵저버 패턴

  • 관찰 대상이 되는 객체가 변경되면 대상 객체를 관찰하고 있는 옵저버(Observer)에게 변경사항을 통지하는 디자인 패턴

직접 구현한 옵저버 패턴의 예시

import java.util.*

class Coffee(val name: String)


// 데이터 제공자
class Barista : Observable() {

    private fun makeCoffee(name: String) = Coffee(name)

    fun serve(name: String) {
         setChanged()
         notifyObservers(makeCoffee(name))
    }
}

// 데이터 구독자
class Customer : Observer {

    fun orderCoffee() = "Iced Americano"

    override fun update(o: Observable?, arg: Any?) {
         val coffee = arg as Coffee
         println("I got a cup of ${coffee.name}")
    }
}


fun main(args: Array<String>) {
    val barista = Barista()
    val customer = Customer()

    barista.addObserver(customer)
    barista.serve(customer.orderCoffee())
}

--------------------
출력 결과)
--------------------
I got a cup of Iced Americano

 

리액티브 스트림

  • JVM 환경에서 리액티브 프로그래밍의 표준 API 사양으로 비동기 데이터 스트림논블로킹-백프레셔(Back-Pressure)에 대한 사양을 제공
  • 리액티브 스트림 이전의 비동기식 애플리케이션에서는 멀티 코어를 제대로 활용하기 위해 복잡한 병렬 처리 코드가 필요
  • 처리할 데이터가 무한정 많아져서 시스템의 한계를 넘어서는 경우 애플리케이션은 병목 현상(bottleneck)이 발생하거나 심각한 경우 애플리케이션이 정지되는 경우도 발생할 수 있음(논블로킹-백프레셔로 해결)
  • Netflix, Vmware(Pivotal), Lightbend, Red Hat과 같은 유명 회사들이 표준화에 참여 중
  • 리액티브 스트림 인터페이스

  • Publisher : 데이터를 생성하고 구독자에게 통지
  • Subscriber : 데이터를 구독하고 통지 받은 데이터를 처리
  • Subscription : Publisher, Subscriber간의 데이터를 교환하도록 연결하는 역할을 하며 전달받을 데이터의 개수와 구독을 해지할 수 있다
  • Processor : Publisher, Subscriber을 모두 상속받은 인터페이스
  • 리액티브 스트림에서 Publisher와 Subscriber 간의 데이터 처리 흐름

리액티브 스트림을 표준 사양을 채택한 대표적인 구현체들

  • Project Reactor
  • RxJava
  • JDK9 Flow
  • Akka Streams
  • Vert.x
  • Hibernate Reactive(Vert.x 사용)

 

3. 프로젝트 리액터

  • 프로젝트 리액터(Project Reactor)는 리액티브 스트림의 구현체 중 하나로 스프링의 에코시스템 범주에 포함된 프레임워크이다
  • 리액티브 스트림 사양을 구현하고 있으므로 리액티브 스트림에서 사용하는 용어와 규칙을 그대로 사용한다
  • 리액터를 사용하면 애플리케이션에 리액티브 프로그래밍을 적용할 수 있고 비동기-논블로킹을 적용할 수 있다
  • 함수형 프로그래밍의 접근 방식을 사용해서 비동기-논블로킹 코드의 난해함을 해결한다
  • 백프레셔(Backpressure) 를 사용해 시스템의 부하를 효율적으로 조절할 수 있다
  • 리액터는 리액티브 스트림의 Publisher 인터페이스를 구현하는 모노(Mono)플럭스(Flux)라는 두 가지 핵심 타입을 제공한다
  • 두 타입 모두 리액티브 스트림 데이터 처리 프로토콜대로 onComplete 또는 onError 시그널이 발생할 때 까지 onNext를 사용해 구독자에게 데이터를 통지한다

모노

  • 0..1개의 단일 요소 스트림을 통지하는 발행자(Publisher)

플럭스

  • 0..N개로 이뤄진 무한대 요소를 통지하는 발행자

  • 연산자를 사용하기 위해서는 터미널 오퍼레이터인 구독(subcribe)이 필수이다. 구독하지 않으면 연산자는 실행되지 않는다. (이 개념은 Java8 스트림과 유사하며 WebFlux에선 컨트롤러에서 Mono,Flux 타입 반환시 자동으로 해줌)
  • CompletableFuture, DefferedResult, ListenableFuture 등 기존의 비동기 처리 방식은 구독 형태가 아니며, 단순한 기능만 가지고 있다 (async, join, callback 등)

 

4. 스프링 웹플럭스

  • 기본적으로 Project Reactor 기반 (RxJava와 같은 다른 구현체도 사용 가능함)
  • 서블릿 기반인 스프링 MVC와 대비되는 리액티브 기반의 웹 스택
  • 스프링 MVC와 베타적으로 동작하지만 부분적으로 상호 운용이 가능함 예) WebClient, 애노테이션 등

  • 내부적으로 블로킹 API가 있으면 성능이 현저하게 떨어짐
  • 예시로 Spring Data JPA는 내부에서 JDBC API를 사용하는데 JDBC 드라이버는 블로킹 API (Thread per Connection)이므로 JPA를 쓰는 경우 Spring MVC 쓰는게 더 나은 옵션이다
  • 꼭 블로킹 API 써야한다면 아래와 같이 별도의 스케쥴러로 동작시켜야한다
val blockingWrapper = Mono.fromCallable {

        /* make a remote synchronous call */ 

}.subscribeOn(Schedulers.boundedElastic())

  • 그 외에도 깊게 공부하면 연산자 융합(operator-fusion), Hot-Cold 퍼블리셔 등등 공부할게 정말 많다.

 

스프링 웹플럭스의 코루틴 지원

  • 리액터의 연산자 스타일에 비해 직관적인 스타일(진리의 사바사)
  • 리액터에 비해 러닝커브가 적은 편
  • 공식 레퍼런스 문서의 코틀린 예제들을 보면 코루틴 기반으로 소개하고 있다


코루틴이란?

  • 코루틴(Coroutine)은 코틀린에서 비동기-논블로킹 프로그래밍을 명령형 스타일로 작성할 수 있도록 도와주는 라이브러리이다
  • 코루틴은 멀티 플랫폼을 지원하여 코틀린을 사용하는 안드로이드, 서버 등 여러 환경에서 사용할 수 있다
  • 코루틴은 일시 중단 가능한 함수(suspend function) 를 통해 스레드가 실행을 잠시 중단했다가 중단한 지점부터 다시 재개(resume) 할 수 있다
  • 다른 언어와는 다르게 코틀린의 코루틴은 특별하다

 

코루틴을 사용한 구조적 동시성 예시

suspend fun combineApi() = coroutineScope {
	val response1 = async { getApi1() }
	val response2 = async { getApi2() }
	
	return ApiResult (
		response1.await()
		response2.await()
	)
}

 

 

리액티브가 코루틴으로 변환되는 방식

//Mono → suspend 
fun handler(): Mono<Void> -> suspend fun handler()

//Flux → Flow
fun handler(): Flux<T> -> fun handler(): Flow<T>
  • 모노는 서스펜드 함수로 플럭스는 플로우로 변환되는 걸 알 수 있다. 반대도 성립된다.

 

코루틴을 적용한 컨트롤러 코드

@RestController
class UserController(
	private val userService : UserService,
	private val userDetailService: UserDetailService
) {
	
	@GetMapping("/{id}")
	suspend fun get(@PathVariable id: Long) : User {
		return userService.getById(id)
	}
	
	@GetMapping("/users")
	suspend fun gets() = withContext(Dispatchers.IO) {
		val usersDeffered = async { userService.gets() }
		val userDetailsDeffered = async { userDetailService.gets() }
				
		return UserList(usersDeffered.await(), userDetailsDeffered.await())	
	}

}

 

  • Spring Data R2DBC는 아래와 같이 CoroutineCrudRepository를 지원한다

기존 리액티브 스택의 Spring Data R2DBC, Spring Data Mongo Reactive 등에서 ReactiveRepository 로 개발된 코드에서 코루틴으로 변환한 예제

interface ContentReactiveRepository : ReactiveCrudRepository<Content, Long> {

    fun findByUserId(userId: Long) : Mono<Content>

    fun findAllByUserId(userId: Long): Flux<Content>
}


class ContentService (
	val repository : ContentReactiveRepository
) {

	
	fun findByUserIdMono(userId: Long) : Mono<Content> {
		return repository.findByUserId(userId)
	}	

	suspend findByUserId (userId: Long) : Content {
		return repository.findByUserId(userId).awaitSingle()
	}

}

더 좋은 방법 : CoroutineCrudRepository 를 사용하면 awaitXXX 확장 함수 없이 사용 가능

interface ContentCouroutineRepository : CoroutineCrudRepository<Content, Long> {

    suspend fun findByUserId(userId:Long) : Content?

    fun findAllByUserId(userId: Long): Flow<Content>
}


class ContentService (
	val repository : ContentCouroutineRepository
) {


	suspend findByUserId (userId: Long) : Content {
		return repository.findByUserId(userId)
	}

}

 

 

 

Flow

  • Flow 는 코루틴에서 리액티브 프로그래밍 스타일로 작성할 수 있도록 만들어진 API이다
  • 코루틴의 suspend 함수는 단일 값을 비동기로 반환하지만 Flow를 사용하면 무한대 값을 반환할 수 있다
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    val flow = simple()
    flow.collect { value -> println(value) }
}

fun simple(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

// Flow started
// 1
// 2
// 3

 

  • 리액티브 스트림과 같이 Terminal Operator(최종 연산자)collect 를 호출하지 않으면 아무런 일도 일어나지 않는다
  • 코루틴의 플로우는 리액티브 스트림에서 영감을 받아 제작
    • 리액터의 플럭스(Flux)는 Pull-Push Hybrid
    • 코틀린의 플로우(Flow)는 Push Only
    • 아직은 리액터 플럭스에 비해 기능이 적다

 

스프링 MVC에서도 리액티브와 코루틴을 쓸수 있다

  • 코루틴, 리액티브를 쓰고 싶은 경우 의존성 추가
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
  • WebClient와 같은 비동기-논블로킹 라이브러리가 필요하다면 의존성 추가
implementation("org.springframework.boot:spring-boot-starter-webflux")
  • 웹플럭스를 바로 도입하기 어렵거나 MVC에서 비동기-논블로킹 구현을 우아하게 작성하고 싶은 경우 이 방법을 추천

 

 

DEMO

예제 코드

https://github.com/digimon1740/backend-techtalk

부하 테스트 시나리오

  1. Mock 데이터를 응답하는 유저 API 서버가 존재함
  2. 유저 API 서버는 API 서버로 부터 요청을 받으면 응답하는데까지 2초가 소요된다
  3. 유저 API 서버가 최초 실행되면 균형있는 테스트를 위해 warm-up을 수행한다
  4. 10초 동안 MVC, WebFlux 서버에 아래 조건으로 요청을 보내고 Vegeta 리포트와, VisualVM 모니터를 확인한다
  5. 100/s 요청 전송
  6. 200/s 요청 전송
  7. 300/s 요청 전송
  8. 1000/s 요청 전송

 

# MVC test
echo 'GET http://localhost:8080/mvc/users/1?delay=2000' | vegeta attack -duration=10s -rate=300/s | vegeta report

 

 

# WebFlux test
echo 'GET http://localhost:8081/webflux/users/1?delay=2000' | vegeta attack -duration=10s -rate=300/s | vegeta report

 

6. 정리

  • 전통적 블로킹 방식에 비해 비동기-논블로킹 모델은 더 적은 리소스로 같거나 더 많은 일을 할 수 있다
  • 필연적으로 비동기-논블로킹 모델은 코드의 가독성이 떨어지고 디버깅이 힘들 수 있다
  • 리액티브 프로그래밍은 비동기-논블로킹 개발을 더 쉽게 해주는 패러다임이다
  • 스프링 MVC는 느리고 구리다? 스프링 웹플럭스는 빠르고 멋지다?
  • 일반적으론 스프링 MVC도 충분하지만 한정된 자원에서 대량의 트래픽이 요구되는 서비스거나 비동기-논블로킹 IO가 필요한 경우 추천
  • 기존 스프링 MVC 프로젝트에 스프링 웹플럭스 의존성을 추가해서 써보는 것도 좋은 방법이다
  • 블로킹 API(JDBC, RestTemplate 등)을 쓸거면 그냥 마음 편히 스프링 MVC로 개발하자
  • 코틀린 + 웹플럭스 조합의 경우 메인은 코루틴을 사용하고 필요에 따라 리액티브를 같이 적용하는 것을 추천
  • 코루틴의 지원 범위를 넘어선 기능을 찾거나 연산자에 대한 학습이 충분하다면 리액티브는 여전히 매력적인 기술이다
  • 새로운 패러다임을 공부하는 것은 항상 어렵지만 도전했을때 얻는 것도 많다
  • 은탄환은 없다.

레퍼런스

https://devsh.tistory.com/category/Reactive Programming

https://tech.lezhin.com/2020/07/15/kotlin-webflux

https://github.com/digimon1740/webflux-demo

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/index.html

https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html

https://docs.spring.io/spring-framework/docs/current/reference/html/languages.html#coroutines

728x90
반응형
728x90
반응형


Hot vs Cold

리액터에는 두 개의 발행자 타입이 존재하는데 바로 Hot 발행자 Cold 발행자입니다. 먼저 Cold 발행자는 각각의 구독에 대해 새로운 데이터를 생성하는 구조입니다. 발행자에 대한 구독이 이뤄지지 않으면 데이터는 결코 새롭게 생성되지 않습니다. Cold 발행자에 대한 예를 들어보면 HTTP 요청이 Cold에 해당합니다. HTTP 요청은 클라이언트로부터 새로운 요청이 발생할 때마다 데이터를 생성하여 응답하는데 이때 여러 개의 클라이언트에서 요청이 발생하더라도 동일한 요청이라면 동일한 데이터 구조를 새로 응답할 것입니다. 또 다른 예로 넷플릭스에서 여러 사용자가 동일한 드라마 회차를 감상하는 경우 사용자들은 자신만의 타임라인을 가집니다. 즉 사용자별로 드라마를 처음 재생하면 처음부터 재생되고 재생을 시작한 시점부터 현재 보고 있는 시점이 각각 다른데 이러한 것들을 Cold 방식으로 볼 수 있습니다.

예제 1.36은 동일한 플럭스를 3개의 구독자가 구독하여도 동일한 데이터를 통지하는 Cold 발행자의 특징을 보여줍니다.

예제 1.36 Cold 발행자 예제 - reactor/reactor-basic/src/hotcold/Cold.kt

package hotcold
 
import reactor.core.publisher.Flux
 
fun main() {
    val list = listOf("one", "two", "three")
    val flux: Flux<String> = Flux.fromIterable(list)
 
    flux.subscribe { println("Subscriber-1 : $it") }
    flux.subscribe { println("Subscriber-2 : $it") }
    flux.subscribe { println("Subscriber-3 : $it") }
}
 
--------------------
출력 결과)
--------------------
Subscriber-1 : one
Subscriber-1 : two
Subscriber-1 : three
Subscriber-2 : one
Subscriber-2 : two
Subscriber-2 : three
Subscriber-3 : one
Subscriber-3 : two
Subscriber-3 : three


출력된 결과를 확인해보면 구독자가 각각 동일한 데이터를 통지받은 것을 확인할 수 있습니다. 즉 각각의 구독자가 새로운 구독을 만들었다는 것을 확인할 수 있습니다.

Hot 발행자는 구독자의 상태에 상관없이 데이터를 지속적으로 제공합니다. 예를 들어 주식 데이터를 제공하는 API의 경우 실시간으로 최신 데이터를 제공합니다. 국내 주식의 경우 정규장으로 오전 9시부터 시작됩니다. 만약 출근 후에 주식 정보를 보고 싶어서 네이버에 접속해 오전 10시에 주식 정보를 확인한다면 정규장이 시작된 오전 9시의 가격이 아닌 오전 10시의 주식가격을 확인하게 됩니다. 또 다른 예로 유투브의 실시간 스트리밍을 들 수 있습니다. 유저가 구독 중인 크리에이터의 실시간 스트리밍을 1시간이 지난 뒤 입장하면 당연히 처음부터 보는 게 아니라 실시간으로 크리에이터와 상호작용하게 됩니다. 이렇게 나중에 구독에 참가한 구독자는 처음부터 새로운 데이터를 얻는 게 아닌 새로이 구독한 시점부터 데이터를 제공받는 개념을 Hot 방식이라고 합니다.

예제 1.37은 Hot 발행자인 커넥터블 플럭스(ConnectableFlux)를 사용하여 각각의 구독자를 구독한 시점부터 데이터를 통지받도록 연결하여 출력하는 예제입니다.

예제 1.37 Hot 발행자 예제 - reactor/reactor-basic/src/hotcold/Hot.kt

package hotcold
 
import reactor.core.publisher.ConnectableFlux
import reactor.core.publisher.Flux
import java.time.Duration
 
 
fun main() {
    val flux: ConnectableFlux<Int> =
        Flux.range(1, 5)
            .delayElements(Duration.ofMillis(100))
            .publish()
 
    flux.connect()
 
    flux.subscribe { println("Subscriber-1 : $it") }
 
    Thread.sleep(Duration.ofMillis(200).toMillis())
    flux.subscribe { println("Subscriber-2 : $it") }
 
    Thread.sleep(Duration.ofMillis(200).toMillis())
    flux.subscribe { println("Subscriber-3 : $it") }
 
    Thread.sleep(Duration.ofSeconds(2).toMillis())
}
 
--------------------
출력 결과)
--------------------
Subscriber-1 : 1
Subscriber-1 : 2
Subscriber-2 : 2
Subscriber-1 : 3
Subscriber-2 : 3
Subscriber-1 : 4
Subscriber-2 : 4
Subscriber-3 : 4
Subscriber-1 : 5
Subscriber-2 : 5
Subscriber-3 : 5

 

  • range(1, 5)를 사용해 1부터 5까지의 플럭스를 생성하고 delayElements(Duration.ofMillis(100)) 함수를 사용해 각각의 데이터를 100밀리초 지연 후 publish로 데이터를 통지합니다.
  • 만약 flux.connect()가 없으면 발행자를 구독을 하더라도 아무 일도 일어나지 않습니다. 
  • 3개의 구독자인 flux.subscribe{}는 다음 구독자로 넘어가는 시점을 200밀리초 뒤로 미루고 현재 시점에서 통지 받은 데이터를 출력합니다.

출력 결과를 확인해보면 Subscriber-1은 1부터 5까지의 데이터를 모두 전달받았지만 Subscriber-2와 Subscriber-3은 지연된 만큼 데이터를 전달받지 못한 것을 확인할 수 있습니다.

앞서 커넥터블 플럭스를 사용해 Hot 발행자의 동작 방식에 대해 설명했습니다. 커넥터블 플럭스는 대표적인 Hot 발행자로 하나의 데이터를 생성하는 발행자에 여러 구독자가 동시에 구독하는 것을 허용합니다. 특징은 일반적인 Cold 방식의 발행자와 다른 점은 구독자가 구독을 위해 subscribe를 호출하더라도 데이터를 통지하지 않고 connect 연산자를 호출해야 비로소 데이터를 통지합니다. 

Cold 발행자인 플럭스를 커넥터블 플럭스로 변환하는 방법에는 2가지가 있습니다. 표 1.1은 커넥터블 플럭스로 변환하기 위한 연산자에 대해 설명합니다.

표 1.1 플럭스를 커넥터블 플럭스로 변환하는 연산자

함수 설명
publish 구독자가 연결된 시점 부터 새롭게 생성되는 데이터를 통지하는 커넥터블 플럭스를 제공하는 연산자입니다.
replay 인자로 지정한 만큼의 데이터를 캐시하고 있으면서 새롭게 구독자가 연결되어 구독을 시작하는 경우 시존에 캐시 된 데이터를 먼저 통지하고 캐시 된 데이터 데이터가 없다면 구독 이후로 생성된 데이터를 통지하는 연산자입니다.


생성된 커넥터블 플럭스는 connect와 같이 구독에 대한 추가적인 처리를 위한 방법도 제공합니다. 표 1.2는 커넥터블 플럭스가 제공하는 연산자에 대해 설명합니다. 

표 1.2 커넥터블 플럭스가 제공하는 연산자

함수 설명
connect 커넥터블 플럭스에 대한 구독을 활성화하기 위해 수동으로 호출하여 사용합니다. 
autoConnect 함수에 지정된 인자의 개수만큼 구독이 연결되면 자동으로 데이터를 통지합니다.
refCount 커넥터블 플럭스를 새로운 플럭스로 변환하고 함수에 지정된 인자의 구독자 개수만큼 연결된 구독자가 있을 경우에만 데이터를 처음부터 통지하고 새롭게 연결된 구독자에게는 구독 이후로 생성된 데이터를 통지합니다.

 

백프레셔

백프레셔(Back-Pressure)는 리액티브 스트림의 핵심 사양 중 하나입니다. 백프레셔는 데이터를 소비하는 측인 구독자가 자체적으로 처리 가능한 만큼의 데이터를 발행자에게 역으로 요청(request)합니다. 데이터를 소비하는 속도 즉 처리 속도보다 데이터를 생성하고 통지하는 속도가 빠르다면 구독자 입장에선 압박(pressure)이 될 수 있습니다. 백프레셔는 이런 상황에서 구독자가 처리 가능한 만큼 데이터를 통지할 수 있도록 조절해주는 기능입니다.

서비스를 예를 들어 동시 접속자가 최대 100명인 시스템에 1000명이 동시에 접속한다면 장애가 발생할 수 있습니다. 이런 경우 인프라를 증설하여 처리해야 하지만 예상치 못한 동시 접속자 증가는 대응하기가 쉽지 않습니다. 이럴 때 서버 앞단에서 요청에 대한 요청 제한(Rate Limiting)을 적용해 HTTP 상태 코드 429 Too Many Requests를 반환하여 처리하면 최악의 상황인 서비스 장애는 방지할 수 있습니다.

예제 1.38은 Flux.range를 사용하여 1부터 10까지 순차적으로 데이터를 통지하고 구독자(Subscriber)를 직접 구현하여 요청을 조절하는 간단한 백프레셔를 구현하는 예제입니다. 

예제 1.38 백프레셔 예제 - reactor/reactor-basic/src/backpressure/Backpressure.kt

package backpressure
 
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import reactor.core.publisher.Flux
 
fun main() {
 
    Flux.range(1, 10)
        .log()
        .subscribe(object : Subscriber<Int?> {
            private var s: Subscription? = null
            private var count = 0
 
            override fun onSubscribe(s: Subscription) {
                println("onSubscribe : 구독 시작")
                println("==================")
                this.s = s
                s.request(5)
            }
 
            override fun onNext(integer: Int?) {
                count++
                if (count % 5 == 0) {
                    println("==================")
                    s!!.request(5)
                }
            }
 
            override fun onError(t: Throwable) {}
            override fun onComplete() {
                 println("==================")
                 println("onComplete : 구독 완료")
                 println("통지된 numbers 개수 : $count")
            }
        })
}
 
--------------------
출력 결과)
--------------------
onSubscribe : 구독 시작
==================
[ INFO] (main) | request(5)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onNext(5)
==================
[ INFO] (main) | request(5)
[ INFO] (main) | onNext(6)
[ INFO] (main) | onNext(7)
[ INFO] (main) | onNext(8)
[ INFO] (main) | onNext(9)
[ INFO] (main) | onNext(10)
==================
[ INFO] (main) | request(5)
[ INFO] (main) | onComplete()
==================
onComplete : 구독 완료
최종 통지된 numbers 개수 : 10


subscribe 함수의 인자에는 리액티브 스트림의 구독자 인터페이스인 Subscriber를 직접 구현하였습니다. 우선 구독이 시작되면서 최초에 한번 실행되는 onSubscribe 함수에서는 Subscription을 초기화하고 s.request(5)를 호출해서 발행자에게 request에 전달받은 만큼의 데이터를 통지하도록 요청합니다. 그다음 호출되는 onNext는 발행자가 생성한 데이터를 담아 통지 신호가 발생할 때마다 호출됩니다. onNext 내부에는 데이터 통지 횟수를 count 변수에 기록하고 count가 5의 배수인 경우에 request를 호출해서 발행자에게 데이터 통지를 요청합니다. 마지막으로 onComplete는 구독에 대한 통지가 완료되었을 때 동작합니다.

출력 결과를 확인해보면 request(5)로 요청한 대로 onNext(1).. onNext(5)까지 순서대로 통지된 후 다음엔 onNext(6).. onNext(10)까지 차례로 통지된 것을 알 수 있습니다. 또한 더이상 통지할 데이터가 없는 경우에는 request(5)를 요청했지만 onNext가 호출되지 않고 onComplete 시그널이 호출되며 구독이 종료되었습니다.

 

리액터로 작성한 코드를 테스트하기

자동화된 테스트를 작성하는 것은 프로그램의 잠재된 버그의 발견을 쉽게 하고 코드의 품질을 높여줍니다. 그러나 비동기-논 블로킹 형태의 코드는 테스트가 번거롭고 예측하기 어렵습니다. 예를 들어 여러 스레드를 동시에 생성한 후 각각 다른 작업을 한 후 합친 결과에 대해 테스트를 하거나 시간차로 동작하는 코드는 자동화된 테스트를 어렵게 하는 요인으로 알려져 있지만 리액터는 자동화된 테스트를 위한 여러 가지 도구로써 reactor-test라는 라이브러리를 제공합니다. 표 1.3은 reactor-test에서 제공하는 핵심 3가지 패키지에 대한 설명입니다.

표 1.3 reactor-test가 제공하는 패키지

함수 설명
reactor.test 발행자에 대한 검증 및 테스트를 위한 메인 테스트 컴포넌트 패키지
reactor.test.publisher 테스트 가능한 발행자의 생성을 위한 컴포넌트 패키지
reactor.test.scheduler 테스트 가능한 스케쥴러의 생성을 위한 컴포넌트 패키지

 

StepVerifier

리액터 테스트 패키지에는 StepVerifier라는 리액티브 스트림의 각 단계를 테스트하기 위한 API를 제공합니다.  StepVerifier를 사용하면 데이터 통지와 구독에 대한 조건이 예상한 결과를 만족하는지를 판단하는 테스트 시나리오를 작성할 수 있으므로 편리합니다. 또한 다음에 예상되는 동작과 주어진 시간 동안 딜레이를 주는 등의 다양한 테스트를 가능하게 합니다. StepVerifier는 빌더 패턴 형태로 제공되는데 빌더 패턴을 적용한 함수의 반환값은 this 즉 자기 자신이기 때문에 함수를 연쇄적으로 호출할 수 있다는 장점이 있습니다. 이런 기법을 메소드 체이닝(Method Chaining)이라고 부릅니다.

 

expectNext

리액터가 제공하는 테스트 도구를 사용하기 위해 우선 코틀린 그레이들 프로젝트를 생성합니다. 정상적으로 생성되었다면 아래와 같은 구조로 프로젝트가 생성됩니다.

├── build.gradle.kts
├── gradlew
├── settings.gradle.kts
└── src
    ├── main
    │   ├── kotlin
    │   └── resources
    └── test
        ├── kotlin
        └── resources


프로젝트가 정상적으로 생성되었다면 build.gradle.kts를 열어 예제 1.39와 같이 작성합니다.

예제 1.39 build.gradle.kts - reactor/reactor-test/build.gradle.kts

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
 
plugins {
    kotlin("jvm") version "1.4.21"
}
 
group = "com.reactor.reactortest"
version = "1.0-SNAPSHOT"
 
repositories {
    mavenCentral()
}
 
dependencies {
    implementation("org.jetbrains.kotlin:kotlin-stdlib")
    implementation("io.projectreactor:reactor-core:3.4.0")
  
    testImplementation(kotlin("test-junit5"))
    testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.0")
    testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.0")
    testImplementation("io.projectreactor:reactor-test:3.4.0")
}
 
tasks.test {
    useJUnitPlatform()
}
 
tasks.withType<KotlinCompile>() {
    kotlinOptions.jvmTarget = "11"
}

예제에서는 리액터 테스트를 위한 reactor-test 라이브러리와 유닛 테스트 프레임워크인 JUnit5를 의존성에 추가했습니다. 그다음 test/kotlin 디렉토리 하위에 ReactorTest1 클래스를 생성한뒤 예제 1.40과 같이 첫번째 테스트 코드를 작성합니다.

 

예제 1.40 첫번째 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest1.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest1 {
 
    @Test
    fun `짝수로 된 데이터만 통지해야 한다`() {
        val flux = Flux.just(1, 2, 3, 4)
            // rem은 나머지를 구하는 %와 같다.
            .filter { num -> num.rem(2) == 0 }
 
        StepVerifier
            .create(flux)
            .expectNext(2)
            .expectNext(4)
            .expectComplete()
            .verify()
    }
}


예제 1.40는 Flux.just의 인자에 1부터 4까지 지정하고 filter 연산자에서 통지된 정수 값을 2로 나눈 나머지가 0인 데이터만 필터링하므로 짝수인 2, 4만 통지됩니다. 그다음 테스트의 핵심인 StepVerifier는 첫 단계로 create(publisher) 연산자를 사용해 초기화합니다. StepVerifier가 초기화되면 expect로 시작하는 연산자를 사용해 단계별로 검증이 가능합니다. 가장 먼저 사용된 expectNext는 onNext로 통지받은 데이터가 인자로 지정된 값과 일치하는지를 검증합니다. 더 이상 검증할 값이 없다면 expectComplete 연산자로 OnComplete 신호를 통지합니다. 그다음 최종적으로 verify 함수를 호출하면 테스트 시나리오에 대한 검증이 완료됩니다. 검증이 정상이라면 테스트를 통과할 것이고 검증이 잘못됬다면 테스트가 실패할 것 입니다.

 

recordWith, expectNextCount, expectRecordedMatches

예제 1.41은 여러개의 과일을 가진 플럭스를 생성하고 과일의 개수와 특정한 과일이 존재하는지 검증하는 예제입니다. 

예제 1.41 reocrdWith 관련 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest2.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
import java.util.ArrayList
 
class ReactorTest2 {
 
    @Test
    fun `사과가 들어있는지 확인`() {
        val flux: Flux<String> = Flux.just("사과", "바나나", "딸기")
 
        StepVerifier
            .create(flux)
            .recordWith { ArrayList() }
            .expectNextCount(3)
            .expectRecordedMatches { fruits ->
                fruits.contains("사과")
            }
            .expectComplete()
            .verify()
    }
}


먼저 StepVerifier를 create로 초기화 후 recordWith{ ArrayList() }를 사용했습니다. recordWith는 create로 초기화된 발행자가 통지하는 onNext 값을 인자로 제공받은 컬렉션에 기록합니다. recordWith로 기록된 데이터는 내부에 컬렉터로 불리는 내부 큐에 보관되며 expectRecordedMatches와 consumeRecordedWith 같은 테스트 연산자를 사용하기 위해 필수로 호출돼야 합니다.

다음 스텝으로 expectNextCount는 통지된 데이터의 개수가 인자로 지정된 개수와 일치하는지 검증합니다. 복수의 데이터가 통지되는 경우 유용하게 사용할 수 있는 연산자입니다. 그다음 expectRecordedMatches는 인자로  함수형 인터페이스인 Predicate를 전달받습니다. Predicate 내부의 test 함수는 Boolean을 리턴하기 때문에 전달받은 조건이 true인 경우에 테스트가 통과합니다. 예제에서는 fruits로 제공받은 리스트 안에 ‘사과’가 존재하므로 이 경우에는 true을 반환하므로 테스트를 통과하게 됩니다.

 

expectError, expectErrorMessage, expectErrorSatisfies

테스트 케이스를 작성할 때  특정 조건에서 의도한 에러가 발생하는지 검증하는 것도 중요한 테스트입니다. 애플리케이션이 운영 중인 상황에서 의도치 않은 에러가 발생하여 시스템에 장애가 발생하게 될 경우 간단한 코드 상의  에러라면 빠르게 처리하여 핫픽스 배포를 진행할 수 있지만 돈에 관련된 문제라든지 데이터의 일관성에 부정확을 초래하는 에러인 경우 잠깐 사이에도 큰 장애가 될 수 있습니다. 이런 경우를 방지하기 위해 특정한 조건에서 의도한 에러를 발생시키면 개발자가 에러를 핸들링할 수 있기 때문에 에러 검증을 위한 테스트 케이스는 매우 중요합니다.

예제 1.42는 기존에 존재하는 플럭스에 임의의 에러 플럭스를 합친 다음 정상적인 데이터가 통지된 이후 onError 신호가 발생하는지 검증하는 테스트입니다.

예제 1.42 expectError 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest3.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest3 {
 
    @Test
    fun `IllegalArgumentException 에러가 발생해야 한다`() {
        val error = IllegalArgumentException("error")
        val errorFlux = Flux.error<String>(error)
 
        val source = Flux.just("success-1", "success-2")
            .concatWith(errorFlux)
 
        // 종류에 상관없이 에러가 발생하면 통과
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
            .expectError()
            .verify()
 
        // 인자로 지정한 에러가 발생할 때만 통과
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
            .expectError(IllegalArgumentException::class.java)
            .verify()
    }
}

 


expectError는 현재 구독하는 발행자가 에러를 통지하는지 검증합니다. 리액티브 스트림에는 서브스크라이버의 onError에 대한 사양이 있기 때문에 내부적으로 onError 신호가 발생하는지를 판단하여 통과여부를 판단하게됩니다. 예제에서 먼저 사용한 인자 없는 expectError 연산자는 에러의 종류에 상관없이 onError 신호가 발생하면 테스트를 통과하게되고 상세한 에러를 인자로 받는 expectError(Throwable)는 지정한 에러가 발생하는 경우에만 테스트를 통과합니다.

모든 에러의 최상위 부모인 Throwable은 기본 생성자 외에도 사용자 정의 에러 메시지를 인자로 받는 Throwable(message)을 제공합니다. 이러한 사용자 정의 에러 메시지를 검증하고 싶은 경우 expectErrorMessage 연산자를 사용해 테스트할 수 있습니다. 예제 1.43에서는 expectErrorMessage 연산자를 사용해 사용자 정의 에러 메시지에 대한 테스트 방법을 확인할 수 있습니다.

예제 1.43 expectErrorMessage 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest4.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest4 {
 
    @Test
    fun `An Error Occurred 에러 메시지가 발생해야 한다`() {
        val error = IllegalArgumentException("An error occurred")
        val errorFlux = Flux.error<String>(error)
 
        val source = Flux.just("success-1", "success-2")
            .concatWith(errorFlux)
 
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
            .expectErrorMessage("An error occurred")
            .verify()
    }
}

IllegalArgumentException의 인자로 “An error occurred” 에러 메시지를 지정하고 에러를 통지하면 expectErrorMessage는 내부적으로 통지받은 에러 플럭스를 Exception의 최상위 타입인 Throwable로 변환한 뒤 메시지를 비교하고 기대하는 결과가 동일하다면 테스트를 통과시킵니다.

마지막으로 expectErrorSatisfies 함수는 인자로 Consumer<Throwable>을 받아서 개발자가 에러에 대한 검증을 직접 구현할 수 있습니다. 인자로 받는 Consumer 역시 함수형 인터페이스의 하나로 반환 값 없이 인자로 전달된 함수를 단순히 실행하는 함수형 인터페이스입니다.

@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}

예제 1.44는 expectErrorSatisfies에 전달되는 람다 함수로 JUnit5의 Assertions API를 사용해 통지된 에러의 타입을 assertTrue로 우선 체크 한 후 에러 메시지가 기대한 결과와 일치하는지를 assertEquals로 검증하는 코드입니다.

 

예제 1.44 expectErrorMessage 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest5.kt

import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest5 {
 
    @Test
    fun `An Error Occurred 에러 메시지가 발생해야 한다`() {
        val error = IllegalArgumentException("An error occurred")
        val errorFlux = Flux.error<String>(error)
 
        val source = Flux.just("success-1", "success-2")
            .concatWith(errorFlux)
 
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
            .expectErrorSatisfies {
                assertTrue(it is IllegalArgumentException)
                assertEquals("An error occurred", it.message)
            }
            .verify()
    }
}

 


발생할 에러의 타입이 다양하여 분기 처리가 필요하거나 상세한 검증 로직이 필요한 경우 expectErrorSatisfies를 사용하면 쉽게 구현이 가능합니다.

 

verify로 시작하는 편의 함수

앞서 알아본 예제의 경우  코드의 마지막에 최종적으로 verify를 호출하여 테스트를 실행하였는데 편의 함수(Convenience Function)를 사용하면 조금 더 간결하게 코드를 작성할 수 있습니다. StepVerifier는 verify로 시작하는 이름의 편의 함수를 제공합니다. 

먼저 예제 1.41에서 작성한 테스트 코드의 마지막 부분인 expectComplete.verify()입니다. 예제 1.45와 같이 편의 함수인 verifyComplete를 사용해서 한 번의 함수 호출로 코드를 재작성 할 수 있습니다.

예제 1.45 verifyComplete 예제 - reactor/reactor-test/src/test/kotlin/ReactorTest6.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
import java.util.*
 
class ReactorTest6 {
 
    @Test
    fun `사과가 들어있는지 확인`() {
        val flux: Flux<String> = Flux.just("사과", "바나나", "딸기")
 
        StepVerifier
            .create(flux)
            .recordWith { ArrayList() }
            .expectNextCount(3)
            .expectRecordedMatches { fruits ->
                fruits.contains("사과")
            }
//          .expectComplete()
//          .verify()
            .verifyComplete()
    }
}

 

예제 1.42의 expectError.verify()와 expectError(Throwable).verify()는 예제 1.46와 같이 verifyError와 verifyError(Throwable)을 사용해 코드를 재작성 할 수 있습니다.

예제 1.46 verifyError 예제 - reactor/reactor-test/src/test/kotlin/ReactorTest7.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest7 {
 
    @Test
    fun `IllegalArgumentException 에러가 발생해야 한다`() {
        val error = IllegalArgumentException("error")
        val errorFlux = Flux.error<String>(error)
 
        val source = Flux.just("success-1", "success-2")
            .concatWith(errorFlux)
 
        // 종류에 상관없이 에러가 발생하면 통과
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
//          .expectError()
//          .verify()
            .verifyError()
 
        // 인자로 지정한 에러가 발생할 때만 통과
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
//          .expectError(IllegalArgumentException::class.java)
//          .verify()
            .verifyError(IllegalArgumentException::class.java)
    }
}

 

예제 1.43의 expectErrorMessage.verify()는 예제 1.47과 같이 편의 함수인 verifyErrorMessage(message)를 사용해서 한 번의 함수 호출로 코드를 재작성 할 수 있습니다.

 

예제 1.47 verifyErrorMessage 예제 - reactor/reactor-test/src/test/kotlin/ReactorTest8.kt

import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest8 {
 
    @Test
    fun `An Error Occurred 에러 메시지가 발생해야 한다`() {
        val error = IllegalArgumentException("An error occurred")
        val errorFlux = Flux.error<String>(error)
 
        val source = Flux.just("success-1", "success-2")
            .concatWith(errorFlux)
 
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
//          .expectErrorMessage("An error occurred")
//          .verify()
            .verifyErrorMessage("An error occurred")
    }
}

 

마지막으로 예제 1.44의 expectErrorSatisfies.verify()입니다. 예제 1.48과 같이 편의 함수인 verifyErrorMessage를 사용해서 한 번의 함수 호출로 코드를 재작성 할 수 있습니다.

 

예제 1.48 verifyErrorSatisfies 예제 - reactor/reactor-test/src/test/kotlin/ReactorTest9.kt

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
 
class ReactorTest9 {
 
    @Test
    fun `An Error Occurred 에러 메시지가 발생해야 한다`() {
        val error = IllegalArgumentException("An error occurred")
        val errorFlux = Flux.error<String>(error)
 
        val source = Flux.just("success-1", "success-2")
            .concatWith(errorFlux)
 
        StepVerifier.create(source)
            .expectNext("success-1")
            .expectNext("success-2")
//          .expectErrorSatisfies {
//              assertTrue(it is IllegalArgumentException)
//              assertEquals("An error occurred", it.message)
//          }
//          .verify()
            .verifyErrorSatisfies {
                assertTrue(it is IllegalArgumentException)
                assertEquals("An error occurred", it.message)
            }
    }
}


이렇게 verify로 시작하는 편의 함수를 사용하면 마지막에 호출되는 expect 테스트 함수를 줄일 수 있으므로 조금 더 간결하고 편리하게 코드를 작성할 수 있습니다. 

 

부수 효과 적용

부수 효과(Side Effect)는 핵심 로직이 아닌 부가적으로 동작하는 별개의 기능을 추가하는 것을 말합니다. 리액터는 기존에 작성한 연산자를 수정하지 않고 부수 효과를 지원하는 연산자를 통해 로직을 특정한 신호와 함께 동작하도록 추가하거나 통지되는 데이터를 들여다보기 위한 방법을 제공합니다. 이러한 연산자는 대부분 ‘do’라는 접두어를 사용합니다. 주로 사용되는 연산자는 ‘doOn’으로 시작하는 연산자로 특정 신호(onSubscribe, onNext 등)가 동작하면 콜백 형태로 동작하므로 각각의 신호가 발생할 때마다 쉽게 부수 효과를 적용할 수 있습니다. 예를 들어 데이터를 변환하거나 필터링 후 데이터가 어떻게 통지되는지 로그로 출력하는 등 핵심 로직과는 별개의 작업이 필요한 경우 유용하게 사용될 수 있을 것입니다. 표 1.4는 주로 사용되는 doOn으로 시작하는 연산자들을 소개합니다.


표 1.4 주로 사용되는 doOn으로 시작하는 연산자

함수 설명
doOnSubscribe 구독을 시작하고 onSubscribe 신호가 발생하면 호출 
doOnNext onNext로 데이터를 통지한뒤 호출 
doOnComplete onComplete 신호가 발생하기 직전에 호출. 플럭스 전용 연산자
doOnError onError 신호가 발생하면 호출


이외에도 더 많은 부수 효과를 위한 연산자들은 리액터 공식 문서에서 확인할 수 있습니다. 

 

doOnSubscribe

doOnSubscribe는 구독자의 onSubscribe가 호출되면 이어서 동작하는 연산자입니다. onSubscribe는 구독이 일어나면 최초 1회만 호출되는 것이 특징입니다. doOnSubscribe은 함수형 인터페이스인 Consumer를 제공받아서 반환 값 없이 함수 내부의 코드만 실행하는 것이 특징입니다. 개발자는 Consumer를 작성하면 인자로 서브스크립션(Subscription)을 제공받으므로 현재 구독에 대한 부수 효과도 같이 처리할 수 있습니다. 

예제 1.49는 “red”와 “blue”라는 문자열을 가진 플럭스를 생성하고 log 연산자를 사용해 내부적으로 어떤 동작이 일어나는지 로그로 출력하고 전달받은 문자열을 대문자로 변환한 뒤 각각 출력합니다.

예제 1.49 doOnSubscribe 예제 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnSub1.kt

package sideeffect
 
import reactor.core.publisher.Flux
 
fun main() {
    Flux.just("red", "blue")
        .log()
        .map {
            it.toUpperCase()
        }
        .subscribe(::println)
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(red)
RED
[ INFO] (main) | onNext(blue)
BLUE
[ INFO] (main) | onComplete()


출력 결과를 보면 [ INFO]로 시작하는 로그를 확인할 수 있는데 이것들은 모두 log() 연산자를 사용하였기 때문에 내부 동작이 출력되는 것입니다. 실무에서는 디버깅 용도를 제외하면 성능을 하락시킬 수 있기 때문에 유의하여 사용해야 합니다.  로그 내용을 보면 최초 onSubscribe가 호출된 후 request(unbounded)가 호출된 것을 알 수 있습니다. unbounded는 즉 Long.MAX_VALUE 값이기 때문에 무한대의 값을 요청한 것과 마찬가지입니다. request 이후에는 onNext(red)로 데이터가 통지되고 subscribe(::println)에서 ‘RED’를 출력합니다. 그다음 onNext(blue)로 두 번째 데이터가 다시 통지되고 ‘BLUE’가 다시 subscribe(::println)에 의해 콘솔에 출력됩니다. 최종적으로 구독이 완료되어 onComplete 신호가 호출된 것을 확인할 수 있습니다.

예제 1.50은 doOnSubscribe를 적용하면 어떻게 동작하는지 알아보기 위해 예제 1.49에 doOnSubscribe를 추가하고 몇 가지 부수 효과를 적용한 예제입니다. 먼저 부수 효과를 주기위해 적용한 doOnSubscribe 내부에선 인자로 전달받은 서브스크립션을 조작합니다. 우선 request(1)을 호출하여 onNext로 통지 될 데이터를 1개씩으로 제한합니다. 그런다음 cancel을 호출하여 구독을 취소한뒤 부수 효과가 제대로 적용됬는지 확인하기 위해 “doOnSubscribe is called”를 콘솔에 출력합니다. 

예제 1.50 doOnSubscribe 예제2 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnSub2.kt

package sideeffect
 
import reactor.core.publisher.Flux
 
fun main() {
    Flux.just("red", "blue")
        .log()
        .doOnSubscribe { subscription ->
            subscription.request(1)
            subscription.cancel()
            println("doOnSubscribe is called")
        }
        .map {
            it.toUpperCase()
        }
        .subscribe(::println)
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(red)
RED
[ INFO] (main) | cancel()
doOnSubscribe is called
[ INFO] (main) | request(unbounded)


출력 결과를 확인해보면 최초 onSubscribe가 호출되고 이후 request(1)이 출력되었습니다. 예제에서 subscription.request(1)을 호출했기 때문에 데이터 통지가 1로 제한된 것입니다. 그다음엔 onNext(red)로 데이터가 통지되고 ‘RED’가 출력됩니다. 그다음 이전 예제에서는 ‘BLUE’가 출력됐지만 이번 예제에서는 cancel()이 호출되고 저희가 작성한 ‘doOnSubscribe is called’가 출력되었습니다. 즉 subscription.cancel()을 호출하여 구독을 취소했기 때문에 처음에 request(1)로 요청한 데이터는 정상 통지되고 두 번째로 데이터를 통지하려는 시점에서는 이미 구독이 취소되어 데이터가 출력되지 않은 것입니다.

 

doOnNext

doOnNext는 데이터가 통지되는 시점인 onNext가 호출된 후 동작하는 연산자입니다. doOnNext는 onNext로 통지된 값을 인자로 전달받습니다. 그러므로  통지된 값을 확인하거나 통지된 값을 가지고 별도의 부수 효과를 적용할 수 있으므로 유용하게 사용될 수 있습니다. 

예제 1.51은 1부터 5까지 데이터를 통지하는 플럭스를 생성하고 doOnNext에서 통지된 값을 확인할 수 있도록 출력하는 예제입니다. 

예제 1.51 doOnNext 예제1 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnNext1.kt

package sideeffect
 
import reactor.core.publisher.Flux
 
fun main() {
    Flux.range(1, 5)
        .log()
        .doOnNext {
            println("통지된 값 : $it")
        }
        .subscribe()
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
통지된 값 : 1
[ INFO] (main) | onNext(2)
통지된 값 : 2
[ INFO] (main) | onNext(3)
통지된 값 : 3
[ INFO] (main) | onNext(4)
통지된 값 : 4
[ INFO] (main) | onNext(5)
통지된 값 : 5
[ INFO] (main) | onComplete()


출력 결과를 보면 Flux.range에 지정된 값의 순서대로 onNext로 데이터 통지 신호가 발생하고 각각의 값이 doOnNext로 전달되는 것을 확인할 수 있습니다.

예제 1.52에서는 AtomicInteger 클래스를 사용해 onNext 신호가 발생할 때마다 데이터 통지를 기록할 카운터를 생성하고 카운트한 뒤 출력하는 예제입니다.

예제 1.52 doOnNext 예제2 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnNext2.kt

package sideeffect
 
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicInteger
 
fun main() {
   val counter = AtomicInteger()
 
   Flux.range(1, 5)
       .log()
       .doOnNext {
           counter.incrementAndGet()
           println("카운터 증가")
       }
       .subscribe()
 
   println("총 개수 : ${counter.get()}")
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
카운터 증가
[ INFO] (main) | onNext(2)
카운터 증가
[ INFO] (main) | onNext(3)
카운터 증가
[ INFO] (main) | onNext(4)
카운터 증가
[ INFO] (main) | onNext(5)
카운터 증가
[ INFO] (main) | onComplete()
총 개수 : 5

출력 결과를 보면 onNext 신호가 발생할때마다 “카운터 증가" 메시지가 콘솔에 출력되는 것을 확인할 수 있습니다. 총 5번의 카운터가 증가되고 최종적으로 “총 개수 :5”가 출력됩니다. doOnNext를 사용하면 각각 데이터 통지 마다 이러한 부수 효과를 적용할 수 있습니다.

 

doOnComplete

doOnComplete는 통지가 성공적으로 완료되는 시점에 동작하는 연산자로 인자로 함수형 인터페이스인 Runnable을 전달 받아서 동작하는 것이 특징입니다. 또한 doOnComplete는 플럭스만 제공하는 연산자로 모노의 경우 비슷한 동작을 하는 doOnSuccess가 있습니다.

예제 1.53은 A부터 C를 통지하는 플럭스를 생성하고 데이터 통지가 완료되면 doOnComplete 내부에서 AtomicBoolean을 사용해 성공 여부를 true로 변경한 후 데이터 통지 성공 여부를 출력합니다.

예제 1.53 doOnComplete 예제1 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnComp1.kt

package sideeffect
 
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicBoolean
 
fun main() {
    val completed = AtomicBoolean(false)
    Flux.just("A", "B", "C")
        .log()
        .doOnComplete {
            completed.set(true)
        }
        .subscribe()
 
    if (completed.get()) {
        println("데이터 통지 완료")
    } else {
        println("데이터 통지 실패")
    }
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(A)
[ INFO] (main) | onNext(B)
[ INFO] (main) | onNext(C)
doOnComplete 동작
[ INFO] (main) | onComplete()
데이터 통지 성공


출력 결과를 보면 우선 onNext로 A, B, C가 순차적으로 통지됩니다. 그다음 doOnComplete에 작성한 “doOnComplete 동작”이 출력되고 onComplete 신호가 발생하고 최종적으로 “데이터 통지 완료" 메시지가 콘솔에 출력됩니다. 그러므로 doOnComplete는 onComplete 신호 직전에 호출된다는 것을 확인할 수 있습니다. 


이번엔 에러가 통지될 경우 doOnComplete가 제대로 동작하는지 알아보기 위해 예제 1.54에서는 기존의 플럭스에 concatWith로 Flux.error(IllegalArgumentException())을 추가한 뒤 실행해보겠습니다.

예제 1.54 doOnComplete 예제2 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnComp2.kt

package sideeffect
 
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicBoolean
 
fun main() {
    val completed = AtomicBoolean(false)
    Flux.just("A", "B", "C")
        .concatWith(Flux.error(IllegalArgumentException()))
        .log()
        .doOnComplete {
            println("doOnComplete 동작")
            completed.set(true)
        }
        .subscribe()
 
    if (completed.get()) {
        println("데이터 통지 완료")
    } else {
        println("데이터 통지 실패")
    }
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(A)
[ INFO] (main) onNext(B)
[ INFO] (main) onNext(C)
데이터 통지 실패
[ERROR] (main) onError(java.lang.IllegalArgumentException)
[ERROR] (main)  - java.lang.IllegalArgumentException


출력 결과를 보면 doOnComplete와 onComplete가 호출되지 않고 “데이터 통지 실패"가 출력된 후 onError 신호가 발생하는 것을 확인할 수 있습니다. 또한 에러에 대한 별도 처리가 없으므로 java.lang.IllegalArgumentException이 발생하게 됩니다. onComplete는 에러 없이 정상적으로 데이터 통지가 완료될 경우에만 발생하는데 doOnComplete는 onComplete 직전에 동작하므로 에러 통지가 없어야 동작하는 것을 확인할 수 있습니다.

 

doOnError

doOnError는 에러가 통지되는 시점에  지정한 부수 효과를 적용할 수 있습니다. 에러가 통지되는 시점에는 onError 신호가 발생하는데 doOnError 연산자는 인자로 함수형 인터페이스인 Consumer를 제공받아서  onError 신호가 발생하기 직전에 실행합니다. 인자로 제공된 Consumer에는 최상위 에러 클래스인 Throwable가 전달됩니다. 그렇기 때문에 내부에서 어떤 에러가 발생했는지 확인하여 처리를 다르게 할 수도 있습니다.

예제 1.55는 에러를 통지하는 플럭스를 생성하고 에러가 발생할 경우 doOnError가 동작하는 예제입니다.

예제 1.55 doOnError 예제 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnError.kt

package sideeffect
 
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicBoolean
 
fun main() {
    val errorOccurred = AtomicBoolean(false)
    Flux.range(1,3)
        .concatWith(Flux.error(IllegalArgumentException()))
        .log()
        .doOnError { throwable: Throwable ->
            println("doOnError 동작")
            println("발생한 에러 : ${throwable.javaClass}")
            errorOccurred.set(true)
        }
        .subscribe()
 
    if (errorOccurred.get()) {
        println("에러 발생")
    }
}
 
--------------------
출력 결과)
--------------------
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(1)
[ INFO] (main) onNext(2)
[ INFO] (main) onNext(3)
doOnError 동작
발생한 에러 : class java.lang.IllegalArgumentException
에러 발생
[ERROR] (main) onError(java.lang.IllegalArgumentException)


출력 결과를 보면 onNext로 데이터를 정상적으로 통지하다가 에러가 통지되면 doOnError가 먼저 발생한 후 최종적으로 onError가 발생합니다. doOnError 내부에서는 람다 함수로 제공되는 Consumer는 인자로 Throwable을 받기 때문에 에러의 종류와 어떤 메시지를 출력하는지 확인할 수 있습니다.

 

리액터 코틀린 지원

스프링 프로젝트는 코틀린에 대한 지원을 점점 늘리고 있는데 최신 스프링 프로젝트 공식 문서를 보면 자바 예제외에도 코틀린 예제를 쉽게 확인할 수 있습니다. 또한 코틀린이 제공하는 여러 라이브러리도 스프링 프로젝트에 기본으로 탑재되고 있는데 리액터 또한 리액터 코틀린 확장(reactor-kotlin-extensions)을 지원합니다 . 리액터 코틀린 확장 기능을 사용하면 코틀린의 우아한 문법을 사용해 리액터 코드를 자바를 사용했을 때보다 더 간결하게 작성이 가능합니다. 


먼저 리액터 코틀린 확장 기능을 사용하기 위해 예제 1.56과 같이 build.gradle.kts에 reactor-kotlin-extensions 모듈을 추가 합니다.

예제 1.56 프로젝트에 reactor-kotlin-extensions 의존성 추가 - reactor/reactor-basic/build.gradle.kts

plugins {
   id("org.jetbrains.kotlin.jvm") version ("1.4.10")
}
 
group = "com.reactor.reactorbasic"
version = "1.0-SNAPSHOT"
 
repositories {
    mavenCentral()
}
 
dependencies {
   implementation("org.jetbrains.kotlin:kotlin-stdlib")
   implementation("io.projectreactor:reactor-core:3.4.0")
 implementation("io.projectreactor.kotlin:reactor-kotlin-extensions:1.1.2")
}

 

toMono

toMono는 객체를 모노로 변환하기 위한 코틀린 확장 함수입니다. toMono 함수를 사용하면 Mono.justOrEmpty, Mono.fromCallable, Mono.fromFuture와 같은 팩토리 함수를 쓰지 않아도 객체를 모노로 쉽게 변환할 수 있습니다.

다음은 리액터 코틀린 확장 모듈의 모노 확장 함수 구현 코드입니다.

fun <T> Publisher<T>.toMono(): Mono<T> = Mono.from(this)
 
fun <T> (() -> T?).toMono(): Mono<T> = Mono.fromSupplier(this)
 
fun <T : Any> T?.toMono(): Mono<T> = Mono.justOrEmpty(this)
 
fun <T> Callable<T?>.toMono(): Mono<T> = Mono.fromCallable(this::call)


코드를 보면 toMono는 코틀린의 최상위 객체인 Any에 확장한 함수부터 Callable, CompletableFuture, 리액터의 Publisher를 지원하는 확장 함수가 구현돼있습니다.

예제 1.57은 앞서 살펴본 toMono를 사용해 다양한 객체를 모노로 변환하는 예제입니다. 

예제 1.57 toMono 예제 - reactor/reactor-basic/src/main/kotlin/extensions/toMono.kt

package extensions
 
import reactor.kotlin.core.publisher.toMono
import java.util.concurrent.Callable
import java.util.concurrent.CompletableFuture
 
fun main() {
    "String.toMono"
        .toMono()
        .subscribe(::println)
 
    listOf("Collection.toMono")
        .toMono()
        .subscribe(::println)
 
    Callable { "Callable.toMono" }
        .toMono()
        .subscribe(::println)
 
    CompletableFuture.supplyAsync { "supplyAsync.toMono" }
        .toMono()
        .subscribe(::println)
}


--------------------
출력 결과)
--------------------
String.toMono
[Collection.toMono]
Callable.toMono
CompletableFuture.supplyAsync.toMono


출력 결과를 보면 toMono로 변환한 객체들이 정상적으로 모노로 변환되어 데이터 통지가 이뤄진 것을 확인할 수 있습니다. toMono를 사용하면 다양한 객체를 팩토리 함수 없이 모노로 쉽게 변환할 수 있습니다. 

 

toFlux

toFlux는 객체를 플럭스로 변환하기 위한 코틀린 확장 함수입니다. toFlux는 toMono와 다르게 컬렉션이나 배열 같은 복수의 데이터를 저장하는 자료구조를 toFlux로 변환하는 것이 특징입니다. toFlux 함수를 사용하면 Flux.just, Flux.fromIterable, Flux.fromArray 같은 팩토리 함수를 쓰지 않아도 객체를 플럭스로 쉽게 변환할 수 있습니다. 

다음은 리액터 코틀린 확장 모듈의 플럭스 확장 함수 구현 코드입니다.

fun <T : Any> Publisher<T>.toFlux(): Flux<T> = Flux.from(this)
 
fun <T : Any> Iterator<T>.toFlux(): Flux<T> = toIterable().toFlux()
 
fun <T : Any> Iterable<T>.toFlux(): Flux<T> = Flux.fromIterable(this)
 
fun <T : Any> Stream<T>.toFlux(): Flux<T> = Flux.fromStream(this)
 
fun BooleanArray.toFlux(): Flux<Boolean> = this.toList().toFlux()
 
fun ByteArray.toFlux(): Flux<Byte> = this.toList().toFlux()
 
fun ShortArray.toFlux(): Flux<Short> = this.toList().toFlux()
 
fun IntArray.toFlux(): Flux<Int> = this.toList().toFlux()
 
fun LongArray.toFlux(): Flux<Long> = this.toList().toFlux()
 
fun FloatArray.toFlux(): Flux<Float> = this.toList().toFlux()
 
fun DoubleArray.toFlux(): Flux<Double> = this.toList().toFlux()
 
fun <T> Array<out T>.toFlux(): Flux<T> = Flux.fromArray(this)


코드를 보면 다양한 배열과 컬렉션 구조에 대한 toFlux 확장 함수가 구현돼 있습니다.

예제 1.58은 toFlux를 사용해 자료구조를 플럭스로 변환하는 예제입니다. 

예제 1.58 toFlux 예제 - reactor/reactor-basic/src/main/kotlin/extensions/toFlux.kt

package extensions
 
import reactor.kotlin.core.publisher.toFlux
import reactor.kotlin.core.publisher.toMono
 
fun main() {
    listOf("A", "B", "C")
        .toFlux()
        .subscribe(::println)
 
    intArrayOf(1, 2, 3)
        .toFlux()
        .subscribe(::println)
 
    setOf("D", "E", "F")
        .toFlux()
        .subscribe(::println)
 
    "MonoToFlux".toMono()
       .toFlux()
       .subscribe(::println)
}
 
--------------------
출력 결과)
--------------------
A
B
C
1
2
3
D
E
F
MonoToFlux

출력 결과를 보면 컬렉션과 배열을 플럭스로 변환해서 데이터 통지가 정상적으로 이뤄진 것을 확인할 수 있습니다. toFlux의 한가지 특징은 컬렉션이나 배열의 경우 플럭스로 바로 변환이 가능하지만 “MonoToFlux” 문자열과 같은 일반 객체의 경우 toMono로 우선 변환한 뒤 toFlux 함수를 사용해 플럭스로 변환해야 합니다. toFlux의 경우 자료구조 외에는 발행자인 Publisher에만 확장할 수 있도록 구현돼 있기 때문입니다.

 

728x90
반응형
728x90
반응형


플럭스의 연산자

플럭스는 0..N개로 이뤄진 복수개의 요소를 통지하는 발행자입니다. 모노는 1개의 요소를 통지하지만 플럭스는 제한 없는 무한대의 요소를 통지합니다. 플럭스의 연산자는 모노의 연산자와 이름이 같고 역할이 같은 연산자가 많습니다. 대표적으로 앞선 장에서 소개한 map, flatMap, switchIfEmpty 등의 연산자는 플럭스를 사용할시에도 동일하게 사용할 수 있습니다.

just

Flux.just는 플럭스를 생성하는 기본 팩토리 함수입니다. Flux.just의 인자는 가변 인자를 지원하기 때문에 데이터의 개수 제한 없이 추가하여 플럭스를 생성할 수 있습니다. 예제 1.20는 두 개의 CellPhone 객체를 생성하고 Flux.just의 인자로 전달하고 다른 작업 없이 출력합니다.

package flux

import reactor.core.publisher.Flux

data class Cellphone(
    val name: String,
    val price: Int,
    val currency: Currency,
)

enum class Currency {
    KRW, USD
}

fun main() {
    val iphone =
        Cellphone(name = "Iphone", price = 100, currency = Currency.KRW)
    val galaxy =
        Cellphone(name = "Galaxy", price = 90, currency = Currency.KRW)

    val flux: Flux<Cellphone> =
        Flux.just(iphone, galaxy)

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Cellphone(name=Iphone, price=100, currency=KRW)
Cellphone(name=Galaxy, price=90, currency=KRW)

출력 결과와 같이 Flux.just에 들어가는 인자의 순서대로 차례로 데이터가 통지됩니다. just 함수에 만약 null인 객체가 전달되면 예제 1.21과 같이 NullPointerException이 발생하기 때문에 주의해야합니다.

예제 1.21 Flux.just에 null이 들어간 경우 - reactor/reactor-basic/src/flux/Flux1.kt

package flux

import reactor.core.publisher.Flux

data class Cellphone(
    val name: String,
    val price: Int,
    val currency: Currency,
)

enum class Currency {
    KRW, USD
}

fun main() {
    val iphone =
        Cellphone(name = "Iphone", price = 100, currency = Currency.KRW)
    val galaxy =
        Cellphone(name = "Galaxy", price = 90, currency = Currency.KRW)

    val flux: Flux<Cellphone> =
        Flux.just(iphone, galaxy, null)

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
java.lang.NullPointerException: The 2th array element was null

fromArray, fromIterable

Flux.fromArray는 함수는 인자에 배열을 받아 플럭스를 생성하는 팩토리 함수입니다. 예제 1.22는 배열에 객체를 담고 Flux.fromArray를 사용해 플럭스를 생성한 뒤 순서대로 출력합니다.

예제 1.22 Flux.fromArray - reactor/reactor-basic/src/flux/Flux2.kt

package flux

import reactor.core.publisher.Flux

data class Hero(
   val nickname: String,
   val name: String,
)

fun main() {
   val avengers = arrayOf(
       Hero("Black Widow", "Natasha Romanoff"),
       Hero("Iron Man", "Tony Stark"),
       Hero("Captain America", "Steve Rogers"),
       Hero("Thor", "Thor Odinson"),
       Hero("Hulk", "Bruce Banner"),
       Hero("Hawkeye", "Clint Barton"),
       Hero("Black Panther", "T'Challa")
   )

   val flux: Flux<Hero> =
       Flux.fromArray(avengers)

   flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Hero(nickname=Iron Man, name=Tony Stark)
Hero(nickname=Black Widow, name=Natasha Romanoff)
Hero(nickname=Captain America, name=Steve Rogers)
Hero(nickname=Thor, name=Thor Odinson)
Hero(nickname=Hulk, name=Bruce Banner)
Hero(nickname=Hawkeye, name=Clint Barton)
Hero(nickname=Black Panther, name=T'Challa)

예제 1.22의 출력 결과를 보면 예상한대로 배열에 지정한 순서대로 출력되는 것을 알 수 있습니다.

배열이 아닌 컬렉션을 이용해 플럭스를 생성할 경우 Flux.fromIterable 함수를 사용할 수 있습니다. fromArray와 마찬가지로 리스트에 지정한 순서대로 데이터를 통지합니다. 예제 1.23은 Flux.fromIterable을 사용해 플럭스를 생성하고 순서대로 출력합니다.

예제 1.23 Flux.fromIterable - reactor/reactor-basic/src/flux/Flux3.kt

package flux

import reactor.core.publisher.Flux

fun main() {
   val avengers = listOf(
       Hero("Iron Man", "Tony Stark"),
       Hero("Black Widow", "Natasha Romanoff"),
       Hero("Captain America", "Steve Rogers"),
       Hero("Thor", "Thor Odinson"),
       Hero("Hulk", "Bruce Banner"),
       Hero("Hawkeye", "Clint Barton"),
       Hero("Black Panther", "T'Challa"),
   )

   val flux: Flux<Hero> =
       Flux.fromIterable(avengers)

   flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Hero(nickname=Iron Man, name=Tony Stark)
Hero(nickname=Black Widow, name=Natasha Romanoff)
Hero(nickname=Captain America, name=Steve Rogers)
Hero(nickname=Thor, name=Thor Odinson)
Hero(nickname=Hulk, name=Bruce Banner)
Hero(nickname=Hawkeye, name=Clint Barton)
Hero(nickname=Black Panther, name=T'Challa)

출력 결과에서 확인할 수 있듯이 fromIterable도 리스트에 지정한 순서 즉 인덱스의 순서대로 데이터를 통지하는 것을 확인할 수 있습니다.

range

Flux.range 함수는 첫 번째 인자인 시작 값부터 두 번째 인자인 카운트 값까지 값을 하나씩 증가하는 데이터를 통지하는 팩토리 함수입니다. 예제 1.24는 Flux.range를 사용해 1부터 100까지 더한 값을 출력합니다.

예제 1.24 Flux.range로 sum한 값 구하기 - reactor/reactor-basic/src/flux/Flux4.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val sum = Flux.range(1, 100)
    .reduce { t, u ->
        t + u
    }
    sum.subscribe(::println)
}

--------------------
출력 결과)
--------------------
5050

출력 결과를 보면 1부터 100까지 더한 값인 5050이 출력되었습니다. 즉 1부터 100까지 순서대로 데이터를 통지하고 reduce 함수를 사용해 통지받은 데이터를 더해졌다는 것을 알 수 있습니다.

다음 예제 1.25는 take를 사용해 인자로 지정한 10개 데이터를 통지하고 buffer는 통지받은 인자로 지정한 값만큼 데이터를 버퍼에 담아서 통지합니다.

예제 1.25 take와 buffer를 사용한 예제 - reactor/reactor-basic/src/flux/Flux5.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val buffer: Flux<List<Int>> =
        Flux.range(1, 100)
            .take(10)
            .buffer(4)

    buffer.subscribe(::println)
}

--------------------
출력 결과)
--------------------
[1, 2, 3, 4]
[5, 6, 7, 8]
[9, 10]

출력 결과를 보면 버퍼에 할당된 4건씩 데이터가 나뉘어 통지된 것을 확인할 수 있습니다.

interval

Flux.Interval 함수를 사용하면 인자로 지정해준 주기대로 신호를 발생 시킵니다. 발생된 신호는 Long 타입으로 ‘0, 1, 2,3’ 순서대로 전달 받을 수 있습니다. 예제 1.26은 0.5초 간격으로 랜덤한 식별자(id)를 가지는 쿠폰을 생성하는 예제 입니다.

예제 1.26 Flux.interval로 쿠폰 생성 예제 - reactor/reactor-basic/src/flux/Flux6.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration
import java.util.UUID

data class Coupon(
    val id: String,
    val description: String,
)

fun main() {
    val flux: Flux<Coupon> =
        Flux.interval(Duration.ofMillis(500))
            .map { count ->
                val id = UUID.randomUUID().toString()
                Coupon(id = id, description = "${count.plus(1)}번째 쿠폰")
            }
            .take(15)

    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(10).toMillis())
}


--------------------
출력 결과)
--------------------
Coupon(id=b755285e-476e-405a-8a9f-e0b49b3520a6, description=1번째 쿠폰)
Coupon(id=9c4eea7a-1f12-4123-9fae-678e6161f7de, description=2번째 쿠폰)
Coupon(id=6478a919-ae2b-47c9-b238-d41fe5d73010, description=3번째 쿠폰)
Coupon(id=faf7b65f-299e-4467-9308-964b901e978a, description=4번째 쿠폰)
Coupon(id=77db0f58-27c8-4abc-a47f-8966259cea78, description=5번째 쿠폰)
Coupon(id=ccfab126-a5b1-4e9e-bbf3-c55f16a0e475, description=6번째 쿠폰)
Coupon(id=c59c6f88-cf6f-4d05-80f9-261851e49359, description=7번째 쿠폰)
Coupon(id=a2ec4a36-c9f8-4087-ac93-6b963954f029, description=8번째 쿠폰)
Coupon(id=8663e6ee-293c-492f-98b7-8246e19aeb3e, description=9번째 쿠폰)
Coupon(id=c9cf3146-d1ad-4c2d-85f6-c66f3635d53e, description=10번째 쿠폰)
Coupon(id=1c66e15e-0794-4bc5-bc33-d0088e0384ca, description=11번째 쿠폰)
Coupon(id=c2e2c8c3-d5b2-47c9-be99-06aa3a679018, description=12번째 쿠폰)
Coupon(id=cf53e12c-8a54-4875-b6e3-11f5494c57c5, description=13번째 쿠폰)
Coupon(id=c37c3981-e34b-4147-8d8a-66eb3bc626ed, description=14번째 쿠폰)
Coupon(id=32942297-f457-4778-b559-c0f45023bbd6, description=15번째 쿠폰)

먼저 Flux.interval(Duration.ofMillis(500))는 0.5초 간격으로 신호를 발생시키고 순번을 데이터로 통지합니다. map 연산자 내부에선 UUID.randomUUID().toString()을 사용해 유니크한 식별자를 생성하고 쿠폰 객체의 id 프로퍼티에 할당합니다. description에는 몇 번째로 생성된 쿠폰인지를 확인하기 위해 count라는 람다 변수를 선언했습니다. 이때 count의 값은 최초에 0부터 시작하기 때문에 count.plus(1)을 사용해 기존 값에 1을 더해줍니다. 그 다음은 take로 15개의 데이터만 통지하도록 조절한 후 출력합니다.

이때 마지막 라인에서 예시와 같이 스레드를 일시정지하는 코드가 있어야 정상적으로 출력됩니다.

 Thread.sleep(Duration.ofSeconds(10).toMillis())

스레드를 일시정지하는 이유는 Flux.interval은 내부적으로 별도의 스레드에서 동작하도록 만들어져있는데 코드를 실행하는 스레드는 메인 함수의 스레드이므로 일시정지하지 않으면 subscribe에서 출력하기 전에 메인 스레드가 종료되기 때문입니다.

예제 1.27에서 Thread.sleep을 주석 처리하여 어떤 결과가 출력되는지 확인해 보겠습니다.

package flux

import reactor.core.publisher.Flux
import java.time.Duration
import java.util.UUID

data class Coupon(
    val id: String,
    val description: String,
)

fun main() {
    val flux: Flux<Coupon> =
        Flux.interval(Duration.ofMillis(500))
            .map { count ->
                val id = UUID.randomUUID().toString()
                Coupon(id = id, description = "${count.plus(1)}번째 쿠폰")
            }
            .take(15)

    flux.subscribe(::println)

    //Thread.sleep(Duration.ofSeconds(10).toMillis())
}


--------------------
출력 결과)
--------------------

출력 결과를 보면 Flux.interval에서 사용하는 스레드가 완료되기 전에 메인 스레드가 종료되어 아무것도 출력되지 않은 것을 확인할 수 있습니다.

single, singleOrEmpty

single 함수를 사용하면 최초에 전달받은 1개의 데이터만 통지받도록 제한할 수 있습니다. 그러므로 single을 연산자로 적용한 경우 통지받은 데이터가 정확히 1개 임을 보장해야 합니다. 데이터가 없거나 1개 이상인 경우는 예외가 발생합니다.

예제 1.29에서는 데이터가 1개가 아닌 경우 어떤 예외가 발생하는지 확인하기 위해 Flux.just로 6개의 데이터를 인자로 받고 take(2)로 데이터를 2번 통지하여 예외를 발생시킵니다.

예제 1.29 single 예제 - reactor/reactor-basic/src/flux/Flux7.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val flux = Flux.just(1, 2, 3, 4, 5, 6)
        .take(2)
        .single()

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
java.lang.IndexOutOfBoundsException: Source emitted more than one item

출력 결과를 보면 IndexOutOfBoundsException가 발생한 것을 확인할 수 있습니다. 예제 1.30는 take(1)을 사용해 하나의 데이터만 통지하는 예제입니다.

예제 1.30 single 예제2 - reactor/reactor-basic/src/flux/Flux8.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val flux = Flux.just("one", "two", "three")
        .take(1)
        .single()

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
one

출력 결과를 확인해보면 정상적으로 Flux.just의 인자로 처음 전달받은 문자열 ‘one’이 출력된 것을 확인할 수 있습니다.

만약 Flux로 통지받은 데이터가 비어있을(empty) 가능성이 있는 경우 singleOrEmpty를 사용할 수 있습니다. singleOrEmpty 함수는 1개의 데이터 또는 빈 값을 허용합니다. 예제 1.31은 Flux.empty를 사용해 빈 값을 통지하고 singleOrEmpty를 사용해 통지합니다.

예제 1.31 singleOrEmpty 예제 - reactor/reactor-basic/src/flux/Flux9.kt

package flux

import reactor.core.publisher.Flux

fun main() {
   val flux = Flux.empty<Any>()
       .singleOrEmpty()

   flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------

Flux.empty로 플럭스를 생성하고 singleOrEmpty를 사용했기 때문에 빈 결과가 출력되었습니다. singleOrEmpty 함수도 1개 또는 빈 값이 아닌 경우는 동일하게 IndexOutOfBoundsException가 발생합니다.

예제 1.32는 Flux.just에 첫 번째 인자는 ‘1’ 두 번째 인자는 Flux.empty()를 지정한 후 singleOrEmpty 함수를 사용하여 예외를 발생시킵니다.

예제 1.32 singleOrEmpty 예제2 - reactor/reactor-basic/src/flux/Flux9.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val flux = Flux.just(1, Flux.empty<Int>())
        .singleOrEmpty()

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
java.lang.IndexOutOfBoundsException: Source emitted more than one item

merge, concat, mergeSequential

Flux.merge는 플럭스를 제공받은 순서대로 결합하는 팩토리 함수입니다. zip 함수와의 차이점은 merge 함수는 튜플 객체로 감싸지지 않습니다. 예제 1.33은 Flux.merge를 사용해 인자로 지정된 플럭스를 순서대로 통지하는 예제입니다.

예제 1.32 Flux.merge 예제 - reactor/reactor-basic/src/flux/Flux10.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val three = Flux.just("three")
    val two = Flux.just("two")
    val one = Flux.just("one")

    val flux: Flux<String> = Flux.merge(three, two, one)
    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
three
two
one

출력 결과를 확인해보면 인자로 지정한 순서대로 결과가 출력되었음을 확인할 수 있습니다. 예제 1.33은 예제 1.32를 조금 수정하여 Flux.merge 함수에 인자로 전달되는 플럭스에 delayElements 함수를 사용하여 정해진 지연 시간 이후에 각각 병렬로 처리되는데 이 경우에도 순서가 유지되는지 알아보겠습니다.

예제 1.33 Flux.merge 예제2 - reactor/reactor-basic/src/flux/Flux11.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration

fun main() {
    val one = Flux.just("one").delayElements(Duration.ofMillis(300))
    val two = Flux.just("two").delayElements(Duration.ofMillis(400))
    val three = Flux.just("three").delayElements(Duration.ofMillis(500))

    val flux: Flux<String> = Flux.merge(three, two, one)
    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(2).toMillis())
}

--------------------
출력 결과)
--------------------
one
two
three

출력 결과를 확인해보면 인자로 지정된 순서대로 통지되지 않고 지연 시간에 따라서 순서가 정해진 것을 확인할 수 있습니다. 만약 병렬 처리 시에도 인자로 지정된 순서대로 통지해야 하는 경우 예제 1.34와 같이 concat 함수를 사용해 순서를 보장할 수 있습니다.

예제 1.34 Flux.concat 예제 - reactor/reactor-basic/src/flux/Flux12.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration

fun main() {
    val one = Flux.just("one").delayElements(Duration.ofMillis(300))
    val two = Flux.just("two").delayElements(Duration.ofMillis(400))
    val three = Flux.just("three").delayElements(Duration.ofMillis(500))

    val flux: Flux<String> = Flux.concat(three, two, one)
    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(2).toMillis())
}

--------------------
출력 결과)
--------------------
three
two
one

출력 결과를 확인해보면 플럭스가 concat 함수에 병렬로 전달되었지만 인자로 지정된 순서대로 통지된 것을 확인할 수 있습니다. 순서가 보장되는 이유는 병렬 처리 시 인자로 지정된 플럭스를 결합한 다음 플럭스를 순서대로 구독하여 데이터를 통지하고 이전 플럭스에 대한 통지가 완료되면 다음 플럭스가 구독 처리될 때까지 기다리는 순차처리 방식의 연산자이기 때문입니다.

예제 1.35에서 사용한 mergeSequential도 concat과 같이 순서가 보장됩니다.

예제 1.35 Flux.concat 예제 - reactor/reactor-basic/src/flux/Flux12.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration

fun main() {
    val one = Flux.just("one").delayElements(Duration.ofMillis(300))
    val two = Flux.just("two").delayElements(Duration.ofMillis(400))
    val three = Flux.just("three").delayElements(Duration.ofMillis(500))

    val flux: Flux<String> = Flux.mergeSequential(three, two, one)
    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(2).toMillis())
}

--------------------
출력 결과)
--------------------
three
two
one

concat 과의 차이점이라면 concat은 여러 개의 플럭스를 인자로 전달받아 하나로 결합한 후에 순서대로 구독 처리하는 연산자라면 mergeSequential은 여러 개의 플럭스를 병렬로 구독 처리하고 최종 변환 시 인자로 지정한 구독 순서에 따라서 새로운 플럭스로 결합한다는 차이점이 있습니다.

728x90
반응형
728x90
반응형


프로젝트 리액터

프로젝트 리액터(Project Reactor)는 JVM기반의 환경에서 리액티브 애플리케이션 개발을 위한 오픈 소스 프레임워크입니다. 리액터는 스프링 에코시스템에서 리액티브 스택의 기반이 되는 프로젝트로 리액티브 스트림 사양을 구현하므로 리액티브 스트림에서 사용하는 용어와 규칙을 동일하게 사용하며 리액티브 스트림 사양에 포함되지 않은 리액터만의 다양한 기능들도 제공합니다.

리액터를 사용하면 손쉽게 애플리케이션에 리액티브 프로그래밍을 적용할 수 있고 애플리케이션의 모든 구간에 비동기-논블로킹을 적용할 수 있습니다. 또한 리액터에서 제공하는 백프레셔 기법을 사용해 시스템의 부하를 효율적으로 조절할 수 있습니다.

 

모노와 플럭스

리액터는 모노(Mono)플럭스(Flux)라는 두 가지 핵심 타입을 제공합니다. 먼저 모노는 0..1개의 단일 요소 스트림을 통지합니다. 이에 반해 플럭스는 0..N개로 이뤄진 복수개의 요소를 통지하는 발행자입니다. 플럭스와 모노는 리액티브 스트림 사양의 발행자(Publisher)를 구현하여 데이터를 통지하기 때문에 onComplete 또는 onError 시그널이 발생할 때까지 onNext를 사용해 구독자에게 데이터를 통지합니다.

모노와 플럭스는 리액터의 핵심이 되는 요소로써 리액티브 스트림 사양을 구현하는 하는 것 외에도 데이터를 가공하거나 필터링하는 기능과 에러가 발생하면 에러를 핸들링 할 수 있는 다양한 연산자(Operator)를 제공하며 두 타입이 공통으로 사용하는 연산자도 있고 자신만의 연산자도 있습니다. 이번장에서는 모노와 플럭스를 사용할때 자주 사용하는 연산자들에 대해 알아보도록 하겠습니다.

 

모노의 연산자

모노는 단일 요소에 대한 통지의 경우 사용됩니다. 모노의 사용 방법을 알아보기 위해 코틀린 그레이들 프로젝트를 생성합니다. 정상적으로 생성되었다면 예제와 같은 구조로 프로젝트가 생성됩니다.

 

├── build.gradle.kts
├── gradlew
├── settings.gradle.kts
└── src
    ├── main
    │   ├── kotlin
    │   └── resources
    └── test
        ├── kotlin
        └── resources

 

그다음 build.gradle.kts를 열어 예제 1.1과 같이 reactor-core 의존성을 추가합니다.

 

예제 1.1 프로젝트에 reactor-core 의존성 추가 - reactor/reactor-basic/build.gradle.kts

plugins {
    id("org.jetbrains.kotlin.jvm") version ("1.4.10")
}


group = "com.reactor.reactorbasic"
version = "1.0-SNAPSHOT"


repositories {
    mavenCentral()
}


dependencies {

    implementation("org.jetbrains.kotlin:kotlin-stdlib")

    implementation("io.projectreactor:reactor-core:3.4.0")

}

 

 

just

모노를 생성하는 가장 간단한 방법은 Mono.just를 사용하는 것입니다. Mono.just(T data)는 객체를 인자로 받은 뒤 모노로 래핑하는 팩토리 함수입니다.

예제 1.2 Mono.just 사용 - reactor/reactor-basic/src/mono/Mono1.kt

package mono

import reactor.core.publisher.Mono

fun main() {
    val mono: Mono<String> = Mono.just("Hello Reactive World")
    mono.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Hello Reactive World


인자로 “Hello Reactive World”라는 String을 인자로 받았기 때문에 String이 감싸진 Mono<String>을 반환하며 그다음 라인에 작성된 mono.subscribe(::println)는 리액티브 스트림 사양의 Publisher.subscribe 함수를 구현하여 데이터를 구독합니다. 

중요한 점은 플럭스와 모노는 게으르게(lazy) 동작하여 subscribe를 호출하지 않으면 연산자에 작성한 로직이 동작하지 않는다는 것입니다. 이런 특징은 Java8 스트림 API의 특성과 동일합니다. 스트림 API 역시 연산자를 트리거 하는 최종 연산자를 호출하지 않으면 스트림 연산이 동작하지 않습니다.

 

justOrEmpty

Mono.justOrEmpty는 값이 null이 거나 null이 될 수 있는 데이터를 받아서 처리할 수 있습니다. 

 

예제 1.3  Mono.justOrEmpty 사용 - reactor/reactor-basic/src/mono/Mono2.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val mono: Mono<String> = Mono.justOrEmpty("Hello Reactive World")
    mono.subscribe(::println)
}
--------------------
출력 결과)
--------------------
Hello Reactive World


우선 greeting 변수에 null을 할당하고 Mono.justOrEmpty의 인자로 넣어줍니다. Mono.just의 경우 null인 데이터가 인자로 들어오면 NullPointerException이 발생하므로 주의해야 합니다.  

 

switchOrEmpty/defer

null인 데이터를 제공받을 경우 switchIfEmpty를 사용해 처리할 수 있습니다. switchIfEmpty는 전달받은 값이 null인 경우 새로운 데이터로 변환하는 연산자입니다. 

 

예제 1.4  통지할 데이터가 null인 경우 처리 방법 - reactor/reactor-basic/src/mono/Mono3.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val greeting: String? = null
    Mono.justOrEmpty(greeting)
        .switchIfEmpty(Mono.defer {
            Mono.just("Hello Reactive World")
        })
        .subscribe(::println)
}
--------------------
출력 결과)
--------------------
Hello Reactive World

greeting이 null로 전달되었으므로 switchIfEmpty에서 제공하는 Mono.just(“Hello Reactive World”)가 연산자의 결과로 전달됩니다. 이때 switchIfEmpty 내부에서 Mono.defer라는 연산자를 사용하면 내부 코드의 호출이 지연되어 실제 사용되는 시점에 호출됩니다.

만약 예제 1.5 같이 Mono.defer를 사용하지 않고 switchIfEmpty를 사용하게 되면 greeting의 값의 존재 유무에 상관없이 switchIfEmpty 내부의 코드가 동작하게 되는데 이는 불필요한 동작이므로 Mono.defer를 사용하여 실제 사용되는 시점까지 동작을 미루는 것이 유리합니다.

예제 1.5  Mono.defer를 사용하지 않은 경우 - reactor/reactor-basic/src/mono/Mono4.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val greeting = "Hello"
    Mono.justOrEmpty(greeting)
        .switchIfEmpty(greetIfEmpty()) // null이 아니지만 호출됨
        .subscribe(::println)
}
 
fun greetIfEmpty() : Mono<String> {
    val greeting = "Hello Reactive World"
    println(greeting)
    return Mono.just(greeting)
}
--------------------
출력 결과)
--------------------
Hello Reactive World
Hello

 

예제 1.6은 Mono.defer를 사용하여 greeting이 null인 경우에만 동작하므로 효율적입니다.

 

예제 1.6  Mono.defer를 사용한 경우 - reactor/reactor-basic/src/mono/Mono5.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val greeting = "Hello"
    Mono.justOrEmpty(greeting)
        .switchIfEmpty(Mono.defer {
            greetIfEmptyInDefer() // greeting이 null인 경우에만 동작
        })
        .subscribe(::println)
}
 
fun greetIfEmptyInDefer(): Mono<String> {
    val greeting = "Hello Reactive World"
    println(greeting)
    return Mono.just(greeting)
}
--------------------
출력 결과)
--------------------
Hello

 

 

defaultIfEmpty

전달받은 데이터가 null인 경우 예제 1.7의 defaultIfEmpty를 사용하여 기본 값을 제공할 수 있습니다. 

예제 1.7 defaultIfEmpty를 사용해 기본 값 제공 - reactor/reactor-basic/src/mono/Mono6.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val greeting: String? = null
    Mono.justOrEmpty(greeting)
        .defaultIfEmpty("Hello Reactive World")
        .subscribe(::println)
}
--------------------
출력 결과)
--------------------
Hello Reactive World

 
defaultIfEmtpy와 switchIfEmpty 연산자는 모두 전달받은 데이터가 null인 경우 사용할 수 있다는 것이 공통점입니다. 차이점은 switchIfEmpty는 내부에서 새로운 로직을 구현해 새로운 Mono를 생성할 수 있지만 defaultIfEmpty는 null을 대체하는 값을 지정한다는 점이 다릅니다.

 

fromSupplier

내부에서 로직에 대한 결과로 모노 객체를 생성할 경우 예제 1.8과 같이 Mono.fromSupplier를 사용할 수 있습니다.

예제 1.7 fromSupplier 예제 - reactor/reactor-basic/src/mono/Mono7.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    Mono.fromSupplier {
        val greeting = "Reactive World"
        var welcome = "Welcome"
        welcome += " ${greeting.split(" ")[1]}"
        welcome += " ${greeting.split(" ")[2]}"
        welcome
    }.subscribe(::println)
}
--------------------
출력 결과)
--------------------
Welcome Reactive World


Mono.fromSupplier의 인자로 제공되는 Supplier는 Java8에 추가된 함수형 인터페이스입니다. Supplier는 별도의 인자는 없이 내부의 값을 반환하는 T get() 함수를 구현하도록 설계되었습니다.

@FunctionalInterface
public interface Supplier<T> {
    T get();
}


즉 Mono.fromSupplier{} 내부에 작성하는 코드는 T get() 함수의 구현이 되어 Mono.fromSupplier로 제공되고 내부 로직의 반환되는 구조입니다. 모노를 생성하는데 특정한 구현 로직이 필요한 경우나 늦은 초기화가 필요한 경우 유용하게 사용할 수 있습니다.

 

error

Mono.error는 로직을 처리하는 중 특정한 에러를 발생시키고 싶은 경우 유용하게 사용할 수 있습니다. 예제 1.8은 일부러 NullPointerException을 발생시키는 코드를 생성하고 예외가 발생했을때 사용자 정의 예외를 발생시킵니다.

예제 1.8 error를 사용자 사용자 정의 예외 발생 예제 - reactor/reactor-basic/src/mono/Mono8.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    Mono.fromSupplier {
        val greeting: String? = null
        Mono.just(greeting)
    }.onErrorResume {
        Mono.error(GreetingEmptyException("Greeting is Empty"))
    }.subscribe(::println)
}
 
class GreetingEmptyException : Throwable {
    constructor() : super()
    constructor(message: String?) : super(message)
}
--------------------
출력 결과)
--------------------
Welcome Reactive World


가장 먼저 Mono.from Supplier 내부에서 null을 반환하는 greeting을 Mono.just로 생성하였습니다. Mono.just는 null인 객체를 전달받으면 NullPointerException을 발생시킵니다. 에러가 발생한 경우 onErrorResume를 사용하면 예외가 발생할 경우 별도의 핸들링이 가능합니다. 예제 1.8에서는 NullPointerException이 발생한 경우 직접 생성한 GreetingEmptyException을 대신 발생시키기 위해 Mono.error를 사용했습니다. Mono.error는 최상위 Exception인 Throwable을 인자로 전달받기 때문에 모든 사용자 정의 예외 객체에 사용이 가능합니다.

 

map

예제 1.9에서 소개하는 map 연산자는 전달받은 요소를 새로운 모노로 변환할 때 사용하는 연산자입니다. 

예제 1.9 map 사용 예제 - reactor/reactor-basic/src/mono/Mono9.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
   val mono: Mono<String> =
       Mono.just("Hello Reactive World")
           .map {
               it.toLowerCase()
           }
   mono.subscribe(::println)
}
--------------------
출력 결과)
--------------------
Hello Reactive World


map 내부에서는 Mono.just에 담긴 문자열(Hello Reactive World)을 전달받아서 소문자로 변환하는 toLowerCase()를 호출합니다. 이렇게 되면 map 내부에서 변환된 데이터를 모노로 감싼 뒤 데이터를 통지하게 됩니다. map은 한 개의 데이터만 제공할 수 있고 다수의 데이터를 제공할 수 없습니다. 

 

flatMap

예제 1.10은 “Hello Reactive World”라는 String을 받아서 flatMap을 통해 각 단어의 처음 단어를 뽑아내 약자로 변환하여 출력하는 예제입니다. 

예제 1.10 flatMap 사용 예제 - reactor/reactor-basic/src/mono/Mono10.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    Mono.just("Hello Reactive World")
        .flatMap {
            val words = it.split(" ")
            val acronym = words.map { word -> word[0] }.toCharArray()
            Mono.just(acronym)
        }.subscribe(::println)
}
--------------------
출력 결과)
--------------------
HRW


flatMap은 주로 단일 요소를 복수개의 요소로 변환할 때 사용합니다. 유의할 점은 flatMap은 map과 다르게 마지막에서 반환할 요소를 앞서 소개한 Mono.just와 같은 팩토리 함수로 감싸서 반환해야 합니다.

예제 1.11은 전달 받은 문자열에 “Hello”가 포함 되었는지를 판단하여 true면 문자열로 새롭게 생성한 모노를 반환하고 거짓이라면 Mono.empty를 반환합니다.

예제 1.11 flatMap 사용 예제2 - reactor/reactor-basic/src/mono/Mono11.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    Mono.just("Reactive World")
        .flatMap {
            if (it.contains("Hello")) {
                Mono.just(it)
            } else {
                Mono.empty()
            }
        }.subscribe(::println)
}
--------------------
출력 결과)
--------------------
없음


flatMap을 활용하면 특정 조건에 따라서 에러를 발생 시키거나 비어 있는 값을 리턴할 수 있습니다.


filter

filter 연산자는 특정 조건이 true인지 판단하여 true인 경우에만 데이터를 통지하는 연산자입니다. 예제 1.12는 전달받은 문자열 데이터가 존재하는 경우 정해진 문자열을 출력합니다.

예제 1.12 filter를 사용해 데이터 정제 - reactor/reactor-basic/src/mono/Mono12.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val mono: Mono<String> =
        Mono.just("Hello Reactive World")
            .filter {
                it.startsWith("Hello")
            }
            .map {
                "this operator is working"
            }
    mono.subscribe(::println)
}
--------------------
출력 결과)
--------------------
this operator is working


예제 1.12의 filter 연산자에서는 전달받은 문자열이 Hello로 시작하는지 판단하여 true이라면 다음 연산자인 map 연산자로 이어지고 정해진 문자열 “this operator is working”을 출력합니다.

filter 연산자는 Predicate라는 함수형 인터페이스를 전달받습니다. Predicate도 마찬가지로 Java8에 추가되었으며 test라는 함수를 구현해야 합니다. 

@FunctionalInterface
public interface Predicate<T> {
    boolean test(T t);
}

test 함수는 boolean을 반환하여 test 함수를 구현하는 구현체에서는 조건식의 결과를 boolean으로 반환해야 합니다. 

예제 1.13은 filter 연산자 내부의 조건이 거짓인 경우 switchIfEmpty 내부의 로직이 동작하는 것을 확인 하는 예제입니다.

예제 1.13 filter를 사용해 데이터 정제2 - reactor/reactor-basic/src/mono/Mono13.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val mono: Mono<String> =
        Mono.just("Hello Reactive World")
            .filter {
                it.startsWith("Bye")
            }
            .map {
                "this operator is working"
            }
            .switchIfEmpty(Mono.defer {
                Mono.just("switchIfEmpty is working")
            })
    mono.subscribe(::println)
}
--------------------
출력 결과)
--------------------
switchIfEmpty is working


filter 연산자내에서 전달받은 데이가 Bye로 시작하면 조건은 true지만 전달 받은 문자열이 Hello Reactive World 이므로 이 조건 식은 거짓이 됩니다. 이 경우 바로 이어진 map 연산자가 동작하지 않고 switchIfEmpty 연산자가 동작하여 출력 결과는 switchIfEmpty is working이 됩니다. 이런 형태로 filter의 조건에 따른 분기가 이뤄지는 조합은 매우 빈번하게 사용됩니다.


zip

만약 여러 개의 모노 객체를 하나의 모노로 결합할 경우 zip을 사용해 처리할 수 있습니다. 예제 1.14는 zip을 사용해 3개의 모노를 합친 결과를 출력합니다.

예제 1.14 zip을 사용한 결합 예제1 - reactor/reactor-basic/src/mono/Mono14.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val mono1 = Mono.just("Hello")
    val mono2 = Mono.just("Reactive")
    val mono3 = Mono.just("World")
    Mono.zip(mono1, mono2, mono3)
        .map { tuple: Tuple3<String, String, String> ->
            "${tuple.t1} ${tuple.t2} ${tuple.t3}"
        }.subscribe(::println)
}
--------------------
출력 결과)
--------------------
Hello Reactive World


우선 3개의 모노를 생성한 후 Mono.zip 함수의 인자로 추가합니다.  zip 함수의 처리가 완료되어 새로운 모노를 생성한 뒤 다음 연산자에 통지를 보내는 시점은 인자로 제공된 모노 중 가장 오래 수행된 모노의 시간을 기준으로 결합되는 것이 특징입니다. 이런 특징 때문에 각 모노의 로직을 비동기 처리한 뒤 결과에 대한 결합을 위해 zip을 사용할 수 있습니다.

zip으로 합쳐진 모노 객체는 튜플(Tuple) 객체의 프로퍼티로 저장됩니다. 튜플에 들어가는 순서는 함수에 인자로 추가한 순서대로 t1, t2, t3와 같은 방식으로 사용할 수 있습니다. 튜플은 리액터에서 제공하는 데이터 저장을 위한 구조체의 종류로 비슷한 예로 코틀린의 페어(Pair)와 트리플(Triple)이 있습니다. 튜플은 현재 8개까지 제공되고 있으며 reactor.util.function 패키지를 보면 Tuple2, Tuple3, Tuple4… 과 같이 미리 정의되어 제공됩니다.

public Tuple2<T1, T2> implements Iterable<Object>, Serializable {
    final T1 t1;
    final T2 t2;
    // ...
}
 
public Tuple3<T1, T2, T3> extends Tuple2<T1, T2> {
    final T3, t3;
    // ...
}


기본적으로 Mono.zip 함수에는 최대 8개의 모노를 인자로 받아서 튜플로 사용할 수 있는 오버로딩 함수가 있지만 그 이상의 모노를 결합할 경우 컴비네이터 함수(Combinator Function)을 사용해 합칠 수 있습니다.

예제 1.15는 리스트로 전달받은 모노를 컴비네이터 함수로 결합하는 예제 입니다.

예제 1.15 zip을 사용한 결합 예제2 - reactor/reactor-basic/src/mono/Mono15.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val monoList = arrayListOf(
        Mono.just("Seoul"),
        Mono.just("Tokyo"),
        Mono.just("Washington, D.C"),
        Mono.just("Wellington"),
        Mono.just("Canberra"),
        Mono.just("Paris"),
        Mono.just("Prague"),
        Mono.just("Manila"),
        Mono.just("Doha"),
        Mono.just("Singapore")
    )
 
    Mono.zip(monoList) { array: Array<Any> -> // 컴비네이터 함수
        array.joinToString {
            (it as String).toUpperCase()
        }
    }.subscribe(::println)
}
 
--------------------
출력 결과)
--------------------
SEOUL, TOKYO, WASHINGTON, D.C, WELLINGTON, CANBERRA, PARIS, PRAGUE, MANILA, DOHA, SINGAPORE


우선 Mono.zip 함수의 인자에 리스트에 담긴 모노 객체들을 추가합니다. 그런 다음 컴비네이터 함수에서 데이터를 변환하게 되는데 이때 앞서 제공된 모노가 담긴 리스트는 튜플이 아닌 배열에 담겨서 전달됩니다. 그다음 joinToString 함수 내부에서 배열의 개수만큼 반복하면서 문자열을 대문자로 변환하고 콤마(,)를 구분자로 해서 배열을 문자열로 변환한 후 다음 연산자로 제공합니다.

 

zipWith, zipWhen

zipWith 연산자는 기존 모노 객체에 새로운 모노를 결합합니다. 얼핏 보면 zip 함수와 비슷하지만 zip은 여러 개의 모노를 결합할 수 있는 반면에 zipWith는 한 번에 1개의 모노 객체를 기존 모노에 결합합니다.  예제 1.16은 zipWith를 사용해 모노를 결합하는 예제입니다.

예제 1.16 zipWith을 사용한 결합 예제 - reactor/reactor-basic/src/mono/Mono16.kt

package mono
 
import reactor.core.publisher.Mono
import reactor.util.function.Tuple2
 
fun main() {
    val mono: Mono<Tuple2<String, String>> =
        Mono.just("Republic of Korea")
            .zipWith(Mono.just("Seoul"))
 
    mono.map {
        val country = it.t1
        val capital = it.t2
        "$capital is capital of $country"
    }.subscribe(::println)
}
 
--------------------
출력 결과)
--------------------
Seoul is capital of Republic of Korea

zipWith는 한 번에 1개의 모노를 결합하는 연산자이기 때문에 2개의 모노를 담는 튜플인 Tuple2<T1,T2>를 생성합니다. 그러므로 예제 1.16의 zipWith 연산자는 Mono<Tuple2<String, String>>과 같은 형태를 반환하게 되고 map 연산자 내부에서 전달받은 튜플을 이용해 문자열을 생성합니다. 이때 튜플의 순서는 전달받은 모노의 순서와 동일합니다.

zipWhen 연산자는 zipWith와 마찬가지로 기존 모노 객체에 새로운 모노를 결합하지만 함수의 인자로 모노를 직접 전달 받는 것이 아니라 함수형 인터페이스인 Function을 전달 받습니다.

@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}


예제 1.17은 앞선 1.16 예제에서 zipWith를 사용하는 코드를  zipWhen으로 변경하였습니다.

예제 1.17 zipWith을 사용한 결합 예제 - reactor/reactor-basic/src/mono/Mono17.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val mono =
       Mono.just("Republic of Korea")
             .zipWhen {
                Mono.just("Seoul")
            }
 
    mono.map {
        val country = it.t1
        val capital = it.t2
        "$capital is capital of $country"
    }.subscribe(::println)
}
 
--------------------
출력 결과)
--------------------
Seoul is capital of Republic of Korea


zipWhen도 zipWith와 마찬가지로 한 번에 1개의 모노를 결합하는 연산자이기 때문에 2개의 모노를 담는 튜플인 Tuple2<T1,T2>를 생성합니다. 하지만 인자를 함수 형태로 전달받았기 때문에 내부에서 특정 로직을 구현할 결과를 모노로 변환하는 경우 유용하게 사용할 수 있습니다.

 

block

리액터의 연산자는 게으르게 동작하여 subscribe가 호출되기 전까지 연산자가 동작하지 않는 것이 일반적입니다. 하지만 block을 사용하면 그 즉시 구독을 차단(block) 하고 이전 연산자로부터 수신된 데이터를 가져옵니다. 예제 1.18은 block을 사용해 데이터를 출력합니다.

예제 1.18 block을 사용한 데이터 출력 예제 - reactor/reactor-basic/src/mono/Mono18.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val mono: Mono<String> =
        Mono.just("Welcome Back")
            .zipWhen {
                Mono.just("Synchronous World")
            }.map {
                "${it.t1} to the ${it.t2}"
            }
 
    val text: String? = mono.block()
    println(text)
}
 
--------------------
출력 결과)
--------------------
Welcome Back to the Synchronous World


subscribe를 호출하지 않았지만 block 함수를 호출함으로써 구독이 완료되고 Mono<String>가 아닌 String이 반환되는 것을 확인할 수 있습니다.

block 함수는 논블로킹 스레드 내부에서 사용되면 IllegalStateException이 발생합니다. 예제 1.19는 리액터에서 지원하는 스케줄러를 사용해 단일 논블로킹 스레드를 생성하고 내부에서 block 함수를 호출하여 에러를 발생시키는 예제입니다. 

예제 1.19 논블로킹 스레드 내부에서 block을 사용 예제 - reactor/reactor-basic/src/mono/Mono19.kt

package mono
 
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
 
fun main() {
    Schedulers.newSingle("NonBlockingSingleThread").schedule {
        val mono: Mono<String> =
            Mono.just("Welcome Back")
                .zipWhen {
                    Mono.just("Synchronous World")
                }.map {
                    "${it.t1} to the ${it.t2}"
                }
        val text: String? = mono.block()
        println(text)
    }
}
 
--------------------
출력 결과)
--------------------
java.lang.IllegalStateException: 
block()/blockFirst()/blockLast() are blocking, which is not supported in thread NonBlockingSingleThread-1

 

예제 1.19와 같이 논블로킹 스레드 내부에서 사용되면 경고와 함께 에러가 발생합니다. 일반적으로 block 함수를 사용하는 상황은 리액터의 비동기, 논블로킹 처리의 이점을 사용하지 않는다는 의미이기 때문에 한정적인 상황에서 사용됩니다.

 

728x90
반응형

+ Recent posts