Hot vs Cold
리액터에는 두 개의 발행자 타입이 존재하는데 바로 Hot 발행자와 Cold 발행자입니다. 먼저 Cold 발행자는 각각의 구독에 대해 새로운 데이터를 생성하는 구조입니다. 발행자에 대한 구독이 이뤄지지 않으면 데이터는 결코 새롭게 생성되지 않습니다. Cold 발행자에 대한 예를 들어보면 HTTP 요청이 Cold에 해당합니다. HTTP 요청은 클라이언트로부터 새로운 요청이 발생할 때마다 데이터를 생성하여 응답하는데 이때 여러 개의 클라이언트에서 요청이 발생하더라도 동일한 요청이라면 동일한 데이터 구조를 새로 응답할 것입니다. 또 다른 예로 넷플릭스에서 여러 사용자가 동일한 드라마 회차를 감상하는 경우 사용자들은 자신만의 타임라인을 가집니다. 즉 사용자별로 드라마를 처음 재생하면 처음부터 재생되고 재생을 시작한 시점부터 현재 보고 있는 시점이 각각 다른데 이러한 것들을 Cold 방식으로 볼 수 있습니다.
예제 1.36은 동일한 플럭스를 3개의 구독자가 구독하여도 동일한 데이터를 통지하는 Cold 발행자의 특징을 보여줍니다.
예제 1.36 Cold 발행자 예제 - reactor/reactor-basic/src/hotcold/Cold.kt
package hotcold
import reactor.core.publisher.Flux
fun main() {
val list = listOf("one", "two", "three")
val flux: Flux<String> = Flux.fromIterable(list)
flux.subscribe { println("Subscriber-1 : $it") }
flux.subscribe { println("Subscriber-2 : $it") }
flux.subscribe { println("Subscriber-3 : $it") }
}
--------------------
출력 결과)
--------------------
Subscriber-1 : one
Subscriber-1 : two
Subscriber-1 : three
Subscriber-2 : one
Subscriber-2 : two
Subscriber-2 : three
Subscriber-3 : one
Subscriber-3 : two
Subscriber-3 : three
출력된 결과를 확인해보면 구독자가 각각 동일한 데이터를 통지받은 것을 확인할 수 있습니다. 즉 각각의 구독자가 새로운 구독을 만들었다는 것을 확인할 수 있습니다.
Hot 발행자는 구독자의 상태에 상관없이 데이터를 지속적으로 제공합니다. 예를 들어 주식 데이터를 제공하는 API의 경우 실시간으로 최신 데이터를 제공합니다. 국내 주식의 경우 정규장으로 오전 9시부터 시작됩니다. 만약 출근 후에 주식 정보를 보고 싶어서 네이버에 접속해 오전 10시에 주식 정보를 확인한다면 정규장이 시작된 오전 9시의 가격이 아닌 오전 10시의 주식가격을 확인하게 됩니다. 또 다른 예로 유투브의 실시간 스트리밍을 들 수 있습니다. 유저가 구독 중인 크리에이터의 실시간 스트리밍을 1시간이 지난 뒤 입장하면 당연히 처음부터 보는 게 아니라 실시간으로 크리에이터와 상호작용하게 됩니다. 이렇게 나중에 구독에 참가한 구독자는 처음부터 새로운 데이터를 얻는 게 아닌 새로이 구독한 시점부터 데이터를 제공받는 개념을 Hot 방식이라고 합니다.
예제 1.37은 Hot 발행자인 커넥터블 플럭스(ConnectableFlux)를 사용하여 각각의 구독자를 구독한 시점부터 데이터를 통지받도록 연결하여 출력하는 예제입니다.
예제 1.37 Hot 발행자 예제 - reactor/reactor-basic/src/hotcold/Hot.kt
package hotcold
import reactor.core.publisher.ConnectableFlux
import reactor.core.publisher.Flux
import java.time.Duration
fun main() {
val flux: ConnectableFlux<Int> =
Flux.range(1, 5)
.delayElements(Duration.ofMillis(100))
.publish()
flux.connect()
flux.subscribe { println("Subscriber-1 : $it") }
Thread.sleep(Duration.ofMillis(200).toMillis())
flux.subscribe { println("Subscriber-2 : $it") }
Thread.sleep(Duration.ofMillis(200).toMillis())
flux.subscribe { println("Subscriber-3 : $it") }
Thread.sleep(Duration.ofSeconds(2).toMillis())
}
--------------------
출력 결과)
--------------------
Subscriber-1 : 1
Subscriber-1 : 2
Subscriber-2 : 2
Subscriber-1 : 3
Subscriber-2 : 3
Subscriber-1 : 4
Subscriber-2 : 4
Subscriber-3 : 4
Subscriber-1 : 5
Subscriber-2 : 5
Subscriber-3 : 5
- range(1, 5)를 사용해 1부터 5까지의 플럭스를 생성하고 delayElements(Duration.ofMillis(100)) 함수를 사용해 각각의 데이터를 100밀리초 지연 후 publish로 데이터를 통지합니다.
- 만약 flux.connect()가 없으면 발행자를 구독을 하더라도 아무 일도 일어나지 않습니다.
- 3개의 구독자인 flux.subscribe{}는 다음 구독자로 넘어가는 시점을 200밀리초 뒤로 미루고 현재 시점에서 통지 받은 데이터를 출력합니다.
출력 결과를 확인해보면 Subscriber-1은 1부터 5까지의 데이터를 모두 전달받았지만 Subscriber-2와 Subscriber-3은 지연된 만큼 데이터를 전달받지 못한 것을 확인할 수 있습니다.
앞서 커넥터블 플럭스를 사용해 Hot 발행자의 동작 방식에 대해 설명했습니다. 커넥터블 플럭스는 대표적인 Hot 발행자로 하나의 데이터를 생성하는 발행자에 여러 구독자가 동시에 구독하는 것을 허용합니다. 특징은 일반적인 Cold 방식의 발행자와 다른 점은 구독자가 구독을 위해 subscribe를 호출하더라도 데이터를 통지하지 않고 connect 연산자를 호출해야 비로소 데이터를 통지합니다.
Cold 발행자인 플럭스를 커넥터블 플럭스로 변환하는 방법에는 2가지가 있습니다. 표 1.1은 커넥터블 플럭스로 변환하기 위한 연산자에 대해 설명합니다.
표 1.1 플럭스를 커넥터블 플럭스로 변환하는 연산자
함수 |
설명 |
publish |
구독자가 연결된 시점 부터 새롭게 생성되는 데이터를 통지하는 커넥터블 플럭스를 제공하는 연산자입니다. |
replay |
인자로 지정한 만큼의 데이터를 캐시하고 있으면서 새롭게 구독자가 연결되어 구독을 시작하는 경우 시존에 캐시 된 데이터를 먼저 통지하고 캐시 된 데이터 데이터가 없다면 구독 이후로 생성된 데이터를 통지하는 연산자입니다. |
생성된 커넥터블 플럭스는 connect와 같이 구독에 대한 추가적인 처리를 위한 방법도 제공합니다. 표 1.2는 커넥터블 플럭스가 제공하는 연산자에 대해 설명합니다.
표 1.2 커넥터블 플럭스가 제공하는 연산자
함수 |
설명 |
connect |
커넥터블 플럭스에 대한 구독을 활성화하기 위해 수동으로 호출하여 사용합니다. |
autoConnect |
함수에 지정된 인자의 개수만큼 구독이 연결되면 자동으로 데이터를 통지합니다. |
refCount |
커넥터블 플럭스를 새로운 플럭스로 변환하고 함수에 지정된 인자의 구독자 개수만큼 연결된 구독자가 있을 경우에만 데이터를 처음부터 통지하고 새롭게 연결된 구독자에게는 구독 이후로 생성된 데이터를 통지합니다. |
백프레셔
백프레셔(Back-Pressure)는 리액티브 스트림의 핵심 사양 중 하나입니다. 백프레셔는 데이터를 소비하는 측인 구독자가 자체적으로 처리 가능한 만큼의 데이터를 발행자에게 역으로 요청(request)합니다. 데이터를 소비하는 속도 즉 처리 속도보다 데이터를 생성하고 통지하는 속도가 빠르다면 구독자 입장에선 압박(pressure)이 될 수 있습니다. 백프레셔는 이런 상황에서 구독자가 처리 가능한 만큼 데이터를 통지할 수 있도록 조절해주는 기능입니다.
서비스를 예를 들어 동시 접속자가 최대 100명인 시스템에 1000명이 동시에 접속한다면 장애가 발생할 수 있습니다. 이런 경우 인프라를 증설하여 처리해야 하지만 예상치 못한 동시 접속자 증가는 대응하기가 쉽지 않습니다. 이럴 때 서버 앞단에서 요청에 대한 요청 제한(Rate Limiting)을 적용해 HTTP 상태 코드 429 Too Many Requests를 반환하여 처리하면 최악의 상황인 서비스 장애는 방지할 수 있습니다.
예제 1.38은 Flux.range를 사용하여 1부터 10까지 순차적으로 데이터를 통지하고 구독자(Subscriber)를 직접 구현하여 요청을 조절하는 간단한 백프레셔를 구현하는 예제입니다.
예제 1.38 백프레셔 예제 - reactor/reactor-basic/src/backpressure/Backpressure.kt
package backpressure
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import reactor.core.publisher.Flux
fun main() {
Flux.range(1, 10)
.log()
.subscribe(object : Subscriber<Int?> {
private var s: Subscription? = null
private var count = 0
override fun onSubscribe(s: Subscription) {
println("onSubscribe : 구독 시작")
println("==================")
this.s = s
s.request(5)
}
override fun onNext(integer: Int?) {
count++
if (count % 5 == 0) {
println("==================")
s!!.request(5)
}
}
override fun onError(t: Throwable) {}
override fun onComplete() {
println("==================")
println("onComplete : 구독 완료")
println("통지된 numbers 개수 : $count")
}
})
}
--------------------
출력 결과)
--------------------
onSubscribe : 구독 시작
==================
[ INFO] (main) | request(5)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onNext(5)
==================
[ INFO] (main) | request(5)
[ INFO] (main) | onNext(6)
[ INFO] (main) | onNext(7)
[ INFO] (main) | onNext(8)
[ INFO] (main) | onNext(9)
[ INFO] (main) | onNext(10)
==================
[ INFO] (main) | request(5)
[ INFO] (main) | onComplete()
==================
onComplete : 구독 완료
최종 통지된 numbers 개수 : 10
subscribe 함수의 인자에는 리액티브 스트림의 구독자 인터페이스인 Subscriber를 직접 구현하였습니다. 우선 구독이 시작되면서 최초에 한번 실행되는 onSubscribe 함수에서는 Subscription을 초기화하고 s.request(5)를 호출해서 발행자에게 request에 전달받은 만큼의 데이터를 통지하도록 요청합니다. 그다음 호출되는 onNext는 발행자가 생성한 데이터를 담아 통지 신호가 발생할 때마다 호출됩니다. onNext 내부에는 데이터 통지 횟수를 count 변수에 기록하고 count가 5의 배수인 경우에 request를 호출해서 발행자에게 데이터 통지를 요청합니다. 마지막으로 onComplete는 구독에 대한 통지가 완료되었을 때 동작합니다.
출력 결과를 확인해보면 request(5)로 요청한 대로 onNext(1).. onNext(5)까지 순서대로 통지된 후 다음엔 onNext(6).. onNext(10)까지 차례로 통지된 것을 알 수 있습니다. 또한 더이상 통지할 데이터가 없는 경우에는 request(5)를 요청했지만 onNext가 호출되지 않고 onComplete 시그널이 호출되며 구독이 종료되었습니다.
리액터로 작성한 코드를 테스트하기
자동화된 테스트를 작성하는 것은 프로그램의 잠재된 버그의 발견을 쉽게 하고 코드의 품질을 높여줍니다. 그러나 비동기-논 블로킹 형태의 코드는 테스트가 번거롭고 예측하기 어렵습니다. 예를 들어 여러 스레드를 동시에 생성한 후 각각 다른 작업을 한 후 합친 결과에 대해 테스트를 하거나 시간차로 동작하는 코드는 자동화된 테스트를 어렵게 하는 요인으로 알려져 있지만 리액터는 자동화된 테스트를 위한 여러 가지 도구로써 reactor-test라는 라이브러리를 제공합니다. 표 1.3은 reactor-test에서 제공하는 핵심 3가지 패키지에 대한 설명입니다.
표 1.3 reactor-test가 제공하는 패키지
함수 |
설명 |
reactor.test |
발행자에 대한 검증 및 테스트를 위한 메인 테스트 컴포넌트 패키지 |
reactor.test.publisher |
테스트 가능한 발행자의 생성을 위한 컴포넌트 패키지 |
reactor.test.scheduler |
테스트 가능한 스케쥴러의 생성을 위한 컴포넌트 패키지 |
StepVerifier
리액터 테스트 패키지에는 StepVerifier라는 리액티브 스트림의 각 단계를 테스트하기 위한 API를 제공합니다. StepVerifier를 사용하면 데이터 통지와 구독에 대한 조건이 예상한 결과를 만족하는지를 판단하는 테스트 시나리오를 작성할 수 있으므로 편리합니다. 또한 다음에 예상되는 동작과 주어진 시간 동안 딜레이를 주는 등의 다양한 테스트를 가능하게 합니다. StepVerifier는 빌더 패턴 형태로 제공되는데 빌더 패턴을 적용한 함수의 반환값은 this 즉 자기 자신이기 때문에 함수를 연쇄적으로 호출할 수 있다는 장점이 있습니다. 이런 기법을 메소드 체이닝(Method Chaining)이라고 부릅니다.
expectNext
리액터가 제공하는 테스트 도구를 사용하기 위해 우선 코틀린 그레이들 프로젝트를 생성합니다. 정상적으로 생성되었다면 아래와 같은 구조로 프로젝트가 생성됩니다.
├── build.gradle.kts
├── gradlew
├── settings.gradle.kts
└── src
├── main
│ ├── kotlin
│ └── resources
└── test
├── kotlin
└── resources
프로젝트가 정상적으로 생성되었다면 build.gradle.kts를 열어 예제 1.39와 같이 작성합니다.
예제 1.39 build.gradle.kts - reactor/reactor-test/build.gradle.kts
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
kotlin("jvm") version "1.4.21"
}
group = "com.reactor.reactortest"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains.kotlin:kotlin-stdlib")
implementation("io.projectreactor:reactor-core:3.4.0")
testImplementation(kotlin("test-junit5"))
testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.0")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.0")
testImplementation("io.projectreactor:reactor-test:3.4.0")
}
tasks.test {
useJUnitPlatform()
}
tasks.withType<KotlinCompile>() {
kotlinOptions.jvmTarget = "11"
}
예제에서는 리액터 테스트를 위한 reactor-test 라이브러리와 유닛 테스트 프레임워크인 JUnit5를 의존성에 추가했습니다. 그다음 test/kotlin 디렉토리 하위에 ReactorTest1 클래스를 생성한뒤 예제 1.40과 같이 첫번째 테스트 코드를 작성합니다.
예제 1.40 첫번째 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest1.kt
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
class ReactorTest1 {
@Test
fun `짝수로 된 데이터만 통지해야 한다`() {
val flux = Flux.just(1, 2, 3, 4)
// rem은 나머지를 구하는 %와 같다.
.filter { num -> num.rem(2) == 0 }
StepVerifier
.create(flux)
.expectNext(2)
.expectNext(4)
.expectComplete()
.verify()
}
}
예제 1.40는 Flux.just의 인자에 1부터 4까지 지정하고 filter 연산자에서 통지된 정수 값을 2로 나눈 나머지가 0인 데이터만 필터링하므로 짝수인 2, 4만 통지됩니다. 그다음 테스트의 핵심인 StepVerifier는 첫 단계로 create(publisher) 연산자를 사용해 초기화합니다. StepVerifier가 초기화되면 expect로 시작하는 연산자를 사용해 단계별로 검증이 가능합니다. 가장 먼저 사용된 expectNext는 onNext로 통지받은 데이터가 인자로 지정된 값과 일치하는지를 검증합니다. 더 이상 검증할 값이 없다면 expectComplete 연산자로 OnComplete 신호를 통지합니다. 그다음 최종적으로 verify 함수를 호출하면 테스트 시나리오에 대한 검증이 완료됩니다. 검증이 정상이라면 테스트를 통과할 것이고 검증이 잘못됬다면 테스트가 실패할 것 입니다.
recordWith, expectNextCount, expectRecordedMatches
예제 1.41은 여러개의 과일을 가진 플럭스를 생성하고 과일의 개수와 특정한 과일이 존재하는지 검증하는 예제입니다.
예제 1.41 reocrdWith 관련 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest2.kt
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
import java.util.ArrayList
class ReactorTest2 {
@Test
fun `사과가 들어있는지 확인`() {
val flux: Flux<String> = Flux.just("사과", "바나나", "딸기")
StepVerifier
.create(flux)
.recordWith { ArrayList() }
.expectNextCount(3)
.expectRecordedMatches { fruits ->
fruits.contains("사과")
}
.expectComplete()
.verify()
}
}
먼저 StepVerifier를 create로 초기화 후 recordWith{ ArrayList() }를 사용했습니다. recordWith는 create로 초기화된 발행자가 통지하는 onNext 값을 인자로 제공받은 컬렉션에 기록합니다. recordWith로 기록된 데이터는 내부에 컬렉터로 불리는 내부 큐에 보관되며 expectRecordedMatches와 consumeRecordedWith 같은 테스트 연산자를 사용하기 위해 필수로 호출돼야 합니다.
다음 스텝으로 expectNextCount는 통지된 데이터의 개수가 인자로 지정된 개수와 일치하는지 검증합니다. 복수의 데이터가 통지되는 경우 유용하게 사용할 수 있는 연산자입니다. 그다음 expectRecordedMatches는 인자로 함수형 인터페이스인 Predicate를 전달받습니다. Predicate 내부의 test 함수는 Boolean을 리턴하기 때문에 전달받은 조건이 true인 경우에 테스트가 통과합니다. 예제에서는 fruits로 제공받은 리스트 안에 ‘사과’가 존재하므로 이 경우에는 true을 반환하므로 테스트를 통과하게 됩니다.
expectError, expectErrorMessage, expectErrorSatisfies
테스트 케이스를 작성할 때 특정 조건에서 의도한 에러가 발생하는지 검증하는 것도 중요한 테스트입니다. 애플리케이션이 운영 중인 상황에서 의도치 않은 에러가 발생하여 시스템에 장애가 발생하게 될 경우 간단한 코드 상의 에러라면 빠르게 처리하여 핫픽스 배포를 진행할 수 있지만 돈에 관련된 문제라든지 데이터의 일관성에 부정확을 초래하는 에러인 경우 잠깐 사이에도 큰 장애가 될 수 있습니다. 이런 경우를 방지하기 위해 특정한 조건에서 의도한 에러를 발생시키면 개발자가 에러를 핸들링할 수 있기 때문에 에러 검증을 위한 테스트 케이스는 매우 중요합니다.
예제 1.42는 기존에 존재하는 플럭스에 임의의 에러 플럭스를 합친 다음 정상적인 데이터가 통지된 이후 onError 신호가 발생하는지 검증하는 테스트입니다.
예제 1.42 expectError 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest3.kt
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
class ReactorTest3 {
@Test
fun `IllegalArgumentException 에러가 발생해야 한다`() {
val error = IllegalArgumentException("error")
val errorFlux = Flux.error<String>(error)
val source = Flux.just("success-1", "success-2")
.concatWith(errorFlux)
// 종류에 상관없이 에러가 발생하면 통과
StepVerifier.create(source)
.expectNext("success-1")
.expectNext("success-2")
.expectError()
.verify()
// 인자로 지정한 에러가 발생할 때만 통과
StepVerifier.create(source)
.expectNext("success-1")
.expectNext("success-2")
.expectError(IllegalArgumentException::class.java)
.verify()
}
}
expectError는 현재 구독하는 발행자가 에러를 통지하는지 검증합니다. 리액티브 스트림에는 서브스크라이버의 onError에 대한 사양이 있기 때문에 내부적으로 onError 신호가 발생하는지를 판단하여 통과여부를 판단하게됩니다. 예제에서 먼저 사용한 인자 없는 expectError 연산자는 에러의 종류에 상관없이 onError 신호가 발생하면 테스트를 통과하게되고 상세한 에러를 인자로 받는 expectError(Throwable)는 지정한 에러가 발생하는 경우에만 테스트를 통과합니다.
모든 에러의 최상위 부모인 Throwable은 기본 생성자 외에도 사용자 정의 에러 메시지를 인자로 받는 Throwable(message)을 제공합니다. 이러한 사용자 정의 에러 메시지를 검증하고 싶은 경우 expectErrorMessage 연산자를 사용해 테스트할 수 있습니다. 예제 1.43에서는 expectErrorMessage 연산자를 사용해 사용자 정의 에러 메시지에 대한 테스트 방법을 확인할 수 있습니다.
예제 1.43 expectErrorMessage 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest4.kt
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
class ReactorTest4 {
@Test
fun `An Error Occurred 에러 메시지가 발생해야 한다`() {
val error = IllegalArgumentException("An error occurred")
val errorFlux = Flux.error<String>(error)
val source = Flux.just("success-1", "success-2")
.concatWith(errorFlux)
StepVerifier.create(source)
.expectNext("success-1")
.expectNext("success-2")
.expectErrorMessage("An error occurred")
.verify()
}
}
IllegalArgumentException의 인자로 “An error occurred” 에러 메시지를 지정하고 에러를 통지하면 expectErrorMessage는 내부적으로 통지받은 에러 플럭스를 Exception의 최상위 타입인 Throwable로 변환한 뒤 메시지를 비교하고 기대하는 결과가 동일하다면 테스트를 통과시킵니다.
마지막으로 expectErrorSatisfies 함수는 인자로 Consumer<Throwable>을 받아서 개발자가 에러에 대한 검증을 직접 구현할 수 있습니다. 인자로 받는 Consumer 역시 함수형 인터페이스의 하나로 반환 값 없이 인자로 전달된 함수를 단순히 실행하는 함수형 인터페이스입니다.
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
}
예제 1.44는 expectErrorSatisfies에 전달되는 람다 함수로 JUnit5의 Assertions API를 사용해 통지된 에러의 타입을 assertTrue로 우선 체크 한 후 에러 메시지가 기대한 결과와 일치하는지를 assertEquals로 검증하는 코드입니다.
예제 1.44 expectErrorMessage 테스트 - reactor/reactor-test/src/test/kotlin/ReactorTest5.kt
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
class ReactorTest5 {
@Test
fun `An Error Occurred 에러 메시지가 발생해야 한다`() {
val error = IllegalArgumentException("An error occurred")
val errorFlux = Flux.error<String>(error)
val source = Flux.just("success-1", "success-2")
.concatWith(errorFlux)
StepVerifier.create(source)
.expectNext("success-1")
.expectNext("success-2")
.expectErrorSatisfies {
assertTrue(it is IllegalArgumentException)
assertEquals("An error occurred", it.message)
}
.verify()
}
}
발생할 에러의 타입이 다양하여 분기 처리가 필요하거나 상세한 검증 로직이 필요한 경우 expectErrorSatisfies를 사용하면 쉽게 구현이 가능합니다.
verify로 시작하는 편의 함수
앞서 알아본 예제의 경우 코드의 마지막에 최종적으로 verify를 호출하여 테스트를 실행하였는데 편의 함수(Convenience Function)를 사용하면 조금 더 간결하게 코드를 작성할 수 있습니다. StepVerifier는 verify로 시작하는 이름의 편의 함수를 제공합니다.
먼저 예제 1.41에서 작성한 테스트 코드의 마지막 부분인 expectComplete.verify()입니다. 예제 1.45와 같이 편의 함수인 verifyComplete를 사용해서 한 번의 함수 호출로 코드를 재작성 할 수 있습니다.
예제 1.45 verifyComplete 예제 - reactor/reactor-test/src/test/kotlin/ReactorTest6.kt
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
import java.util.*
class ReactorTest6 {
@Test
fun `사과가 들어있는지 확인`() {
val flux: Flux<String> = Flux.just("사과", "바나나", "딸기")
StepVerifier
.create(flux)
.recordWith { ArrayList() }
.expectNextCount(3)
.expectRecordedMatches { fruits ->
fruits.contains("사과")
}
// .expectComplete()
// .verify()
.verifyComplete()
}
}
예제 1.42의 expectError.verify()와 expectError(Throwable).verify()는 예제 1.46와 같이 verifyError와 verifyError(Throwable)을 사용해 코드를 재작성 할 수 있습니다.
예제 1.46 verifyError 예제 - reactor/reactor-test/src/test/kotlin/ReactorTest7.kt
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
class ReactorTest7 {
@Test
fun `IllegalArgumentException 에러가 발생해야 한다`() {
val error = IllegalArgumentException("error")
val errorFlux = Flux.error<String>(error)
val source = Flux.just("success-1", "success-2")
.concatWith(errorFlux)
// 종류에 상관없이 에러가 발생하면 통과
StepVerifier.create(source)
.expectNext("success-1")
.expectNext("success-2")
// .expectError()
// .verify()
.verifyError()
// 인자로 지정한 에러가 발생할 때만 통과
StepVerifier.create(source)
.expectNext("success-1")
.expectNext("success-2")
// .expectError(IllegalArgumentException::class.java)
// .verify()
.verifyError(IllegalArgumentException::class.java)
}
}
예제 1.43의 expectErrorMessage.verify()는 예제 1.47과 같이 편의 함수인 verifyErrorMessage(message)를 사용해서 한 번의 함수 호출로 코드를 재작성 할 수 있습니다.
예제 1.47 verifyErrorMessage 예제 - reactor/reactor-test/src/test/kotlin/ReactorTest8.kt
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
class ReactorTest8 {
@Test
fun `An Error Occurred 에러 메시지가 발생해야 한다`() {
val error = IllegalArgumentException("An error occurred")
val errorFlux = Flux.error<String>(error)
val source = Flux.just("success-1", "success-2")
.concatWith(errorFlux)
StepVerifier.create(source)
.expectNext("success-1")
.expectNext("success-2")
// .expectErrorMessage("An error occurred")
// .verify()
.verifyErrorMessage("An error occurred")
}
}
마지막으로 예제 1.44의 expectErrorSatisfies.verify()입니다. 예제 1.48과 같이 편의 함수인 verifyErrorMessage를 사용해서 한 번의 함수 호출로 코드를 재작성 할 수 있습니다.
예제 1.48 verifyErrorSatisfies 예제 - reactor/reactor-test/src/test/kotlin/ReactorTest9.kt
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
class ReactorTest9 {
@Test
fun `An Error Occurred 에러 메시지가 발생해야 한다`() {
val error = IllegalArgumentException("An error occurred")
val errorFlux = Flux.error<String>(error)
val source = Flux.just("success-1", "success-2")
.concatWith(errorFlux)
StepVerifier.create(source)
.expectNext("success-1")
.expectNext("success-2")
// .expectErrorSatisfies {
// assertTrue(it is IllegalArgumentException)
// assertEquals("An error occurred", it.message)
// }
// .verify()
.verifyErrorSatisfies {
assertTrue(it is IllegalArgumentException)
assertEquals("An error occurred", it.message)
}
}
}
이렇게 verify로 시작하는 편의 함수를 사용하면 마지막에 호출되는 expect 테스트 함수를 줄일 수 있으므로 조금 더 간결하고 편리하게 코드를 작성할 수 있습니다.
부수 효과 적용
부수 효과(Side Effect)는 핵심 로직이 아닌 부가적으로 동작하는 별개의 기능을 추가하는 것을 말합니다. 리액터는 기존에 작성한 연산자를 수정하지 않고 부수 효과를 지원하는 연산자를 통해 로직을 특정한 신호와 함께 동작하도록 추가하거나 통지되는 데이터를 들여다보기 위한 방법을 제공합니다. 이러한 연산자는 대부분 ‘do’라는 접두어를 사용합니다. 주로 사용되는 연산자는 ‘doOn’으로 시작하는 연산자로 특정 신호(onSubscribe, onNext 등)가 동작하면 콜백 형태로 동작하므로 각각의 신호가 발생할 때마다 쉽게 부수 효과를 적용할 수 있습니다. 예를 들어 데이터를 변환하거나 필터링 후 데이터가 어떻게 통지되는지 로그로 출력하는 등 핵심 로직과는 별개의 작업이 필요한 경우 유용하게 사용될 수 있을 것입니다. 표 1.4는 주로 사용되는 doOn으로 시작하는 연산자들을 소개합니다.
표 1.4 주로 사용되는 doOn으로 시작하는 연산자
함수 |
설명 |
doOnSubscribe |
구독을 시작하고 onSubscribe 신호가 발생하면 호출 |
doOnNext |
onNext로 데이터를 통지한뒤 호출 |
doOnComplete |
onComplete 신호가 발생하기 직전에 호출. 플럭스 전용 연산자 |
doOnError |
onError 신호가 발생하면 호출 |
이외에도 더 많은 부수 효과를 위한 연산자들은 리액터 공식 문서에서 확인할 수 있습니다.
doOnSubscribe
doOnSubscribe는 구독자의 onSubscribe가 호출되면 이어서 동작하는 연산자입니다. onSubscribe는 구독이 일어나면 최초 1회만 호출되는 것이 특징입니다. doOnSubscribe은 함수형 인터페이스인 Consumer를 제공받아서 반환 값 없이 함수 내부의 코드만 실행하는 것이 특징입니다. 개발자는 Consumer를 작성하면 인자로 서브스크립션(Subscription)을 제공받으므로 현재 구독에 대한 부수 효과도 같이 처리할 수 있습니다.
예제 1.49는 “red”와 “blue”라는 문자열을 가진 플럭스를 생성하고 log 연산자를 사용해 내부적으로 어떤 동작이 일어나는지 로그로 출력하고 전달받은 문자열을 대문자로 변환한 뒤 각각 출력합니다.
예제 1.49 doOnSubscribe 예제 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnSub1.kt
package sideeffect
import reactor.core.publisher.Flux
fun main() {
Flux.just("red", "blue")
.log()
.map {
it.toUpperCase()
}
.subscribe(::println)
}
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(red)
RED
[ INFO] (main) | onNext(blue)
BLUE
[ INFO] (main) | onComplete()
출력 결과를 보면 [ INFO]로 시작하는 로그를 확인할 수 있는데 이것들은 모두 log() 연산자를 사용하였기 때문에 내부 동작이 출력되는 것입니다. 실무에서는 디버깅 용도를 제외하면 성능을 하락시킬 수 있기 때문에 유의하여 사용해야 합니다. 로그 내용을 보면 최초 onSubscribe가 호출된 후 request(unbounded)가 호출된 것을 알 수 있습니다. unbounded는 즉 Long.MAX_VALUE 값이기 때문에 무한대의 값을 요청한 것과 마찬가지입니다. request 이후에는 onNext(red)로 데이터가 통지되고 subscribe(::println)에서 ‘RED’를 출력합니다. 그다음 onNext(blue)로 두 번째 데이터가 다시 통지되고 ‘BLUE’가 다시 subscribe(::println)에 의해 콘솔에 출력됩니다. 최종적으로 구독이 완료되어 onComplete 신호가 호출된 것을 확인할 수 있습니다.
예제 1.50은 doOnSubscribe를 적용하면 어떻게 동작하는지 알아보기 위해 예제 1.49에 doOnSubscribe를 추가하고 몇 가지 부수 효과를 적용한 예제입니다. 먼저 부수 효과를 주기위해 적용한 doOnSubscribe 내부에선 인자로 전달받은 서브스크립션을 조작합니다. 우선 request(1)을 호출하여 onNext로 통지 될 데이터를 1개씩으로 제한합니다. 그런다음 cancel을 호출하여 구독을 취소한뒤 부수 효과가 제대로 적용됬는지 확인하기 위해 “doOnSubscribe is called”를 콘솔에 출력합니다.
예제 1.50 doOnSubscribe 예제2 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnSub2.kt
package sideeffect
import reactor.core.publisher.Flux
fun main() {
Flux.just("red", "blue")
.log()
.doOnSubscribe { subscription ->
subscription.request(1)
subscription.cancel()
println("doOnSubscribe is called")
}
.map {
it.toUpperCase()
}
.subscribe(::println)
}
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(red)
RED
[ INFO] (main) | cancel()
doOnSubscribe is called
[ INFO] (main) | request(unbounded)
출력 결과를 확인해보면 최초 onSubscribe가 호출되고 이후 request(1)이 출력되었습니다. 예제에서 subscription.request(1)을 호출했기 때문에 데이터 통지가 1로 제한된 것입니다. 그다음엔 onNext(red)로 데이터가 통지되고 ‘RED’가 출력됩니다. 그다음 이전 예제에서는 ‘BLUE’가 출력됐지만 이번 예제에서는 cancel()이 호출되고 저희가 작성한 ‘doOnSubscribe is called’가 출력되었습니다. 즉 subscription.cancel()을 호출하여 구독을 취소했기 때문에 처음에 request(1)로 요청한 데이터는 정상 통지되고 두 번째로 데이터를 통지하려는 시점에서는 이미 구독이 취소되어 데이터가 출력되지 않은 것입니다.
doOnNext
doOnNext는 데이터가 통지되는 시점인 onNext가 호출된 후 동작하는 연산자입니다. doOnNext는 onNext로 통지된 값을 인자로 전달받습니다. 그러므로 통지된 값을 확인하거나 통지된 값을 가지고 별도의 부수 효과를 적용할 수 있으므로 유용하게 사용될 수 있습니다.
예제 1.51은 1부터 5까지 데이터를 통지하는 플럭스를 생성하고 doOnNext에서 통지된 값을 확인할 수 있도록 출력하는 예제입니다.
예제 1.51 doOnNext 예제1 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnNext1.kt
package sideeffect
import reactor.core.publisher.Flux
fun main() {
Flux.range(1, 5)
.log()
.doOnNext {
println("통지된 값 : $it")
}
.subscribe()
}
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
통지된 값 : 1
[ INFO] (main) | onNext(2)
통지된 값 : 2
[ INFO] (main) | onNext(3)
통지된 값 : 3
[ INFO] (main) | onNext(4)
통지된 값 : 4
[ INFO] (main) | onNext(5)
통지된 값 : 5
[ INFO] (main) | onComplete()
출력 결과를 보면 Flux.range에 지정된 값의 순서대로 onNext로 데이터 통지 신호가 발생하고 각각의 값이 doOnNext로 전달되는 것을 확인할 수 있습니다.
예제 1.52에서는 AtomicInteger 클래스를 사용해 onNext 신호가 발생할 때마다 데이터 통지를 기록할 카운터를 생성하고 카운트한 뒤 출력하는 예제입니다.
예제 1.52 doOnNext 예제2 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnNext2.kt
package sideeffect
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicInteger
fun main() {
val counter = AtomicInteger()
Flux.range(1, 5)
.log()
.doOnNext {
counter.incrementAndGet()
println("카운터 증가")
}
.subscribe()
println("총 개수 : ${counter.get()}")
}
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
카운터 증가
[ INFO] (main) | onNext(2)
카운터 증가
[ INFO] (main) | onNext(3)
카운터 증가
[ INFO] (main) | onNext(4)
카운터 증가
[ INFO] (main) | onNext(5)
카운터 증가
[ INFO] (main) | onComplete()
총 개수 : 5
출력 결과를 보면 onNext 신호가 발생할때마다 “카운터 증가" 메시지가 콘솔에 출력되는 것을 확인할 수 있습니다. 총 5번의 카운터가 증가되고 최종적으로 “총 개수 :5”가 출력됩니다. doOnNext를 사용하면 각각 데이터 통지 마다 이러한 부수 효과를 적용할 수 있습니다.
doOnComplete
doOnComplete는 통지가 성공적으로 완료되는 시점에 동작하는 연산자로 인자로 함수형 인터페이스인 Runnable을 전달 받아서 동작하는 것이 특징입니다. 또한 doOnComplete는 플럭스만 제공하는 연산자로 모노의 경우 비슷한 동작을 하는 doOnSuccess가 있습니다.
예제 1.53은 A부터 C를 통지하는 플럭스를 생성하고 데이터 통지가 완료되면 doOnComplete 내부에서 AtomicBoolean을 사용해 성공 여부를 true로 변경한 후 데이터 통지 성공 여부를 출력합니다.
예제 1.53 doOnComplete 예제1 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnComp1.kt
package sideeffect
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicBoolean
fun main() {
val completed = AtomicBoolean(false)
Flux.just("A", "B", "C")
.log()
.doOnComplete {
completed.set(true)
}
.subscribe()
if (completed.get()) {
println("데이터 통지 완료")
} else {
println("데이터 통지 실패")
}
}
--------------------
출력 결과)
--------------------
[ INFO] (main) | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(A)
[ INFO] (main) | onNext(B)
[ INFO] (main) | onNext(C)
doOnComplete 동작
[ INFO] (main) | onComplete()
데이터 통지 성공
출력 결과를 보면 우선 onNext로 A, B, C가 순차적으로 통지됩니다. 그다음 doOnComplete에 작성한 “doOnComplete 동작”이 출력되고 onComplete 신호가 발생하고 최종적으로 “데이터 통지 완료" 메시지가 콘솔에 출력됩니다. 그러므로 doOnComplete는 onComplete 신호 직전에 호출된다는 것을 확인할 수 있습니다.
이번엔 에러가 통지될 경우 doOnComplete가 제대로 동작하는지 알아보기 위해 예제 1.54에서는 기존의 플럭스에 concatWith로 Flux.error(IllegalArgumentException())을 추가한 뒤 실행해보겠습니다.
예제 1.54 doOnComplete 예제2 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnComp2.kt
package sideeffect
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicBoolean
fun main() {
val completed = AtomicBoolean(false)
Flux.just("A", "B", "C")
.concatWith(Flux.error(IllegalArgumentException()))
.log()
.doOnComplete {
println("doOnComplete 동작")
completed.set(true)
}
.subscribe()
if (completed.get()) {
println("데이터 통지 완료")
} else {
println("데이터 통지 실패")
}
}
--------------------
출력 결과)
--------------------
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(A)
[ INFO] (main) onNext(B)
[ INFO] (main) onNext(C)
데이터 통지 실패
[ERROR] (main) onError(java.lang.IllegalArgumentException)
[ERROR] (main) - java.lang.IllegalArgumentException
출력 결과를 보면 doOnComplete와 onComplete가 호출되지 않고 “데이터 통지 실패"가 출력된 후 onError 신호가 발생하는 것을 확인할 수 있습니다. 또한 에러에 대한 별도 처리가 없으므로 java.lang.IllegalArgumentException이 발생하게 됩니다. onComplete는 에러 없이 정상적으로 데이터 통지가 완료될 경우에만 발생하는데 doOnComplete는 onComplete 직전에 동작하므로 에러 통지가 없어야 동작하는 것을 확인할 수 있습니다.
doOnError
doOnError는 에러가 통지되는 시점에 지정한 부수 효과를 적용할 수 있습니다. 에러가 통지되는 시점에는 onError 신호가 발생하는데 doOnError 연산자는 인자로 함수형 인터페이스인 Consumer를 제공받아서 onError 신호가 발생하기 직전에 실행합니다. 인자로 제공된 Consumer에는 최상위 에러 클래스인 Throwable가 전달됩니다. 그렇기 때문에 내부에서 어떤 에러가 발생했는지 확인하여 처리를 다르게 할 수도 있습니다.
예제 1.55는 에러를 통지하는 플럭스를 생성하고 에러가 발생할 경우 doOnError가 동작하는 예제입니다.
예제 1.55 doOnError 예제 - reactor/reactor-basic/src/main/kotlin/sideeffect/DoOnError.kt
package sideeffect
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicBoolean
fun main() {
val errorOccurred = AtomicBoolean(false)
Flux.range(1,3)
.concatWith(Flux.error(IllegalArgumentException()))
.log()
.doOnError { throwable: Throwable ->
println("doOnError 동작")
println("발생한 에러 : ${throwable.javaClass}")
errorOccurred.set(true)
}
.subscribe()
if (errorOccurred.get()) {
println("에러 발생")
}
}
--------------------
출력 결과)
--------------------
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(1)
[ INFO] (main) onNext(2)
[ INFO] (main) onNext(3)
doOnError 동작
발생한 에러 : class java.lang.IllegalArgumentException
에러 발생
[ERROR] (main) onError(java.lang.IllegalArgumentException)
출력 결과를 보면 onNext로 데이터를 정상적으로 통지하다가 에러가 통지되면 doOnError가 먼저 발생한 후 최종적으로 onError가 발생합니다. doOnError 내부에서는 람다 함수로 제공되는 Consumer는 인자로 Throwable을 받기 때문에 에러의 종류와 어떤 메시지를 출력하는지 확인할 수 있습니다.
리액터 코틀린 지원
스프링 프로젝트는 코틀린에 대한 지원을 점점 늘리고 있는데 최신 스프링 프로젝트 공식 문서를 보면 자바 예제외에도 코틀린 예제를 쉽게 확인할 수 있습니다. 또한 코틀린이 제공하는 여러 라이브러리도 스프링 프로젝트에 기본으로 탑재되고 있는데 리액터 또한 리액터 코틀린 확장(reactor-kotlin-extensions)을 지원합니다 . 리액터 코틀린 확장 기능을 사용하면 코틀린의 우아한 문법을 사용해 리액터 코드를 자바를 사용했을 때보다 더 간결하게 작성이 가능합니다.
먼저 리액터 코틀린 확장 기능을 사용하기 위해 예제 1.56과 같이 build.gradle.kts에 reactor-kotlin-extensions 모듈을 추가 합니다.
예제 1.56 프로젝트에 reactor-kotlin-extensions 의존성 추가 - reactor/reactor-basic/build.gradle.kts
plugins {
id("org.jetbrains.kotlin.jvm") version ("1.4.10")
}
group = "com.reactor.reactorbasic"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains.kotlin:kotlin-stdlib")
implementation("io.projectreactor:reactor-core:3.4.0")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions:1.1.2")
}
toMono
toMono는 객체를 모노로 변환하기 위한 코틀린 확장 함수입니다. toMono 함수를 사용하면 Mono.justOrEmpty, Mono.fromCallable, Mono.fromFuture와 같은 팩토리 함수를 쓰지 않아도 객체를 모노로 쉽게 변환할 수 있습니다.
다음은 리액터 코틀린 확장 모듈의 모노 확장 함수 구현 코드입니다.
fun <T> Publisher<T>.toMono(): Mono<T> = Mono.from(this)
fun <T> (() -> T?).toMono(): Mono<T> = Mono.fromSupplier(this)
fun <T : Any> T?.toMono(): Mono<T> = Mono.justOrEmpty(this)
fun <T> Callable<T?>.toMono(): Mono<T> = Mono.fromCallable(this::call)
코드를 보면 toMono는 코틀린의 최상위 객체인 Any에 확장한 함수부터 Callable, CompletableFuture, 리액터의 Publisher를 지원하는 확장 함수가 구현돼있습니다.
예제 1.57은 앞서 살펴본 toMono를 사용해 다양한 객체를 모노로 변환하는 예제입니다.
예제 1.57 toMono 예제 - reactor/reactor-basic/src/main/kotlin/extensions/toMono.kt
package extensions
import reactor.kotlin.core.publisher.toMono
import java.util.concurrent.Callable
import java.util.concurrent.CompletableFuture
fun main() {
"String.toMono"
.toMono()
.subscribe(::println)
listOf("Collection.toMono")
.toMono()
.subscribe(::println)
Callable { "Callable.toMono" }
.toMono()
.subscribe(::println)
CompletableFuture.supplyAsync { "supplyAsync.toMono" }
.toMono()
.subscribe(::println)
}
--------------------
출력 결과)
--------------------
String.toMono
[Collection.toMono]
Callable.toMono
CompletableFuture.supplyAsync.toMono
출력 결과를 보면 toMono로 변환한 객체들이 정상적으로 모노로 변환되어 데이터 통지가 이뤄진 것을 확인할 수 있습니다. toMono를 사용하면 다양한 객체를 팩토리 함수 없이 모노로 쉽게 변환할 수 있습니다.
toFlux
toFlux는 객체를 플럭스로 변환하기 위한 코틀린 확장 함수입니다. toFlux는 toMono와 다르게 컬렉션이나 배열 같은 복수의 데이터를 저장하는 자료구조를 toFlux로 변환하는 것이 특징입니다. toFlux 함수를 사용하면 Flux.just, Flux.fromIterable, Flux.fromArray 같은 팩토리 함수를 쓰지 않아도 객체를 플럭스로 쉽게 변환할 수 있습니다.
다음은 리액터 코틀린 확장 모듈의 플럭스 확장 함수 구현 코드입니다.
fun <T : Any> Publisher<T>.toFlux(): Flux<T> = Flux.from(this)
fun <T : Any> Iterator<T>.toFlux(): Flux<T> = toIterable().toFlux()
fun <T : Any> Iterable<T>.toFlux(): Flux<T> = Flux.fromIterable(this)
fun <T : Any> Stream<T>.toFlux(): Flux<T> = Flux.fromStream(this)
fun BooleanArray.toFlux(): Flux<Boolean> = this.toList().toFlux()
fun ByteArray.toFlux(): Flux<Byte> = this.toList().toFlux()
fun ShortArray.toFlux(): Flux<Short> = this.toList().toFlux()
fun IntArray.toFlux(): Flux<Int> = this.toList().toFlux()
fun LongArray.toFlux(): Flux<Long> = this.toList().toFlux()
fun FloatArray.toFlux(): Flux<Float> = this.toList().toFlux()
fun DoubleArray.toFlux(): Flux<Double> = this.toList().toFlux()
fun <T> Array<out T>.toFlux(): Flux<T> = Flux.fromArray(this)
코드를 보면 다양한 배열과 컬렉션 구조에 대한 toFlux 확장 함수가 구현돼 있습니다.
예제 1.58은 toFlux를 사용해 자료구조를 플럭스로 변환하는 예제입니다.
예제 1.58 toFlux 예제 - reactor/reactor-basic/src/main/kotlin/extensions/toFlux.kt
package extensions
import reactor.kotlin.core.publisher.toFlux
import reactor.kotlin.core.publisher.toMono
fun main() {
listOf("A", "B", "C")
.toFlux()
.subscribe(::println)
intArrayOf(1, 2, 3)
.toFlux()
.subscribe(::println)
setOf("D", "E", "F")
.toFlux()
.subscribe(::println)
"MonoToFlux".toMono()
.toFlux()
.subscribe(::println)
}
--------------------
출력 결과)
--------------------
A
B
C
1
2
3
D
E
F
MonoToFlux
출력 결과를 보면 컬렉션과 배열을 플럭스로 변환해서 데이터 통지가 정상적으로 이뤄진 것을 확인할 수 있습니다. toFlux의 한가지 특징은 컬렉션이나 배열의 경우 플럭스로 바로 변환이 가능하지만 “MonoToFlux” 문자열과 같은 일반 객체의 경우 toMono로 우선 변환한 뒤 toFlux 함수를 사용해 플럭스로 변환해야 합니다. toFlux의 경우 자료구조 외에는 발행자인 Publisher에만 확장할 수 있도록 구현돼 있기 때문입니다.