리액티브 프로그래밍이란?
왜 리액티브 프로그래밍인가?
전통적인 스레드 기반 동기 방식은 요청마다 스레드를 할당합니다. 동시 요청이 1000개면 스레드도 1000개가 필요합니다. 스레드는 메모리를 많이 사용하고, 컨텍스트 스위칭 비용도 있어서 대규모 트래픽에서 한계가 있습니다.
리액티브 프로그래밍은 이벤트 기반의 비동기 처리로 적은 스레드로도 많은 요청을 처리할 수 있습니다. I/O 대기 시간에 다른 작업을 수행하기 때문입니다.
리액티브 설계 원칙 (리액티브 선언문)
리액티브 시스템을 설계할 때 따라야 하는 4가지 핵심 원칙이 있습니다.
1. Message Driven (메시지 기반)
비동기 메시지 기반의 통신을 통해 구성요소들간의 느슨한 결합, 격리성을 보장합니다. 컴포넌트 간 직접 호출 대신 메시지를 주고받기 때문에 장애가 전파되지 않습니다.
2. Elastic (탄력성)
시스템의 작업량이 변화하더라도 일정한 응답을 유지합니다. 트래픽이 많든 적든 시스템에서 요구하는 응답성을 일정하게 유지하는 것이 핵심입니다. 필요에 따라 리소스를 동적으로 할당하거나 해제할 수 있어야 합니다.
3. Resilient (회복성)
시스템에 장애가 발생하더라도 응답성을 유지합니다. 시스템의 구성요소들이 독립적으로 분리되기 때문에 장애가 발생하더라도 전체 시스템은 여전히 응답 가능하고, 장애가 난 부분만 복구하면 됩니다.
4. Responsive (응답성)
비동기 메시지 기반 통신을 바탕으로 한 회복성과 탄력성을 기반으로 즉각적으로 응답 가능한 시스템을 의미합니다. 사용자에게 일관된 응답 시간을 보장하는 것이 목표입니다.
리액티브 프로그래밍(Reactive Programming)이란?
리액티브 프로그래밍은 데이터 흐름과 변화의 전파에 초점을 맞춘 프로그래밍 패러다임입니다.
- 비동기 메시지 기반으로 통신하는 높은 응답성을 가지는 Reactive System을 구축하는 데 필요한 프로그래밍 모델입니다.
- 데이터 소스의 변경이 있을 때마다 데이터를 전파하는 이벤트 기반 아키텍처를 사용합니다.
- 선언형 프로그래밍 방식을 사용해 실행할 동작을 구체적으로 명시하지 않고, 목표만 선언합니다.
명령형 방식과 선언형 방식의 차이를 코드로 비교해보면 다음과 같습니다.
// 명령형 방식 - "어떻게" 처리할지 명시
List<String> results = new ArrayList<>();
for (String name : names) {
if (name.startsWith("A")) {
results.add(name.toUpperCase());
}
}
// 선언형 방식 - "무엇을" 원하는지 선언
Flux.fromIterable(names)
.filter(name -> name.startsWith("A"))
.map(String::toUpperCase)
.subscribe(results::add);
리액티브 프로그래밍 코드 구성
리액티브 프로그래밍의 핵심 구성요소는 다음 4가지입니다.
1. Publisher
입력으로 들어오는 데이터를 제공하는 역할을 수행합니다. 데이터를 생성하고 Subscriber에게 전달하는 주체입니다.
2. Subscriber
Publisher가 제공한 데이터를 전달받아서 사용하는 주체(구독자)입니다. 데이터를 받아서 실제로 처리하는 역할을 합니다.
3. Data Source (Data Stream)
Publisher의 입력으로 전달되는 데이터입니다. 데이터베이스 쿼리 결과, API 응답, 파일 내용 등이 될 수 있습니다.
4. Operator
Publisher와 Subscriber 사이에서 데이터를 가공하는 처리 역할을 담당하는 컴포넌트입니다. map, filter, flatMap 등의 연산자가 여기에 해당합니다.
리액티브 스트림즈(Reactive Streams)
리액티브 라이브러리를 어떻게 구현할지 정의해놓은 표준 명세를 Reactive Streams라고 부릅니다.
RxJava, Reactor, Java 9의 Flow API와 같은 여러 구현체가 존재합니다. Spring에서는 Reactor 3.x가 Spring Framework 5부터 포함되어 핵심 역할을 담당하고 있습니다.
리액티브 스트림즈 구성요소
리액티브 스트림즈는 Publisher, Subscriber, Subscription, Processor 라는 4개의 컴포넌트로 구성되어 있습니다. 리액티브 스트림즈 구현체는 이 4개의 인터페이스를 표준 명세에 맞게 구현해야 합니다.
1. Publisher
데이터를 생성하고 발행하는 역할을 합니다.
public interface Publisher<T> {
// 파라미터로 전달받은 Subscriber를 등록하는 역할 수행
public void subscribe(Subscriber<? super T> s);
}
2. Subscriber
구독한 Publisher로부터 데이터를 전달받아 처리하는 역할을 합니다.
public interface Subscriber<T> {
// 구독 시점에 호출. Subscription을 통해 데이터 요청량 조절 가능
public void onSubscribe(Subscription s);
// Publisher가 통지한 데이터를 처리
public void onNext(T t);
// 에러 발생시 호출
public void onError(Throwable t);
// 데이터 통지 완료시 호출
public void onComplete();
}
3. Subscription
Publisher에 요청할 데이터 개수를 지정하고, 데이터의 구독을 취소하는 역할을 합니다. Backpressure를 구현하는 핵심 인터페이스입니다.
public interface Subscription {
// Publisher에게 "n"개만큼의 데이터 요청
public void request(long n);
// 구독을 해지
public void cancel();
}
4. Processor
Publisher와 Subscriber의 역할을 모두 수행할 수 있는 컴포넌트입니다. 데이터를 받아서 가공한 후 다시 발행할 수 있습니다.
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Backpressure (배압)
Backpressure는 리액티브 프로그래밍에서 핵심적인 개념입니다. Publisher가 데이터를 빠르게 발행하는데 Subscriber가 처리 속도를 따라가지 못하면 어떻게 될까요?
처리되지 못한 데이터가 메모리에 쌓이다가 결국 OutOfMemoryError가 발생합니다. Backpressure는 이 문제를 해결하기 위해 Subscriber가 처리할 수 있는 만큼만 데이터를 요청하는 메커니즘입니다.
// Backpressure 예시: Subscriber가 처리량을 조절
Flux.range(1, 1000)
.onBackpressureBuffer(100) // 버퍼 크기 제한
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10); // 처음에 10개만 요청
}
@Override
protected void hookOnNext(Integer value) {
processValue(value);
request(1); // 하나 처리하면 하나 더 요청
}
});
Reactor에서 제공하는 Backpressure 전략은 다음과 같습니다.
| 전략 | 설명 |
|---|---|
onBackpressureBuffer() |
버퍼에 저장 (가장 많이 사용) |
onBackpressureDrop() |
처리 못하면 버림 |
onBackpressureLatest() |
최신 값만 유지 |
onBackpressureError() |
에러 발생 |
Project Reactor
Spring Framework 5버전부터 Reactive Stack에 포함되어 Spring WebFlux 기반의 리액티브 애플리케이션을 개발할 때 핵심 역할을 담당하고 있습니다.
Reactor는 Reactive Streams의 구현체이며, Non-Blocking I/O 방식으로 개발하고 Java의 함수형 프로그래밍 API(스트림/람다)를 사용합니다.

Mono[0|1], Flux[N]
Reactor의 Publisher 타입은 두 가지가 존재합니다. 바로 Mono와 Flux입니다.
| 타입 | 설명 | 사용 예 |
|---|---|---|
Mono<T> |
0~1개의 데이터를 emit | 단일 객체 조회, API 응답 |
Flux<T> |
0~N개의 데이터를 emit | 목록 조회, 스트리밍 |
// Mono 예시: 단일 사용자 조회
Mono<User> findById(Long id) {
return userRepository.findById(id);
}
// Flux 예시: 모든 사용자 조회
Flux<User> findAll() {
return userRepository.findAll();
}
아래의 예시코드와 같이 Publisher는 Data Source로부터 Data를 발행하는 역할을 수행합니다. 이후에는 Operator를 거쳐 Subscriber가 데이터를 받아 처리합니다.
Flux<String> sequence = Flux.just("Hello", "Reactor"); // Flux == Publisher, "Hello","Reactor" == Data Source
sequence.map(String::toLowerCase) // map == Operator
.subscribe(System.out::println); // System.out::println == Subscriber
자주 사용하는 Operator
실무에서 자주 쓰는 Operator 위주로 정리했습니다. 이것만 알아도 대부분의 상황은 커버됩니다.
// 1. map: 데이터 변환
Flux.just(1, 2, 3)
.map(i -> i * 2)
.subscribe(System.out::println); // 2, 4, 6
// 2. flatMap: 비동기 변환 (1:N)
Flux.just(1, 2, 3)
.flatMap(id -> findUserById(id)) // 각 id로 비동기 조회
.subscribe(System.out::println);
// 3. filter: 조건에 맞는 데이터만 통과
Flux.just(1, 2, 3, 4, 5)
.filter(i -> i % 2 == 0)
.subscribe(System.out::println); // 2, 4
// 4. zip: 여러 Publisher 결합
Mono.zip(getUserMono(), getOrderMono())
.map(tuple -> new UserOrderInfo(tuple.getT1(), tuple.getT2()))
.subscribe(System.out::println);
// 5. onErrorResume: 에러 발생시 대체 Publisher로 전환
findUserById(id)
.onErrorResume(e -> Mono.just(User.anonymous()))
.subscribe(System.out::println);
// 6. retry: 실패시 재시도
callExternalApi()
.retry(3)
.subscribe(System.out::println);
map vs flatMap, 언제 뭘 써야 하나?
처음 Reactor를 접하면 map과 flatMap이 헷갈립니다. 핵심 차이는 반환 타입입니다.
// map: 동기 변환. 값 -> 값
Flux.just(1, 2, 3)
.map(i -> i * 2) // Integer -> Integer
.subscribe();
// flatMap: 비동기 변환. 값 -> Publisher
Flux.just(1, 2, 3)
.flatMap(id -> userRepository.findById(id)) // Integer -> Mono<User>
.subscribe();
만약 비동기 작업에 map을 쓰면 Mono<Mono<User>> 같은 중첩 구조가 됩니다. flatMap은 이를 평탄화(flatten)해서 Mono<User>로 만들어줍니다.
| 상황 | 사용할 Operator |
|---|---|
| 단순 값 변환 (동기) | map |
| 비동기 작업 (DB 조회, API 호출) | flatMap |
| 순서 보장이 필요한 비동기 | concatMap |
| 최신 값만 필요한 비동기 | switchMap |
어떤 상황에서 Spring WebFlux 도입을 고려해야 하는가?
WebFlux가 적합한 상황과 그렇지 않은 상황을 명확히 구분하는 것이 중요합니다.
WebFlux가 적합한 경우
-
대량의 동시 요청 처리가 필요한 시스템
- Blocking I/O 방식으로 처리하기 어려운 대량의 요청 트래픽이 발생하는 경우
- 적은 스레드로 많은 연결을 처리해야 하는 경우
-
스트리밍 또는 실시간 시스템
- 채팅, 알림, 실시간 데이터 피드 등
- Server-Sent Events(SSE), WebSocket 활용 시스템
-
마이크로서비스 기반 시스템
- 여러 서비스를 호출하고 결과를 조합해야 하는 경우
- I/O 대기 시간이 긴 외부 API 호출이 많은 경우
WebFlux가 적합하지 않은 경우
-
Blocking 라이브러리 의존
- JDBC, JPA 등 전통적인 Blocking DB 드라이버 사용시
- R2DBC 같은 리액티브 드라이버가 없는 경우
-
CPU 집약적인 작업
- 이미지/영상 처리, 복잡한 계산 등
- I/O 대기가 아닌 CPU 연산이 병목인 경우
-
팀의 학습 곡선
- 리액티브 프로그래밍 경험이 부족한 팀
- 디버깅과 에러 추적이 어려움을 감수하기 어려운 경우
Virtual Thread vs WebFlux
Java 21에서 정식 도입된 Virtual Thread와 WebFlux는 모두 동시성 처리를 개선하는 방법입니다. 어떤 차이가 있을까요?
| 구분 | Virtual Thread | WebFlux |
|---|---|---|
| 프로그래밍 모델 | 명령형 (기존 코드 유지) | 선언형 (Mono/Flux) |
| 학습 곡선 | 낮음 | 높음 |
| 기존 라이브러리 호환 | 좋음 (JDBC 등 사용 가능) | 제한적 (R2DBC 등 필요) |
| Backpressure | 기본 지원 없음 | 내장 지원 |
| 스트리밍 처리 | 제한적 | 강력한 지원 |
Virtual Thread는 기존 코드를 거의 수정하지 않고도 동시성을 개선할 수 있어 도입 장벽이 낮습니다. 반면 WebFlux는 스트리밍, Backpressure 등 리액티브 패러다임이 필요한 경우에 더 적합합니다.
// Virtual Thread - 기존 방식 그대로 사용
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
executor.submit(() -> {
User user = userRepository.findById(id); // Blocking OK
return processUser(user);
});
// WebFlux - 리액티브 방식
userRepository.findById(id) // Non-blocking
.map(this::processUser)
.subscribe();
흔한 실수와 주의사항
1. subscribe()를 호출하지 않으면 아무 일도 일어나지 않는다
// 잘못된 예: subscribe 없음 - 실행되지 않음
Mono.just("data").map(s -> s.toUpperCase());
// 올바른 예
Mono.just("data").map(s -> s.toUpperCase()).subscribe();
// WebFlux Controller에서는 return하면 프레임워크가 subscribe 처리
@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userService.findById(id); // subscribe 불필요
}
2. Blocking 코드를 섞지 않는다
// 잘못된 예: 리액티브 파이프라인 안에서 Blocking 호출
Flux.range(1, 10)
.map(id -> userRepository.findById(id)) // Blocking!
.subscribe();
// 올바른 예: Blocking 작업은 별도 스케줄러에서 실행
Flux.range(1, 10)
.flatMap(id -> Mono.fromCallable(() -> userRepository.findById(id))
.subscribeOn(Schedulers.boundedElastic()))
.subscribe();
3. 에러 처리를 잊지 않는다
// 에러 처리 없으면 예외가 삼켜질 수 있음
userService.findById(id)
.doOnError(e -> log.error("Error: ", e))
.onErrorResume(e -> Mono.just(User.defaultUser()))
.subscribe();
4. 디버깅이 어려울 때는 log()와 checkpoint()를 활용한다
리액티브 코드는 스택트레이스가 복잡해서 디버깅이 까다롭습니다. 어디서 문제가 생겼는지 추적하려면 다음 방법을 사용합니다.
// log(): 각 신호(onNext, onError 등)를 로깅
userService.findById(id)
.log("UserService.findById") // 이름을 붙여서 구분
.map(this::processUser)
.subscribe();
// checkpoint(): 에러 발생 시 위치 표시
Flux.range(1, 10)
.map(i -> i / (i - 5)) // i=5일 때 ArithmeticException
.checkpoint("after division") // 에러 위치 힌트
.subscribe();
5. 스케줄러를 이해하고 적절히 사용한다
| 스케줄러 | 용도 |
|---|---|
Schedulers.immediate() |
현재 스레드에서 실행 |
Schedulers.single() |
단일 재사용 스레드 |
Schedulers.parallel() |
CPU 코어 수만큼의 스레드 풀 (CPU 작업용) |
Schedulers.boundedElastic() |
탄력적 스레드 풀 (Blocking I/O 작업용) |
// Blocking 작업은 boundedElastic에서 실행
Mono.fromCallable(() -> blockingOperation())
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
