코틀린 코루틴 한판 정리

2025-01-03 22:37:21
#kotlin#coroutine#async#concurrency

코루틴의 등장 배경

프로세스는 별도 설정없이는 단일 스레드로 실행됩니다(Single Thread Process). 실행 중에 File System IO작업, Network작업과 같이 Blocking 작업이 실행되면 대기하게 됩니다. 이를 해결하기 위해 멀티 스레드 프로그래밍이 등장했는데, 작업 간 독립성이 있는 경우 Parallel Processing이 가능하기 때문에 단일 스레드보다 더 높은 작업량을 처리할 수 있습니다.

멀티 스레드 프로그래밍도 만능은 아니고, 여러 단점이 존재합니다. Thread 생성 비용과 Thread간 context switching 비용이 있고, JVM에선 1개의 Thread 당 약 1MB의 Stack 메모리 영역을 차지합니다. 또한 I/O 작업 시 thread가 blocking 되는 일도 발생합니다.

Thread Blocking: thread가 아무것도 하지 않고 대기하는 상태입니다. Mutex, Semaphore로 인해 공유 자원에 접근할 수 있는 스레드가 제한되는 경우에도 발생하죠. 아래 코드에서도 blocking이 발생합니다.

fun main(args: Array<String>) {
    val executorService = Executors.newFixedThreadPool(2)
    val future = executorService.submit<String> {
        Thread.sleep(1000L)
        return@submit "작업완료!"
    }

    val result = future.get() // 여기서 thread blocking!
    println(result)
    executorService.shutdown()
}

thread blocking 문제를 코루틴이라는 경량 쓰레드(lightweight thread)로 해결할 수 있습니다. 코루틴은 작업이 일시 중단되면 더 이상 쓰레드 사용이 필요없으므로 쓰레드를 다른 코루틴에게 양보합니다. 마치 코루틴을 쓰레드에 뗐다가 붙이면서 쓰레드가 쉬는 시간을 갖지 않도록 열심히 채찍질하는 거죠.

코루틴이란

일시 중단이 가능한 작업 단위입니다. thread에 뗐다가 붙일 수가 있기 때문에 경량 쓰레드라고도 불립니다. 코루틴 10만 개를 만들어도 메모리 문제가 발생하지 않지만, 스레드 10만 개를 만들면 OutOfMemoryError가 발생합니다.

// 코루틴 10만 개 생성 - 정상 동작
runBlocking {
    repeat(100_000) {
        launch {
            delay(1000L)
            print(".")
        }
    }
}

// 스레드 10만 개 생성 - OutOfMemoryError!
repeat(100_000) {
    thread {
        Thread.sleep(1000L)
        print(".")
    }
}

suspend 함수

suspend 키워드가 붙은 함수는 일시 중단 가능한 함수입니다. 코루틴 내부 또는 다른 suspend 함수에서만 호출할 수 있습니다.

suspend fun fetchUserData(): User {
    delay(1000L) // delay도 suspend 함수
    return User("홍길동")
}

// 일반 함수에서는 호출 불가
fun main() {
    // fetchUserData() // 컴파일 에러!
}

// 코루틴 내에서는 호출 가능
fun main() = runBlocking {
    val user = fetchUserData() // OK
    println(user)
}

suspend 함수는 컴파일 시점에 Continuation이라는 콜백 파라미터가 추가됩니다. 이를 통해 일시 중단된 지점에서 다시 재개할 수 있죠.

// 컴파일 전
suspend fun fetchUserData(): User

// 컴파일 후 (개념적으로)
fun fetchUserData(continuation: Continuation<User>): Any?

Continuation은 "나머지 실행할 코드"를 담고 있는 콜백입니다. 함수형 프로그래밍에서 말하는 CPS(Continuation Passing Style)와 같은 개념인데요, 코루틴이 일시 중단되면 현재 상태를 Continuation에 저장하고, 나중에 재개할 때 이 Continuation을 호출해서 이어서 실행합니다.

CoroutineDispatcher

CoroutineDispatcher는 코루틴을 어떤 thread에서 실행할지 결정합니다. 코루틴의 실행을 관리하는 주체죠.

동작 방식

  1. 1번 Thread에서 1번 코루틴 실행 중
  2. 2번 코루틴이 들어오면, CoroutineDispatcher가 2번 코루틴을 2번 Thread에 할당
  3. 1, 2번 Thread 모두 점유 중인 상태에서 3번 코루틴이 들어오면 작업 대기열에 대기
  4. 1번 코루틴이 완료되면 1번 Thread에 대기 중인 3번 코루틴 실행

Dispatcher 종류

Dispatcher 용도 스레드 풀 크기
Dispatchers.Default CPU 집약적 작업 (정렬, JSON 파싱 등) CPU 코어 수
Dispatchers.IO I/O 작업 (네트워크, 파일, DB) 최대 64개 (또는 코어 수 중 큰 값)
Dispatchers.Main UI 작업 (Android) 메인 스레드 1개
Dispatchers.Unconfined 특정 스레드에 국한되지 않음 제한 없음
launch(Dispatchers.IO) {
    // 네트워크 요청
    val response = httpClient.get("https://api.example.com/users")
}

launch(Dispatchers.Default) {
    // CPU 집약적 작업
    val sorted = bigList.sortedBy { it.score }
}

코루틴 라이브러리는 application 레벨에서 스레드풀을 공유하는데요, 덕분에 스레드 생성/관리 비용을 아낄 수 있습니다. Dispatcher를 직접 생성하면 메모리를 낭비하게 될 수 있으니, 가급적 미리 정의된 Dispatcher를 사용하세요.

참고로 Dispatchers.DefaultDispatchers.IO는 내부적으로 스레드를 공유합니다. 하지만 IO는 blocking 작업이 많아서 스레드가 더 많이 필요하고, Default는 CPU 작업이라 코어 수만큼만 있어도 충분하거든요. 그래서 용도에 맞게 분리해둔 겁니다.

코루틴 빌더 함수

코루틴을 생성하는 함수들입니다. 모든 코루틴 빌더 함수는 코루틴을 만들고, 코루틴을 추상화한 Job 객체를 생성합니다.

launch

결과값이 필요 없을 때 쓰는 빌더입니다. "실행하고 잊어버리기(fire-and-forget)" 스타일이죠.

val job: Job = launch(Dispatchers.Default) {
    println("코루틴 실행")
}

async

결과값이 필요할 때 쓰는 빌더입니다. Deferred<T> 객체를 반환하고, await()로 결과를 받아요.

val deferred: Deferred<String> = async(Dispatchers.IO) {
    delay(1000L)
    "결과값"
}
val result = deferred.await() // 결과 대기

runBlocking

현재 스레드를 blocking하면서 코루틴을 실행합니다. 테스트나 main 함수에서 사용합니다. 프로덕션 코드에서는 사용을 피하세요.

fun main() = runBlocking {
    launch { println("코루틴 1") }
    launch { println("코루틴 2") }
}

Job의 주요 함수

runBlocking {
    val job = launch {
        repeat(10) { i ->
            delay(500L)
            println("작업 $i")
        }
    }

    delay(1300L)
    job.cancel() // 취소 요청
    job.join()   // 완료 대기
    // 또는 job.cancelAndJoin()
}

주의: cancel()을 호출해도 코루틴이 즉시 취소되지 않습니다. 취소 플래그만 설정하고, 코루틴이 일시 중단 지점(delay, yield 등)에서 플래그를 확인할 때 취소됩니다.

// 취소되지 않는 코루틴
val job = launch(Dispatchers.Default) {
    while (true) {
        // 일시 중단 지점이 없어서 취소 확인을 못함!
        println("작업중")
    }
}
job.cancelAndJoin() // 취소 안됨

// 해결: isActive 체크 또는 yield() 호출
val job = launch(Dispatchers.Default) {
    while (isActive) { // 취소 확인
        println("작업중")
    }
}

지연 시작

val lazyJob = launch(start = CoroutineStart.LAZY) {
    println("지연 실행")
}
delay(1000L)
lazyJob.start() // 명시적으로 시작

코루틴 상태

코루틴 상태 다이어그램 코루틴 Job의 상태 전이 [5]

상태 isActive isCompleted isCancelled
New false false false
Active true false false
Completing true false false
Cancelling false false true
Cancelled false true true
Completed false true false

왜 Completing 상태가 따로 있을까요? 부모 코루틴이 자신의 코드를 다 실행했더라도, 자식 코루틴이 아직 실행 중이면 기다려야 하거든요. 이 "자식 기다리는 중" 상태가 바로 Completing입니다. 구조화된 동시성의 핵심이죠.

withContext

Dispatcher를 전환하면서 코드 블록을 실행합니다. async { }.await()와 비슷하지만 더 효율적입니다.

suspend fun fetchAndProcess(): Result {
    // IO 스레드에서 네트워크 요청
    val data = withContext(Dispatchers.IO) {
        api.fetchData()
    }

    // Default 스레드에서 CPU 작업
    val processed = withContext(Dispatchers.Default) {
        processData(data)
    }

    return processed
}

CoroutineContext

코루틴을 실행하는 환경을 설정하고 관리하는 인터페이스입니다. 여러 요소를 + 연산자로 조합할 수 있습니다.

val context = CoroutineName("MyCoroutine") +
              Dispatchers.IO +
              CoroutineExceptionHandler { _, e -> println("에러: $e") }

launch(context) {
    println(coroutineContext[CoroutineName]) // CoroutineName(MyCoroutine)
}

주요 구성 요소

  • CoroutineName: 코루틴 이름 (디버깅용)
  • CoroutineDispatcher: 실행 스레드 결정
  • Job: 코루틴 생명주기 관리
  • CoroutineExceptionHandler: 예외 처리

구조화된 동시성 (Structured Concurrency)

부모 코루틴과 자식 코루틴 간의 관계를 정의하는 개념입니다.

Job 계층 구조 Job의 계층적 관계 [4]

핵심 규칙

  1. 부모 코루틴의 CoroutineContext가 자식에게 상속됨 (Job 제외)
  2. 부모가 취소되면 모든 자식도 취소됨
  3. 부모는 모든 자식이 완료될 때까지 대기함
  4. 자식의 예외가 부모로 전파됨

취소 전파 취소가 자식 코루틴으로 전파되는 모습 [4]

val parentJob = launch {
    val childJob = launch {
        delay(1000L)
        println("자식 완료")
    }
    println("부모 코드 완료")
    // 자식이 완료될 때까지 부모는 Completing 상태
}
parentJob.join()
// 자식까지 완료된 후에야 여기 도달
launch(CoroutineName("Parent") + Dispatchers.Default) {
    // 부모 컨텍스트 상속
    launch {
        println(coroutineContext[CoroutineName]) // CoroutineName(Parent)
    }

    // 자식이 CoroutineName 오버라이드
    launch(CoroutineName("Child")) {
        println(coroutineContext[CoroutineName]) // CoroutineName(Child)
    }
}

예외 처리

CoroutineExceptionHandler

launch로 시작한 코루틴의 잡히지 않은 예외를 처리합니다.

val handler = CoroutineExceptionHandler { _, exception ->
    println("예외 발생: ${exception.message}")
}

val scope = CoroutineScope(Dispatchers.Default + handler)

scope.launch {
    throw RuntimeException("에러!")
}

주의: async에서 발생한 예외는 await() 호출 시 throw됩니다. CoroutineExceptionHandlerasync에는 동작하지 않기 때문에, try-catch로 직접 잡아야 합니다.

val deferred = async {
    throw RuntimeException("에러!")
}
try {
    deferred.await() // 여기서 예외 발생
} catch (e: Exception) {
    println("잡힘: ${e.message}")
}

SupervisorJob

일반 Job은 자식 중 하나가 실패하면 모든 자식이 취소됩니다. SupervisorJob은 자식의 실패가 다른 자식에게 영향을 주지 않습니다.

SupervisorJob 에러 전파 SupervisorJob의 에러 전파 방식 [4]

// 일반 Job: 한 자식 실패 → 모든 자식 취소
val scope = CoroutineScope(Job())

// SupervisorJob: 자식 실패가 독립적
val supervisorScope = CoroutineScope(SupervisorJob())

supervisorScope.launch {
    throw RuntimeException("실패") // 이 자식만 실패
}

supervisorScope.launch {
    delay(1000L)
    println("나는 정상 실행됨") // 영향 없음
}

supervisorScope 빌더를 사용할 수도 있습니다.

supervisorScope {
    launch { throw RuntimeException("실패") }
    launch {
        delay(100L)
        println("계속 실행")
    }
}

CancellationException 처리

CancellationException은 특별합니다. 코루틴 취소 시 발생하는데, 이건 "정상적인 취소"라서 부모에게 전파되지 않습니다. 그래서 예외 처리할 때 주의가 필요해요.

try {
    someSuspendFunction()
} catch (e: Exception) {
    // CancellationException도 여기서 잡히면 취소가 무시됨!
}

// 올바른 방법
try {
    someSuspendFunction()
} catch (e: CancellationException) {
    throw e // 다시 던져서 취소 전파
} catch (e: Exception) {
    // 다른 예외 처리
}

Flow

코루틴 기반의 비동기 데이터 스트림입니다. 여러 값을 순차적으로 방출할 수 있습니다.

fun numbersFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(100L)
        emit(i) // 값 방출
    }
}

// 수집
runBlocking {
    numbersFlow().collect { value ->
        println(value)
    }
}

Flow vs Sequence

  • Sequence: 동기, blocking
  • Flow: 비동기, non-blocking, 코루틴 컨텍스트 활용

주요 연산자

flowOf(1, 2, 3, 4, 5)
    .filter { it % 2 == 0 }           // 짝수만
    .map { it * 10 }                   // 10 곱하기
    .onEach { println("처리: $it") }   // 부수 효과
    .flowOn(Dispatchers.Default)       // 업스트림 Dispatcher 지정
    .collect { println("결과: $it") }

StateFlow와 SharedFlow

// StateFlow: 상태 홀더 (항상 최신값 보유)
private val _uiState = MutableStateFlow(UiState())
val uiState: StateFlow<UiState> = _uiState.asStateFlow()

// SharedFlow: 이벤트 방출 (값을 보유하지 않음)
private val _events = MutableSharedFlow<Event>()
val events: SharedFlow<Event> = _events.asSharedFlow()

Backpressure 처리

생산자가 소비자보다 빠르면 어떻게 될까요? Flow는 기본적으로 순차 처리라서 소비자가 처리할 때까지 생산자가 기다립니다. 하지만 buffer(), conflate(), collectLatest() 같은 연산자로 전략을 바꿀 수 있어요.

flow.buffer()          // 버퍼에 쌓아두기
flow.conflate()        // 최신 값만 유지, 중간 값 버리기
flow.collectLatest {}  // 새 값 오면 이전 처리 취소

4. 재시도 로직

suspend fun <T> retry(
    times: Int = 3,
    initialDelay: Long = 100,
    maxDelay: Long = 1000,
    factor: Double = 2.0,
    block: suspend () -> T
): T {
    var currentDelay = initialDelay
    repeat(times - 1) {
        try {
            return block()
        } catch (e: Exception) {
            delay(currentDelay)
            currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
        }
    }
    return block() // 마지막 시도
}

마무리

코루틴은 스레드보다 가볍고, 콜백 지옥 없이 비동기 코드를 동기 코드처럼 작성할 수 있게 해줍니다. 핵심은:

  • suspend 함수로 일시 중단 가능한 함수 정의
  • Dispatcher로 실행 스레드 제어
  • 구조화된 동시성으로 안전한 코루틴 관리
  • Flow로 비동기 데이터 스트림 처리

다만 코루틴도 은탄환이 아닙니다. CPU 바운드 작업에서 동시성을 높인다고 성능이 좋아지는 건 아니고, 디버깅이 스레드보다 어려울 수 있습니다. 적재적소에 활용하는 게 중요합니다.

참고로 Java 21의 Virtual Thread와 비교하면, 코루틴은 언어 레벨에서 suspend/resume을 지원하고 구조화된 동시성이 잘 갖춰져 있습니다. Virtual Thread는 기존 blocking 코드를 그대로 쓸 수 있다는 장점이 있고요. Kotlin 서버라면 코루틴, Java 서버라면 Virtual Thread를 고려해보세요.

참고문헌

프로필 이미지
@chani
바둑, 스타크래프트 등 고전 게임을 좋아하는 내향인 개발자입니다

댓글