728x90
반응형
Server Sent Event(SSE) 가 무엇인가?
Server-sent events (SSE) 는 Real time으로 데이터를 클라이언트에 전송할수 있도록 하는 기술이다.
SSE는 http 통신이며, long-lived 커넥션 기반으로 서버에서 클라이언트로 데이터를 전송한다.
Sse 커넥션 관리 객체 생성
SseEmitters.java
@Component
@Slf4j
public class SseEmitters {
private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
public SseEmitter add(SseEmitter emitter) {
this.emitters.add(emitter);
log.info("new emitter added: {}", emitter);
log.info("emitter list size: {}", emitters.size());
emitter.onCompletion(() -> {
log.info("onCompletion callback");
this.emitters.remove(emitter); // 만료되면 리스트에서 삭제
});
emitter.onTimeout(() -> {
log.info("onTimeout callback");
emitter.complete();
});
return emitter;
}
public void sendMessage(String message) {
emitters.forEach(emitter -> {
try {
emitter.send(SseEmitter.event()
.name("topic")
.data(message));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
}
커넥션연결이 올때마다 해당 커넥션에 정보를 저장하는 add 메소드를 저장한다.
또한 현재 프로젝트는 카프카를 토대로 진행중이여서 카프카에서 받는 데이터를 그대로 클라이언트로 데이터 전달을 위해
sendMessage 메소드를 만들었다.
또한 sse는 http통신이며 , 그러므로 커넥션이 주기적으로 끊킨다. 그래서 저장된 sseemitter중에 현재 연결이 종료된 객체를 제거해주는 작업을 해주며, 해당작업은 thread-safe여야 하므로 CopyOnWriteArray를 사용한다.
SSe 커넥션 연결 컨트롤러 생성
SseController.java
@RestController
@AllArgsConstructor
public class SseController {
private final SseEmitters sseEmitters;
@CrossOrigin("*")
@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> connect() {
SseEmitter emitter = new SseEmitter();
sseEmitters.add(emitter);
try {
emitter.send(SseEmitter.event()
.name("connect")
.data("connected!"));
} catch (IOException e) {
throw new RuntimeException(e);
}
return ResponseEntity.ok(emitter);
}
}
현재 http 통신중 connect url를 통해 들어오는것에 대해 sse 연결을 진행해준다.
카프카 Listener에서 데이터 전송
DataWebHandler
@Component
@AllArgsConstructor
public class DataWebHandler {
private final SseEmitters sseEmitters;
@KafkaListener(topics = "data-topic",groupId = "data-group")
public void handleData(String message){
sseEmitters.sendMessage(message);
}
}
Client에서 데이터 받기 with React.js
import React, { useState, useEffect } from 'react';
const StockTicker2 = () => {
const [eventData, setEventData] = useState(null);
useEffect(() => {
const eventSource = new EventSource('http://localhost:8080/connect');
eventSource.onopen = () => {
console.log('SSE connection established.');
};
eventSource.onerror = (error) => {
console.error('SSE error:', error);
};
eventSource.addEventListener('topic', e => {
const { data: price } = e;
console.log("count event data",price);
setEventData(price);
});
return () => {
eventSource.close();
};
}, []);
return (
<div>
<h1>Real-time Stock Ticker</h1>
{eventData && (
<div>
<p>Price: {eventData}</p>
</div>
)}
</div>
);
};
export default StockTicker2;
값이 계속 변해가는것을 확인할수있따.
728x90
반응형
댓글