Search
📚

[SLASH 24] SSE 이벤트 푸쉬로 불필요한 Polling 제거하기

Tags
Architcture
Meet Up / Conference
Study
Last edited time
2024/11/13 06:49
2 more properties

1. 배경

1.1. 요청량 증가 → 부하 증가

준실시간성으로 데이터를 표현하기 위해 폴링 사용
사용자 급증으로 인해 CPU 사용량 증가

1.2. 현재 아키텍처

클라이언트에서 3초마다 Serving Layer에 폴링
Resource Layer에 부하 발생

1.3. 해결 방안 고민

재조회 Event가 발행될때만 계좌 API 조회
Push Layer를 구성하여 Client가 데이터를 조회하게 재구성

2. Push(SSE) Server 구성

2.1. WebSocket과 SSE(ServerSentEvent)

SSE
WebSocket
Protocol
HTTP
WebSocket
Direction
서버사이드 단방향
양방향
Auto Reconnect
O
X
Cookie
O
X
Response Type
Text
Text or Byte
토스 활용 사례
1) 양방향 통신이 필요할때 사용 2) 송 / 수신량 데이터량이 많을 때 사용 3) 시세 위주 사용 예) 화면 이동할때마다 시세를 구독할지 해지할지 표현하는 기능
1) 불필요한 폴링을 제거할 때 2) 개인화된 데이터를 이용한 이벤트 푸쉬 할때

2.2. SSE란?

클라이언트에서 HTTP 프로토콜을 통해 요청
서버는 text/event-stream 이라는 content-type으로 응답 반환
이후 client / server 간의 데이터 파이프라인이 형성되어 연결됨
서버에서 데이터 발행될때마다 주기적으로 데이터를 내려줌

2.3. Push Server 구성

클라이언트는 SSE 서버와 연결을 맺음
SSE 서버는 메세지 브로커를 통해 메세지를 전달 받음
메세지 제공 서버는 배치, API, Consumer 등을 통해 메세지를 브로커로 발행
클라이언트 사이드와 서버 사이드에 따라 메세지 전달 전략이 다름

3. Client side message 전달 전략

3.1. Broadcasting

3.1.1. 구성

메세지 브로커로부터 메세지를 받으면 모든 클라이언트에게 동일한 메세지 전달 (Fanout Broadcasting)
예시 코드
1.
클라이언트와 연결될 브로드캐스팅 채널 생성
2.
서버가 bootup된 이후 메세지 브로커로부터 메세지 수신
3.
수신받은 메세지는 SSE 메세지로 변환된 이후 클라이언트로 전달
// 1. 클라이언트와 연결될 브로드캐스팅 채널 생성 private val channel: Sinks.Many<ServerSentEvent<T>> = Sinks.many().multicast().directAllOrNothing() override val doProcess: Consumer<ReactiveSubscription.Message<String, String>> get() = Consumer { msg -> // 3. 수신받은 메세지는 SSE 메세지로 변환된 이후 클라이언트로 전달 val sseMsg = JsonUtil.fromJson(msg.message, object: TypeReference<SseMessage<T>>() {}) channelMeterRegistry.recordMessageSub(sseMsg.header.sentTime) super.send( channel, ServerSentEvent.builder<T>() .data(sseMsg.msg) .id(msg.channel) .build() ) } override fun afterPropertiesSet() { // 2. 서버가 bootup된 이후 메세지 브로커로부터 메세지 수신 super.onMessage(CHANNEL_NAME) .subscribe() } fun connect(): Flux<ServerSentEvent<T>> { return Flux.merge( super.connect(channel) {}, ) }
Kotlin
복사
@RestController class BroadcastEventController( @Qualifier(ChannelConfiguration.BROADCAST_CHANNEL) private val broadcastChannelHandler: BroadcastChannelHandler<String> ) { @GetMapping(value = ["/api/v1/live-chat"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE]) fun eventConnect(): Flux<ServerSentEvent<String>> { return broadcastChannelHandler.connect() } }
Kotlin
복사

3.1.2. 예시 - 이벤트성 단일 채팅방

이벤트성 단일 채팅방. 빠른 개발을 위해 Broadcasting 사용
아키텍처 구성
하지만 유저별은 어떻게? 개인화된 데이터를 이용한 이벤트 푸쉬는?
UniCasting 사용

3.2. UniCasting

3.2.1. 구성

SSE 서버는 각 유저별 단독 채널 생성
클라이언트는 본인의 채널만 구독
예시 코드
1.
단일로 연결할 채널 생성
2.
개인화된 data stream을 주입받고, SSE 메세지로 컨버팅 이후 클라이언트로 발송
class UnicastChannelHandler<T : Any>( private val channelMeterRegistry: ChannelMeterRegistry, private val inboundStream: Flux<String>, ) : ChannelOutBoundHandler<ServerSentEvent<T>>() { private val CHANNEL = Sinks.many().unicast().onBackpressureBuffer<ServerSentEvent<T>>() private val TYPE_REFERENCE = object : TypeReference<SseMessage<T>>() {} fun connect(): Flux<ServerSentEvent<T>> { return Flux.merge( // 1. 단일로 연결할 채널 생성 super.connect(CHANNEL) {}, // 2. 개인화된 data stream을 주입받고, SSE 메세지로 컨버팅 이후 클라이언트로 발송 inboundStream .map { JsonUtil.fromJson(it, TYPE_REFERENCE) } .map { sseMsg -> channelMeterRegistry.recordMessageSub(sseMsg.header.sentTime) ServerSentEvent.builder<T>() .data(sseMsg.msg) .id(sseMsg.header.key) .build() } ) } }
Kotlin
복사
개인용 data stream은 어떻게 만들지?
1.
스트림 생성
2.
본인의 메세지를 필터링할 수 있는 리스너 발급
3.
클라이언트와 연결 이후, 메세지를 수신하겠다고 메세지 브로커에 요청
4.
클라이언트와 SSE간의 연결이 끊어지면 메세지 브로커로부터 구독 취소 요청
fun createInboundStreamFromRedisPubSub( connection: StatefulRedisClusterPubSubConnection<String, String>, channel: String ): Flux<String> { // 1. 스트림 생성 return Flux.create { sink: FluxSink<String> -> // 2. 본인의 메세지를 필터링할 수 있는 리스너 발급 val pubSubListener = object : RedisPubSubAdapter<String, String>() { override fun message(messageChannel: String, message: String?) { if (channel == messageChannel && message != null) { sink.next(message) } } } val (_, subscriber) = shardConnection(connection.async().upstream(), channel) ?: return@create sink.error(UnreachableCodeException("redis connection error")) // 3. 클라이언트와 연결 이후, 메세지를 수신하겠다고 메세지 브로커에 요청 subscriber.statefulConnection.addListener(pubSubListener) subscriber.subscribe(channel) // 4. 클라이언트와 SSE간의 연결이 끊어지면 메세지 브로커로부터 구독 취소 요청 sink.onDispose { subscriber.unsubscribe(channel) subscriber.statefulConnection.removeListener(pubSubListener) } } }
Kotlin
복사

3.2.2. 예시 (1) 보유자산 polling 제거

기존에 주기적으로 폴링 진행
3초마다 보유종목 API 폴링
S 전자 1주 구매 → 재조회 이벤트 발행 → 조회 DB 업데이트
SSE 서버를 중간에 두고, 재조회 이벤트가 발행될 때만 보유종목 API를 호출하여 성능 개선

3.2.3. 예시 (2) 안드로이드 푸쉬알람 대체

푸시 알림
FCM Message Push 을 사용하나, FCM 요청 메세지 Capacity 초과 이슈 발생
FCM Push Server와 SSE 서버를 접속/미접속 유저에 따라 다르게 사용
접속 유저 - SSE Sever를 통해 클라이언트에게 푸시 전달
미접속 유저 - FCM 서버를 통해 클라이언트에게 푸시 전달
세션은 어떻게? (접속/미접속 유저 구별은 어떻게?)
Redis PubSub을 사용
SSE 서버에 연결하게 되면 SSE 서버가 Redis Pubsub을 통해 특정 유저를 지칭하는 토픽을 구독
예) client1 토픽을 subscribe
FCM 서버는 특정 유저를 지칭하는 토픽에 메세지 발행
예) client1 msg를 publish
Redis PubSub에 연결된 사용자(예. subscribe cnt)가 있는 경우, 사용자가 있다고 응답을 FCM 서버에 반환
예) client1 1 이 있다고 응답을 FCM 서버에 반환
이를 통해 세션 테이블을 따로 구현하지 않고, 접속 유저 / 미접속 유저를 구별할 수 있음
접속 유저 식별에 대한 추가 부연 설명
SSE 서버에서의 구독(Subscribe): 사용자가 SSE 서버에 연결하면, 해당 SSE 서버는 Redis PubSub을 통해 특정 유저를 지칭하는 토픽(e.g., client1)을 구독합니다. Redis PubSub의 구독 정보는 Redis 서버에 저장되므로, 현재 구독 중인 유저가 접속 중이라는 정보를 파악할 수 있습니다.
미접속 유저 식별: 구독을 해제하거나, SSE 서버에서 연결이 끊어지면 Redis PubSub은 자동으로 해당 유저의 토픽 구독 상태를 업데이트합니다. 즉, 특정 유저의 토픽 구독 수(subscribe count)가 0이면 미접속 유저로 간주할 수 있습니다.

4. Server side message 전달 전략

Message broker 선택 전략

4.1. Kafka

SSE 서버를 구성할때, 서버마다 groupId를 발급해서 모든 메세지를 수신하여 연결된 사용자에게만 데이터를 내려주면 카프카로 구현 가능
하지만, At Least Once를 지원하는 카프카는 SSE 서버와 어울리지 않다고 판단
접속/미접속 유저 모두에게 SSE 서버로 메세지가 발행됨. 불필요한 I/O, 메세지 필터링 기능 구현 필요
At Most Once 도구 탐색 중 Redis PubSub을 찾음

4.2. Redis PubSub

SSE 서버는 Redis PubSub에 연결된 사용자에게만 subscribe 요청
Publisher는 Redis PubSub끼리만 요청
SSE 서버는 Redis PubSub에게 subscribe 하는 경우에만 데이터가 오게되고, 카프카 처럼 모든 데이터를 받지 않음
성능 테스트 (broker throughput)
60K

4.3. NATS

broker throughput 더 좋은 도구 탐색 과정에서 발견
고랭 기반의 pub/sub 기반의 모델로 만들어진 도구
단일 모드
클러스터 모드
슈퍼 클러스터 모드

4.3.1. 클러스터 모드

기본 매커니즘
클라이언트 A가 클라이언트 D에게 메세지 전송
동일한 Subject(Redis PubSub의 channel의 개념)를 구독하고 있는 구독자에게만 데이터 전송

4.3.2. 클러스터 모드 예시

NATS 2번 노드에게 msg 라는 subject를 발행 요청
client port를 통해 데이터 전송
cluster port를 통해 나츠 1번으로 메세지 전송
NATS 1번 노드는 연결된 사용자를 알고 있어서, 그 데이터를 전송해주게 됨

4.3.3. NATS Server Node 장부

NATS 2번 노드는 1번 노드에게 subscriber가 있다는것을 어떻게 알았을까?
NATS 서버 노드 내부에는 본인이 라우팅할 장부, 타서버로 라우팅할 장부 2가지가 존재
타서버 라우팅 정보는 타서버로 요청을 돌림
구독과 해지 시점에서 아래 명령어를 통해 FANOUT으로 라우팅 장부에 추가/제거
RS+ (추가 명령어)
RS- (제거 명령어)

4.3.4. 성능

레디스 펍섭에 비해 5배 좋은 성능 → 최종적으로 메세지 브로커로 채택

5. Heartbeat 이슈 트러블 슈팅

5.1. Heartbeat

웹소켓 /SSE 등은 일반적으로 heartbeat를 통해 좀비 커넥션 확인
import reactor.core.publisher.Flux import reactor.util.function.Tuple2 import java.time.Duration import org.springframework.http.codec.ServerSentEvent private fun <T> heartbeat(): Flux<ServerSentEvent<T>> { return Flux.interval(Duration.ofSeconds(2)) // 2초마다 이벤트 발생 .map { ServerSentEvent.builder<T>() .comment("heartbeat") // "heartbeat" 코멘트 추가 .retry(Duration.ofHours(1)) // 연결 실패 시 1시간 후 재시도 .build() } }
Kotlin
복사
HTTP API를 보니 2*n 초 레이턴시 발견
GET /api/v1/sse → 200 took 10000ms
2초, 4초, 6초 등등…

5.2. AS IS 구조

L7 http proxy & k8s cluster 로 구성
k8s 클러스터와 L7 http proxy간의 TCP dump 확인
HTTP heartbeat를 날리고 나서 L7 http proxy에서 이미 끊어진 connection에 대해 ACK를 응답하다보니 RST가 날라오는 것을 확인
RST 자체를 주는것은 상용에서 비일비재할 수 있지만 heartbeat를 주고나서 RST가 계속 발생하는 것은 이상하다고 추측

5.3. TO BE 구조

L7 장비 이슈인 것이 확인
L7을 TCP Proxy 모드로 변경후 SSE Connection이 정상적으로 close 되는 것이 확인

5.4. Heartbeat 이슈 추가 부연 설명

끊어진 연결에서 Heartbeat 패킷이 전달되는 상황이 조금 이해가 되지 않아 자료를 조금 더 살펴보았다. 살펴본 내용을 요약하자면 다음과 같다.

5.4.1. 상황

1.
Heartbeat 전송 시도
클라이언트는 연결이 끊어진 상태를 모른 채 Heartbeat 패킷을 프록시에 전송
프록시는 여전히 이 연결이 유효하다고 생각하고, 클라이언트로부터 받은 Heartbeat 패킷을 서버로 전달
2.
비정상적인 응답 처리 (RST 패킷)
서버는 이미 이 연결이 끊어진 상태이기 때문에, 프록시로부터 온 Heartbeat 패킷에 응답할 수 없음
서버는 TCP 프로토콜의 규칙에 따라 "이 연결이 존재하지 않는다"는 의미로 RST(Reset) 패킷을 프록시에 전송
3.
RST 패킷의 반복 발생
프록시는 Heartbeat 패킷을 다시 서버에 보내지만, 서버는 여전히 연결이 끊어졌다고 판단하여 RST 패킷을 다시 보내게 됨
이 과정이 반복되면서 프록시는 끊어진 연결을 제대로 정리하지 않고 계속 Heartbeat 패킷을 서버로 보내려고 하기 때문에, 서버에서 RST 패킷이 계속 발생함
4.
레벨 증가 및 리소스 소모
이러한 RST 패킷이 계속 발생하면서 서버와 프록시 간의 연결은 끊어진 상태임에도 불구하고 이를 계속 시도하면서 연결을 유지하려는 불필요한 오버헤드가 발생함
프록시는 응답이 없으면 타임아웃까지 기다리게 되므로, 실제로 Heartbeat 주기마다 레이턴시가 증가하게 됨

5.4.2. 결과적으로 발생하는 문제

이 과정을 통해 끊어진 연결임에도 불구하고 L7 Proxy는 계속해서 비정상 연결을 유지하려고 시도함. 이는 다음과 같은 문제로 이어짐
1.
레이터시 증가: 끊어진 연결에 대해 계속 응답을 기다리기 때문에 요청/응답에 대한 레이턴시가 누적됨
2.
RST 패킷의 과도한 발생: 끊어진 연결에 대해 프록시가 계속해서 Heartbeat 패킷을 전달하려 하면서 서버에서 RST 패킷이 발생함
3.
리소스 낭비: 프록시와 서버 간 불필요한 패킷 전송이 반복되어 시스템 리소스가 낭비되고, 서버는 끊어진 연결을 처리하는데 오버헤드가 발생하게 됨
이 문제를 방지하려면 L7 Proxy가 유휴 상태의 연결을 주기적으로 정리하거나, TCP Proxy와 같이 낮은 계층에서 비정상 연결을 효율적으로 감지하고 정리하는 방식으로 해결할 수 있음
TCP Proxy로 변경하면 HTTP 세션 추적 없이 TCP 연결만 중개하므로, SSE 연결이 끊어진 경우에도 추가적인 세션 관리나 RST 패킷 발생이 없음. 연결 유지에 필요한 부가 처리가 없어지기 때문에 Heartbeat에서 발생하는 레이턴시가 줄어들며, 서버는 TCP 연결 상태를 통해 효율적으로 접속을 관리하게 됨

5.5. L7 Proxy와 TCP Proxy의 비교

5.5.1. L7 Proxy (Layer 7 HTTP Proxy)

L7 프록시는 OSI 7계층의 애플리케이션 계층에서 동작하는 프록시로, HTTP와 같은 애플리케이션 프로토콜을 이해하고 처리함
요청 데이터를 검사하고, URL 경로나 헤더에 따라 요청을 분배하며, 이를 통해 고급 라우팅이나 부하 분산을 지원할 수 있음
HTTP 세션을 이해하고 처리하기 때문에, 세션 유지나 특정 헤더 기반 라우팅, 쿠키 조작 등 복잡한 처리가 가능함

5.5.2. TCP Proxy (Layer 4 Proxy)

TCP 프록시는 OSI 4계층의 전송 계층에서 동작하며, 애플리케이션 계층 정보를 이해하지 않고 TCP 연결을 단순히 전달하는 역할을 함
패킷 수준에서 데이터 흐름을 처리하기 때문에 L4 프록시는 HTTP와 같은 특정 애플리케이션 프로토콜에 종속되지 않음
애플리케이션 계층의 세부 사항을 신경 쓰지 않으므로, 트래픽을 빠르게 전달할 수 있으며, 주로 단순한 부하 분산을 위해 사용됨

5.5.3. L7 HTTP Proxy와 TCP Proxy의 차이점

세션 유지 방식
L7 HTTP Proxy는 HTTP 세션을 유지해야 하므로 세션 관리나 상태를 계속 추적함. 반면 TCP Proxy는 TCP 연결만 중개하고, 세션 관리가 필요 없으므로 가볍게 작동함
트래픽 처리 방식
HTTP Proxy는 요청을 처리하고 분석하는 시간이 추가로 소요됩니다. L7 프록시가 연결을 끊는 경우 TCP 프로토콜 수준에서 재확인 없이 연결이 종료될 수 있음
TCP Proxy는 애플리케이션 계층 프로토콜을 신경 쓰지 않고 TCP 수준에서만 연결을 유지하므로, L7 Proxy보다 빠르고 단순하게 트래픽을 전달함

6. Reference