Reactive Programming

스프링 웹플럭스와 코루틴 톺아보기

devsh 2022. 6. 2. 19:33
728x90
반응형

1. 비동기-논블로킹 프로그래밍

전통적 웹 방식 (스프링 MVC)

  • 전통적 웹 방식은 1개 요청당 1개의 스레드를 사용하는 Thread per Request 모델
  • 요청 당 스레드를 사용하는 방식은 DB, Network IO 등이 발생할 경우 결과를 받기전까지 해당 스레드는 블로킹 상태가 됨


  • 요청을 처리하기 위해 스레드를 생성하고 전환하는데 따른 Context Switching 비용이 발생
  • 요청에 따라 무한정 스레드를 생성할 수 없기 때문에 스레드풀을 사용해 스레드를 재사용(톰캣 기본 200개)

 

 

비동기-논블로킹 방식

  • IO(DB, File, Network) 처리 시 스레드가 대기하지 않고 처리가 완료되면 다양한 방식으로 이벤트를 통지받는 방식 (콜백, 프로미스, async-await 등)
  • 더 적은 스레드로 같은 일을 할 수 있기 때문에 Context Switching에 따른 비용이 훨씬 적다. 즉 하드웨어 자원을 더 적게 사용한다
  • 순차적으로 동작하지 않으므로 코드의 흐름을 파악하기 어렵고 콜백헬(Callback Hell)로 인해 가독성과 유지보수성이 현저히 떨어짐


 

C10K 문제

  • 컴퓨터 과학에서 C10K 문제란 1만명의 사용자가 동시에 접속했을때(1만개의 소켓이 열림) 하드웨어가 충분함에도 기존의 I/O 통지 방식(e.g. select)으로는 제대로 처리하지 못하는 것을 말함
  • Event-Driven 모델을 사용해 해결한다.
    • 커널 레벨에선 OS에서 제공하는 epoll, kqueue와 같은 통지 모델을 사용해 해결할 수 있다.
    • 프로그래머는 로우레벨을 직접 다루지 않고 Netty, Node.js, Nginx, Vert.x 등을 사용한다.


 

2. 리액티브 프로그래밍

  • 비동기-논블로킹을 처리하는 새로운 패러다임
  • 2010년 에릭마이어에 의해 마이크로소프트 .NET 에코 시스템으로 정의됨
  • 리액티브 프로그래밍은 기본적으로 옵저버 패턴 처럼 데이터 제공자가 소비하는 측에 데이터를 통지하는 푸시 기반(Push-Based)
  • 구현체에 따라서 풀과 푸시를 모두 제공하는 경우도 있음 (Pull-Push Hybrid)
  • 데이터의 통지, 완료, 에러 처리를 옵저버 패턴에 영감을 받아 설계되었고 데이터의 손쉬운 비동기 처리를 위해 함수형 언어의 접근 방식을 사용

 

옵저버 패턴

  • 관찰 대상이 되는 객체가 변경되면 대상 객체를 관찰하고 있는 옵저버(Observer)에게 변경사항을 통지하는 디자인 패턴

직접 구현한 옵저버 패턴의 예시

import java.util.*

class Coffee(val name: String)


// 데이터 제공자
class Barista : Observable() {

    private fun makeCoffee(name: String) = Coffee(name)

    fun serve(name: String) {
         setChanged()
         notifyObservers(makeCoffee(name))
    }
}

// 데이터 구독자
class Customer : Observer {

    fun orderCoffee() = "Iced Americano"

    override fun update(o: Observable?, arg: Any?) {
         val coffee = arg as Coffee
         println("I got a cup of ${coffee.name}")
    }
}


fun main(args: Array<String>) {
    val barista = Barista()
    val customer = Customer()

    barista.addObserver(customer)
    barista.serve(customer.orderCoffee())
}

--------------------
출력 결과)
--------------------
I got a cup of Iced Americano

 

리액티브 스트림

  • JVM 환경에서 리액티브 프로그래밍의 표준 API 사양으로 비동기 데이터 스트림논블로킹-백프레셔(Back-Pressure)에 대한 사양을 제공
  • 리액티브 스트림 이전의 비동기식 애플리케이션에서는 멀티 코어를 제대로 활용하기 위해 복잡한 병렬 처리 코드가 필요
  • 처리할 데이터가 무한정 많아져서 시스템의 한계를 넘어서는 경우 애플리케이션은 병목 현상(bottleneck)이 발생하거나 심각한 경우 애플리케이션이 정지되는 경우도 발생할 수 있음(논블로킹-백프레셔로 해결)
  • Netflix, Vmware(Pivotal), Lightbend, Red Hat과 같은 유명 회사들이 표준화에 참여 중
  • 리액티브 스트림 인터페이스

  • Publisher : 데이터를 생성하고 구독자에게 통지
  • Subscriber : 데이터를 구독하고 통지 받은 데이터를 처리
  • Subscription : Publisher, Subscriber간의 데이터를 교환하도록 연결하는 역할을 하며 전달받을 데이터의 개수와 구독을 해지할 수 있다
  • Processor : Publisher, Subscriber을 모두 상속받은 인터페이스
  • 리액티브 스트림에서 Publisher와 Subscriber 간의 데이터 처리 흐름

리액티브 스트림을 표준 사양을 채택한 대표적인 구현체들

  • Project Reactor
  • RxJava
  • JDK9 Flow
  • Akka Streams
  • Vert.x
  • Hibernate Reactive(Vert.x 사용)

 

3. 프로젝트 리액터

  • 프로젝트 리액터(Project Reactor)는 리액티브 스트림의 구현체 중 하나로 스프링의 에코시스템 범주에 포함된 프레임워크이다
  • 리액티브 스트림 사양을 구현하고 있으므로 리액티브 스트림에서 사용하는 용어와 규칙을 그대로 사용한다
  • 리액터를 사용하면 애플리케이션에 리액티브 프로그래밍을 적용할 수 있고 비동기-논블로킹을 적용할 수 있다
  • 함수형 프로그래밍의 접근 방식을 사용해서 비동기-논블로킹 코드의 난해함을 해결한다
  • 백프레셔(Backpressure) 를 사용해 시스템의 부하를 효율적으로 조절할 수 있다
  • 리액터는 리액티브 스트림의 Publisher 인터페이스를 구현하는 모노(Mono)플럭스(Flux)라는 두 가지 핵심 타입을 제공한다
  • 두 타입 모두 리액티브 스트림 데이터 처리 프로토콜대로 onComplete 또는 onError 시그널이 발생할 때 까지 onNext를 사용해 구독자에게 데이터를 통지한다

모노

  • 0..1개의 단일 요소 스트림을 통지하는 발행자(Publisher)

플럭스

  • 0..N개로 이뤄진 무한대 요소를 통지하는 발행자

  • 연산자를 사용하기 위해서는 터미널 오퍼레이터인 구독(subcribe)이 필수이다. 구독하지 않으면 연산자는 실행되지 않는다. (이 개념은 Java8 스트림과 유사하며 WebFlux에선 컨트롤러에서 Mono,Flux 타입 반환시 자동으로 해줌)
  • CompletableFuture, DefferedResult, ListenableFuture 등 기존의 비동기 처리 방식은 구독 형태가 아니며, 단순한 기능만 가지고 있다 (async, join, callback 등)

 

4. 스프링 웹플럭스

  • 기본적으로 Project Reactor 기반 (RxJava와 같은 다른 구현체도 사용 가능함)
  • 서블릿 기반인 스프링 MVC와 대비되는 리액티브 기반의 웹 스택
  • 스프링 MVC와 베타적으로 동작하지만 부분적으로 상호 운용이 가능함 예) WebClient, 애노테이션 등

  • 내부적으로 블로킹 API가 있으면 성능이 현저하게 떨어짐
  • 예시로 Spring Data JPA는 내부에서 JDBC API를 사용하는데 JDBC 드라이버는 블로킹 API (Thread per Connection)이므로 JPA를 쓰는 경우 Spring MVC 쓰는게 더 나은 옵션이다
  • 꼭 블로킹 API 써야한다면 아래와 같이 별도의 스케쥴러로 동작시켜야한다
val blockingWrapper = Mono.fromCallable {

        /* make a remote synchronous call */ 

}.subscribeOn(Schedulers.boundedElastic())

  • 그 외에도 깊게 공부하면 연산자 융합(operator-fusion), Hot-Cold 퍼블리셔 등등 공부할게 정말 많다.

 

스프링 웹플럭스의 코루틴 지원

  • 리액터의 연산자 스타일에 비해 직관적인 스타일(진리의 사바사)
  • 리액터에 비해 러닝커브가 적은 편
  • 공식 레퍼런스 문서의 코틀린 예제들을 보면 코루틴 기반으로 소개하고 있다


코루틴이란?

  • 코루틴(Coroutine)은 코틀린에서 비동기-논블로킹 프로그래밍을 명령형 스타일로 작성할 수 있도록 도와주는 라이브러리이다
  • 코루틴은 멀티 플랫폼을 지원하여 코틀린을 사용하는 안드로이드, 서버 등 여러 환경에서 사용할 수 있다
  • 코루틴은 일시 중단 가능한 함수(suspend function) 를 통해 스레드가 실행을 잠시 중단했다가 중단한 지점부터 다시 재개(resume) 할 수 있다
  • 다른 언어와는 다르게 코틀린의 코루틴은 특별하다

 

코루틴을 사용한 구조적 동시성 예시

suspend fun combineApi() = coroutineScope {
	val response1 = async { getApi1() }
	val response2 = async { getApi2() }
	
	return ApiResult (
		response1.await()
		response2.await()
	)
}

 

 

리액티브가 코루틴으로 변환되는 방식

//Mono → suspend 
fun handler(): Mono<Void> -> suspend fun handler()

//Flux → Flow
fun handler(): Flux<T> -> fun handler(): Flow<T>
  • 모노는 서스펜드 함수로 플럭스는 플로우로 변환되는 걸 알 수 있다. 반대도 성립된다.

 

코루틴을 적용한 컨트롤러 코드

@RestController
class UserController(
	private val userService : UserService,
	private val userDetailService: UserDetailService
) {
	
	@GetMapping("/{id}")
	suspend fun get(@PathVariable id: Long) : User {
		return userService.getById(id)
	}
	
	@GetMapping("/users")
	suspend fun gets() = withContext(Dispatchers.IO) {
		val usersDeffered = async { userService.gets() }
		val userDetailsDeffered = async { userDetailService.gets() }
				
		return UserList(usersDeffered.await(), userDetailsDeffered.await())	
	}

}

 

  • Spring Data R2DBC는 아래와 같이 CoroutineCrudRepository를 지원한다

기존 리액티브 스택의 Spring Data R2DBC, Spring Data Mongo Reactive 등에서 ReactiveRepository 로 개발된 코드에서 코루틴으로 변환한 예제

interface ContentReactiveRepository : ReactiveCrudRepository<Content, Long> {

    fun findByUserId(userId: Long) : Mono<Content>

    fun findAllByUserId(userId: Long): Flux<Content>
}


class ContentService (
	val repository : ContentReactiveRepository
) {

	
	fun findByUserIdMono(userId: Long) : Mono<Content> {
		return repository.findByUserId(userId)
	}	

	suspend findByUserId (userId: Long) : Content {
		return repository.findByUserId(userId).awaitSingle()
	}

}

더 좋은 방법 : CoroutineCrudRepository 를 사용하면 awaitXXX 확장 함수 없이 사용 가능

interface ContentCouroutineRepository : CoroutineCrudRepository<Content, Long> {

    suspend fun findByUserId(userId:Long) : Content?

    fun findAllByUserId(userId: Long): Flow<Content>
}


class ContentService (
	val repository : ContentCouroutineRepository
) {


	suspend findByUserId (userId: Long) : Content {
		return repository.findByUserId(userId)
	}

}

 

 

 

Flow

  • Flow 는 코루틴에서 리액티브 프로그래밍 스타일로 작성할 수 있도록 만들어진 API이다
  • 코루틴의 suspend 함수는 단일 값을 비동기로 반환하지만 Flow를 사용하면 무한대 값을 반환할 수 있다
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    val flow = simple()
    flow.collect { value -> println(value) }
}

fun simple(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

// Flow started
// 1
// 2
// 3

 

  • 리액티브 스트림과 같이 Terminal Operator(최종 연산자)collect 를 호출하지 않으면 아무런 일도 일어나지 않는다
  • 코루틴의 플로우는 리액티브 스트림에서 영감을 받아 제작
    • 리액터의 플럭스(Flux)는 Pull-Push Hybrid
    • 코틀린의 플로우(Flow)는 Push Only
    • 아직은 리액터 플럭스에 비해 기능이 적다

 

스프링 MVC에서도 리액티브와 코루틴을 쓸수 있다

  • 코루틴, 리액티브를 쓰고 싶은 경우 의존성 추가
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
  • WebClient와 같은 비동기-논블로킹 라이브러리가 필요하다면 의존성 추가
implementation("org.springframework.boot:spring-boot-starter-webflux")
  • 웹플럭스를 바로 도입하기 어렵거나 MVC에서 비동기-논블로킹 구현을 우아하게 작성하고 싶은 경우 이 방법을 추천

 

 

DEMO

예제 코드

https://github.com/digimon1740/backend-techtalk

부하 테스트 시나리오

  1. Mock 데이터를 응답하는 유저 API 서버가 존재함
  2. 유저 API 서버는 API 서버로 부터 요청을 받으면 응답하는데까지 2초가 소요된다
  3. 유저 API 서버가 최초 실행되면 균형있는 테스트를 위해 warm-up을 수행한다
  4. 10초 동안 MVC, WebFlux 서버에 아래 조건으로 요청을 보내고 Vegeta 리포트와, VisualVM 모니터를 확인한다
  5. 100/s 요청 전송
  6. 200/s 요청 전송
  7. 300/s 요청 전송
  8. 1000/s 요청 전송

 

# MVC test
echo 'GET http://localhost:8080/mvc/users/1?delay=2000' | vegeta attack -duration=10s -rate=300/s | vegeta report

 

 

# WebFlux test
echo 'GET http://localhost:8081/webflux/users/1?delay=2000' | vegeta attack -duration=10s -rate=300/s | vegeta report

 

6. 정리

  • 전통적 블로킹 방식에 비해 비동기-논블로킹 모델은 더 적은 리소스로 같거나 더 많은 일을 할 수 있다
  • 필연적으로 비동기-논블로킹 모델은 코드의 가독성이 떨어지고 디버깅이 힘들 수 있다
  • 리액티브 프로그래밍은 비동기-논블로킹 개발을 더 쉽게 해주는 패러다임이다
  • 스프링 MVC는 느리고 구리다? 스프링 웹플럭스는 빠르고 멋지다?
  • 일반적으론 스프링 MVC도 충분하지만 한정된 자원에서 대량의 트래픽이 요구되는 서비스거나 비동기-논블로킹 IO가 필요한 경우 추천
  • 기존 스프링 MVC 프로젝트에 스프링 웹플럭스 의존성을 추가해서 써보는 것도 좋은 방법이다
  • 블로킹 API(JDBC, RestTemplate 등)을 쓸거면 그냥 마음 편히 스프링 MVC로 개발하자
  • 코틀린 + 웹플럭스 조합의 경우 메인은 코루틴을 사용하고 필요에 따라 리액티브를 같이 적용하는 것을 추천
  • 코루틴의 지원 범위를 넘어선 기능을 찾거나 연산자에 대한 학습이 충분하다면 리액티브는 여전히 매력적인 기술이다
  • 새로운 패러다임을 공부하는 것은 항상 어렵지만 도전했을때 얻는 것도 많다
  • 은탄환은 없다.

레퍼런스

https://devsh.tistory.com/category/Reactive Programming

https://tech.lezhin.com/2020/07/15/kotlin-webflux

https://github.com/digimon1740/webflux-demo

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/index.html

https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html

https://docs.spring.io/spring-framework/docs/current/reference/html/languages.html#coroutines

728x90
반응형