728x90
반응형


플럭스의 연산자

플럭스는 0..N개로 이뤄진 복수개의 요소를 통지하는 발행자입니다. 모노는 1개의 요소를 통지하지만 플럭스는 제한 없는 무한대의 요소를 통지합니다. 플럭스의 연산자는 모노의 연산자와 이름이 같고 역할이 같은 연산자가 많습니다. 대표적으로 앞선 장에서 소개한 map, flatMap, switchIfEmpty 등의 연산자는 플럭스를 사용할시에도 동일하게 사용할 수 있습니다.

just

Flux.just는 플럭스를 생성하는 기본 팩토리 함수입니다. Flux.just의 인자는 가변 인자를 지원하기 때문에 데이터의 개수 제한 없이 추가하여 플럭스를 생성할 수 있습니다. 예제 1.20는 두 개의 CellPhone 객체를 생성하고 Flux.just의 인자로 전달하고 다른 작업 없이 출력합니다.

package flux

import reactor.core.publisher.Flux

data class Cellphone(
    val name: String,
    val price: Int,
    val currency: Currency,
)

enum class Currency {
    KRW, USD
}

fun main() {
    val iphone =
        Cellphone(name = "Iphone", price = 100, currency = Currency.KRW)
    val galaxy =
        Cellphone(name = "Galaxy", price = 90, currency = Currency.KRW)

    val flux: Flux<Cellphone> =
        Flux.just(iphone, galaxy)

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Cellphone(name=Iphone, price=100, currency=KRW)
Cellphone(name=Galaxy, price=90, currency=KRW)

출력 결과와 같이 Flux.just에 들어가는 인자의 순서대로 차례로 데이터가 통지됩니다. just 함수에 만약 null인 객체가 전달되면 예제 1.21과 같이 NullPointerException이 발생하기 때문에 주의해야합니다.

예제 1.21 Flux.just에 null이 들어간 경우 - reactor/reactor-basic/src/flux/Flux1.kt

package flux

import reactor.core.publisher.Flux

data class Cellphone(
    val name: String,
    val price: Int,
    val currency: Currency,
)

enum class Currency {
    KRW, USD
}

fun main() {
    val iphone =
        Cellphone(name = "Iphone", price = 100, currency = Currency.KRW)
    val galaxy =
        Cellphone(name = "Galaxy", price = 90, currency = Currency.KRW)

    val flux: Flux<Cellphone> =
        Flux.just(iphone, galaxy, null)

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
java.lang.NullPointerException: The 2th array element was null

fromArray, fromIterable

Flux.fromArray는 함수는 인자에 배열을 받아 플럭스를 생성하는 팩토리 함수입니다. 예제 1.22는 배열에 객체를 담고 Flux.fromArray를 사용해 플럭스를 생성한 뒤 순서대로 출력합니다.

예제 1.22 Flux.fromArray - reactor/reactor-basic/src/flux/Flux2.kt

package flux

import reactor.core.publisher.Flux

data class Hero(
   val nickname: String,
   val name: String,
)

fun main() {
   val avengers = arrayOf(
       Hero("Black Widow", "Natasha Romanoff"),
       Hero("Iron Man", "Tony Stark"),
       Hero("Captain America", "Steve Rogers"),
       Hero("Thor", "Thor Odinson"),
       Hero("Hulk", "Bruce Banner"),
       Hero("Hawkeye", "Clint Barton"),
       Hero("Black Panther", "T'Challa")
   )

   val flux: Flux<Hero> =
       Flux.fromArray(avengers)

   flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Hero(nickname=Iron Man, name=Tony Stark)
Hero(nickname=Black Widow, name=Natasha Romanoff)
Hero(nickname=Captain America, name=Steve Rogers)
Hero(nickname=Thor, name=Thor Odinson)
Hero(nickname=Hulk, name=Bruce Banner)
Hero(nickname=Hawkeye, name=Clint Barton)
Hero(nickname=Black Panther, name=T'Challa)

예제 1.22의 출력 결과를 보면 예상한대로 배열에 지정한 순서대로 출력되는 것을 알 수 있습니다.

배열이 아닌 컬렉션을 이용해 플럭스를 생성할 경우 Flux.fromIterable 함수를 사용할 수 있습니다. fromArray와 마찬가지로 리스트에 지정한 순서대로 데이터를 통지합니다. 예제 1.23은 Flux.fromIterable을 사용해 플럭스를 생성하고 순서대로 출력합니다.

예제 1.23 Flux.fromIterable - reactor/reactor-basic/src/flux/Flux3.kt

package flux

import reactor.core.publisher.Flux

fun main() {
   val avengers = listOf(
       Hero("Iron Man", "Tony Stark"),
       Hero("Black Widow", "Natasha Romanoff"),
       Hero("Captain America", "Steve Rogers"),
       Hero("Thor", "Thor Odinson"),
       Hero("Hulk", "Bruce Banner"),
       Hero("Hawkeye", "Clint Barton"),
       Hero("Black Panther", "T'Challa"),
   )

   val flux: Flux<Hero> =
       Flux.fromIterable(avengers)

   flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Hero(nickname=Iron Man, name=Tony Stark)
Hero(nickname=Black Widow, name=Natasha Romanoff)
Hero(nickname=Captain America, name=Steve Rogers)
Hero(nickname=Thor, name=Thor Odinson)
Hero(nickname=Hulk, name=Bruce Banner)
Hero(nickname=Hawkeye, name=Clint Barton)
Hero(nickname=Black Panther, name=T'Challa)

출력 결과에서 확인할 수 있듯이 fromIterable도 리스트에 지정한 순서 즉 인덱스의 순서대로 데이터를 통지하는 것을 확인할 수 있습니다.

range

Flux.range 함수는 첫 번째 인자인 시작 값부터 두 번째 인자인 카운트 값까지 값을 하나씩 증가하는 데이터를 통지하는 팩토리 함수입니다. 예제 1.24는 Flux.range를 사용해 1부터 100까지 더한 값을 출력합니다.

예제 1.24 Flux.range로 sum한 값 구하기 - reactor/reactor-basic/src/flux/Flux4.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val sum = Flux.range(1, 100)
    .reduce { t, u ->
        t + u
    }
    sum.subscribe(::println)
}

--------------------
출력 결과)
--------------------
5050

출력 결과를 보면 1부터 100까지 더한 값인 5050이 출력되었습니다. 즉 1부터 100까지 순서대로 데이터를 통지하고 reduce 함수를 사용해 통지받은 데이터를 더해졌다는 것을 알 수 있습니다.

다음 예제 1.25는 take를 사용해 인자로 지정한 10개 데이터를 통지하고 buffer는 통지받은 인자로 지정한 값만큼 데이터를 버퍼에 담아서 통지합니다.

예제 1.25 take와 buffer를 사용한 예제 - reactor/reactor-basic/src/flux/Flux5.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val buffer: Flux<List<Int>> =
        Flux.range(1, 100)
            .take(10)
            .buffer(4)

    buffer.subscribe(::println)
}

--------------------
출력 결과)
--------------------
[1, 2, 3, 4]
[5, 6, 7, 8]
[9, 10]

출력 결과를 보면 버퍼에 할당된 4건씩 데이터가 나뉘어 통지된 것을 확인할 수 있습니다.

interval

Flux.Interval 함수를 사용하면 인자로 지정해준 주기대로 신호를 발생 시킵니다. 발생된 신호는 Long 타입으로 ‘0, 1, 2,3’ 순서대로 전달 받을 수 있습니다. 예제 1.26은 0.5초 간격으로 랜덤한 식별자(id)를 가지는 쿠폰을 생성하는 예제 입니다.

예제 1.26 Flux.interval로 쿠폰 생성 예제 - reactor/reactor-basic/src/flux/Flux6.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration
import java.util.UUID

data class Coupon(
    val id: String,
    val description: String,
)

fun main() {
    val flux: Flux<Coupon> =
        Flux.interval(Duration.ofMillis(500))
            .map { count ->
                val id = UUID.randomUUID().toString()
                Coupon(id = id, description = "${count.plus(1)}번째 쿠폰")
            }
            .take(15)

    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(10).toMillis())
}


--------------------
출력 결과)
--------------------
Coupon(id=b755285e-476e-405a-8a9f-e0b49b3520a6, description=1번째 쿠폰)
Coupon(id=9c4eea7a-1f12-4123-9fae-678e6161f7de, description=2번째 쿠폰)
Coupon(id=6478a919-ae2b-47c9-b238-d41fe5d73010, description=3번째 쿠폰)
Coupon(id=faf7b65f-299e-4467-9308-964b901e978a, description=4번째 쿠폰)
Coupon(id=77db0f58-27c8-4abc-a47f-8966259cea78, description=5번째 쿠폰)
Coupon(id=ccfab126-a5b1-4e9e-bbf3-c55f16a0e475, description=6번째 쿠폰)
Coupon(id=c59c6f88-cf6f-4d05-80f9-261851e49359, description=7번째 쿠폰)
Coupon(id=a2ec4a36-c9f8-4087-ac93-6b963954f029, description=8번째 쿠폰)
Coupon(id=8663e6ee-293c-492f-98b7-8246e19aeb3e, description=9번째 쿠폰)
Coupon(id=c9cf3146-d1ad-4c2d-85f6-c66f3635d53e, description=10번째 쿠폰)
Coupon(id=1c66e15e-0794-4bc5-bc33-d0088e0384ca, description=11번째 쿠폰)
Coupon(id=c2e2c8c3-d5b2-47c9-be99-06aa3a679018, description=12번째 쿠폰)
Coupon(id=cf53e12c-8a54-4875-b6e3-11f5494c57c5, description=13번째 쿠폰)
Coupon(id=c37c3981-e34b-4147-8d8a-66eb3bc626ed, description=14번째 쿠폰)
Coupon(id=32942297-f457-4778-b559-c0f45023bbd6, description=15번째 쿠폰)

먼저 Flux.interval(Duration.ofMillis(500))는 0.5초 간격으로 신호를 발생시키고 순번을 데이터로 통지합니다. map 연산자 내부에선 UUID.randomUUID().toString()을 사용해 유니크한 식별자를 생성하고 쿠폰 객체의 id 프로퍼티에 할당합니다. description에는 몇 번째로 생성된 쿠폰인지를 확인하기 위해 count라는 람다 변수를 선언했습니다. 이때 count의 값은 최초에 0부터 시작하기 때문에 count.plus(1)을 사용해 기존 값에 1을 더해줍니다. 그 다음은 take로 15개의 데이터만 통지하도록 조절한 후 출력합니다.

이때 마지막 라인에서 예시와 같이 스레드를 일시정지하는 코드가 있어야 정상적으로 출력됩니다.

 Thread.sleep(Duration.ofSeconds(10).toMillis())

스레드를 일시정지하는 이유는 Flux.interval은 내부적으로 별도의 스레드에서 동작하도록 만들어져있는데 코드를 실행하는 스레드는 메인 함수의 스레드이므로 일시정지하지 않으면 subscribe에서 출력하기 전에 메인 스레드가 종료되기 때문입니다.

예제 1.27에서 Thread.sleep을 주석 처리하여 어떤 결과가 출력되는지 확인해 보겠습니다.

package flux

import reactor.core.publisher.Flux
import java.time.Duration
import java.util.UUID

data class Coupon(
    val id: String,
    val description: String,
)

fun main() {
    val flux: Flux<Coupon> =
        Flux.interval(Duration.ofMillis(500))
            .map { count ->
                val id = UUID.randomUUID().toString()
                Coupon(id = id, description = "${count.plus(1)}번째 쿠폰")
            }
            .take(15)

    flux.subscribe(::println)

    //Thread.sleep(Duration.ofSeconds(10).toMillis())
}


--------------------
출력 결과)
--------------------

출력 결과를 보면 Flux.interval에서 사용하는 스레드가 완료되기 전에 메인 스레드가 종료되어 아무것도 출력되지 않은 것을 확인할 수 있습니다.

single, singleOrEmpty

single 함수를 사용하면 최초에 전달받은 1개의 데이터만 통지받도록 제한할 수 있습니다. 그러므로 single을 연산자로 적용한 경우 통지받은 데이터가 정확히 1개 임을 보장해야 합니다. 데이터가 없거나 1개 이상인 경우는 예외가 발생합니다.

예제 1.29에서는 데이터가 1개가 아닌 경우 어떤 예외가 발생하는지 확인하기 위해 Flux.just로 6개의 데이터를 인자로 받고 take(2)로 데이터를 2번 통지하여 예외를 발생시킵니다.

예제 1.29 single 예제 - reactor/reactor-basic/src/flux/Flux7.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val flux = Flux.just(1, 2, 3, 4, 5, 6)
        .take(2)
        .single()

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
java.lang.IndexOutOfBoundsException: Source emitted more than one item

출력 결과를 보면 IndexOutOfBoundsException가 발생한 것을 확인할 수 있습니다. 예제 1.30는 take(1)을 사용해 하나의 데이터만 통지하는 예제입니다.

예제 1.30 single 예제2 - reactor/reactor-basic/src/flux/Flux8.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val flux = Flux.just("one", "two", "three")
        .take(1)
        .single()

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
one

출력 결과를 확인해보면 정상적으로 Flux.just의 인자로 처음 전달받은 문자열 ‘one’이 출력된 것을 확인할 수 있습니다.

만약 Flux로 통지받은 데이터가 비어있을(empty) 가능성이 있는 경우 singleOrEmpty를 사용할 수 있습니다. singleOrEmpty 함수는 1개의 데이터 또는 빈 값을 허용합니다. 예제 1.31은 Flux.empty를 사용해 빈 값을 통지하고 singleOrEmpty를 사용해 통지합니다.

예제 1.31 singleOrEmpty 예제 - reactor/reactor-basic/src/flux/Flux9.kt

package flux

import reactor.core.publisher.Flux

fun main() {
   val flux = Flux.empty<Any>()
       .singleOrEmpty()

   flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------

Flux.empty로 플럭스를 생성하고 singleOrEmpty를 사용했기 때문에 빈 결과가 출력되었습니다. singleOrEmpty 함수도 1개 또는 빈 값이 아닌 경우는 동일하게 IndexOutOfBoundsException가 발생합니다.

예제 1.32는 Flux.just에 첫 번째 인자는 ‘1’ 두 번째 인자는 Flux.empty()를 지정한 후 singleOrEmpty 함수를 사용하여 예외를 발생시킵니다.

예제 1.32 singleOrEmpty 예제2 - reactor/reactor-basic/src/flux/Flux9.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val flux = Flux.just(1, Flux.empty<Int>())
        .singleOrEmpty()

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
java.lang.IndexOutOfBoundsException: Source emitted more than one item

merge, concat, mergeSequential

Flux.merge는 플럭스를 제공받은 순서대로 결합하는 팩토리 함수입니다. zip 함수와의 차이점은 merge 함수는 튜플 객체로 감싸지지 않습니다. 예제 1.33은 Flux.merge를 사용해 인자로 지정된 플럭스를 순서대로 통지하는 예제입니다.

예제 1.32 Flux.merge 예제 - reactor/reactor-basic/src/flux/Flux10.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val three = Flux.just("three")
    val two = Flux.just("two")
    val one = Flux.just("one")

    val flux: Flux<String> = Flux.merge(three, two, one)
    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
three
two
one

출력 결과를 확인해보면 인자로 지정한 순서대로 결과가 출력되었음을 확인할 수 있습니다. 예제 1.33은 예제 1.32를 조금 수정하여 Flux.merge 함수에 인자로 전달되는 플럭스에 delayElements 함수를 사용하여 정해진 지연 시간 이후에 각각 병렬로 처리되는데 이 경우에도 순서가 유지되는지 알아보겠습니다.

예제 1.33 Flux.merge 예제2 - reactor/reactor-basic/src/flux/Flux11.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration

fun main() {
    val one = Flux.just("one").delayElements(Duration.ofMillis(300))
    val two = Flux.just("two").delayElements(Duration.ofMillis(400))
    val three = Flux.just("three").delayElements(Duration.ofMillis(500))

    val flux: Flux<String> = Flux.merge(three, two, one)
    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(2).toMillis())
}

--------------------
출력 결과)
--------------------
one
two
three

출력 결과를 확인해보면 인자로 지정된 순서대로 통지되지 않고 지연 시간에 따라서 순서가 정해진 것을 확인할 수 있습니다. 만약 병렬 처리 시에도 인자로 지정된 순서대로 통지해야 하는 경우 예제 1.34와 같이 concat 함수를 사용해 순서를 보장할 수 있습니다.

예제 1.34 Flux.concat 예제 - reactor/reactor-basic/src/flux/Flux12.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration

fun main() {
    val one = Flux.just("one").delayElements(Duration.ofMillis(300))
    val two = Flux.just("two").delayElements(Duration.ofMillis(400))
    val three = Flux.just("three").delayElements(Duration.ofMillis(500))

    val flux: Flux<String> = Flux.concat(three, two, one)
    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(2).toMillis())
}

--------------------
출력 결과)
--------------------
three
two
one

출력 결과를 확인해보면 플럭스가 concat 함수에 병렬로 전달되었지만 인자로 지정된 순서대로 통지된 것을 확인할 수 있습니다. 순서가 보장되는 이유는 병렬 처리 시 인자로 지정된 플럭스를 결합한 다음 플럭스를 순서대로 구독하여 데이터를 통지하고 이전 플럭스에 대한 통지가 완료되면 다음 플럭스가 구독 처리될 때까지 기다리는 순차처리 방식의 연산자이기 때문입니다.

예제 1.35에서 사용한 mergeSequential도 concat과 같이 순서가 보장됩니다.

예제 1.35 Flux.concat 예제 - reactor/reactor-basic/src/flux/Flux12.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration

fun main() {
    val one = Flux.just("one").delayElements(Duration.ofMillis(300))
    val two = Flux.just("two").delayElements(Duration.ofMillis(400))
    val three = Flux.just("three").delayElements(Duration.ofMillis(500))

    val flux: Flux<String> = Flux.mergeSequential(three, two, one)
    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(2).toMillis())
}

--------------------
출력 결과)
--------------------
three
two
one

concat 과의 차이점이라면 concat은 여러 개의 플럭스를 인자로 전달받아 하나로 결합한 후에 순서대로 구독 처리하는 연산자라면 mergeSequential은 여러 개의 플럭스를 병렬로 구독 처리하고 최종 변환 시 인자로 지정한 구독 순서에 따라서 새로운 플럭스로 결합한다는 차이점이 있습니다.

728x90
반응형

+ Recent posts