ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Flux와 SSe를 통한 chat data stream으로 받기 (문제/해결방안)
    카테고리 없음 2025. 12. 5. 16:57
    728x90
    반응형

    1. SSE가 뭐냐? (정의/목적)

    SSE(Server-Sent Events)는 서버 → 클라이언트 한 방향으로만 실시간(또는 준실시간) 업데이트를 푸시하는 웹 표준이야.
    브라우저는 EventSource라는 내장 API로 하나의 HTTP 연결을 오래 유지(long-lived) 하면서 서버가 보내는 이벤트를 계속 받는다.

     

    1-1. 왜 생겼나? (배경 / 역사)

    웹은 원래 클라이언트가 요청해야 서버가 응답하는 pull 모델이었지.
    실시간 UI를 만들려면:

     

    Polling

    • 주기적으로 요청 → 응답
    • 데이터 없으면 낭비, 지연 큼

     

    Long Polling / Comet

    • 요청을 오래 열어두고 서버가 이벤트 생길 때 응답 후 종료
    • 다시 요청 반복 필요, 구현/스케일링 난이도 있음

    WebSocket

    • 양방향, 낮은 지연
    • 하지만 프록시/방화벽에서 막히는 경우도 있고, 서버도 별도 프로토콜 처리 필요

    SSE는 “서버→클라 실시간만 필요할 때 Polling/Long Polling의 단점을 HTTP 레벨에서 깔끔히 해결”하려고 HTML5 표준으로 들어온 거야.

     

    1-2. 프로토콜/포맷 원리 (가장 중요)

    클라이언트가:

    const es = new EventSource("/sse");

    하면 브라우저가 서버에 HTTP GET을 보내고, 서버는:

    SSE는 줄 기반(line-based) 텍스트 프로토콜이고, 이벤트 하나는 이렇게 생김: HTML Living Standard+1

     

     

    event: tick        (옵션: 이벤트 타입)
    id: 123            (옵션: 이벤트 ID)
    data: hello
    data: world        (data 여러 줄 가능)
    
    <빈 줄로 이벤트 종료>
    • data: 줄들이 모여서 하나의 이벤트 payload가 된다.
    • **빈 줄(\n\n)이 “이벤트 경계”**야.
      → 그래서 “이벤트 단위로 끊어서 보내라”는 말이 나오는 거고!
    • 기본 이벤트 이름은 "message".

     

     

    문제 사항 정의

    약 1000명의 동시접속자와 300개의 요청 스레드 환경을 가정한다.

    전통적인 Spring MVC SSE 구현(SseEmitter/ResponseBodyEmitter/Servlet flush 등)을 사용할 경우,
    요청당 스레드가 스트리밍 응답이 끝날 때까지 장시간 점유될 수 있다.
    따라서 스트리밍이 10초 이상 지속되면 최대 300개의 요청만 처리되고,
    나머지 요청은 스레드 풀 대기열에서 스트리밍 종료까지 지연되어 응답 지연/타임아웃이 발생할 위험이 크다.


    즉, “스트리밍 자체가 스레드 풀을 잠식하는 구조적 한계”가 존재한다.

     

    MVC 컨트롤러가 Flux를 반환하는 방식은,
    Flux 반환 이후 스트리밍 구간은 비동기 어댑터로 처리되어 스트리밍 자체가 요청 스레드를 장기간 점유하지는 않는다.
    그러나 Flux를 반환하기 전 단계에서 JPA 기반 DB 조회/동기 크레딧 차감 등 블로킹 로직이 수행되므로,
    동시 요청이 급증할 경우 이 블로킹 구간에서 요청 스레드 및 DB 커넥션 풀이 포화되어 대기 지연이 발생할 수 있다.
    즉, “스트리밍 전 블로킹 구간이 동시성 병목이 되는 한계”가 남아 있다.

    따라서 스트리밍 API의 안정적 동시 처리를 위해

    • 전통 SSE 방식은 Flux/WebFlux 기반으로 전환하여 스트리밍 중 스레드 점유 문제를 제거하고,
    • 동시에 스트리밍 시작 전 블로킹(DB/JPA/크레딧 차감 등) 로직은 boundedElastic 격리 또는 논블로킹 저장소(R2DBC 등)로 전환하여 병목을 제거할 필요가 있다.

     

     

    WEB FLUX

    1) Flux는 “값”이 아니라 “흐름(스트림)”이다

     

    MVC의 일반 응답은 보통 이렇게 생각해:

    • 함수 호출 → 결과 하나 만들어짐 → 리턴

    Flux는 반대야:

    • 함수 호출 → “앞으로 나올 값들의 파이프라인”을 리턴
    • 실제 값은 구독(subscribe)이 일어난 뒤에, 하나씩 나와

    그래서 Flux는 “여러 개의 값이 시간에 따라 나오는 시퀀스”를 표현해.

     

    2) JVM 메모리/객체 관점: Flux.just가 만드는 건 뭔가?

    Flux<Integer> f = Flux.just(1,2,3);

     

    이 코드가 실행될 때 JVM에서 실제로 일어나는 일:

    1. 스택 프레임에서 Flux.just(...) 호출
    2. Reactor가 Flux 구현체 객체 하나를 힙에 생성
      (예: FluxArray, FluxJust 같은 내부 타입)
    3. 그 객체는 “1,2,3 값을 앞으로 어떻게 방출할지”만 알고 있음
    4. 지역변수 f는 그 객체를 가리키는 참조(reference) 를 들고 있음

    중요:

    • f가 들고 있는 건 1,2,3 그 자체가 아니라
    • "1,2,3을 내보낼 수 있는 Publisher 객체의 참조" 야.

    자바에서 객체는 값이 아니라 행위를 담는 인스턴스일 수 있고,
    Flux는 특히 “행위(미래 실행)”를 담는 객체다.

     

    3) “지금 당장 1,2,3이 메모리에 다 있지 않다”를 더 정확히 말하면?

    엄밀하게:

    • Flux.just(1,2,3)는 이미 존재하는 값 세 개를 캡처해 둔다.
      그래서 내부 배열/필드에 1,2,3이 저장되어 있을 수는 있어.
    • 하지만 그게 핵심이 아니야.

    핵심은:

    그 값들은 외부로 방출되거나 소비되지 않았고,
    언제/어떻게/얼마나 빨리 내보낼지는 구독 시점에 결정된다는 점.
    즉 “메모리에 없음”이라기보다
    “값을 실행 흐름으로서 드러내지 않은 상태(잠재된 상태)” 라고 보는 게 맞아.

     

    Reactor 내부 구현: FluxArray, ScalarSubscription 등의 실제 코드 흐름

    1-1. Flux.just(1,2,3)는 어떤 타입을 만들까?

    Flux.just(T... data)는 입력 개수에 따라 구현체가 달라져.

    • 0개 → FluxEmpty
    • 1개 → FluxJust (+ 내부적으로 Scalar 최적화)
    • 2개 이상 → FluxArray (혹은 FluxIterable류)

    그래서

    Flux<Integer> f = Flux.just(1,2,3);

    는 대개 FluxArray 인스턴스를 만든다고 보면 돼.

    이 객체는 힙에 올라가고, 내부에 Integer[] array = {1,2,3} 같은 식으로 저장해둬.
    하지만 “저장”은 단지 나중에 방출할 재료를 들고 있는 것일 뿐, 방출은 아직 없어.

     

    2) Netty/WebFlux에서 subscribe가 서버 I/O 루프랑 어떻게 붙는지

    이건 **“WebFlux 컨트롤러가 Flux를 리턴하면 누가 subscribe하냐?”**의 답이기도 해.

    2-1. 컨트롤러는 “Publisher를 리턴만” 한다

     

    @GetMapping("/nums")
    public Flux<Integer> nums() {
       return Flux.just(1,2,3).delayElements(Duration.ofMillis(100));
    }

    컨트롤러는 구독하지 않아.
    “파이프라인(Flux)”을 프레임워크에 넘기는 것뿐.

     

    2-2. Spring WebFlux의 처리 파이프라인

    요약하면:

    1. Netty가 HTTP 요청을 받음 (event loop 스레드)
    2. Spring이 라우팅해서 컨트롤러 실행
    3. 컨트롤러가 Publisher 반환
    4. Spring의 HttpHandler / ResponseWriter가 그 Publisher를 subscribe
    5. subscribe 과정에서 나온 onNext 데이터가 Netty response로 write됨

    즉, subscribe 주체는 프레임워크야.


    2-3. 더 구체적으로 “붙는 지점”

    WebFlux 내부에서 응답을 쓰는 컴포넌트가 대략 이렇게 동작해:

    • ServerResponse → writeTo(exchange, context)
    • 내부에서 BodyInserter가 Publisher<DataBuffer>를 얻음
    • HttpServerResponse(Reactor Netty)가 그걸 subscribe
    • 그리고 event loop에서 socket write

    큰 흐름

    Controller returns Flux<T>
       ↓
    Spring adapts Flux<T> → Publisher<DataBuffer>
       ↓
    Reactor Netty HttpServerResponse.send(Publisher<DataBuffer>)
       ↓
    send() internally subscribes
       ↓
    onNext(DataBuffer) → channel.write()
    728x90
    반응형
Designed by Tistory.