Search
📚

[SLASH 24] SSE 이벤트 푸쉬로 불필요한 Polling 제거하기

Tags
Architcture
Meet Up / Conference
Study
Last edited time
2024/11/12 10:19
2 more properties

1. 배경

1.1. 요청량 증가 → 부하 증가

준실시간성으로 데이터를 표현하기 위해 폴링 사용
사용자 급증으로 인해 CPU 사용량 증가

1.2. 현재 아키텍처

클라이언트에서 3초마다 Serving Layer에 폴링
Resource Layer에 부하 발생

1.3. 해결 방안 고민

재조회 Event가 발행될때만 계좌 API 조회
Push Layer를 구성하여 Client가 데이터를 조회하게 재구성

2. Push(SSE) Server 구성

2.1. WebSocket과 SSE(ServerSentEvent)

SSE
WebSocket
Protocol
HTTP
WebSocket
Direction
서버사이드 단방향
양방향
Auto Reconnect
O
X
Cookie
O
X
Response Type
Text
Text or Byte
토스 활용 사례
1) 양방향 통신이 필요할때 사용 2) 송 / 수신량 데이터량이 많을 때 사용 3) 시세 위주 사용 예) 화면 이동할때마다 시세를 구독할지 해지할지 표현하는 기능
1) 불필요한 폴링을 제거할 때 2) 개인화된 데이터를 이용한 이벤트 푸쉬 할때

2.2. SSE란?

클라이언트에서 HTTP 프로토콜을 통해 요청
서버는 text/event-stream 이라는 content-type으로 응답 반환
이후 client / server 간의 데이터 파이프라인이 형성되어 연결됨
서버에서 데이터 발행될때마다 주기적으로 데이터를 내려줌

2.3. Push Server 구성

클라이언트는 SSE 서버와 연결을 맺음
SSE 서버는 메세지 브로커를 통해 메세지를 전달 받음
메세지 제공 서버는 배치, API, Consumer 등을 통해 메세지를 브로커로 발행
클라이언트 사이드와 서버 사이드에 따라 메세지 전달 전략이 다름

3. Client side message 전달 전략

3.1. Broadcasting

3.1.1. 구성

메세지 브로커로부터 메세지를 받으면 모든 클라이언트에게 동일한 메세지 전달 (Fanout Broadcasting)
예시 코드
1.
클라이언트와 연결될 브로드캐스팅 채널 생성
2.
서버가 bootup된 이후 메세지 브로커로부터 메세지 수신
3.
수신받은 메세지는 SSE 메세지로 변환된 이후 클라이언트로 전달
// 1. 클라이언트와 연결될 브로드캐스팅 채널 생성 private val channel: Sinks.Many<ServerSentEvent<T>> = Sinks.many().multicast().directAllOrNothing() override val doProcess: Consumer<ReactiveSubscription.Message<String, String>> get() = Consumer { msg -> // 3. 수신받은 메세지는 SSE 메세지로 변환된 이후 클라이언트로 전달 val sseMsg = JsonUtil.fromJson(msg.message, object: TypeReference<SseMessage<T>>() {}) channelMeterRegistry.recordMessageSub(sseMsg.header.sentTime) super.send( channel, ServerSentEvent.builder<T>() .data(sseMsg.msg) .id(msg.channel) .build() ) } override fun afterPropertiesSet() { // 2. 서버가 bootup된 이후 메세지 브로커로부터 메세지 수신 super.onMessage(CHANNEL_NAME) .subscribe() } fun connect(): Flux<ServerSentEvent<T>> { return Flux.merge( super.connect(channel) {}, ) }
Kotlin
복사
@RestController class BroadcastEventController( @Qualifier(ChannelConfiguration.BROADCAST_CHANNEL) private val broadcastChannelHandler: BroadcastChannelHandler<String> ) { @GetMapping(value = ["/api/v1/live-chat"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE]) fun eventConnect(): Flux<ServerSentEvent<String>> { return broadcastChannelHandler.connect() } }
Kotlin
복사

3.1.2. 예시 - 이벤트성 단일 채팅방

이벤트성 단일 채팅방. 빠른 개발을 위해 Broadcasting 사용
아키텍처 구성
하지만 유저별은 어떻게? 개인화된 데이터를 이용한 이벤트 푸쉬는?
UniCasting 사용

3.2. UniCasting

3.2.1. 구성

SSE 서버는 각 유저별 단독 채널 생성
클라이언트는 본인의 채널만 구독
예시 코드
1.
단일로 연결할 채널 생성
2.
개인화된 data stream을 주입받고, SSE 메세지로 컨버팅 이후 클라이언트로 발송
class UnicastChannelHandler<T : Any>( private val channelMeterRegistry: ChannelMeterRegistry, private val inboundStream: Flux<String>, ) : ChannelOutBoundHandler<ServerSentEvent<T>>() { private val CHANNEL = Sinks.many().unicast().onBackpressureBuffer<ServerSentEvent<T>>() private val TYPE_REFERENCE = object : TypeReference<SseMessage<T>>() {} fun connect(): Flux<ServerSentEvent<T>> { return Flux.merge( // 1. 단일로 연결할 채널 생성 super.connect(CHANNEL) {}, // 2. 개인화된 data stream을 주입받고, SSE 메세지로 컨버팅 이후 클라이언트로 발송 inboundStream .map { JsonUtil.fromJson(it, TYPE_REFERENCE) } .map { sseMsg -> channelMeterRegistry.recordMessageSub(sseMsg.header.sentTime) ServerSentEvent.builder<T>() .data(sseMsg.msg) .id(sseMsg.header.key) .build() } ) } }
Kotlin
복사
개인용 data stream은 어떻게 만들지?
1.
스트림 생성
2.
본인의 메세지를 필터링할 수 있는 리스너 발급
3.
클라이언트와 연결 이후, 메세지를 수신하겠다고 메세지 브로커에 요청
4.
클라이언트와 SSE간의 연결이 끊어지면 메세지 브로커로부터 구독 취소 요청
fun createInboundStreamFromRedisPubSub( connection: StatefulRedisClusterPubSubConnection<String, String>, channel: String ): Flux<String> { // 1. 스트림 생성 return Flux.create { sink: FluxSink<String> -> // 2. 본인의 메세지를 필터링할 수 있는 리스너 발급 val pubSubListener = object : RedisPubSubAdapter<String, String>() { override fun message(messageChannel: String, message: String?) { if (channel == messageChannel && message != null) { sink.next(message) } } } val (_, subscriber) = shardConnection(connection.async().upstream(), channel) ?: return@create sink.error(UnreachableCodeException("redis connection error")) // 3. 클라이언트와 연결 이후, 메세지를 수신하겠다고 메세지 브로커에 요청 subscriber.statefulConnection.addListener(pubSubListener) subscriber.subscribe(channel) // 4. 클라이언트와 SSE간의 연결이 끊어지면 메세지 브로커로부터 구독 취소 요청 sink.onDispose { subscriber.unsubscribe(channel) subscriber.statefulConnection.removeListener(pubSubListener) } } }
Kotlin
복사

3.2.2. 예시 (1) 보유자산 polling 제거

기존에 주기적으로 폴링 진행
3초마다 보유종목 API 폴링
S 전자 1주 구매 → 재조회 이벤트 발행 → 조회 DB 업데이트
SSE 서버를 중간에 두고, 재조회 이벤트가 발행될 때만 보유종목 API를 호출하여 성능 개선

3.2.3. 예시 (2) 안드로이드 푸쉬알람 대체

푸시 알림
FCM Message Push 을 사용하나, FCM 요청 메세지 Capacity 초과 이슈 발생
FCM Push Server와 SSE 서버를 접속/미접속 유저에 따라 다르게 사용
접속 유저 - SSE Sever를 통해 클라이언트에게 푸시 전달
미접속 유저 - FCM 서버를 통해 클라이언트에게 푸시 전달
세션은 어떻게? (접속/미접속 유저 구별은 어떻게?)
Redis PubSub을 사용
SSE 서버에 연결하게 되면 SSE 서버가 Redis Pubsub을 통해 특정 유저를 지칭하는 토픽을 구독
예) client1 토픽을 subscribe
FCM 서버는 특정 유저를 지칭하는 토픽에 메세지 발행
예) client1 msg를 publish
Redis PubSub에 연결된 사용자(예. subscribe cnt)가 있는 경우, 사용자가 있다고 응답을 FCM 서버에 반환
예) client1 1 이 있다고 응답을 FCM 서버에 반환
이를 통해 세션 테이블을 따로 구현하지 않고, 접속 유저 / 미접속 유저를 구별할 수 있음

4. Server side message 전달 전략

Message broker 선택 전략

4.1. Kafka

SSE 서버를 구성할때, 서버마다 groupId를 발급해서 모든 메세지를 수신하여 연결된 사용자에게만 데이터를 내려주면 카프카로 구현 가능
하지만, At Least Once를 지원하는 카프카는 SSE 서버와 어울리지 않다고 판단
접속/미접속 유저 모두에게 SSE 서버로 메세지가 발행됨. 불필요한 I/O, 메세지 필터링 기능 구현 필요
At Most Once 도구 탐색 중 Redis PubSub을 찾음

4.2. Redis PubSub

SSE 서버는 Redis PubSub에 연결된 사용자에게만 subscribe 요청
Publisher는 Redis PubSub끼리만 요청
SSE 서버는 Redis PubSub에게 subscribe 하는 경우에만 데이터가 오게되고, 카프카 처럼 모든 데이터를 받지 않음
성능 테스트 (broker throughput)
60K

4.3. NATS

broker throughput 더 좋은 도구 탐색 과정에서 발견
고랭 기반의 pub/sub 기반의 모델로 만들어진 도구
단일 모드
클러스터 모드
슈퍼 클러스터 모드

4.3.1. 클러스터 모드

기본 매커니즘
클라이언트 A가 클라이언트 D에게 메세지 전송
동일한 Subject(Redis PubSub의 channel의 개념)를 구독하고 있는 구독자에게만 데이터 전송

4.3.2. 클러스터 모드 예시

NATS 2번 노드에게 msg 라는 subject를 발행 요청
client port를 통해 데이터 전송
cluster port를 통해 나츠 1번으로 메세지 전송
NATS 1번 노드는 연결된 사용자를 알고 있어서, 그 데이터를 전송해주게 됨

4.3.3. NATS Server Node 장부

NATS 2번 노드는 1번 노드에게 subscriber가 있다는것을 어떻게 알았을까?
NATS 서버 노드 내부에는 본인이 라우팅할 장부, 타서버로 라우팅할 장부 2가지가 존재
타서버 라우팅 정보는 타서버로 요청을 돌림
구독과 해지 시점에서 아래 명령어를 통해 FANOUT으로 라우팅 장부에 추가/제거
RS+ (추가 명령어)
RS- (제거 명령어)

4.3.4. 성능

레디스 펍섭에 비해 5배 좋은 성능 → 최종적으로 메세지 브로커로 채택

5. Heartbeat 이슈 트러블 슈팅

5.1. Heartbeat

웹소켓 /SSE 등은 일반적으로 heartbeat를 통해 좀비 커넥션 확인
import reactor.core.publisher.Flux import reactor.util.function.Tuple2 import java.time.Duration import org.springframework.http.codec.ServerSentEvent private fun <T> heartbeat(): Flux<ServerSentEvent<T>> { return Flux.interval(Duration.ofSeconds(2)) // 2초마다 이벤트 발생 .map { ServerSentEvent.builder<T>() .comment("heartbeat") // "heartbeat" 코멘트 추가 .retry(Duration.ofHours(1)) // 연결 실패 시 1시간 후 재시도 .build() } }
Kotlin
복사
HTTP API를 보니 2*n 초 레이턴시 발견
GET /api/v1/sse → 200 took 10000ms
2초, 4초, 6초 등등…

5.2. AS IS 구조

L7 http proxy & k8s cluster 로 구성
k8s 클러스터와 L7 http proxy간의 TCP dump 확인
HTTP heartbeat를 날리고 나서 L7 http proxy에서 이미 끊어진 connection에 대해 ACK를 응답하다보니 RST가 날라오는 것을 확인
RST 자체를 주는것은 상용에서 비일비재할 수 있지만 heartbeat를 주고나서 RST가 계속 발생하는 것은 이상하다고 추측

5.3. TO BE 구조

L7 장비 이슈인 것이 확인
L7을 TCP Proxy 모드로 변경후 SSE Connection이 정상적으로 close 되는 것이 확인

6. Reference