728x90
반응형


플럭스의 연산자

플럭스는 0..N개로 이뤄진 복수개의 요소를 통지하는 발행자입니다. 모노는 1개의 요소를 통지하지만 플럭스는 제한 없는 무한대의 요소를 통지합니다. 플럭스의 연산자는 모노의 연산자와 이름이 같고 역할이 같은 연산자가 많습니다. 대표적으로 앞선 장에서 소개한 map, flatMap, switchIfEmpty 등의 연산자는 플럭스를 사용할시에도 동일하게 사용할 수 있습니다.

just

Flux.just는 플럭스를 생성하는 기본 팩토리 함수입니다. Flux.just의 인자는 가변 인자를 지원하기 때문에 데이터의 개수 제한 없이 추가하여 플럭스를 생성할 수 있습니다. 예제 1.20는 두 개의 CellPhone 객체를 생성하고 Flux.just의 인자로 전달하고 다른 작업 없이 출력합니다.

package flux

import reactor.core.publisher.Flux

data class Cellphone(
    val name: String,
    val price: Int,
    val currency: Currency,
)

enum class Currency {
    KRW, USD
}

fun main() {
    val iphone =
        Cellphone(name = "Iphone", price = 100, currency = Currency.KRW)
    val galaxy =
        Cellphone(name = "Galaxy", price = 90, currency = Currency.KRW)

    val flux: Flux<Cellphone> =
        Flux.just(iphone, galaxy)

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Cellphone(name=Iphone, price=100, currency=KRW)
Cellphone(name=Galaxy, price=90, currency=KRW)

출력 결과와 같이 Flux.just에 들어가는 인자의 순서대로 차례로 데이터가 통지됩니다. just 함수에 만약 null인 객체가 전달되면 예제 1.21과 같이 NullPointerException이 발생하기 때문에 주의해야합니다.

예제 1.21 Flux.just에 null이 들어간 경우 - reactor/reactor-basic/src/flux/Flux1.kt

package flux

import reactor.core.publisher.Flux

data class Cellphone(
    val name: String,
    val price: Int,
    val currency: Currency,
)

enum class Currency {
    KRW, USD
}

fun main() {
    val iphone =
        Cellphone(name = "Iphone", price = 100, currency = Currency.KRW)
    val galaxy =
        Cellphone(name = "Galaxy", price = 90, currency = Currency.KRW)

    val flux: Flux<Cellphone> =
        Flux.just(iphone, galaxy, null)

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
java.lang.NullPointerException: The 2th array element was null

fromArray, fromIterable

Flux.fromArray는 함수는 인자에 배열을 받아 플럭스를 생성하는 팩토리 함수입니다. 예제 1.22는 배열에 객체를 담고 Flux.fromArray를 사용해 플럭스를 생성한 뒤 순서대로 출력합니다.

예제 1.22 Flux.fromArray - reactor/reactor-basic/src/flux/Flux2.kt

package flux

import reactor.core.publisher.Flux

data class Hero(
   val nickname: String,
   val name: String,
)

fun main() {
   val avengers = arrayOf(
       Hero("Black Widow", "Natasha Romanoff"),
       Hero("Iron Man", "Tony Stark"),
       Hero("Captain America", "Steve Rogers"),
       Hero("Thor", "Thor Odinson"),
       Hero("Hulk", "Bruce Banner"),
       Hero("Hawkeye", "Clint Barton"),
       Hero("Black Panther", "T'Challa")
   )

   val flux: Flux<Hero> =
       Flux.fromArray(avengers)

   flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Hero(nickname=Iron Man, name=Tony Stark)
Hero(nickname=Black Widow, name=Natasha Romanoff)
Hero(nickname=Captain America, name=Steve Rogers)
Hero(nickname=Thor, name=Thor Odinson)
Hero(nickname=Hulk, name=Bruce Banner)
Hero(nickname=Hawkeye, name=Clint Barton)
Hero(nickname=Black Panther, name=T'Challa)

예제 1.22의 출력 결과를 보면 예상한대로 배열에 지정한 순서대로 출력되는 것을 알 수 있습니다.

배열이 아닌 컬렉션을 이용해 플럭스를 생성할 경우 Flux.fromIterable 함수를 사용할 수 있습니다. fromArray와 마찬가지로 리스트에 지정한 순서대로 데이터를 통지합니다. 예제 1.23은 Flux.fromIterable을 사용해 플럭스를 생성하고 순서대로 출력합니다.

예제 1.23 Flux.fromIterable - reactor/reactor-basic/src/flux/Flux3.kt

package flux

import reactor.core.publisher.Flux

fun main() {
   val avengers = listOf(
       Hero("Iron Man", "Tony Stark"),
       Hero("Black Widow", "Natasha Romanoff"),
       Hero("Captain America", "Steve Rogers"),
       Hero("Thor", "Thor Odinson"),
       Hero("Hulk", "Bruce Banner"),
       Hero("Hawkeye", "Clint Barton"),
       Hero("Black Panther", "T'Challa"),
   )

   val flux: Flux<Hero> =
       Flux.fromIterable(avengers)

   flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Hero(nickname=Iron Man, name=Tony Stark)
Hero(nickname=Black Widow, name=Natasha Romanoff)
Hero(nickname=Captain America, name=Steve Rogers)
Hero(nickname=Thor, name=Thor Odinson)
Hero(nickname=Hulk, name=Bruce Banner)
Hero(nickname=Hawkeye, name=Clint Barton)
Hero(nickname=Black Panther, name=T'Challa)

출력 결과에서 확인할 수 있듯이 fromIterable도 리스트에 지정한 순서 즉 인덱스의 순서대로 데이터를 통지하는 것을 확인할 수 있습니다.

range

Flux.range 함수는 첫 번째 인자인 시작 값부터 두 번째 인자인 카운트 값까지 값을 하나씩 증가하는 데이터를 통지하는 팩토리 함수입니다. 예제 1.24는 Flux.range를 사용해 1부터 100까지 더한 값을 출력합니다.

예제 1.24 Flux.range로 sum한 값 구하기 - reactor/reactor-basic/src/flux/Flux4.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val sum = Flux.range(1, 100)
    .reduce { t, u ->
        t + u
    }
    sum.subscribe(::println)
}

--------------------
출력 결과)
--------------------
5050

출력 결과를 보면 1부터 100까지 더한 값인 5050이 출력되었습니다. 즉 1부터 100까지 순서대로 데이터를 통지하고 reduce 함수를 사용해 통지받은 데이터를 더해졌다는 것을 알 수 있습니다.

다음 예제 1.25는 take를 사용해 인자로 지정한 10개 데이터를 통지하고 buffer는 통지받은 인자로 지정한 값만큼 데이터를 버퍼에 담아서 통지합니다.

예제 1.25 take와 buffer를 사용한 예제 - reactor/reactor-basic/src/flux/Flux5.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val buffer: Flux<List<Int>> =
        Flux.range(1, 100)
            .take(10)
            .buffer(4)

    buffer.subscribe(::println)
}

--------------------
출력 결과)
--------------------
[1, 2, 3, 4]
[5, 6, 7, 8]
[9, 10]

출력 결과를 보면 버퍼에 할당된 4건씩 데이터가 나뉘어 통지된 것을 확인할 수 있습니다.

interval

Flux.Interval 함수를 사용하면 인자로 지정해준 주기대로 신호를 발생 시킵니다. 발생된 신호는 Long 타입으로 ‘0, 1, 2,3’ 순서대로 전달 받을 수 있습니다. 예제 1.26은 0.5초 간격으로 랜덤한 식별자(id)를 가지는 쿠폰을 생성하는 예제 입니다.

예제 1.26 Flux.interval로 쿠폰 생성 예제 - reactor/reactor-basic/src/flux/Flux6.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration
import java.util.UUID

data class Coupon(
    val id: String,
    val description: String,
)

fun main() {
    val flux: Flux<Coupon> =
        Flux.interval(Duration.ofMillis(500))
            .map { count ->
                val id = UUID.randomUUID().toString()
                Coupon(id = id, description = "${count.plus(1)}번째 쿠폰")
            }
            .take(15)

    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(10).toMillis())
}


--------------------
출력 결과)
--------------------
Coupon(id=b755285e-476e-405a-8a9f-e0b49b3520a6, description=1번째 쿠폰)
Coupon(id=9c4eea7a-1f12-4123-9fae-678e6161f7de, description=2번째 쿠폰)
Coupon(id=6478a919-ae2b-47c9-b238-d41fe5d73010, description=3번째 쿠폰)
Coupon(id=faf7b65f-299e-4467-9308-964b901e978a, description=4번째 쿠폰)
Coupon(id=77db0f58-27c8-4abc-a47f-8966259cea78, description=5번째 쿠폰)
Coupon(id=ccfab126-a5b1-4e9e-bbf3-c55f16a0e475, description=6번째 쿠폰)
Coupon(id=c59c6f88-cf6f-4d05-80f9-261851e49359, description=7번째 쿠폰)
Coupon(id=a2ec4a36-c9f8-4087-ac93-6b963954f029, description=8번째 쿠폰)
Coupon(id=8663e6ee-293c-492f-98b7-8246e19aeb3e, description=9번째 쿠폰)
Coupon(id=c9cf3146-d1ad-4c2d-85f6-c66f3635d53e, description=10번째 쿠폰)
Coupon(id=1c66e15e-0794-4bc5-bc33-d0088e0384ca, description=11번째 쿠폰)
Coupon(id=c2e2c8c3-d5b2-47c9-be99-06aa3a679018, description=12번째 쿠폰)
Coupon(id=cf53e12c-8a54-4875-b6e3-11f5494c57c5, description=13번째 쿠폰)
Coupon(id=c37c3981-e34b-4147-8d8a-66eb3bc626ed, description=14번째 쿠폰)
Coupon(id=32942297-f457-4778-b559-c0f45023bbd6, description=15번째 쿠폰)

먼저 Flux.interval(Duration.ofMillis(500))는 0.5초 간격으로 신호를 발생시키고 순번을 데이터로 통지합니다. map 연산자 내부에선 UUID.randomUUID().toString()을 사용해 유니크한 식별자를 생성하고 쿠폰 객체의 id 프로퍼티에 할당합니다. description에는 몇 번째로 생성된 쿠폰인지를 확인하기 위해 count라는 람다 변수를 선언했습니다. 이때 count의 값은 최초에 0부터 시작하기 때문에 count.plus(1)을 사용해 기존 값에 1을 더해줍니다. 그 다음은 take로 15개의 데이터만 통지하도록 조절한 후 출력합니다.

이때 마지막 라인에서 예시와 같이 스레드를 일시정지하는 코드가 있어야 정상적으로 출력됩니다.

 Thread.sleep(Duration.ofSeconds(10).toMillis())

스레드를 일시정지하는 이유는 Flux.interval은 내부적으로 별도의 스레드에서 동작하도록 만들어져있는데 코드를 실행하는 스레드는 메인 함수의 스레드이므로 일시정지하지 않으면 subscribe에서 출력하기 전에 메인 스레드가 종료되기 때문입니다.

예제 1.27에서 Thread.sleep을 주석 처리하여 어떤 결과가 출력되는지 확인해 보겠습니다.

package flux

import reactor.core.publisher.Flux
import java.time.Duration
import java.util.UUID

data class Coupon(
    val id: String,
    val description: String,
)

fun main() {
    val flux: Flux<Coupon> =
        Flux.interval(Duration.ofMillis(500))
            .map { count ->
                val id = UUID.randomUUID().toString()
                Coupon(id = id, description = "${count.plus(1)}번째 쿠폰")
            }
            .take(15)

    flux.subscribe(::println)

    //Thread.sleep(Duration.ofSeconds(10).toMillis())
}


--------------------
출력 결과)
--------------------

출력 결과를 보면 Flux.interval에서 사용하는 스레드가 완료되기 전에 메인 스레드가 종료되어 아무것도 출력되지 않은 것을 확인할 수 있습니다.

single, singleOrEmpty

single 함수를 사용하면 최초에 전달받은 1개의 데이터만 통지받도록 제한할 수 있습니다. 그러므로 single을 연산자로 적용한 경우 통지받은 데이터가 정확히 1개 임을 보장해야 합니다. 데이터가 없거나 1개 이상인 경우는 예외가 발생합니다.

예제 1.29에서는 데이터가 1개가 아닌 경우 어떤 예외가 발생하는지 확인하기 위해 Flux.just로 6개의 데이터를 인자로 받고 take(2)로 데이터를 2번 통지하여 예외를 발생시킵니다.

예제 1.29 single 예제 - reactor/reactor-basic/src/flux/Flux7.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val flux = Flux.just(1, 2, 3, 4, 5, 6)
        .take(2)
        .single()

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
java.lang.IndexOutOfBoundsException: Source emitted more than one item

출력 결과를 보면 IndexOutOfBoundsException가 발생한 것을 확인할 수 있습니다. 예제 1.30는 take(1)을 사용해 하나의 데이터만 통지하는 예제입니다.

예제 1.30 single 예제2 - reactor/reactor-basic/src/flux/Flux8.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val flux = Flux.just("one", "two", "three")
        .take(1)
        .single()

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
one

출력 결과를 확인해보면 정상적으로 Flux.just의 인자로 처음 전달받은 문자열 ‘one’이 출력된 것을 확인할 수 있습니다.

만약 Flux로 통지받은 데이터가 비어있을(empty) 가능성이 있는 경우 singleOrEmpty를 사용할 수 있습니다. singleOrEmpty 함수는 1개의 데이터 또는 빈 값을 허용합니다. 예제 1.31은 Flux.empty를 사용해 빈 값을 통지하고 singleOrEmpty를 사용해 통지합니다.

예제 1.31 singleOrEmpty 예제 - reactor/reactor-basic/src/flux/Flux9.kt

package flux

import reactor.core.publisher.Flux

fun main() {
   val flux = Flux.empty<Any>()
       .singleOrEmpty()

   flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------

Flux.empty로 플럭스를 생성하고 singleOrEmpty를 사용했기 때문에 빈 결과가 출력되었습니다. singleOrEmpty 함수도 1개 또는 빈 값이 아닌 경우는 동일하게 IndexOutOfBoundsException가 발생합니다.

예제 1.32는 Flux.just에 첫 번째 인자는 ‘1’ 두 번째 인자는 Flux.empty()를 지정한 후 singleOrEmpty 함수를 사용하여 예외를 발생시킵니다.

예제 1.32 singleOrEmpty 예제2 - reactor/reactor-basic/src/flux/Flux9.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val flux = Flux.just(1, Flux.empty<Int>())
        .singleOrEmpty()

    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
java.lang.IndexOutOfBoundsException: Source emitted more than one item

merge, concat, mergeSequential

Flux.merge는 플럭스를 제공받은 순서대로 결합하는 팩토리 함수입니다. zip 함수와의 차이점은 merge 함수는 튜플 객체로 감싸지지 않습니다. 예제 1.33은 Flux.merge를 사용해 인자로 지정된 플럭스를 순서대로 통지하는 예제입니다.

예제 1.32 Flux.merge 예제 - reactor/reactor-basic/src/flux/Flux10.kt

package flux

import reactor.core.publisher.Flux

fun main() {
    val three = Flux.just("three")
    val two = Flux.just("two")
    val one = Flux.just("one")

    val flux: Flux<String> = Flux.merge(three, two, one)
    flux.subscribe(::println)
}

--------------------
출력 결과)
--------------------
three
two
one

출력 결과를 확인해보면 인자로 지정한 순서대로 결과가 출력되었음을 확인할 수 있습니다. 예제 1.33은 예제 1.32를 조금 수정하여 Flux.merge 함수에 인자로 전달되는 플럭스에 delayElements 함수를 사용하여 정해진 지연 시간 이후에 각각 병렬로 처리되는데 이 경우에도 순서가 유지되는지 알아보겠습니다.

예제 1.33 Flux.merge 예제2 - reactor/reactor-basic/src/flux/Flux11.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration

fun main() {
    val one = Flux.just("one").delayElements(Duration.ofMillis(300))
    val two = Flux.just("two").delayElements(Duration.ofMillis(400))
    val three = Flux.just("three").delayElements(Duration.ofMillis(500))

    val flux: Flux<String> = Flux.merge(three, two, one)
    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(2).toMillis())
}

--------------------
출력 결과)
--------------------
one
two
three

출력 결과를 확인해보면 인자로 지정된 순서대로 통지되지 않고 지연 시간에 따라서 순서가 정해진 것을 확인할 수 있습니다. 만약 병렬 처리 시에도 인자로 지정된 순서대로 통지해야 하는 경우 예제 1.34와 같이 concat 함수를 사용해 순서를 보장할 수 있습니다.

예제 1.34 Flux.concat 예제 - reactor/reactor-basic/src/flux/Flux12.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration

fun main() {
    val one = Flux.just("one").delayElements(Duration.ofMillis(300))
    val two = Flux.just("two").delayElements(Duration.ofMillis(400))
    val three = Flux.just("three").delayElements(Duration.ofMillis(500))

    val flux: Flux<String> = Flux.concat(three, two, one)
    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(2).toMillis())
}

--------------------
출력 결과)
--------------------
three
two
one

출력 결과를 확인해보면 플럭스가 concat 함수에 병렬로 전달되었지만 인자로 지정된 순서대로 통지된 것을 확인할 수 있습니다. 순서가 보장되는 이유는 병렬 처리 시 인자로 지정된 플럭스를 결합한 다음 플럭스를 순서대로 구독하여 데이터를 통지하고 이전 플럭스에 대한 통지가 완료되면 다음 플럭스가 구독 처리될 때까지 기다리는 순차처리 방식의 연산자이기 때문입니다.

예제 1.35에서 사용한 mergeSequential도 concat과 같이 순서가 보장됩니다.

예제 1.35 Flux.concat 예제 - reactor/reactor-basic/src/flux/Flux12.kt

package flux

import reactor.core.publisher.Flux
import java.time.Duration

fun main() {
    val one = Flux.just("one").delayElements(Duration.ofMillis(300))
    val two = Flux.just("two").delayElements(Duration.ofMillis(400))
    val three = Flux.just("three").delayElements(Duration.ofMillis(500))

    val flux: Flux<String> = Flux.mergeSequential(three, two, one)
    flux.subscribe(::println)

    Thread.sleep(Duration.ofSeconds(2).toMillis())
}

--------------------
출력 결과)
--------------------
three
two
one

concat 과의 차이점이라면 concat은 여러 개의 플럭스를 인자로 전달받아 하나로 결합한 후에 순서대로 구독 처리하는 연산자라면 mergeSequential은 여러 개의 플럭스를 병렬로 구독 처리하고 최종 변환 시 인자로 지정한 구독 순서에 따라서 새로운 플럭스로 결합한다는 차이점이 있습니다.

728x90
반응형
728x90
반응형


프로젝트 리액터

프로젝트 리액터(Project Reactor)는 JVM기반의 환경에서 리액티브 애플리케이션 개발을 위한 오픈 소스 프레임워크입니다. 리액터는 스프링 에코시스템에서 리액티브 스택의 기반이 되는 프로젝트로 리액티브 스트림 사양을 구현하므로 리액티브 스트림에서 사용하는 용어와 규칙을 동일하게 사용하며 리액티브 스트림 사양에 포함되지 않은 리액터만의 다양한 기능들도 제공합니다.

리액터를 사용하면 손쉽게 애플리케이션에 리액티브 프로그래밍을 적용할 수 있고 애플리케이션의 모든 구간에 비동기-논블로킹을 적용할 수 있습니다. 또한 리액터에서 제공하는 백프레셔 기법을 사용해 시스템의 부하를 효율적으로 조절할 수 있습니다.

 

모노와 플럭스

리액터는 모노(Mono)플럭스(Flux)라는 두 가지 핵심 타입을 제공합니다. 먼저 모노는 0..1개의 단일 요소 스트림을 통지합니다. 이에 반해 플럭스는 0..N개로 이뤄진 복수개의 요소를 통지하는 발행자입니다. 플럭스와 모노는 리액티브 스트림 사양의 발행자(Publisher)를 구현하여 데이터를 통지하기 때문에 onComplete 또는 onError 시그널이 발생할 때까지 onNext를 사용해 구독자에게 데이터를 통지합니다.

모노와 플럭스는 리액터의 핵심이 되는 요소로써 리액티브 스트림 사양을 구현하는 하는 것 외에도 데이터를 가공하거나 필터링하는 기능과 에러가 발생하면 에러를 핸들링 할 수 있는 다양한 연산자(Operator)를 제공하며 두 타입이 공통으로 사용하는 연산자도 있고 자신만의 연산자도 있습니다. 이번장에서는 모노와 플럭스를 사용할때 자주 사용하는 연산자들에 대해 알아보도록 하겠습니다.

 

모노의 연산자

모노는 단일 요소에 대한 통지의 경우 사용됩니다. 모노의 사용 방법을 알아보기 위해 코틀린 그레이들 프로젝트를 생성합니다. 정상적으로 생성되었다면 예제와 같은 구조로 프로젝트가 생성됩니다.

 

├── build.gradle.kts
├── gradlew
├── settings.gradle.kts
└── src
    ├── main
    │   ├── kotlin
    │   └── resources
    └── test
        ├── kotlin
        └── resources

 

그다음 build.gradle.kts를 열어 예제 1.1과 같이 reactor-core 의존성을 추가합니다.

 

예제 1.1 프로젝트에 reactor-core 의존성 추가 - reactor/reactor-basic/build.gradle.kts

plugins {
    id("org.jetbrains.kotlin.jvm") version ("1.4.10")
}


group = "com.reactor.reactorbasic"
version = "1.0-SNAPSHOT"


repositories {
    mavenCentral()
}


dependencies {

    implementation("org.jetbrains.kotlin:kotlin-stdlib")

    implementation("io.projectreactor:reactor-core:3.4.0")

}

 

 

just

모노를 생성하는 가장 간단한 방법은 Mono.just를 사용하는 것입니다. Mono.just(T data)는 객체를 인자로 받은 뒤 모노로 래핑하는 팩토리 함수입니다.

예제 1.2 Mono.just 사용 - reactor/reactor-basic/src/mono/Mono1.kt

package mono

import reactor.core.publisher.Mono

fun main() {
    val mono: Mono<String> = Mono.just("Hello Reactive World")
    mono.subscribe(::println)
}

--------------------
출력 결과)
--------------------
Hello Reactive World


인자로 “Hello Reactive World”라는 String을 인자로 받았기 때문에 String이 감싸진 Mono<String>을 반환하며 그다음 라인에 작성된 mono.subscribe(::println)는 리액티브 스트림 사양의 Publisher.subscribe 함수를 구현하여 데이터를 구독합니다. 

중요한 점은 플럭스와 모노는 게으르게(lazy) 동작하여 subscribe를 호출하지 않으면 연산자에 작성한 로직이 동작하지 않는다는 것입니다. 이런 특징은 Java8 스트림 API의 특성과 동일합니다. 스트림 API 역시 연산자를 트리거 하는 최종 연산자를 호출하지 않으면 스트림 연산이 동작하지 않습니다.

 

justOrEmpty

Mono.justOrEmpty는 값이 null이 거나 null이 될 수 있는 데이터를 받아서 처리할 수 있습니다. 

 

예제 1.3  Mono.justOrEmpty 사용 - reactor/reactor-basic/src/mono/Mono2.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val mono: Mono<String> = Mono.justOrEmpty("Hello Reactive World")
    mono.subscribe(::println)
}
--------------------
출력 결과)
--------------------
Hello Reactive World


우선 greeting 변수에 null을 할당하고 Mono.justOrEmpty의 인자로 넣어줍니다. Mono.just의 경우 null인 데이터가 인자로 들어오면 NullPointerException이 발생하므로 주의해야 합니다.  

 

switchOrEmpty/defer

null인 데이터를 제공받을 경우 switchIfEmpty를 사용해 처리할 수 있습니다. switchIfEmpty는 전달받은 값이 null인 경우 새로운 데이터로 변환하는 연산자입니다. 

 

예제 1.4  통지할 데이터가 null인 경우 처리 방법 - reactor/reactor-basic/src/mono/Mono3.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val greeting: String? = null
    Mono.justOrEmpty(greeting)
        .switchIfEmpty(Mono.defer {
            Mono.just("Hello Reactive World")
        })
        .subscribe(::println)
}
--------------------
출력 결과)
--------------------
Hello Reactive World

greeting이 null로 전달되었으므로 switchIfEmpty에서 제공하는 Mono.just(“Hello Reactive World”)가 연산자의 결과로 전달됩니다. 이때 switchIfEmpty 내부에서 Mono.defer라는 연산자를 사용하면 내부 코드의 호출이 지연되어 실제 사용되는 시점에 호출됩니다.

만약 예제 1.5 같이 Mono.defer를 사용하지 않고 switchIfEmpty를 사용하게 되면 greeting의 값의 존재 유무에 상관없이 switchIfEmpty 내부의 코드가 동작하게 되는데 이는 불필요한 동작이므로 Mono.defer를 사용하여 실제 사용되는 시점까지 동작을 미루는 것이 유리합니다.

예제 1.5  Mono.defer를 사용하지 않은 경우 - reactor/reactor-basic/src/mono/Mono4.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val greeting = "Hello"
    Mono.justOrEmpty(greeting)
        .switchIfEmpty(greetIfEmpty()) // null이 아니지만 호출됨
        .subscribe(::println)
}
 
fun greetIfEmpty() : Mono<String> {
    val greeting = "Hello Reactive World"
    println(greeting)
    return Mono.just(greeting)
}
--------------------
출력 결과)
--------------------
Hello Reactive World
Hello

 

예제 1.6은 Mono.defer를 사용하여 greeting이 null인 경우에만 동작하므로 효율적입니다.

 

예제 1.6  Mono.defer를 사용한 경우 - reactor/reactor-basic/src/mono/Mono5.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val greeting = "Hello"
    Mono.justOrEmpty(greeting)
        .switchIfEmpty(Mono.defer {
            greetIfEmptyInDefer() // greeting이 null인 경우에만 동작
        })
        .subscribe(::println)
}
 
fun greetIfEmptyInDefer(): Mono<String> {
    val greeting = "Hello Reactive World"
    println(greeting)
    return Mono.just(greeting)
}
--------------------
출력 결과)
--------------------
Hello

 

 

defaultIfEmpty

전달받은 데이터가 null인 경우 예제 1.7의 defaultIfEmpty를 사용하여 기본 값을 제공할 수 있습니다. 

예제 1.7 defaultIfEmpty를 사용해 기본 값 제공 - reactor/reactor-basic/src/mono/Mono6.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val greeting: String? = null
    Mono.justOrEmpty(greeting)
        .defaultIfEmpty("Hello Reactive World")
        .subscribe(::println)
}
--------------------
출력 결과)
--------------------
Hello Reactive World

 
defaultIfEmtpy와 switchIfEmpty 연산자는 모두 전달받은 데이터가 null인 경우 사용할 수 있다는 것이 공통점입니다. 차이점은 switchIfEmpty는 내부에서 새로운 로직을 구현해 새로운 Mono를 생성할 수 있지만 defaultIfEmpty는 null을 대체하는 값을 지정한다는 점이 다릅니다.

 

fromSupplier

내부에서 로직에 대한 결과로 모노 객체를 생성할 경우 예제 1.8과 같이 Mono.fromSupplier를 사용할 수 있습니다.

예제 1.7 fromSupplier 예제 - reactor/reactor-basic/src/mono/Mono7.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    Mono.fromSupplier {
        val greeting = "Reactive World"
        var welcome = "Welcome"
        welcome += " ${greeting.split(" ")[1]}"
        welcome += " ${greeting.split(" ")[2]}"
        welcome
    }.subscribe(::println)
}
--------------------
출력 결과)
--------------------
Welcome Reactive World


Mono.fromSupplier의 인자로 제공되는 Supplier는 Java8에 추가된 함수형 인터페이스입니다. Supplier는 별도의 인자는 없이 내부의 값을 반환하는 T get() 함수를 구현하도록 설계되었습니다.

@FunctionalInterface
public interface Supplier<T> {
    T get();
}


즉 Mono.fromSupplier{} 내부에 작성하는 코드는 T get() 함수의 구현이 되어 Mono.fromSupplier로 제공되고 내부 로직의 반환되는 구조입니다. 모노를 생성하는데 특정한 구현 로직이 필요한 경우나 늦은 초기화가 필요한 경우 유용하게 사용할 수 있습니다.

 

error

Mono.error는 로직을 처리하는 중 특정한 에러를 발생시키고 싶은 경우 유용하게 사용할 수 있습니다. 예제 1.8은 일부러 NullPointerException을 발생시키는 코드를 생성하고 예외가 발생했을때 사용자 정의 예외를 발생시킵니다.

예제 1.8 error를 사용자 사용자 정의 예외 발생 예제 - reactor/reactor-basic/src/mono/Mono8.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    Mono.fromSupplier {
        val greeting: String? = null
        Mono.just(greeting)
    }.onErrorResume {
        Mono.error(GreetingEmptyException("Greeting is Empty"))
    }.subscribe(::println)
}
 
class GreetingEmptyException : Throwable {
    constructor() : super()
    constructor(message: String?) : super(message)
}
--------------------
출력 결과)
--------------------
Welcome Reactive World


가장 먼저 Mono.from Supplier 내부에서 null을 반환하는 greeting을 Mono.just로 생성하였습니다. Mono.just는 null인 객체를 전달받으면 NullPointerException을 발생시킵니다. 에러가 발생한 경우 onErrorResume를 사용하면 예외가 발생할 경우 별도의 핸들링이 가능합니다. 예제 1.8에서는 NullPointerException이 발생한 경우 직접 생성한 GreetingEmptyException을 대신 발생시키기 위해 Mono.error를 사용했습니다. Mono.error는 최상위 Exception인 Throwable을 인자로 전달받기 때문에 모든 사용자 정의 예외 객체에 사용이 가능합니다.

 

map

예제 1.9에서 소개하는 map 연산자는 전달받은 요소를 새로운 모노로 변환할 때 사용하는 연산자입니다. 

예제 1.9 map 사용 예제 - reactor/reactor-basic/src/mono/Mono9.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
   val mono: Mono<String> =
       Mono.just("Hello Reactive World")
           .map {
               it.toLowerCase()
           }
   mono.subscribe(::println)
}
--------------------
출력 결과)
--------------------
Hello Reactive World


map 내부에서는 Mono.just에 담긴 문자열(Hello Reactive World)을 전달받아서 소문자로 변환하는 toLowerCase()를 호출합니다. 이렇게 되면 map 내부에서 변환된 데이터를 모노로 감싼 뒤 데이터를 통지하게 됩니다. map은 한 개의 데이터만 제공할 수 있고 다수의 데이터를 제공할 수 없습니다. 

 

flatMap

예제 1.10은 “Hello Reactive World”라는 String을 받아서 flatMap을 통해 각 단어의 처음 단어를 뽑아내 약자로 변환하여 출력하는 예제입니다. 

예제 1.10 flatMap 사용 예제 - reactor/reactor-basic/src/mono/Mono10.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    Mono.just("Hello Reactive World")
        .flatMap {
            val words = it.split(" ")
            val acronym = words.map { word -> word[0] }.toCharArray()
            Mono.just(acronym)
        }.subscribe(::println)
}
--------------------
출력 결과)
--------------------
HRW


flatMap은 주로 단일 요소를 복수개의 요소로 변환할 때 사용합니다. 유의할 점은 flatMap은 map과 다르게 마지막에서 반환할 요소를 앞서 소개한 Mono.just와 같은 팩토리 함수로 감싸서 반환해야 합니다.

예제 1.11은 전달 받은 문자열에 “Hello”가 포함 되었는지를 판단하여 true면 문자열로 새롭게 생성한 모노를 반환하고 거짓이라면 Mono.empty를 반환합니다.

예제 1.11 flatMap 사용 예제2 - reactor/reactor-basic/src/mono/Mono11.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    Mono.just("Reactive World")
        .flatMap {
            if (it.contains("Hello")) {
                Mono.just(it)
            } else {
                Mono.empty()
            }
        }.subscribe(::println)
}
--------------------
출력 결과)
--------------------
없음


flatMap을 활용하면 특정 조건에 따라서 에러를 발생 시키거나 비어 있는 값을 리턴할 수 있습니다.


filter

filter 연산자는 특정 조건이 true인지 판단하여 true인 경우에만 데이터를 통지하는 연산자입니다. 예제 1.12는 전달받은 문자열 데이터가 존재하는 경우 정해진 문자열을 출력합니다.

예제 1.12 filter를 사용해 데이터 정제 - reactor/reactor-basic/src/mono/Mono12.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val mono: Mono<String> =
        Mono.just("Hello Reactive World")
            .filter {
                it.startsWith("Hello")
            }
            .map {
                "this operator is working"
            }
    mono.subscribe(::println)
}
--------------------
출력 결과)
--------------------
this operator is working


예제 1.12의 filter 연산자에서는 전달받은 문자열이 Hello로 시작하는지 판단하여 true이라면 다음 연산자인 map 연산자로 이어지고 정해진 문자열 “this operator is working”을 출력합니다.

filter 연산자는 Predicate라는 함수형 인터페이스를 전달받습니다. Predicate도 마찬가지로 Java8에 추가되었으며 test라는 함수를 구현해야 합니다. 

@FunctionalInterface
public interface Predicate<T> {
    boolean test(T t);
}

test 함수는 boolean을 반환하여 test 함수를 구현하는 구현체에서는 조건식의 결과를 boolean으로 반환해야 합니다. 

예제 1.13은 filter 연산자 내부의 조건이 거짓인 경우 switchIfEmpty 내부의 로직이 동작하는 것을 확인 하는 예제입니다.

예제 1.13 filter를 사용해 데이터 정제2 - reactor/reactor-basic/src/mono/Mono13.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val mono: Mono<String> =
        Mono.just("Hello Reactive World")
            .filter {
                it.startsWith("Bye")
            }
            .map {
                "this operator is working"
            }
            .switchIfEmpty(Mono.defer {
                Mono.just("switchIfEmpty is working")
            })
    mono.subscribe(::println)
}
--------------------
출력 결과)
--------------------
switchIfEmpty is working


filter 연산자내에서 전달받은 데이가 Bye로 시작하면 조건은 true지만 전달 받은 문자열이 Hello Reactive World 이므로 이 조건 식은 거짓이 됩니다. 이 경우 바로 이어진 map 연산자가 동작하지 않고 switchIfEmpty 연산자가 동작하여 출력 결과는 switchIfEmpty is working이 됩니다. 이런 형태로 filter의 조건에 따른 분기가 이뤄지는 조합은 매우 빈번하게 사용됩니다.


zip

만약 여러 개의 모노 객체를 하나의 모노로 결합할 경우 zip을 사용해 처리할 수 있습니다. 예제 1.14는 zip을 사용해 3개의 모노를 합친 결과를 출력합니다.

예제 1.14 zip을 사용한 결합 예제1 - reactor/reactor-basic/src/mono/Mono14.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val mono1 = Mono.just("Hello")
    val mono2 = Mono.just("Reactive")
    val mono3 = Mono.just("World")
    Mono.zip(mono1, mono2, mono3)
        .map { tuple: Tuple3<String, String, String> ->
            "${tuple.t1} ${tuple.t2} ${tuple.t3}"
        }.subscribe(::println)
}
--------------------
출력 결과)
--------------------
Hello Reactive World


우선 3개의 모노를 생성한 후 Mono.zip 함수의 인자로 추가합니다.  zip 함수의 처리가 완료되어 새로운 모노를 생성한 뒤 다음 연산자에 통지를 보내는 시점은 인자로 제공된 모노 중 가장 오래 수행된 모노의 시간을 기준으로 결합되는 것이 특징입니다. 이런 특징 때문에 각 모노의 로직을 비동기 처리한 뒤 결과에 대한 결합을 위해 zip을 사용할 수 있습니다.

zip으로 합쳐진 모노 객체는 튜플(Tuple) 객체의 프로퍼티로 저장됩니다. 튜플에 들어가는 순서는 함수에 인자로 추가한 순서대로 t1, t2, t3와 같은 방식으로 사용할 수 있습니다. 튜플은 리액터에서 제공하는 데이터 저장을 위한 구조체의 종류로 비슷한 예로 코틀린의 페어(Pair)와 트리플(Triple)이 있습니다. 튜플은 현재 8개까지 제공되고 있으며 reactor.util.function 패키지를 보면 Tuple2, Tuple3, Tuple4… 과 같이 미리 정의되어 제공됩니다.

public Tuple2<T1, T2> implements Iterable<Object>, Serializable {
    final T1 t1;
    final T2 t2;
    // ...
}
 
public Tuple3<T1, T2, T3> extends Tuple2<T1, T2> {
    final T3, t3;
    // ...
}


기본적으로 Mono.zip 함수에는 최대 8개의 모노를 인자로 받아서 튜플로 사용할 수 있는 오버로딩 함수가 있지만 그 이상의 모노를 결합할 경우 컴비네이터 함수(Combinator Function)을 사용해 합칠 수 있습니다.

예제 1.15는 리스트로 전달받은 모노를 컴비네이터 함수로 결합하는 예제 입니다.

예제 1.15 zip을 사용한 결합 예제2 - reactor/reactor-basic/src/mono/Mono15.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val monoList = arrayListOf(
        Mono.just("Seoul"),
        Mono.just("Tokyo"),
        Mono.just("Washington, D.C"),
        Mono.just("Wellington"),
        Mono.just("Canberra"),
        Mono.just("Paris"),
        Mono.just("Prague"),
        Mono.just("Manila"),
        Mono.just("Doha"),
        Mono.just("Singapore")
    )
 
    Mono.zip(monoList) { array: Array<Any> -> // 컴비네이터 함수
        array.joinToString {
            (it as String).toUpperCase()
        }
    }.subscribe(::println)
}
 
--------------------
출력 결과)
--------------------
SEOUL, TOKYO, WASHINGTON, D.C, WELLINGTON, CANBERRA, PARIS, PRAGUE, MANILA, DOHA, SINGAPORE


우선 Mono.zip 함수의 인자에 리스트에 담긴 모노 객체들을 추가합니다. 그런 다음 컴비네이터 함수에서 데이터를 변환하게 되는데 이때 앞서 제공된 모노가 담긴 리스트는 튜플이 아닌 배열에 담겨서 전달됩니다. 그다음 joinToString 함수 내부에서 배열의 개수만큼 반복하면서 문자열을 대문자로 변환하고 콤마(,)를 구분자로 해서 배열을 문자열로 변환한 후 다음 연산자로 제공합니다.

 

zipWith, zipWhen

zipWith 연산자는 기존 모노 객체에 새로운 모노를 결합합니다. 얼핏 보면 zip 함수와 비슷하지만 zip은 여러 개의 모노를 결합할 수 있는 반면에 zipWith는 한 번에 1개의 모노 객체를 기존 모노에 결합합니다.  예제 1.16은 zipWith를 사용해 모노를 결합하는 예제입니다.

예제 1.16 zipWith을 사용한 결합 예제 - reactor/reactor-basic/src/mono/Mono16.kt

package mono
 
import reactor.core.publisher.Mono
import reactor.util.function.Tuple2
 
fun main() {
    val mono: Mono<Tuple2<String, String>> =
        Mono.just("Republic of Korea")
            .zipWith(Mono.just("Seoul"))
 
    mono.map {
        val country = it.t1
        val capital = it.t2
        "$capital is capital of $country"
    }.subscribe(::println)
}
 
--------------------
출력 결과)
--------------------
Seoul is capital of Republic of Korea

zipWith는 한 번에 1개의 모노를 결합하는 연산자이기 때문에 2개의 모노를 담는 튜플인 Tuple2<T1,T2>를 생성합니다. 그러므로 예제 1.16의 zipWith 연산자는 Mono<Tuple2<String, String>>과 같은 형태를 반환하게 되고 map 연산자 내부에서 전달받은 튜플을 이용해 문자열을 생성합니다. 이때 튜플의 순서는 전달받은 모노의 순서와 동일합니다.

zipWhen 연산자는 zipWith와 마찬가지로 기존 모노 객체에 새로운 모노를 결합하지만 함수의 인자로 모노를 직접 전달 받는 것이 아니라 함수형 인터페이스인 Function을 전달 받습니다.

@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}


예제 1.17은 앞선 1.16 예제에서 zipWith를 사용하는 코드를  zipWhen으로 변경하였습니다.

예제 1.17 zipWith을 사용한 결합 예제 - reactor/reactor-basic/src/mono/Mono17.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val mono =
       Mono.just("Republic of Korea")
             .zipWhen {
                Mono.just("Seoul")
            }
 
    mono.map {
        val country = it.t1
        val capital = it.t2
        "$capital is capital of $country"
    }.subscribe(::println)
}
 
--------------------
출력 결과)
--------------------
Seoul is capital of Republic of Korea


zipWhen도 zipWith와 마찬가지로 한 번에 1개의 모노를 결합하는 연산자이기 때문에 2개의 모노를 담는 튜플인 Tuple2<T1,T2>를 생성합니다. 하지만 인자를 함수 형태로 전달받았기 때문에 내부에서 특정 로직을 구현할 결과를 모노로 변환하는 경우 유용하게 사용할 수 있습니다.

 

block

리액터의 연산자는 게으르게 동작하여 subscribe가 호출되기 전까지 연산자가 동작하지 않는 것이 일반적입니다. 하지만 block을 사용하면 그 즉시 구독을 차단(block) 하고 이전 연산자로부터 수신된 데이터를 가져옵니다. 예제 1.18은 block을 사용해 데이터를 출력합니다.

예제 1.18 block을 사용한 데이터 출력 예제 - reactor/reactor-basic/src/mono/Mono18.kt

package mono
 
import reactor.core.publisher.Mono
 
fun main() {
    val mono: Mono<String> =
        Mono.just("Welcome Back")
            .zipWhen {
                Mono.just("Synchronous World")
            }.map {
                "${it.t1} to the ${it.t2}"
            }
 
    val text: String? = mono.block()
    println(text)
}
 
--------------------
출력 결과)
--------------------
Welcome Back to the Synchronous World


subscribe를 호출하지 않았지만 block 함수를 호출함으로써 구독이 완료되고 Mono<String>가 아닌 String이 반환되는 것을 확인할 수 있습니다.

block 함수는 논블로킹 스레드 내부에서 사용되면 IllegalStateException이 발생합니다. 예제 1.19는 리액터에서 지원하는 스케줄러를 사용해 단일 논블로킹 스레드를 생성하고 내부에서 block 함수를 호출하여 에러를 발생시키는 예제입니다. 

예제 1.19 논블로킹 스레드 내부에서 block을 사용 예제 - reactor/reactor-basic/src/mono/Mono19.kt

package mono
 
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
 
fun main() {
    Schedulers.newSingle("NonBlockingSingleThread").schedule {
        val mono: Mono<String> =
            Mono.just("Welcome Back")
                .zipWhen {
                    Mono.just("Synchronous World")
                }.map {
                    "${it.t1} to the ${it.t2}"
                }
        val text: String? = mono.block()
        println(text)
    }
}
 
--------------------
출력 결과)
--------------------
java.lang.IllegalStateException: 
block()/blockFirst()/blockLast() are blocking, which is not supported in thread NonBlockingSingleThread-1

 

예제 1.19와 같이 논블로킹 스레드 내부에서 사용되면 경고와 함께 에러가 발생합니다. 일반적으로 block 함수를 사용하는 상황은 리액터의 비동기, 논블로킹 처리의 이점을 사용하지 않는다는 의미이기 때문에 한정적인 상황에서 사용됩니다.

 

728x90
반응형
728x90
반응형


리액티브 스트림(Reactive Streams)은 JVM 환경에서 리액티브 프로그래밍의 표준 API 사양으로 비동기 데이터 스트림과 논블로킹 백프레셔(Back-Pressure)에 대한 사양을 제공합니다. 리액티브 스트림 이전의 비동기식 애플리케이션에서는 멀티 CPU 코어를 제대로 활용하기 위해 복잡한 병렬 처리 코드가 필요했습니다. 또한 처리할 데이터가 무한정 많아져서 시스템의 한계를 넘어서는 경우 애플리케이션은 병목 현상(bottleneck)이 발생하거나 심각한 경우 애플리케이션이 정지되는 경우도 발생하게 됩니다. 리액티브 스트림은 이러한 문제를 해결하기 위해 탄생했습니다. 

 

리액티브 스트림 사양이 만들어지기 전에는 표준화된 사양이 없었으므로 라이브러리마다 각자의 방식으로 구현을 해야 했습니다. 이런 이유로 2013년부터 리액티브 스트림이라는 이름으로 Netflix, Pivotal, Lightbend, Red Hat과 같은 유명 회사들이 공동으로 표준화 작업에 참여하고 있습니다. 2015년 리액티브 스트림 사양이 최초 릴리즈 된 후 현재 1.0.3까지 릴리즈 되었습니다.  몇년 사이 리액티브 스트림 표준 사양을 지원하는 라이브러리들이 많이 발표됬고 JDK9에는 Flow라는 이름으로 리액티브 스트림 구현이 추가되었습니다.

리액티브 스트림의 목표는 개발자들로 하여금 용어(Glossary)를 통일하고 라이브러리들이 리액티브 스트림 사양을 준수하여 일관성 있는 구현을 따르도록 만드는 것입니다.


리액티브 스트림의 다양한 구현체들

리액티브 스트림은 TCK(Technology Compatibility Kit)를 지원합니다. 기술 호환성 키트라고 불리는 TCK는 라이브러리가 정해진 사양에 맞게 구현되었는지 보장하기 위해 만들어진 테스트 도구입니다. 자바 진영에서 Java SE 표준을 따른 JDK(Java Development Kit)인지 검증하기 위해 TCK를 사용합니다. 예를 들어 오픈 소스 JDK인 AdoptOpenJDK, Amazon Corretto 등이 TCK를 사용해 사양을 검증했습니다. TCK 테스트를 통과했다면 라이브러리의 구현이 사양에 맞게 구현되었다는 의미이므로 안정성이 중요한 사용자 입장에선 믿고 사용할 수 있게 됩니다.

 

리액티브 스트림도 이와 마찬가지로 리액티브 스트림 사양을 구현한 라이브러리는 TCK에 정의된 규칙을 통과해야만 검증된 라이브러리임을 확인할 수 있도록 하였습니다. 또한 리액티브 스트림은 라이브러리가 표준 사양을 준수하는 TCK 테스트를 통과하기만 한다면 사양에 포함되지 않은 라이브러리만의 추가 기능도 자유롭게 구현할 수 있도록 하였습니다.

 리액티브 스트림을 표준 사양을 채택한 대표적인 라이브러리들

  • Project Reactor
  • RxJava
  • JDK9 Flow
  • Akka Streams
  • Vert.x

리액티브 스트림 사양

리액티브 스트림 사양(specification)은 핵심 인터페이스와 프로토콜로 구성됩니다. 먼저 리액티브 스트림에서 제공하는 핵심 인터페이스를 확인해보겠습니다.

그림 1.7 리액티브 스트림 인터페이스

 

표 1.5 리액티브 스트림 인터페이스

인터페이스 명 설명
Publisher 데이터를 생성하고 구독자에게 통지
Subscriber 데이터를 구독하고 통지 받은 데이터를 처리
Subscription Publisher, Subscriber간의 데이터를 교환하도록 연결하는 역할을 하며 전달받을 데이터의 개수와 구독을 해지할 수 있다
Processor Publisher, Subscriber을 모두 상속받은 인터페이스


핵심 인터페이스에 대해 요약하면 발행자(Publisher)는 데이터를 생성하고 구독자(Subscriber)에게 데이터를 통지하고 구독자는 자신이 처리할 수 있는 만큼의 데이터를 요청하고 처리합니다. 이때 발행자가 제공할 수 있는 데이터의 양은 무한(unbounded) 하고 순차적(sequential) 처리를 보장합니다. 

 

서브스크립션(Subscription)은 발행자와 구독자를 연결 하는 매개체이며 구독자가 데이터를 요청하거나 구독을 해지하는 등 데이터 조절에 관련된 역할을 담당합니다. 프로세서(Processor)는 발행자와 구독자의 기능을 모두 포함하는 인터페이스이며 데이터를 가공하는 중간 단계에서 사용합니다. 

 

간단히 리액티브 스트림의 핵심 인터페이스가 하는 일에 대해 알아봤으니 실제 자바로 작성된 코드를 보면서 리액티브 스트림에서 필수적인 사양도 같이 알아보겠습니다.

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}


발행자는 subscribe에서 구독자인 Subscriber를 등록하고 구독자에게 데이터를 순차적으로 통지합니다. 이때 발행자는 구독자가 요청한 만큼의 데이터를 통지합니다. 또한 발행자는 구독자가 요청한 데이터를 처리하는 중에 에러가 발생하거나 또는 완료 처리 신호가 발생하게 되면 더 이상 데이터를 통지하지 않습니다. 그리고 subscribe에서 전달받은 구독자가 null이라면 즉시 java.lang.NullPointException을 발생시킵니다.

 

구독자 인터페이스에는 4가지의 추상 메서드가 정의되있습니다.

public interface Subscriber<T> {

    public void onSubscribe(Subscription s);

    public void onNext(T t);

    public void onError(Throwable t);

    public void onComplete();

}


구독자의 메서드들은 리액티브 스트림에서 구독자와 발행자간의 데이터 전달에 사용하는  규약(Protocol)이라고 설명하고 있습니다.

 

표 1.6 서브스크립션에 정의된 추상 메서드

메서드 명 설명
onSubscribe 구독시 최초에 한번만 호출
onNext 구독자가 요구하는 데이터의 수 만큼 호출 (최대 java.lang.Long.MAX_VALUE)
onError 에러 또는 더이상 처리할 수 없는 경우
onComplete 모든 처리가 정상적으로 완료된 경우


이때 각 메서드의 호출을 시그널(Signal)이라고 부릅니다. 각 시그널은 호출되는 순서가 다릅니다. onSubscribe는 최초 구독에 대한 초기화를 담당합니다. 구독 시 최초 한 번만 호출되기 때문에 onSubscribe 내부에서 초기화 로직을 구현할 수 있습니다. 

 

onNext는 발행자로부터 통지받을 데이터가 있는 경우 구독자가 요청하는 만큼 계속 호출됩니다. 이때 발행자가 통지하는 데이터의 수는 구독자가 요구하는 수와 같거나 적어야 합니다. 이런 사양이 있는 이유는 발행자가 너무 많은 데이터를 통지해서 구독자가 처리할 수 있는 양보다 많아지면 시스템에 문제가 발생할 수 있기 때문에 적절하게 처리량을 조절하기 위함입니다. 

 

반대로 구독자가 onNext에서 데이터를 처리하는 데 오래 걸린다면 발행자도 영향을 받을 수 있습니다. 이때는 발행자와 구독자를 분리하여 비동기적으로 처리해야 합니다.

 

발행자 측에서 처리 중 에러가 발생하면 onError를 구독자에게 통지합니다. onError 시그널이 발생하면 더 이상 데이터를 통지하지 않습니다. 구독자는 onError 시그널을 받으면 이에 대한 에러 처리를 할 수 있습니다. onComplete는 모든 데이터를 통지한 시점에 마지막에 호출되어 데이터 통지가 성공적으로 완료되었음을 통지합니다. onError와 onComplete는 반드시 둘중 하나만 호출되야하며 이후에는 어떠한 시그널도 발생해선 안됩니다. 만약 onError가 발생하고 onComplete가 발생한다면 에러가 발생한 것인지 정상적으로 완료되었는지 판단할 수 없기 때문입니다.

 

서브스크립션 인터페이스에는 request와 cancel이라는 추상 메서드가 정의돼있습니다.

public interface Subscription {

    public void request(long n);

    public void cancel();

}


구독자가 발행자에게 통지받을 데이터의 수를 요청할 때 서브스크립션의 request(long n)를 사용하는데 onSubscribe에서 최초 통지받을 개수를 요청하기 위해 사용하고 그다음엔 onNext에서 데이터 처리 후 다음에 통지받을 데이터 개수를 재요청합니다. 만약 request(long n)에 0보다 작거나 같은 수가 전달된 경우 IllegalArgumentException가 발생합니다.

cancel은 구독을 취소하기 위해 사용합니다. 구독을 취소하면 발행자와 구독자의 연결이 끊어지므로 발행자는 더 이상 데이터를 통지하지 않습니다. cancel이 호출되면 비동기적으로 시그널이 발생하여 구독 취소 여부가 지연되어 처리될 수 있습니다.

그림 1.8 리액티브 스트림에서 발행자 구독자 간의 데이터 처리 흐름

마지막으로 소개할 프로세서는 발행자와 구독자를 모두 상속받는 인터페이스 이며 자신만의 추상 메서드는 존재하지 않습니다.

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {

}

프로세서는 일반적으로 데이터의 가공 단계에서 사용되어 발행자와 구독자의 모든 사양을 따릅니다.

 

728x90
반응형
728x90
반응형


동기(Synchronous)방식의 프로그램에서 작업의 실행 흐름은 순차적으로 동작합니다. 순차적으로 동작하는 프로그램은 코드를 파악하기 쉽고 결과를 예측하기 쉽다는 장점이 있지만 특정 작업을 실행하는 동안에는 다른 작업을 할 수 없다는 단점이 있습니다. 이에 반해 비동기 처리 방식은 현재 실행 중인 작업이 끝나는 것을 기다리지 않고 다른 작업을 할 수 있습니다. 이러한 특징을 가진 비동기 프로그래밍은 서버, 모바일, 데스크톱 등 어떤 환경의  애플리케이션을 개발하더라도 유용하게 사용됩니다.

예를 들어 트위터나 페이스북 같은 SNS 서비스를 개발한다면 외부의 다양한 데이터 소스 또는 원격 서비스로부터 데이터를 가져온 뒤 적절하게 가공하고 하나로 합쳐서 클라이언트에게 응답하게 되는데 이런 경우 비동기 프로그래밍이 유용할 수 있습니다. 또 UI 애플리케이션의 경우 특정 이벤트에 반응하는 동작을 구현해야 하는데  이럴 때 필수적으로 비동기 프로그래밍을 사용하게 됩니다.

대부분의 프로그래밍 언어들은 각 언어의 철학에 맞는 다양한 비동기 처리 방법들을 지원합니다. 비동기 처리 방법에는 대표적으로 스레드(Thread), 콜백(Callback), 프로미스(Promise), 퓨처(Future), 코루틴(Coroutine) 등이 있고 각각의 방법들은 장점과 단점이 존재하고 언어와 라이브러리에 따라 지원되지 않는 경우도 있습니다.

리액티브 프로그래밍에 대한 학습에 앞서 기본적인 비동기 프로그래밍 지식은 필수입니다. 리액티브 프로그래밍에서 비동기 처리를 적용하면 콜백 헬에서 벗어날 수 있으며 퓨처를 사용했을때 성능 하락을 유발하는 블로킹 문제를 논 블로킹 형태로 쉽게 변경할 수 있습니다. 이번 절에서는 비동기 프로그래밍에 대한 기본적인 지식과 문제점에 대해 알아보겠습니다.

 

스레드

스레드는 서버 프로그래밍에서 가장 기본이 되는 비동기 처리 방식입니다. 하나의 프로세스(Process)에는 최소한 하나 이상의 스레드가 존재하고 프로세스 내의 스레드들은 동일한 메모리를 공유합니다. 일반적으로 하나의 프로세스에서 스레드가 1개인 경우 싱글 스레드(Single Thread)라고 부르고 하나 이상 존재하는 경우 멀티 스레드(Multi Thread)라고 부릅니다. 멀티 스레드를 사용하면 애플리케이션에서 여러 개의 작업을 동시에 할 수 있게 됩니다.

이번에는 스레드 자신의 이름을 출력하는 기능을 가진 5개의 스레드를 사용하는 간단한 멀티 스레드 예제를 만들어보겠습니다.

 

예제 1.3 현재 스레드의 이름을 반환하는 예제

package thread
 
class PrintRunnable : Runnable {
    override fun run() {
         println("current-thread-name : ${Thread.currentThread().name}")
    }
}


예제 1.3의 PrintRunnable은 java.lang 패키지의 Runnable 인터페이스를 구현하는 클래스입니다. 러너블(Runnable) 인터페이스를 사용하려면 run 함수를 필수로 구현해야합니다. run 내부에는 스레드 자신의 이름을 출력하도록 하였습니다. 그다음 메인 함수에서 5번 반복문을 수행하면서 PrintRunnable을 감싼 스레드를 생성하고 스레드를 동작시킵니다. 마지막엔 마찬가지로 현재 동작 중인 스레드 이름을 출력하는 print 함수를 호출합니다.

 

package thread
 
fun main(args: Array<String>) {
    for (i in 0..5) {
         val thread = Thread(PrintRunnable())
         thread.start()
    }
    println("current-thread-name : ${Thread.currentThread().name}")
}
 
--------------------
출력 결과)
--------------------
current-thread-name : Thread-0
current-thread-name : Thread-3
current-thread-name : Thread-2
current-thread-name : Thread-1
current-thread-name : main
current-thread-name : Thread-4
current-thread-name : Thread-5


예제 1.4에서 출력된 결과는 예제를 실행할 때마다 달라지는 걸 확인할 수 있습니다. 만약 멀티 스레드를 사용하지 않고 예제를 출력하면 current-thread-name은 모두 main으로 출력됩니다. main은 메인 스레드로 불리며 JVM 기반의 언어에서 가장 기본이 되는 스레드를 말합니다. 

앞서 멀티 스레드 스레드는 생성한 순서대로 동작하지 않고 무작위로 동작하는걸 확인했습니다. 다수의 스레드를 사용하면 스레드가 전환되면서 컨텍스트 스위칭(Context Switching)이 발생합니다.

그림 1.3 멀티 스레드의 동작 방식

 

멀티 스레드를 사용하면 스케쥴링 알고리즘에 의해 컨텍스트 스위칭이 일어나면서 특정 스레드가 작업을 실행하다가 다른 스레드로 전환되면서 새로운 작업을 하고 다시 원래 스레드로 돌아와서 일시 중지했던 작업을 이어서 완료합니다. 멀티 스레드를 사용하면 스레드마다 작업을 나눠서 처리할 수 있기 때문에 여러 개의 작업을 효율적으로 처리할 수 있습니다. 또한 스레드는 프로세스보다 가볍고 하나의 프로세스 내에서 여러 개의 스레드를 생성하면 메모리 자원을 공유할 수 있다는 장점을 가지고 있습니다.

 

하지만 스레드가 무한정 많아지면 생성된 스레드로 인해 메모리 사용량이 높아지게 되어 OutOfMemoryError가 발생할 수 있고 동시 처리량이 높아야 하는 시스템의 경우 스레드를 생성하면서 발생하는 대기 시간 때문에 클라이언트에게 빠른 응답을 줄 수 없게 됩니다. 이러한 문제를 극복하려면 스레드 풀(Thread Pool)을 사용해야 합니다. 스레드 풀을 사용하면 애플리케이션 내에서 사용할 총 스레드 수를 제한할 수 있고 이미 생성된 스레드를 재사용하므로 클라이언트는 빠른 응답을 받을 수 있습니다. 

 

스레드 풀은 직접 만드는 것보다 검증된 라이브러리를 사용하는 것이 좋습니다. 그러므로 java.util.concurrent 패키지가 제공하는 ExecutorService를 사용하면 쉽고 안정적으로 스레드 풀을 사용할 수 있습니다. 

 

예제 1.5 ExecutorService를 사용해 스레드 실행하기

package thread
 
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
 
fun main(args: Array<String>) {
    val pool: ExecutorService = Executors.newFixedThreadPool(5)
    try {
         for (i in 0..5) {
              pool.execute(PrintRunnable())
         }
    } catch (e: Exception) {
         pool.shutdown()
    }
    println("current-thread-name : ${Thread.currentThread().name}")
}
 
--------------------
출력 결과)
--------------------
current-thread-name : pool-1-thread-1
current-thread-name : pool-1-thread-2
current-thread-name : pool-1-thread-3
current-thread-name : pool-1-thread-4
current-thread-name : main
current-thread-name : pool-1-thread-5
current-thread-name : pool-1-thread-5

 

  • Executors.newFixedThreadPool을 사용하면 주어진 수만큼의 스레드를 생성하고 유지합니다. 즉 5로 값을 설정하면 5개의 스레드는 항상 유지되는 걸 보장합니다.
  • pool.execute는 러너블 인터페이스를 받아서 단순히 실행합니다. execute는 별도의 반환형이 없기 때문에 작업의 결과나 상태를 알 수 없습니다.


예제 1.5에서 출력된 결과를 보면 스레드 이름에 pool-1로 시작하는 것을 확인할 수 있습니다. 이름에서 유추할 수 있듯이 pool-1로 시작되는 스레드는 스레드 풀에서 관리되는 스레드라는 것 을 알 수 있습니다.  또 한 가지 스레드 풀을 사용하면서 다른 점은 마지막 두 개 작업은 pool-1-thread-5라는 동일한 스레드에서 처리되었습니다. 그러므로 스레드를 새로 생성한 게 아니라 스레드 풀에 이미 생성된 스레드를 재사용했다는 것을 알 수 있습니다.


퓨처

퓨처(Future)는 비동기 작업에 대한 결과를 얻고 싶은 경우 사용되는 인터페이스입니다. 퓨처를 사용하면 스레드를 직접 사용하는 것보다 직관적이기 때문에 이해하기 쉽고 모든 작업이 끝나면 작업에 대한 결과를 얻어서 별도의 처리를 할 수 있게 됩니다. 예를 들어 수행 시간이 오래 걸리는 작업이나 작업에 대한 결과를 기다리면서 다른 작업을 병행해서 수행하고 싶은 경우에 유용합니다.퓨처를 사용해 처리 결과를 얻기 위해선 콜러블(Callable)을 사용해야 합니다. 앞선 예제에서 사용한 러너블은 결과 반환을 지원하지 않으므로 콜러블을 사용해 완료된 결과를 퓨처에 넘길 수 있습니다. 다음 예제는 퓨처를 사용해 비동기적으로 계산하고 결과를 리턴합니다.

예제 1.6  정해진 시간만큼 대기한 후 계산하는 계산기

package future
 
object Calculator {
    fun sum(a: Int, b: Int, delay: Long = 0): Int {
         Thread.sleep(delay)
         return a + b
    }
}

 

예제 1.7 퓨처를 사용해 작업을 실행

package future
 
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
 
fun main(args: Array<String>) {
    val pool: ExecutorService = Executors.newSingleThreadExecutor()
    val future: Future<*> = pool.submit(Callable {
        Calculator.sum(100, 200, delay = 1000)
    })
    println("계산 시작")
    val futureResult = future.get() // 비동기 작업의 결과를 기다린다.
    println(futureResult)
    val result = Calculator.sum(1, 5)
    println(result)
    println("계산 종료")
}
 
--------------------
출력 결과)
--------------------
계산 시작
300
6
계산 종료


예제 1.7의 ExecutorService의 submit은 콜러블을 인자로 받아 실행합니다. 콜러블 내부에 선언된 Calculator는 sum 함수의 인자로 전달받은 delay=1000를 이용해 스레드를 1초간 일시 정지한 후 두 개 정수의 합을 계산하여 퓨처로 반환합니다. 콜러블 내부의 코드는 스레드 풀에서 전달받은 스레드를 사용해 비동기적으로 작업을 수행하게 됩니다. 비동기 작업을 수행하는 동안에 Calculator.sum(1, 5, delay = 0)의 결과는 즉시 출력되고 future.get()에서 비동기 작업에 대한 결과를 가져오게 됩니다. 이때 비동기 작업이 완료되었다면 결과를 get 함수에서 즉시 가져올 수 있지만 작업이 완료되지 않았다면 작업이 완료될 때까지 스레드는 블로킹 상태에서 결과를 기다립니다.

퓨처를 사용하면 비동기 작업을 쉽게 구현할 수 있지만 몇 가지 단점을 가지고 있습니다. 우선 퓨처를 사용하면 동시에 실행되는 한 개 이상의 비동기 작업에 대한 결과를 하나로 조합하여 처리하거나 수동으로 완료 처리(completion)  할 수 있는 방법을 지원하지 않습니다. 또한 퓨처의 get 함수는 비동기 작업의 처리가 완료될 때까지 다음 코드로 넘어가지 않고 무한정 대기하거나 지정해둔 타임아웃 시간까지 블로킹됩니다. 

그림 1.4 블로킹 흐름

 

퓨처의 이러한 단점을 극복하기 극복하기 위해 JDK8부터 제공되는 컴플리터블 퓨처(CompletableFuture)를 사용하여 비동기 작업의 결과를 논블로킹으로 가져올 수 있는 API를 사용할 수 있습니다. 다음 예제에서는 이전에 작성했던 예제를 컴플리터블 퓨처로 변경하여 실행해보겠습니다.

 

예제 1.8 컴플리터블 퓨처를 사용해 작업을 실행

package future
 
import java.util.concurrent.*
 
fun main(args: Array<String>) {
    val completableFuture = CompletableFuture.supplyAsync { // (1)
         Calculator.sum(100, 200, delay = 2000)
    }
 
    println("계산 시작")
    val futureResult = completableFuture.get() // (2)
    println(futureResult)
 
    while (!completableFuture.isDone) { // (3)
         Thread.sleep(500)
         println("계산 결과를 집계 중입니다.")
    }
    println("계산 종료")
}
 
--------------------
출력 결과)
--------------------
계산 시작
300
계산 종료
  1. 컴플리터블 퓨처는 퓨처와 달리 ExecutorService를 사용할 필요가 없기 때문에 컴플리터블 퓨처의 팩토리 함수인 supplyAsync를 사용해 비동기 작업을 수행했습니다. 컴플리터블 퓨처가 제공하는 supplyAsync와 같은 팩토리 함수는 명시적으로 Executor 스레드 풀을 제공하지 않으면 기본 스레드 풀로 포크-조인풀을 사용합니다. 
  2. 퓨처를 사용할때와 마찬가지로 비동기 작업이 완료될때까지 블록됩니다.
  3. isDone 속성은 말그대로 컴플리터블 퓨처가 수행 중인 비동기 작업이 완료된 상태인지를 체크합니다. 컴플리터블 퓨처는 isDone외에도  취소상태를 나타내는 isCancelled 그리고 비동기 작업 도중에 에러가 발생한 상태를 나타내는 isCompletedExceptionally도 제공합니다.

예제 1.8을 실행해보면 계산 결과를 집계 중입니다.라는 메시지는 출력되지 않고 계산 결과인 300이 출력된 것을 확인할 수 있습니다. 컴플리터블 퓨처를 사용해도 get 함수를 사용하면 퓨처를 사용하는 것과 동일하게 해당 라인에서 결과를 리턴할 때까지 블로킹됩니다. 컴플리터블 퓨처를 사용하는 가장 큰 이유 중 하나는 비동기 작업을 블로킹하지 않고 사용하기 위함입니다. 그러므로 thenApply 함수를 사용하면 작업이 완료될 때까지 기다리지 않고 결과를 전달받을 수 있습니다.

예제 1.9 thenApply를 사용해 논블로킹 코드로 변경

package future
 
import java.util.concurrent.*
 
fun main(args: Array<String>) {
   val completableFuture = CompletableFuture.supplyAsync {
       Calculator.sum(100, 200, delay = 2000)
   }
 
   println("계산 시작")
   completableFuture.thenApply(::println) // 논블로킹으로 동작
 
   while (!completableFuture.isDone) {
       Thread.sleep(500)
       println("계산 결과를 집계 중입니다.")
   }
   println("계산 종료")
}
 
--------------------
출력 결과)
--------------------
계산 시작
계산 결과를 집계 중입니다.
계산 결과를 집계 중입니다.
계산 결과를 집계 중입니다.
300
계산 결과를 집계 중입니다.
계산 종료


예제 1.9를 실행해보면 get을 thenApply로 변경한 후 결과가 완료되는 것을 기다리지 않고 다음 라인의 “계산 결과를 집계 중입니다.”를 출력하는 것을 확인할 수 있습니다. 이후 계산 작업이 완료 된 후 계산 결과를 출력하고 “계산 종료"를 출력한 뒤 애플리케이션이 종료됩니다. 


그림 1.5 논블로킹 흐름

 

thenApply는 코드가 블록 되지 않기 때문에 예제 1.10과 같이 while (!completableFuture.isDone)을 주석 처리해버리면 애플리케이션은 결과를 반환하지 않고 종료됩니다. 

 

예제 1.10 계산 결과 완료 코드 주석 처리 

package future
 
import java.util.concurrent.*
 
fun main(args: Array<String>) {
   val completableFuture = CompletableFuture.supplyAsync {
       Calculator.sum(100, 200, delay = 2000)
   }
 
   println("계산 시작")
   completableFuture.thenApply(::println) // 논블로킹으로 동작
 
//    while (!completableFuture.isDone) {
//        Thread.sleep(500)
//        println("계산 결과를 집계 중입니다.")
//    }
   println("계산 종료")
}
 
--------------------
출력 결과)
--------------------
계산 시작
계산 종료


예상대로 계산 결과는 출력되지 않고 바로 종료된것을 확인 할 수 있습니다. 논블로킹으로 동작하는 코드를 작성할때는 이러한 특징에 대해 이해하고 개발하지 않으면 의도치 않은 버그를 만들 수 있다는 단점이 있습니다. 

이번엔 컴플리터블 퓨처를 활용해 다수의 작업을 병렬로 처리하는 예제에 대해 알아보겠습니다. 예제 1.11에선 의도대로 병렬 처리가 되는지 또한 얼마나 시간이 소요되는지 확인하기 위해 소요 시간을 측정하는 코드를 추가하였습니다.

 

 예제 1.11 계산 소요 시간 측정 

val startTime = System.nanoTime()
println("계산 시작")
println("계산 종료")
val elapsedTime = (System.nanoTime() - startTime) / 1_000_000
println("수행시간 : $elapsedTime msecs")


다음 예제는 비동기 작업을 5번 반복하면서 결과를 출력하는 코드입니다.

 

 예제 1.12 병렬 처리 구현1

package future
 
import java.util.concurrent.*
 
fun main(args: Array<String>) {
    val startTime = System.nanoTime()
    println("계산 시작")
    (0 until 5).forEach {
         val supplier = {
             println("current-thread-name:${Thread.currentThread().name}")
             Calculator.sum(100, 200, delay = 2000)
         }
         val result = CompletableFuture.supplyAsync(supplier).join()
         println(result)
    }
    println("계산 종료")
    val elapsedTime = (System.nanoTime() - startTime) / 1_000_000
    println("수행시간 : $elapsedTime msecs")
}
 
--------------------
출력 결과)
--------------------
계산 시작
current-thread-name : ForkJoinPool.commonPool-worker-9
300
current-thread-name : ForkJoinPool.commonPool-worker-9
300
current-thread-name : ForkJoinPool.commonPool-worker-9
300
current-thread-name : ForkJoinPool.commonPool-worker-9
300
current-thread-name : ForkJoinPool.commonPool-worker-9
300
계산 종료
수행시간 : 10021 msecs


우선 (0 until 5).forEach 내부에서 이전 예제와 동일하게 비동기 계산 로직을 추가하고 join을 사용해 결과를 받아온 뒤 출력합니다. 이번에 사용한 join은 get과 동일하게 비동기 작업의 결과를 받아옵니다. 하지만 get은 익셉션이 발생하면 throw를 사용해 익셉션을 외부로 전파해서 try-catch 구문으로 예외 처리를 해야 하는데 반해 join은 익셉션을 외부로 전파시키지 않는다는 차이점이 있습니다.

예제 1.12의 실행 결과는 예상과 다르게 10초가 소요되었고 current-thread-name에 5번 모두 동일한 스레드가 찍힌 것을 확인할 수 있습니다. 그러므로 작성한 예제가 병렬로 처리되지 않고 순차 처리되었단 걸 알 수 있습니다. 이런 결과가 발생한 이유는 join 역시 get과 마찬가지로 작업의 결과를 가져올때 블로킹되는데 join을 반복문 내부에서 수행했기 때문에 의도와는 다르게 순차적으로 처리되었습니다. 이제 이유를 알았으니 예제를 개선해보겠습니다. 

 

 예제 1.13 병렬 처리 구현2

package future
 
import java.util.concurrent.*
 
fun main(args: Array<String>) {
    val startTime = System.nanoTime()
    println("계산 시작")
    (0 until 5).map {
         val supplier = {              
             println("current-thread-name:${Thread.currentThread().name}")
             Calculator.sum(100, 200, delay = 2000)
         }
         CompletableFuture.supplyAsync(supplier)
    }.map(CompletableFuture<Int>::join).forEach(::println)
    println("계산 종료")
    val elapsedTime = (System.nanoTime() - startTime) / 1_000_000
    println("수행시간 : $elapsedTime msecs")
}
 
--------------------
출력 결과)
--------------------
계산 시작
current-thread-name : ForkJoinPool.commonPool-worker-9
current-thread-name : ForkJoinPool.commonPool-worker-2
current-thread-name : ForkJoinPool.commonPool-worker-11
current-thread-name : ForkJoinPool.commonPool-worker-4
current-thread-name : ForkJoinPool.commonPool-worker-6
300
300
300
300
300
계산 종료
수행시간 : 2034 msecs


마지막으로 개선한 예제 1.13에선 map 함수를 사용해서 5개의 CompletableFuture<Int>를 가지는 리스트로 변환하고 그다음 map에선 join으로 가져온 결과를 모아서 List<Int>로 변환하였습니다. 그리고 마지막 forEach에선 전달받은 정수형 리스트를 가지고 println을 출력하도록 수정했습니다.

예제의 실행 결과를 보면 의도대로 각기 다른 스레드가 계산 작업을 병렬로 나눠 처리한 것을 확인할 수 있습니다. 또한 기존 코드 보다 약 5배 빠른 2초 만에 처리가 완료되었습니다. 이처럼 컴플리터블 퓨처를 사용하면 논블로킹으로 동작하거나 수행 결과를 조합하는 코드도 어렵지 않게 구현할 수 있습니다. 

이번 장에선 다양한 방법을 사용해 비동기 프로그래밍을 구현해봤습니다. 특히 컴플리터블 퓨처는   기존의 비동기 처리 방법에 비해 우아하고 편리합니다. 컴플리터블 퓨처가 만능 해결사는 아니지만 대다수의 비동기 처리 시나리오에서 유용하게 사용할 수 있습니다. 예를 들면 우리가 개발한 서버에서 외부의 여러 API 서버를 호출하여 응답을 받아서 결과를 결합하고 처리해야 하는 시나리오라면 컴플리터블 퓨처는 매우 유용할 것입니다.

그림 1.6 컴플리터블 퓨처가 잘 사용된 시나리오

 

하나의 컴플리터블 퓨처는 단순히 한 번의 비동기 작업을 정의하고 처리합니다. 반면에 리액티브 프로그래밍은 데이터의 크기가 정해지지 않고 지속적으로 생성되는 데이터를 처리하는 스트리밍 형태로써 데이터 전달의 주체인 발행자는 데이터가 준비되면 데이터를 구독자에게 통지하고 구독자는 자신이 처리할 수 있는 만큼의 데이터를 처리합니다. 이런 처리 방법은 일반적인 비동기 프로그래밍으로는 처리하기 힘들고 구현도 쉽지 않습니다. 이때 리액티브 프로그래밍을 적용하면 최소한의 노력으로 복잡한 비동기 작업을 핸들링 할 수 있습니다.

 

728x90
반응형

+ Recent posts