728x90
반응형


동기(Synchronous)방식의 프로그램에서 작업의 실행 흐름은 순차적으로 동작합니다. 순차적으로 동작하는 프로그램은 코드를 파악하기 쉽고 결과를 예측하기 쉽다는 장점이 있지만 특정 작업을 실행하는 동안에는 다른 작업을 할 수 없다는 단점이 있습니다. 이에 반해 비동기 처리 방식은 현재 실행 중인 작업이 끝나는 것을 기다리지 않고 다른 작업을 할 수 있습니다. 이러한 특징을 가진 비동기 프로그래밍은 서버, 모바일, 데스크톱 등 어떤 환경의  애플리케이션을 개발하더라도 유용하게 사용됩니다.

예를 들어 트위터나 페이스북 같은 SNS 서비스를 개발한다면 외부의 다양한 데이터 소스 또는 원격 서비스로부터 데이터를 가져온 뒤 적절하게 가공하고 하나로 합쳐서 클라이언트에게 응답하게 되는데 이런 경우 비동기 프로그래밍이 유용할 수 있습니다. 또 UI 애플리케이션의 경우 특정 이벤트에 반응하는 동작을 구현해야 하는데  이럴 때 필수적으로 비동기 프로그래밍을 사용하게 됩니다.

대부분의 프로그래밍 언어들은 각 언어의 철학에 맞는 다양한 비동기 처리 방법들을 지원합니다. 비동기 처리 방법에는 대표적으로 스레드(Thread), 콜백(Callback), 프로미스(Promise), 퓨처(Future), 코루틴(Coroutine) 등이 있고 각각의 방법들은 장점과 단점이 존재하고 언어와 라이브러리에 따라 지원되지 않는 경우도 있습니다.

리액티브 프로그래밍에 대한 학습에 앞서 기본적인 비동기 프로그래밍 지식은 필수입니다. 리액티브 프로그래밍에서 비동기 처리를 적용하면 콜백 헬에서 벗어날 수 있으며 퓨처를 사용했을때 성능 하락을 유발하는 블로킹 문제를 논 블로킹 형태로 쉽게 변경할 수 있습니다. 이번 절에서는 비동기 프로그래밍에 대한 기본적인 지식과 문제점에 대해 알아보겠습니다.

 

스레드

스레드는 서버 프로그래밍에서 가장 기본이 되는 비동기 처리 방식입니다. 하나의 프로세스(Process)에는 최소한 하나 이상의 스레드가 존재하고 프로세스 내의 스레드들은 동일한 메모리를 공유합니다. 일반적으로 하나의 프로세스에서 스레드가 1개인 경우 싱글 스레드(Single Thread)라고 부르고 하나 이상 존재하는 경우 멀티 스레드(Multi Thread)라고 부릅니다. 멀티 스레드를 사용하면 애플리케이션에서 여러 개의 작업을 동시에 할 수 있게 됩니다.

이번에는 스레드 자신의 이름을 출력하는 기능을 가진 5개의 스레드를 사용하는 간단한 멀티 스레드 예제를 만들어보겠습니다.

 

예제 1.3 현재 스레드의 이름을 반환하는 예제

package thread
 
class PrintRunnable : Runnable {
    override fun run() {
         println("current-thread-name : ${Thread.currentThread().name}")
    }
}


예제 1.3의 PrintRunnable은 java.lang 패키지의 Runnable 인터페이스를 구현하는 클래스입니다. 러너블(Runnable) 인터페이스를 사용하려면 run 함수를 필수로 구현해야합니다. run 내부에는 스레드 자신의 이름을 출력하도록 하였습니다. 그다음 메인 함수에서 5번 반복문을 수행하면서 PrintRunnable을 감싼 스레드를 생성하고 스레드를 동작시킵니다. 마지막엔 마찬가지로 현재 동작 중인 스레드 이름을 출력하는 print 함수를 호출합니다.

 

package thread
 
fun main(args: Array<String>) {
    for (i in 0..5) {
         val thread = Thread(PrintRunnable())
         thread.start()
    }
    println("current-thread-name : ${Thread.currentThread().name}")
}
 
--------------------
출력 결과)
--------------------
current-thread-name : Thread-0
current-thread-name : Thread-3
current-thread-name : Thread-2
current-thread-name : Thread-1
current-thread-name : main
current-thread-name : Thread-4
current-thread-name : Thread-5


예제 1.4에서 출력된 결과는 예제를 실행할 때마다 달라지는 걸 확인할 수 있습니다. 만약 멀티 스레드를 사용하지 않고 예제를 출력하면 current-thread-name은 모두 main으로 출력됩니다. main은 메인 스레드로 불리며 JVM 기반의 언어에서 가장 기본이 되는 스레드를 말합니다. 

앞서 멀티 스레드 스레드는 생성한 순서대로 동작하지 않고 무작위로 동작하는걸 확인했습니다. 다수의 스레드를 사용하면 스레드가 전환되면서 컨텍스트 스위칭(Context Switching)이 발생합니다.

그림 1.3 멀티 스레드의 동작 방식

 

멀티 스레드를 사용하면 스케쥴링 알고리즘에 의해 컨텍스트 스위칭이 일어나면서 특정 스레드가 작업을 실행하다가 다른 스레드로 전환되면서 새로운 작업을 하고 다시 원래 스레드로 돌아와서 일시 중지했던 작업을 이어서 완료합니다. 멀티 스레드를 사용하면 스레드마다 작업을 나눠서 처리할 수 있기 때문에 여러 개의 작업을 효율적으로 처리할 수 있습니다. 또한 스레드는 프로세스보다 가볍고 하나의 프로세스 내에서 여러 개의 스레드를 생성하면 메모리 자원을 공유할 수 있다는 장점을 가지고 있습니다.

 

하지만 스레드가 무한정 많아지면 생성된 스레드로 인해 메모리 사용량이 높아지게 되어 OutOfMemoryError가 발생할 수 있고 동시 처리량이 높아야 하는 시스템의 경우 스레드를 생성하면서 발생하는 대기 시간 때문에 클라이언트에게 빠른 응답을 줄 수 없게 됩니다. 이러한 문제를 극복하려면 스레드 풀(Thread Pool)을 사용해야 합니다. 스레드 풀을 사용하면 애플리케이션 내에서 사용할 총 스레드 수를 제한할 수 있고 이미 생성된 스레드를 재사용하므로 클라이언트는 빠른 응답을 받을 수 있습니다. 

 

스레드 풀은 직접 만드는 것보다 검증된 라이브러리를 사용하는 것이 좋습니다. 그러므로 java.util.concurrent 패키지가 제공하는 ExecutorService를 사용하면 쉽고 안정적으로 스레드 풀을 사용할 수 있습니다. 

 

예제 1.5 ExecutorService를 사용해 스레드 실행하기

package thread
 
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
 
fun main(args: Array<String>) {
    val pool: ExecutorService = Executors.newFixedThreadPool(5)
    try {
         for (i in 0..5) {
              pool.execute(PrintRunnable())
         }
    } catch (e: Exception) {
         pool.shutdown()
    }
    println("current-thread-name : ${Thread.currentThread().name}")
}
 
--------------------
출력 결과)
--------------------
current-thread-name : pool-1-thread-1
current-thread-name : pool-1-thread-2
current-thread-name : pool-1-thread-3
current-thread-name : pool-1-thread-4
current-thread-name : main
current-thread-name : pool-1-thread-5
current-thread-name : pool-1-thread-5

 

  • Executors.newFixedThreadPool을 사용하면 주어진 수만큼의 스레드를 생성하고 유지합니다. 즉 5로 값을 설정하면 5개의 스레드는 항상 유지되는 걸 보장합니다.
  • pool.execute는 러너블 인터페이스를 받아서 단순히 실행합니다. execute는 별도의 반환형이 없기 때문에 작업의 결과나 상태를 알 수 없습니다.


예제 1.5에서 출력된 결과를 보면 스레드 이름에 pool-1로 시작하는 것을 확인할 수 있습니다. 이름에서 유추할 수 있듯이 pool-1로 시작되는 스레드는 스레드 풀에서 관리되는 스레드라는 것 을 알 수 있습니다.  또 한 가지 스레드 풀을 사용하면서 다른 점은 마지막 두 개 작업은 pool-1-thread-5라는 동일한 스레드에서 처리되었습니다. 그러므로 스레드를 새로 생성한 게 아니라 스레드 풀에 이미 생성된 스레드를 재사용했다는 것을 알 수 있습니다.


퓨처

퓨처(Future)는 비동기 작업에 대한 결과를 얻고 싶은 경우 사용되는 인터페이스입니다. 퓨처를 사용하면 스레드를 직접 사용하는 것보다 직관적이기 때문에 이해하기 쉽고 모든 작업이 끝나면 작업에 대한 결과를 얻어서 별도의 처리를 할 수 있게 됩니다. 예를 들어 수행 시간이 오래 걸리는 작업이나 작업에 대한 결과를 기다리면서 다른 작업을 병행해서 수행하고 싶은 경우에 유용합니다.퓨처를 사용해 처리 결과를 얻기 위해선 콜러블(Callable)을 사용해야 합니다. 앞선 예제에서 사용한 러너블은 결과 반환을 지원하지 않으므로 콜러블을 사용해 완료된 결과를 퓨처에 넘길 수 있습니다. 다음 예제는 퓨처를 사용해 비동기적으로 계산하고 결과를 리턴합니다.

예제 1.6  정해진 시간만큼 대기한 후 계산하는 계산기

package future
 
object Calculator {
    fun sum(a: Int, b: Int, delay: Long = 0): Int {
         Thread.sleep(delay)
         return a + b
    }
}

 

예제 1.7 퓨처를 사용해 작업을 실행

package future
 
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
 
fun main(args: Array<String>) {
    val pool: ExecutorService = Executors.newSingleThreadExecutor()
    val future: Future<*> = pool.submit(Callable {
        Calculator.sum(100, 200, delay = 1000)
    })
    println("계산 시작")
    val futureResult = future.get() // 비동기 작업의 결과를 기다린다.
    println(futureResult)
    val result = Calculator.sum(1, 5)
    println(result)
    println("계산 종료")
}
 
--------------------
출력 결과)
--------------------
계산 시작
300
6
계산 종료


예제 1.7의 ExecutorService의 submit은 콜러블을 인자로 받아 실행합니다. 콜러블 내부에 선언된 Calculator는 sum 함수의 인자로 전달받은 delay=1000를 이용해 스레드를 1초간 일시 정지한 후 두 개 정수의 합을 계산하여 퓨처로 반환합니다. 콜러블 내부의 코드는 스레드 풀에서 전달받은 스레드를 사용해 비동기적으로 작업을 수행하게 됩니다. 비동기 작업을 수행하는 동안에 Calculator.sum(1, 5, delay = 0)의 결과는 즉시 출력되고 future.get()에서 비동기 작업에 대한 결과를 가져오게 됩니다. 이때 비동기 작업이 완료되었다면 결과를 get 함수에서 즉시 가져올 수 있지만 작업이 완료되지 않았다면 작업이 완료될 때까지 스레드는 블로킹 상태에서 결과를 기다립니다.

퓨처를 사용하면 비동기 작업을 쉽게 구현할 수 있지만 몇 가지 단점을 가지고 있습니다. 우선 퓨처를 사용하면 동시에 실행되는 한 개 이상의 비동기 작업에 대한 결과를 하나로 조합하여 처리하거나 수동으로 완료 처리(completion)  할 수 있는 방법을 지원하지 않습니다. 또한 퓨처의 get 함수는 비동기 작업의 처리가 완료될 때까지 다음 코드로 넘어가지 않고 무한정 대기하거나 지정해둔 타임아웃 시간까지 블로킹됩니다. 

그림 1.4 블로킹 흐름

 

퓨처의 이러한 단점을 극복하기 극복하기 위해 JDK8부터 제공되는 컴플리터블 퓨처(CompletableFuture)를 사용하여 비동기 작업의 결과를 논블로킹으로 가져올 수 있는 API를 사용할 수 있습니다. 다음 예제에서는 이전에 작성했던 예제를 컴플리터블 퓨처로 변경하여 실행해보겠습니다.

 

예제 1.8 컴플리터블 퓨처를 사용해 작업을 실행

package future
 
import java.util.concurrent.*
 
fun main(args: Array<String>) {
    val completableFuture = CompletableFuture.supplyAsync { // (1)
         Calculator.sum(100, 200, delay = 2000)
    }
 
    println("계산 시작")
    val futureResult = completableFuture.get() // (2)
    println(futureResult)
 
    while (!completableFuture.isDone) { // (3)
         Thread.sleep(500)
         println("계산 결과를 집계 중입니다.")
    }
    println("계산 종료")
}
 
--------------------
출력 결과)
--------------------
계산 시작
300
계산 종료
  1. 컴플리터블 퓨처는 퓨처와 달리 ExecutorService를 사용할 필요가 없기 때문에 컴플리터블 퓨처의 팩토리 함수인 supplyAsync를 사용해 비동기 작업을 수행했습니다. 컴플리터블 퓨처가 제공하는 supplyAsync와 같은 팩토리 함수는 명시적으로 Executor 스레드 풀을 제공하지 않으면 기본 스레드 풀로 포크-조인풀을 사용합니다. 
  2. 퓨처를 사용할때와 마찬가지로 비동기 작업이 완료될때까지 블록됩니다.
  3. isDone 속성은 말그대로 컴플리터블 퓨처가 수행 중인 비동기 작업이 완료된 상태인지를 체크합니다. 컴플리터블 퓨처는 isDone외에도  취소상태를 나타내는 isCancelled 그리고 비동기 작업 도중에 에러가 발생한 상태를 나타내는 isCompletedExceptionally도 제공합니다.

예제 1.8을 실행해보면 계산 결과를 집계 중입니다.라는 메시지는 출력되지 않고 계산 결과인 300이 출력된 것을 확인할 수 있습니다. 컴플리터블 퓨처를 사용해도 get 함수를 사용하면 퓨처를 사용하는 것과 동일하게 해당 라인에서 결과를 리턴할 때까지 블로킹됩니다. 컴플리터블 퓨처를 사용하는 가장 큰 이유 중 하나는 비동기 작업을 블로킹하지 않고 사용하기 위함입니다. 그러므로 thenApply 함수를 사용하면 작업이 완료될 때까지 기다리지 않고 결과를 전달받을 수 있습니다.

예제 1.9 thenApply를 사용해 논블로킹 코드로 변경

package future
 
import java.util.concurrent.*
 
fun main(args: Array<String>) {
   val completableFuture = CompletableFuture.supplyAsync {
       Calculator.sum(100, 200, delay = 2000)
   }
 
   println("계산 시작")
   completableFuture.thenApply(::println) // 논블로킹으로 동작
 
   while (!completableFuture.isDone) {
       Thread.sleep(500)
       println("계산 결과를 집계 중입니다.")
   }
   println("계산 종료")
}
 
--------------------
출력 결과)
--------------------
계산 시작
계산 결과를 집계 중입니다.
계산 결과를 집계 중입니다.
계산 결과를 집계 중입니다.
300
계산 결과를 집계 중입니다.
계산 종료


예제 1.9를 실행해보면 get을 thenApply로 변경한 후 결과가 완료되는 것을 기다리지 않고 다음 라인의 “계산 결과를 집계 중입니다.”를 출력하는 것을 확인할 수 있습니다. 이후 계산 작업이 완료 된 후 계산 결과를 출력하고 “계산 종료"를 출력한 뒤 애플리케이션이 종료됩니다. 


그림 1.5 논블로킹 흐름

 

thenApply는 코드가 블록 되지 않기 때문에 예제 1.10과 같이 while (!completableFuture.isDone)을 주석 처리해버리면 애플리케이션은 결과를 반환하지 않고 종료됩니다. 

 

예제 1.10 계산 결과 완료 코드 주석 처리 

package future
 
import java.util.concurrent.*
 
fun main(args: Array<String>) {
   val completableFuture = CompletableFuture.supplyAsync {
       Calculator.sum(100, 200, delay = 2000)
   }
 
   println("계산 시작")
   completableFuture.thenApply(::println) // 논블로킹으로 동작
 
//    while (!completableFuture.isDone) {
//        Thread.sleep(500)
//        println("계산 결과를 집계 중입니다.")
//    }
   println("계산 종료")
}
 
--------------------
출력 결과)
--------------------
계산 시작
계산 종료


예상대로 계산 결과는 출력되지 않고 바로 종료된것을 확인 할 수 있습니다. 논블로킹으로 동작하는 코드를 작성할때는 이러한 특징에 대해 이해하고 개발하지 않으면 의도치 않은 버그를 만들 수 있다는 단점이 있습니다. 

이번엔 컴플리터블 퓨처를 활용해 다수의 작업을 병렬로 처리하는 예제에 대해 알아보겠습니다. 예제 1.11에선 의도대로 병렬 처리가 되는지 또한 얼마나 시간이 소요되는지 확인하기 위해 소요 시간을 측정하는 코드를 추가하였습니다.

 

 예제 1.11 계산 소요 시간 측정 

val startTime = System.nanoTime()
println("계산 시작")
println("계산 종료")
val elapsedTime = (System.nanoTime() - startTime) / 1_000_000
println("수행시간 : $elapsedTime msecs")


다음 예제는 비동기 작업을 5번 반복하면서 결과를 출력하는 코드입니다.

 

 예제 1.12 병렬 처리 구현1

package future
 
import java.util.concurrent.*
 
fun main(args: Array<String>) {
    val startTime = System.nanoTime()
    println("계산 시작")
    (0 until 5).forEach {
         val supplier = {
             println("current-thread-name:${Thread.currentThread().name}")
             Calculator.sum(100, 200, delay = 2000)
         }
         val result = CompletableFuture.supplyAsync(supplier).join()
         println(result)
    }
    println("계산 종료")
    val elapsedTime = (System.nanoTime() - startTime) / 1_000_000
    println("수행시간 : $elapsedTime msecs")
}
 
--------------------
출력 결과)
--------------------
계산 시작
current-thread-name : ForkJoinPool.commonPool-worker-9
300
current-thread-name : ForkJoinPool.commonPool-worker-9
300
current-thread-name : ForkJoinPool.commonPool-worker-9
300
current-thread-name : ForkJoinPool.commonPool-worker-9
300
current-thread-name : ForkJoinPool.commonPool-worker-9
300
계산 종료
수행시간 : 10021 msecs


우선 (0 until 5).forEach 내부에서 이전 예제와 동일하게 비동기 계산 로직을 추가하고 join을 사용해 결과를 받아온 뒤 출력합니다. 이번에 사용한 join은 get과 동일하게 비동기 작업의 결과를 받아옵니다. 하지만 get은 익셉션이 발생하면 throw를 사용해 익셉션을 외부로 전파해서 try-catch 구문으로 예외 처리를 해야 하는데 반해 join은 익셉션을 외부로 전파시키지 않는다는 차이점이 있습니다.

예제 1.12의 실행 결과는 예상과 다르게 10초가 소요되었고 current-thread-name에 5번 모두 동일한 스레드가 찍힌 것을 확인할 수 있습니다. 그러므로 작성한 예제가 병렬로 처리되지 않고 순차 처리되었단 걸 알 수 있습니다. 이런 결과가 발생한 이유는 join 역시 get과 마찬가지로 작업의 결과를 가져올때 블로킹되는데 join을 반복문 내부에서 수행했기 때문에 의도와는 다르게 순차적으로 처리되었습니다. 이제 이유를 알았으니 예제를 개선해보겠습니다. 

 

 예제 1.13 병렬 처리 구현2

package future
 
import java.util.concurrent.*
 
fun main(args: Array<String>) {
    val startTime = System.nanoTime()
    println("계산 시작")
    (0 until 5).map {
         val supplier = {              
             println("current-thread-name:${Thread.currentThread().name}")
             Calculator.sum(100, 200, delay = 2000)
         }
         CompletableFuture.supplyAsync(supplier)
    }.map(CompletableFuture<Int>::join).forEach(::println)
    println("계산 종료")
    val elapsedTime = (System.nanoTime() - startTime) / 1_000_000
    println("수행시간 : $elapsedTime msecs")
}
 
--------------------
출력 결과)
--------------------
계산 시작
current-thread-name : ForkJoinPool.commonPool-worker-9
current-thread-name : ForkJoinPool.commonPool-worker-2
current-thread-name : ForkJoinPool.commonPool-worker-11
current-thread-name : ForkJoinPool.commonPool-worker-4
current-thread-name : ForkJoinPool.commonPool-worker-6
300
300
300
300
300
계산 종료
수행시간 : 2034 msecs


마지막으로 개선한 예제 1.13에선 map 함수를 사용해서 5개의 CompletableFuture<Int>를 가지는 리스트로 변환하고 그다음 map에선 join으로 가져온 결과를 모아서 List<Int>로 변환하였습니다. 그리고 마지막 forEach에선 전달받은 정수형 리스트를 가지고 println을 출력하도록 수정했습니다.

예제의 실행 결과를 보면 의도대로 각기 다른 스레드가 계산 작업을 병렬로 나눠 처리한 것을 확인할 수 있습니다. 또한 기존 코드 보다 약 5배 빠른 2초 만에 처리가 완료되었습니다. 이처럼 컴플리터블 퓨처를 사용하면 논블로킹으로 동작하거나 수행 결과를 조합하는 코드도 어렵지 않게 구현할 수 있습니다. 

이번 장에선 다양한 방법을 사용해 비동기 프로그래밍을 구현해봤습니다. 특히 컴플리터블 퓨처는   기존의 비동기 처리 방법에 비해 우아하고 편리합니다. 컴플리터블 퓨처가 만능 해결사는 아니지만 대다수의 비동기 처리 시나리오에서 유용하게 사용할 수 있습니다. 예를 들면 우리가 개발한 서버에서 외부의 여러 API 서버를 호출하여 응답을 받아서 결과를 결합하고 처리해야 하는 시나리오라면 컴플리터블 퓨처는 매우 유용할 것입니다.

그림 1.6 컴플리터블 퓨처가 잘 사용된 시나리오

 

하나의 컴플리터블 퓨처는 단순히 한 번의 비동기 작업을 정의하고 처리합니다. 반면에 리액티브 프로그래밍은 데이터의 크기가 정해지지 않고 지속적으로 생성되는 데이터를 처리하는 스트리밍 형태로써 데이터 전달의 주체인 발행자는 데이터가 준비되면 데이터를 구독자에게 통지하고 구독자는 자신이 처리할 수 있는 만큼의 데이터를 처리합니다. 이런 처리 방법은 일반적인 비동기 프로그래밍으로는 처리하기 힘들고 구현도 쉽지 않습니다. 이때 리액티브 프로그래밍을 적용하면 최소한의 노력으로 복잡한 비동기 작업을 핸들링 할 수 있습니다.

 

728x90
반응형
728x90
반응형


리액티브 프로그래밍(Reactive Programming)이란 데이터 또는 이벤트의 변경이 발생하면 이에 반응해 처리하는 프로그래밍 기법을 말합니다. 리액티브 프로그래밍은 2010년 에릭 마이어라는 프로그래머에 의해  마이크로소프트의 .NET 에코 시스템으로 정의되었습니다. 리액티브 프로그래밍의 특징은 비동기 프로그래밍을 처리하는 새로운 접근 방식이라는 것입니다. 리액티브 프로그래밍은 데이터의 통지, 완료, 에러에 대한 처리를 옵저버 패턴에 영감을 받아 설계되었고 데이터의 손쉬운 비동기 처리를 위해 함수형 언어의 접근 방식을 사용합니다.

 

리액티브 프로그래밍이 나오기 전 비동기 프로그래밍은 대부분 콜백(Callback) 기반의 비동기 처리 방식을 사용했습니다. 간단한 비동기 처리는 콜백 기반의 비동기 프로그래밍으로도 이해하기 쉬운 코드를 작성할 수 있지만 콜백이 많아져서 발생하는 일명 콜백 헬(Callback Hell)로 인해 코드의 복잡도가 늘어나서 가독성과 유지 보수성이 떨어지게 됩니다. 그동안 콜백 헬을 해결하기 위해 다양한 해결 방법들이 제시되었고 리액티브 프로그래밍을 적용하면 콜백 헬 없이 읽기 쉬운 코드를 작성할 수 있기 때문에 서버 애플리케이션 관점에서 리액티브 프로그래밍은 비동기 기반의 논-블로킹(Non-Blocking)과 이벤트 드리븐(Event-Driven) 애플리케이션 구현에 유리하여 최근 리액티브 프로그래밍을 적용한 사례가 점점 늘어나고 있습니다.

리액티브 프로그래밍은 비동기 처리 외에도 백프레셔(Back-Pressure)이라는 특징을 포함합니다. 백프레셔는 데이터를 소비하는 측에서 처리 가능한 만큼의 양을 데이터 제공자 측에 역으로 요청하는 기능입니다. 데이터 제공자와 데이터 소비자가 다른 스레드에서 비동기로 처리될 경우 주로 사용되며 제공자 측에서 너무 빠르게 데이터를 제공하면 소비하는 측에선 빠르게 처리하지 못해서 데이터가 점점 쌓이면 시스템에 장애 요소가 될 수 있는데 백프레셔는 이런 문제를 해결할 수 있습니다.

 

리액티브 프로그래밍과 디자인 패턴

리액티브 프로그래밍을 설명할 때 빠지지 않고 등장하는 디자인 패턴이 옵저버 패턴(Observer Pattern)이터레이터 패턴(Iterator Pattern)입니다. 리액티브 프로그래밍은 옵저버 패턴 그리고 이터레이터 패턴과 자주 비교되며 또한 영감을 받은 걸로 알려져 있습니다. 이번 절에 선 두 개의 패턴과 리액티브 프로그래밍 간에 어떤 차이점이 있는지 알아보겠습니다.

 

옵저버 패턴

옵저버 패턴이란 GoF가 소개한 디자인 패턴 중 하나로 관찰 대상이 되는 객체가 변경되면 대상 객체를 관찰하고 있는 옵저버(Observer)에게 변경사항을 통지(notify) 하는 디자인 패턴을 말합니다. 옵저버 패턴을 사용하면 객체 간의 상호작용을 쉽게 하고 효과적으로 데이터를 전달할 수 있습니다.

옵저버 패턴은 관찰 대상인 서브젝트(Subject)와 서브젝트를 관찰하는 옵저버로 이뤄져 있습니다. 하나의 서브젝트에는 하나 또는 여러 개의 옵저버를 등록할 수 있고 상태가 변경되면 자신을 관찰하는 옵저버들에게 변경 사항을 통지합니다. 서브젝트로부터 통지를 받은 옵저버는 부가적인 처리를 합니다. 

그림 1.1 옵저버 패턴의 구조

일반적으로 옵저버 패턴은 서브젝트와 옵저버를 상속하는 구체화(concrete) 클래스가 있습니다. 구체화 클래스는 서브젝트와 옵저버의 시그니처에 대한 상세 구현을 작성합니다. 구체화 클래스를 작성하기 위해 알아야 할 서브젝트와 옵저버의 필수 함수를 살펴보겠습니다.

 

표 1.1 서브젝트의 함수

함수 설명
add 서브젝트의 상태를 관찰할 옵저버 등록한다.
remove 등록된 옵저버를 삭제한다.
notify 서브젝트 상태가 변경되면 등록된 옵저버에 통지한다.

 

표 1.2 옵저버의 함수

함수 설명
update 서브젝트의 notify 내부에서 호출되며 서브젝트의 변경에 따른 부가 기능을 처리

 

옵저버 패턴에 대해 간략히 알아봤으니 이번엔 JDK는 1.0 부터 포함된 Observable 클래스와 Observer 인터페이스를 사용해 간단한 커피 주문 예제를 만들어 보겠습니다. Observable은 옵저버 패턴의 서브젝트와 동일합니다.

 

예제 1.1 옵저버 패턴으로 구현한 커피 주문 예제

import java.util.*
 
class Customer : Observer {
 
    fun orderCoffee() = "Iced Americano"
 
    override fun update(o: Observable?, arg: Any?) {
         val coffee = arg as Coffee
         println("I got a cup of ${coffee.name}")
    }
}
 
class Coffee(val name: String)
 
class Barista : Observable() {
 
    private fun makeCoffee(name: String) = Coffee(name)
 
    fun serve(name: String) {
         setChanged()
         notifyObservers(makeCoffee(name))
    }
}
 
fun main(args: Array<String>) {
    val barista = Barista()
    val customer = Customer()
 
    barista.addObserver(customer)
    barista.serve(customer.orderCoffee())
}
 
--------------------
출력 결과)
--------------------
I got a cup of Iced Americano

 

  • Customer 클래스는 Observer 인터페이스를 구현하여 Barista 클래스가 커피를 완성하면 통지를 받아서 update 함수에서 처리한다.
  • Barista 클래스는 Observable 클래스를 상속하여 고객이 주문한 커피가 만들어지면 notifyObservers로 고객에게 만들어진 Coffee 객체를 전달한다. 이때 setChanged를 먼저 호출하여 변경 여부를 내부에 저장한다. 
  • Customer 클래스가 Barista 클래스를 관찰하기 위해 addObserver로 등록한다.

예제 1.1과 같이 옵저버 패턴을 사용하지 않았다면 고객은 일정 간격으로 커피가 완성됐는지 바리스타에게 확인하는 처리가 있어야 합니다. 간격이 너무 짧으면 변경된 상태를 빠르게 확인할 수 있지만 매번 불필요한 호출이 발생하므로 성능상 문제가 발생할 수 있습니다. 또 간격이 너무 길면 변경된 상태를 즉시 확인할 수 없으므로 실시간성이 떨어질 수 있습니다. 옵저버 패턴은 관찰자인 옵저버가 서브젝트의 변화를 신경 쓰지 않고 상태 변경의 주체인 서브젝트가 변경사항을 옵저버에게 알려줌으로써 앞서 언급한 문제를 해결할 수 있습니다.

옵저버 패턴에서 서브젝트와 옵저버는 관심사에 따라 역할과 책임이 분리되어 있습니다. 서브젝트는 옵저버가 어떤 작업을 하는지 옵저버의 상태가 어떤지에 대해 관심을 가질 필요가 없습니다. 오직 변경 사항을 통지하는 역할만 수행하고 하나 혹은 다수의 옵저버는 각각 맡은 작업을 스스로 하기 때문에 옵저버가 하는 일이 서브젝트에 영향을 끼치지 않고 옵저버는 단순한 데이터의 소비자로서 존재하게 됩니다.

리액티브 프로그래밍은 옵저버 패턴의 서브젝트와 옵저버의 개념과 유사한 발행자와 구독자를 사용해 데이터를 통지하고 처리합니다. 데이터를 제공하는 측에서 데이터를 소비하는 측에 통지하는 방식을 일반적으로 푸시 기반 (Push-Based)이라고 부릅니다. 리액티브 프로그래밍은 옵저버 패턴의 단순한 데이터 통지 기능에 더해서 백프레셔라는 특징을 통해 데이터를 처리하는 측에서 데이터를 전달받는 개수를 역으로 요청할 수 있습니다. 그러므로 데이터를 통지하는 주체의 통지 속도를 제어하고 처리하는 측에서는 자신이 처리 가능한 만큼의 데이터만 받아서 처리할 수 있게 됩니다.


이터레이터 패턴

이터레이터 패턴은 옵저버 패턴과 마찬가지로 GoF가 소개한 디자인 패턴 중 한 가지입니다. 이터레이터 패턴은 데이터의 집합에서 원소라고 불리는 데이터를 순차적으로 꺼내기 위해 만들어진 디자인 패턴입니다. 이터레이터 패턴을 사용하면 다른 컬렉션을 사용해도 동일한 인터페이스를 사용해 데이터를 꺼내올 수 있기 때문에 컬렉션이 변경되더라도 사용하는 쪽에서는 변경사항이 발생하지 않습니다. 이러한 장점 때문에 이터레이터 패턴을 사용하면 코드를 좀 더 유연하고 확장성 있게 만듭니다.

그림 1.2 이터레이터 패턴의 구조

그림 1.2의 에그리게잇(Aggregate)은 자료구조의 리스트, 맵, 세트와 같은 구조가 집합체에 해당합니다. 이터레이터(Iterator)는 집합체 내부에 구현된 iterator를 이용해 생성합니다. 클라이언트는 생성된 이터레이터를 사용해 hasNext 함수에서  내부에 데이터가 존재하는지를 확인하고 next 함수를 사용해 데이터를 꺼내옵니다.

 

표 1.3  에그리게잇의 함수

함수 설명
iterator 이터레이터를 생성한다.

 

표 1.4 이터레이터의 함수

함수 설명
hasNext 데이터가 존재하는지를 판단하여 true 또는 false를 반환한다.
next 데이터가 존재한다면 데이터를 꺼내온다.

 

이번에는 코틀린 컬렉션 라이브러리에 포함된 Iterable과 Iterator를 사용해 간단한 이터레이터 패턴을 구현해 보겠습니다.

 

예제 1.2 이터레이터 패턴 예제

package iterator
 
data class Car(val brand: String)
 
class Cars(val cars: List<Car> = listOf()) : Iterable<Car> {
    override fun iterator() = CarsIterator(cars)
}
 
class CarsIterator(val cars: List<Car> = listOf(), var index: Int = 0) : Iterator<Car> {
    override fun hasNext() = cars.size > index
    override fun next() = cars[index++]
}
 
fun main(args: Array<String>) {
    val cars = Cars(listOf(Car("Lamborghini"), Car("Ferrari")))
    val iterator = cars.iterator()
    while (iterator.hasNext()) {
         println("brand : ${iterator.next()}")
    }
}
 
--------------------
출력 결과)
--------------------
brand : Car(brand=Lamborghini)
brand : Car(brand=Ferrari)
  • Cars 클래스는 Iterable 인터페이스를 구현하여 CarsIterator를 리턴하는 iterator 함수를 오버라이드 한다.
  • CarsIterator 클래스는 Iterator 인터페이스를 구현하여 데이터가 존재하는지 확인하는 hasNext와 데이터가 존재하면 데이터를 가져오는 next 함수를 오버라이드 한다.
  • while문 내부에선 hasNext를 사용하여 데이터를 모두 가져올때까지 반복하고 데이터를 출력한다.

데이터를 제공한다는 관점에서 이터레이터 패턴과 리액티브 프로그래밍은 유사하지만 이터레이터 패턴은 에그리게잇이 내부에 데이터를 저장하고 이터레이터를 사용해 데이터를 순차적으로 당겨오는 방식이기 때문에 풀 기반(Pull-Based)입니다.  이에 반해 리액티브 프로그래밍은 기본적으로 옵저버 패턴 처럼 데이터 제공자가 소비하는 측에 데이터를 통지하는 푸시 기반이기 때문에 분명한 차이점이 있습니다. 다만 구현체의 사양에 따라서 풀과 푸시를 모두 제공하는 경우도 있습니다.  

728x90
반응형
728x90
반응형

문제 확인

r2dbc-mysql을 사용해 개발한 애플리케이션에서 netstat | grep 3306으로 조회시 ESTABLISHED로 조회되는 개수가 커넥션 풀에 설정한 값과 상이한것을 확인함 .

r2dbc 풀 설정시 initSize가 20, maxSize가 50인데 아래와 같이 설정과는 다르게 커넥션이 생성되 있는 것을 확인함.

  • A 인스턴스는 2개
  • B 인스턴스는 10개
  • C 인스턴스는 20개

테스트 환경

  • Kotlin:1.4.21
  • spring boot:2.3.7.RELEASE
  • dev.miku:r2dbc-mysql:0.8.2.RELEASE

원인 파악

as-is

import io.r2dbc.pool.ConnectionPool
import io.r2dbc.pool.ConnectionPoolConfiguration
import io.r2dbc.spi.ConnectionFactories
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.ConnectionFactoryOptions.*

@Configuration
@EnableR2dbcRepositories
class DatabaseConfiguration: AbstractR2dbcConfiguration() {

    override fun connectionFactory(): ConnectionFactory {

        val connectionFactory = ConnectionFactories.get(
            builder()
                .option(DRIVER, "mysql")
                .option(HOST, host)
                .option(USER, userName)
                .option(PORT, port)
                .option(PASSWORD, passWord)
                .option(DATABASE, db)
                .build())

        val configuration = ConnectionPoolConfiguration.builder(connectionFactory)
            .maxIdleTime(Duration.ofSeconds(maxIdleTime))
            .maxCreateConnectionTime(Duration.ofSeconds(maxCreateConnectionTime))
            .maxLifeTime(Duration.ofMinutes(maxLife))
            .initialSize(initialSize)
            .maxSize(maxSize)
            .build()

        return ConnectionPool(configuration)
    }

}    

분명 ConnectionPoolConfiguration에 initialSize와 maxSize를 넣었는데 비정상적으로 동작하는 것을 확인.

to-be

option(DRIVER, "pool")을 추가하였음. 이 설정은 추가하는 이유는 github.com/r2dbc/r2dbc-pool를 보면 Supported ConnectionFactory Discovery Options섹션에 아래와 같이 무조건 pool(Must be pool)로 설정하라고 되어 있음

driver Must be pool

 

import io.r2dbc.pool.PoolingConnectionFactoryProvider.*
import io.r2dbc.spi.ConnectionFactories
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.ConnectionFactoryOptions.*

@Configuration
@EnableR2dbcRepositories
class DatabaseConfiguration: AbstractR2dbcConfiguration() {

   override fun connectionFactory(): ConnectionFactory =
        ConnectionFactories.get(
            builder()
                .option(DRIVER, "pool")
                .option(PROTOCOL, "mysql")
                .option(HOST, readPoolProperties.host)
                .option(USER, readPoolProperties.username)
                .option(PORT, readPoolProperties.port)
                .option(PASSWORD, readPoolProperties.password)
                .option(DATABASE, readPoolProperties.db)
                .option(MAX_SIZE, r2dbcPoolProperties.maxSize)
                .option(INITIAL_SIZE, r2dbcPoolProperties.initialSize)
                .option(MAX_IDLE_TIME, Duration.ofSeconds(r2dbcPoolProperties.maxIdleTime))
                .option(MAX_CREATE_CONNECTION_TIME, Duration.ofSeconds(r2dbcPoolProperties.maxCreateConnectionTime))
                .option(MAX_LIFE_TIME, Duration.ofMinutes(r2dbcPoolProperties.maxLife))
                .build()
        )

}        

netstat으로 조회시 초기값에 맞춰서 ESTABLISHED된 것을 확인함.

r2dbc-pool 소스를 디버그해보니 option(DRIVER, "pool")이 있고 없고에 따라서 ConnectionFactoryProvider가 달라지는 것을 확인 함

option(DRIVER, "mysql")로 설정시 dev.miku.r2dbc.MySqlConnectionFactoryProvider가 동작하고
option(DRIVER, "pool")로 설정시 io.r2dbc.pool.PoolingConnectionFactoryProvider가 동작하는데
dev.miku.r2dbc.MySqlConnectionFactoryProvider에는 initSize, maxSize에 대한 설정이 없고(github.com/mirromutth/r2dbc-mysqlgithub.com/mirromutth/r2dbc-mysql) 풀링되는 커넥션 팩토리가 아니기 때문에 그때그때마다 필요한 커넥션을 생성하는 방식이었음.

결론

r2dbc-pool을 사용해 커넥션 풀링을 정상적으로 이용하고 싶다면 option(DRIVER, "pool") 옵션은 필수이다.

728x90
반응형
728x90
반응형

이 글은 제가 레진 기술블로그에 동명의 제목으로 공유한 글입니다.

 

제가 일하고 있는 서비스 개발팀은 레진코믹스의 백엔드 서비스를 책임지고 있는 팀입니다. 저희는 작년부터 KotlinSpring WebFlux를 메인 스택으로 선정하여 개발하고 있습니다. 이 글에선 WebFlux 기반의 컨텐츠 인증 서비스를 개발하면서 경험한 이슈들을 공유하려 합니다.

이 글은 WebFlux 또는 리액티브 프로그래밍에 대한 기초 지식을 다루진 않으므로 다소 불친절하게 느껴질 수 있는 점 양해 바랍니다. WebFlux에 대한 기초적인 내용은 제 블로그의 Spring WebFlux와 Kotlin으로 만드는 Todo 서비스 - 1편을 참고하시기 바랍니다.

이 글의 예제는 저희가 서비스하고 있는 코드를 공개용으로 변경했기 때문에 정상 동작을 보장하지 않습니다. 참고 부탁드립니다

이 글에서 다루고 있는 내용

컨텐츠 인증 서비스

컨텐츠 인증 서비스는 유저가 레진코믹스 컨텐츠의 구매 여부를 판별하는 서비스입니다. 기본적으로 이미지 서버와 통신하며 유료 작품이라면 유저가 해당 작품을 구매했는지 판별하거나 무료 작품이라면 회원인지 비회원인지 판별하여 컨텐츠 접근 여부를 응답하는 단순하지만 없어선 안되는 서비스입니다.

컨텐츠 인증 서비스 기술 스택

  • Kotlin 1.3.x
  • Spring Boot 2.3.x
  • Spring WebFlux
  • Spring Data R2DBC 1.1.x
  • Spring Data Redis Reactive
  • Spring Boot Actuator

시스템 구성도

이해를 돕기 위해 작성한 간략한 시스템 구성도입니다.

 

R2DBC(Reactive Relational Database Connectivity)

컨텐츠 인증 서버에 앞서 전에도 WebFlux 기반의 서비스를 출시했었지만 JPA를 사용해 데이터베이스를 엑세스하고 있었습니다. JPA는 내부적으로 JDBC를 사용하기 때문에 DB I/O가 블로킹으로 동작합니다. 기본적으로 리액티브 스택은 블로킹 되는 구간이 있다면 전통적인 MVC 방식에 비해 얻는 이점이 거의 없기 때문에 리액터 공식 문서에서 설명하는 것처럼 아래와 같은 패턴을 사용해 해결하고 있었습니다.

Mono blockingWrapper = Mono.fromCallable(() -> { 
    return /* make a remote synchronous call */ 
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic());

하지만 원칙적으로 리액티브 스택은 비동기-논블로킹 형태로 개발하는 것이 자연스럽고 최적의 성능을 보여줍니다. 그리고 때마침 리액티브 기반의 비동기-논블로킹을 지원하는 R2DBC의 GA 버전이 릴리즈 되면서 새로운 서비스에 적용하게 되었습니다.

Spring Data R2DBC 적용

R2DBC를 스프링 환경에서 쉽게 사용하는 방법은 Spring Data R2DBC를 사용하는 것입니다. Spring Data R2DBC 공식 깃헙을 보면 This is Not an ORM 이라는 문구가 가장 먼저 눈에 띕니다. 이것은 JPA가 ORM 프레임 워크임을 어필하는 것과 대조적인 부분입니다.
Spring Data R2DBC는 단순함을 지향하여 일반적인 ORM 프레임 워크가 지원하는 caching, lazy loading, write behind 등을 지원하지 않습니다.

ReactiveCrudRepository를 상속하는 리파지토리 인터페이스

Spring Data JPA나 Spring Data JDBC을 이미 사용하고 있어, Spring Data R2DBC를 적용할때 큰 어려움이 없었습니다.
Spring Data 프로젝트는 리액티브 패러다임을 지원하는 ReactiveCrudRepository를 제공하므로 ReactiveCrudRepository 인터페이스를 상속하는 인터페이스를 만들어 주면 쉽게 Spring Data R2DBC를 사용할 수 있습니다.

interface ContentRepository : ReactiveCrudRepository<Content, Long> {
    fun findFirstByUserIdAnContentIdOrderByIdDesc(userId: Long, contentId:Long) : Mono<Content>
}

멀티 데이터 소스 구현

실무에서 서비스를 개발하다 보면 하나의 서버에서 여러 데이터 소스에 대한 접근이 필요할 수 있습니다. Spring Data R2DBC는 이런 경우에 사용할 수 있도록 AbstractRoutingConnectionFactory라는 추상 클래스를 지원합니다.

MultiTenantRoutingConnectionFactory

MultiTenantRoutingConnectionFactory는 AbstractRoutingConnectionFactory를 상속받아서 determineCurrentLookupKey 함수를 오버라이드 하였습니다.

class MultiTenantRoutingConnectionFactory : AbstractRoutingConnectionFactory() {

    override fun determineCurrentLookupKey(): Mono<Any> {
        return TransactionSynchronizationManager.forCurrentTransaction().map {
            if (it.isActualTransactionActive) {
                if (it.currentTransactionName?.contains(SecondaryDataSourceProperties.BASE_PACKAGE)!!) {
                    SecondaryDataSourceProperties.KEY
                } else {
                    PrimaryDataSourceProperties.KEY
                }
            } else {
                PrimaryDataSourceProperties.KEY
            }
        }
    }
}

determineCurrentLookupKey에선 현재 트랜잭션의 이름을 읽어서 @Transactional을 선언한 서비스의 패키지 기준으로 분기 처리하였습니다. determineCurrentLookupKey의 상세 구현은 개발자가 다양한 방법을 사용할 수 있으므로 이것과 같은 방법을 사용하지 않아도 됩니다.예를 들면 @Transactional(readOnly=true)인 경우엔 리플리케이션으로 라우팅하는 경우도 있을 수 있습니다. 이런 경우엔 현재 트랜잭션이 readOnly인지 판단하는 로직을 내부에 구현해주면 됩니다.

application.yml

application.yml에는 두개의 데이터 소스 설정을 추가하였습니다.

datasource:
  primary:
    url:
    username: 
    password: 
  secondary:
    url:
    username:
    password:

DatasourceProperties

각 데이터 소스의 설정 정보는 DatasourceProperties라는 이름으로 추가하였습니다.@ConfigurationProperties는 application.yml에 지정한 설정 정보의 prefix를 기준으로 동일한 이름의 프로퍼티가 존재하면 자동으로 값을 세팅해 줍니다.@ConstructorBinding은 setter가 아닌 생성자를 사용해 값을 바인딩 해주는 애노테이션입니다. 코틀린의 Data class와 조합하면 설정 클래스를 Immutable 하게 사용할 수 있습니다.

@ConstructorBinding
@ConfigurationProperties(prefix = "datasource.primary")
data class PrimaryDataSourceProperties(val username: String, val password: String, val url: String) {

    companion object {
        const val KEY = "primary"
        const val BASE_PACKAGE = "com.lezhin.backend.service.primary"
        const val TRANSACTION_MANAGER = "primaryTransactionManager"
    }
}

@ConstructorBinding
@ConfigurationProperties(prefix = "datasource.secondary")
data class SecondaryDataSourceProperties(val username: String, val password: String, val url: String) {

    companion object {
        const val KEY = "secondary"
        const val BASE_PACKAGE = "com.lezhin.backend.service.secondary"
        const val TRANSACTION_MANAGER = "secondaryTransactionManager"
    }
}

R2dbcConfig

그 다음 R2dbc 설정 클래스를 만들어서 connectionFactory빈을 생성해 주면 됩니다.

@Configuration
@EnableR2dbcRepositories
class R2dbcConfig(val primaryDataSourceProperties: PrimaryDataSourceProperties,
                  val secondaryDataSourceProperties: SecondaryDataSourceProperties) : AbstractR2dbcConfiguration() {
    @Bean
    override fun connectionFactory(): ConnectionFactory {
        val multiTenantRoutingConnectionFactory = MultiTenantRoutingConnectionFactory()

        val factories = HashMap<String, ConnectionFactory>()
        factories[PrimaryDataSourceProperties.KEY] = primaryConnectionFactory()
        factories[SecondaryDataSourceProperties.KEY] = secondaryConnectionFactory()

        multiTenantRoutingConnectionFactory.setDefaultTargetConnectionFactory(primaryConnectionFactory())
        multiTenantRoutingConnectionFactory.setTargetConnectionFactories(factories)
        return multiTenantRoutingConnectionFactory
    }

    @Bean
    fun primaryConnectionFactory() =
        parseAndGet(Triple(primaryDataSourceProperties.url, primaryDataSourceProperties.username, primaryDataSourceProperties.password))

    @Bean
    fun secondaryConnectionFactory() =
        parseAndGet(Triple(secondaryDataSourceProperties.url, secondaryDataSourceProperties.username, secondaryDataSourceProperties.password))

    private fun parseAndGet(propertiesAsTriple: Triple<String, String, String>): ConnectionFactory {
        val (url,username,password) = propertiesAsTriple

        val properties = URLParser.parseOrDie(url)
        return JasyncConnectionFactory(MySQLConnectionFactory(
            com.github.jasync.sql.db.Configuration(
                username = username,
                password = password,
                host = properties.host,
                port = properties.port,
                database = properties.database,
                charset = properties.charset,
                ssl = properties.ssl
            )))
    }

    @Bean
    fun primaryTransactionManager(@Qualifier("primaryConnectionFactory") connectionFactory: ConnectionFactory?) =
        R2dbcTransactionManager(connectionFactory!!)

    @Bean
    fun secondaryTransactionManager(@Qualifier("secondaryConnectionFactory") connectionFactory: ConnectionFactory?) =
        R2dbcTransactionManager(connectionFactory!!)
}

Reactive Redis를 사용한 캐시 적용

저희는 인프라에 따라서 Redis와 Memcached를 모두 사용하고 있습니다. 이번 컨텐츠 인증 서비스에선 Redis를 사용해 DB에서 읽어온 데이터를 캐시하였습니다. WebFlux를 적용하면서 @Cacheable 사용이 불가하여 별도로 Reactor 호환 캐시 유틸을 만들어서 사용했었습니다만 이번에는 Spring Data Redis Reactive를 직접 사용하는 방법으로 구현하게 되었습니다.

RedisConfig

Redis 클라이언트로 사용될 Lettuce 설정과 ReactiveRedisTemplate에 대한 설정 입니다.

@Configuration
class RedisConfig(@Value("\${spring.redis.host}") val host: String,
                   @Value("\${spring.redis.port}") val port: Int,
                   @Value("\${spring.redis.database}") val database: Int) {

    @Primary
    @Bean("primaryRedisConnectionFactory")
    fun connectionFactory(): ReactiveRedisConnectionFactory? {
        val connectionFactory = LettuceConnectionFactory(host, port)
        connectionFactory.database = database
        return connectionFactory
    }

    @Bean
    fun contentReactiveRedisTemplate(factory: ReactiveRedisConnectionFactory?): ReactiveRedisTemplate<String, Content>? {
        val keySerializer = StringRedisSerializer()
        val redisSerializer = Jackson2JsonRedisSerializer(Content::class.java)
            .apply {
                setObjectMapper(
                    jacksonObjectMapper()
                        .registerModule(JavaTimeModule())
                )
            }
        val serializationContext = RedisSerializationContext
            .newSerializationContext<String, Content>()
            .key(keySerializer)
            .hashKey(keySerializer)
            .value(redisSerializer)
            .hashValue(redisSerializer)
            .build()
        return ReactiveRedisTemplate(factory!!, serializationContext)
    }
}

ContentService

구현은 일반적인 캐시처리 방식으로 cache hit 여부를 판단하여 fallback 처리 하는 구조입니다.

@Service
class ContentService(val contentRepository: ContentRepository,
                      val contentRedisOps: ReactiveRedisOperations<String, Content>) {

    companion object {
        const val DAYS_TO_LIVE = 1L
    }

    @Transactional("primaryTransactionManager")
    fun findById(id: Long): Mono<Content> {
        val key = "content:${id}"
        return contentRedisOps.opsForValue().get(key).switchIfEmpty {
            contentRepository.findById(id).doOnSuccess {
                    contentRedisOps.opsForValue()
                        .set(key, it, Duration.ofDays(DAYS_TO_LIVE))
                        .subscribe()
            }.onErrorResume {
                Mono.empty()
            }
        }
    }
}
  1. Redis에 캐시된 데이터가 있는지 확인하여 존재하지 않으면 switchIfEmpty 내부 코드 동작
    • 이때 사용하는 switchIfEmpty는 Mono.defer로 감싸진 Reactor 코틀린 확장 함수이므로 Mono.defer를 직접 사용하지 않아도 됩니다.
    • fun <T> Mono<T>.switchIfEmpty(s: () -> Mono<T>): Mono<T> = this.switchIfEmpty(Mono.defer { s() })
  2. switchIfEmpty 내부 코드에는 우선 DB에서 조회하여 데이터가 존재하면 에서 내부 코드 동작
  3. 데이터가 존재하면 Redis에 캐시위와 같은 방식으로 캐시 레이어를 리액티브하게 구현할 수 있었습니다.

RestTemplate 대신 WebClient

외부 서비스와 HTTP로 통신해야 하는 경우 가장 흔한 방법은 RestTemplate을 사용하는 것입니다. RestTemplate은 Spring 애플리케이션에서 가장 일반적인 웹 클라이언트지만 블로킹 API이므로 리액티브 기반의 애플리케이션에서 성능을 떨어트리는 원인이 될 수 있습니다.
이런 이유로 Spring5에서 추가된 WebClient를 사용해 리액티브 기반의 비동기-논블로킹 통신을 구현하였습니다.

UserTokenRepositoryImpl

UserTokenRepositoryImpl은 유저의 토큰을 가지고 WebClient를 사용해 유저 서비스를 호출해 받아온 결과를 유저 객체 매핑합니다. 이러한 동작은 모두 리액티브 하게 비동기-논블로킹 형태로 동작합니다.

@Repository
class UserTokenRepositoryImpl(val webClientBuilder: WebClient.Builder) : UserTokenRepository {

    private lateinit var webClient: WebClient

    @PostConstruct
    fun setWebClient() {
        webClient = webClientBuilder.build()
    }

    override fun fetchUser(uri: String, accessToken: String) =
        webClient.get()
            .uri(uri)
            .header("Authorization", "Bearer $accessToken")
            .retrieve()
            .onStatus(HttpStatus::isError) {
                Mono.error(ForbiddenException("Connection Failed"))
            }
            .bodyToMono(User::class.java)
}

@Scheduled 대신 Flux.interval

WebFlux로 서비스를 만들다 보면 기존에는 당연하게 사용하던 기능이 지원되지 않는 경우가 많습니다. @Scheduled도 그중 하나입니다. 만약 일정 간격으로 처리할 작업이 있는 경우 @Scheduled의 대안으로 Flux.interval을 사용할 수 있습니다.

Flux.interval 예시

아래 예제는 90초 간격으로 로그를 출력하도록 만든 예제입니다.

    @PostConstruct
    fun doProcess() = Flux.interval(Duration.ofSeconds(90))
            .map {
                // 구현
                logger.info("hello world")
            }
            .subscribe()

쿠버네티스 환경 모니터링 시스템 구축

저희 레진은 새로 개발하는 서비스는 모두 쿠버네티스 환경에서 운영 중 이고 prometheus를 쿠버네티스 환경에서 기본 모니터링 툴로 사용하고 있습니다.
prometheus는 방식 으로 데이터를 수집합니다. Spring Boot2 부터 메트릭 표준이 된 micrometer를 사용해 prometheus를 지원하는 메트릭을 추가하였습니다.

build.gradle.kts

우선 메트릭 생성에 필요한 의존성을 build.gradle.kts에 추가하였습니다.

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-actuator")
    implementation(group = "io.micrometer", name = "micrometer-registry-prometheus", version = "1.5.1")
}

application.yml

그다음 아래와 같은 형태로 메트릭 엔드포인트를 추가해줍니다. 기본적으로 Spring Boot Actuator는 include에 포함된 엔드포인트만 엑세스가 가능합니다.

management:
  endpoints:
    web:
      exposure:
        include: metrics, prometheus

더 많은 엔드포인트에 대한 정보는 엔드포인트 공식 문서에서 확인할 수 있습니다.

prometheus 엔드포인트

웹 브라우저에서 /actuator/prometheus로 접속해보면 이미지와 같이 prometheus 형식의 메트릭 정보가 출력되는 것을 확인 할 수 있습니다.

)

Grafana 대쉬보드

최종적으로 엔드포인트에서 출력된 정보를 prometheus에서 10초 간격으로 데이터를 풀링해가고 Grafana 대쉬보드에서 시각화된 정보를 관측할 수 있게되었습니다.

)

회고

이번 컨텐츠 인증 서버 개발은 리액티브 프로그래밍과 WebFlux에 대해 좀 더 익숙해지는 기회가 되었습니다. 또 Spring Data R2DBC의 GA 버전이 릴리즈되면서 모든 구간에서 리액티브 스택을 적용할 수 있었습니다.
이외에도 WebFlux와 R2DBC에 대해 공유하고 싶은 주제들이 더 있지만 한편의 블로그로 작성하기엔 너무 길어지는 내용이 될 것 같아서 생략하였습니다.

 

원문

 

Kotlin과 Spring WebFlux 기반의 컨텐츠 인증 서비스 개발 후기

프리미엄 웹툰 서비스 - “레진코믹스” 를 만들고 있는 레진엔터테인먼트가 운영하는 기술 블로그입니다. 글로벌 콘텐츠 플랫폼을 만들면서 익힌 실전 경험과 정보, 최신 기술, 팁들을 공유하�

tech.lezhin.com

 

 

728x90
반응형

+ Recent posts