본문 바로가기
자바웹프로그래밍/HTTP

주기적인 client 데이터 전송 Server Sent Event(SSE) 구축하기 with spring boot

by 디찌s 2024. 4. 3.
728x90
반응형

 

 

Server Sent Event(SSE) 가 무엇인가?

Server-sent events (SSE) 는 Real time으로 데이터를 클라이언트에 전송할수 있도록 하는 기술이다.

SSE는 http 통신이며, long-lived 커넥션 기반으로 서버에서 클라이언트로 데이터를 전송한다.

 

 

 

 

 

Sse 커넥션 관리 객체 생성

SseEmitters.java

@Component  
@Slf4j  
public class SseEmitters {  
  
    private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();  
  
    public SseEmitter add(SseEmitter emitter) {  
        this.emitters.add(emitter);  
        log.info("new emitter added: {}", emitter);  
        log.info("emitter list size: {}", emitters.size());  
        emitter.onCompletion(() -> {  
            log.info("onCompletion callback");  
            this.emitters.remove(emitter);    // 만료되면 리스트에서 삭제
        });  
        emitter.onTimeout(() -> {  
            log.info("onTimeout callback");  
            emitter.complete();  
        });  
  
        return emitter;  
    }  

      public void sendMessage(String message) {  
        emitters.forEach(emitter -> {  
            try {  
                emitter.send(SseEmitter.event()  
                        .name("topic")  
                        .data(message));  
            } catch (IOException e) {  
                throw new RuntimeException(e);  
            }  
        });  
    }  
}

 

커넥션연결이 올때마다 해당 커넥션에 정보를 저장하는 add 메소드를 저장한다.

또한 현재 프로젝트는 카프카를 토대로 진행중이여서 카프카에서 받는 데이터를 그대로 클라이언트로 데이터 전달을 위해

sendMessage 메소드를 만들었다.

 

또한 sse는 http통신이며 , 그러므로 커넥션이 주기적으로 끊킨다. 그래서 저장된 sseemitter중에 현재 연결이 종료된 객체를 제거해주는 작업을 해주며, 해당작업은 thread-safe여야 하므로 CopyOnWriteArray를 사용한다.

 

SSe 커넥션 연결 컨트롤러 생성

SseController.java

 

@RestController
@AllArgsConstructor
public class SseController {
    private final SseEmitters sseEmitters; 
  
  
    @CrossOrigin("*")
    @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)  
    public ResponseEntity<SseEmitter> connect() {  
        SseEmitter emitter = new SseEmitter();  
        sseEmitters.add(emitter);
        try {  
            emitter.send(SseEmitter.event()  
                    .name("connect")  
                    .data("connected!"));  
        } catch (IOException e) {  
            throw new RuntimeException(e);  
        }  
        return ResponseEntity.ok(emitter);  
    }  
}

 

현재 http 통신중 connect url를 통해 들어오는것에 대해 sse 연결을 진행해준다.

 

카프카 Listener에서  데이터 전송

DataWebHandler

@Component
@AllArgsConstructor
public class DataWebHandler {


    private final SseEmitters sseEmitters;
    
    @KafkaListener(topics = "data-topic",groupId = "data-group")
    public void handleData(String message){
        sseEmitters.sendMessage(message);
    }
    
}

 

 

 

Client에서 데이터 받기 with React.js

import React, { useState, useEffect } from 'react';

const StockTicker2 = () => {
    const [eventData, setEventData] = useState(null);

    useEffect(() => {
      const eventSource = new EventSource('http://localhost:8080/connect');
  
      eventSource.onopen = () => {
        console.log('SSE connection established.');
      };
  
      eventSource.onerror = (error) => {
        console.error('SSE error:', error);
      };

      eventSource.addEventListener('topic', e => {  
        const { data: price } = e; 
        console.log("count event data",price);  
        setEventData(price);
    });
  
  
      return () => {
        eventSource.close();
      };
    }, []);

    return (
        <div>
            <h1>Real-time Stock Ticker</h1>
            {eventData && (
                <div>
                    <p>Price: {eventData}</p>
                </div>
            )}
        </div>
    );
};

export default StockTicker2;

 

 

 

값이 계속 변해가는것을 확인할수있따.

728x90
반응형

댓글