1. 배경
1.1. 요청량 증가 → 부하 증가
•
준실시간성으로 데이터를 표현하기 위해 폴링 사용
•
사용자 급증으로 인해 CPU 사용량 증가
1.2. 현재 아키텍처
•
클라이언트에서 3초마다 Serving Layer에 폴링
•
Resource Layer에 부하 발생
1.3. 해결 방안 고민
•
재조회 Event가 발행될때만 계좌 API 조회
•
Push Layer를 구성하여 Client가 데이터를 조회하게 재구성
2. Push(SSE) Server 구성
2.1. WebSocket과 SSE(ServerSentEvent)
SSE | WebSocket | |
Protocol | HTTP | WebSocket |
Direction | 서버사이드 단방향 | 양방향 |
Auto Reconnect | O | X |
Cookie | O | X |
Response Type | Text | Text or Byte |
토스 활용 사례 | 1) 양방향 통신이 필요할때 사용
2) 송 / 수신량 데이터량이 많을 때 사용
3) 시세 위주 사용
예) 화면 이동할때마다 시세를 구독할지 해지할지 표현하는 기능 | 1) 불필요한 폴링을 제거할 때
2) 개인화된 데이터를 이용한 이벤트 푸쉬 할때 |
2.2. SSE란?
•
클라이언트에서 HTTP 프로토콜을 통해 요청
•
서버는 text/event-stream 이라는 content-type으로 응답 반환
•
이후 client / server 간의 데이터 파이프라인이 형성되어 연결됨
•
서버에서 데이터 발행될때마다 주기적으로 데이터를 내려줌
2.3. Push Server 구성
•
클라이언트는 SSE 서버와 연결을 맺음
•
SSE 서버는 메세지 브로커를 통해 메세지를 전달 받음
•
메세지 제공 서버는 배치, API, Consumer 등을 통해 메세지를 브로커로 발행
•
클라이언트 사이드와 서버 사이드에 따라 메세지 전달 전략이 다름
3. Client side message 전달 전략
3.1. Broadcasting
3.1.1. 구성
•
메세지 브로커로부터 메세지를 받으면 모든 클라이언트에게 동일한 메세지 전달 (Fanout Broadcasting)
•
예시 코드
1.
클라이언트와 연결될 브로드캐스팅 채널 생성
2.
서버가 bootup된 이후 메세지 브로커로부터 메세지 수신
3.
수신받은 메세지는 SSE 메세지로 변환된 이후 클라이언트로 전달
// 1. 클라이언트와 연결될 브로드캐스팅 채널 생성
private val channel: Sinks.Many<ServerSentEvent<T>> =
Sinks.many().multicast().directAllOrNothing()
override val doProcess: Consumer<ReactiveSubscription.Message<String, String>>
get() = Consumer { msg ->
// 3. 수신받은 메세지는 SSE 메세지로 변환된 이후 클라이언트로 전달
val sseMsg = JsonUtil.fromJson(msg.message, object: TypeReference<SseMessage<T>>() {})
channelMeterRegistry.recordMessageSub(sseMsg.header.sentTime)
super.send(
channel,
ServerSentEvent.builder<T>()
.data(sseMsg.msg)
.id(msg.channel)
.build()
)
}
override fun afterPropertiesSet() {
// 2. 서버가 bootup된 이후 메세지 브로커로부터 메세지 수신
super.onMessage(CHANNEL_NAME)
.subscribe()
}
fun connect(): Flux<ServerSentEvent<T>> {
return Flux.merge(
super.connect(channel) {},
)
}
Kotlin
복사
@RestController
class BroadcastEventController(
@Qualifier(ChannelConfiguration.BROADCAST_CHANNEL)
private val broadcastChannelHandler: BroadcastChannelHandler<String>
) {
@GetMapping(value = ["/api/v1/live-chat"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun eventConnect(): Flux<ServerSentEvent<String>> {
return broadcastChannelHandler.connect()
}
}
Kotlin
복사
3.1.2. 예시 - 이벤트성 단일 채팅방
•
이벤트성 단일 채팅방. 빠른 개발을 위해 Broadcasting 사용
•
아키텍처 구성
•
하지만 유저별은 어떻게? 개인화된 데이터를 이용한 이벤트 푸쉬는?
◦
UniCasting 사용
3.2. UniCasting
3.2.1. 구성
•
SSE 서버는 각 유저별 단독 채널 생성
•
클라이언트는 본인의 채널만 구독
•
예시 코드
1.
단일로 연결할 채널 생성
2.
개인화된 data stream을 주입받고, SSE 메세지로 컨버팅 이후 클라이언트로 발송
class UnicastChannelHandler<T : Any>(
private val channelMeterRegistry: ChannelMeterRegistry,
private val inboundStream: Flux<String>,
) : ChannelOutBoundHandler<ServerSentEvent<T>>() {
private val CHANNEL = Sinks.many().unicast().onBackpressureBuffer<ServerSentEvent<T>>()
private val TYPE_REFERENCE = object : TypeReference<SseMessage<T>>() {}
fun connect(): Flux<ServerSentEvent<T>> {
return Flux.merge(
// 1. 단일로 연결할 채널 생성
super.connect(CHANNEL) {},
// 2. 개인화된 data stream을 주입받고, SSE 메세지로 컨버팅 이후 클라이언트로 발송
inboundStream
.map { JsonUtil.fromJson(it, TYPE_REFERENCE) }
.map { sseMsg ->
channelMeterRegistry.recordMessageSub(sseMsg.header.sentTime)
ServerSentEvent.builder<T>()
.data(sseMsg.msg)
.id(sseMsg.header.key)
.build()
}
)
}
}
Kotlin
복사
•
개인용 data stream은 어떻게 만들지?
1.
스트림 생성
2.
본인의 메세지를 필터링할 수 있는 리스너 발급
3.
클라이언트와 연결 이후, 메세지를 수신하겠다고 메세지 브로커에 요청
4.
클라이언트와 SSE간의 연결이 끊어지면 메세지 브로커로부터 구독 취소 요청
fun createInboundStreamFromRedisPubSub(
connection: StatefulRedisClusterPubSubConnection<String, String>,
channel: String
): Flux<String> {
// 1. 스트림 생성
return Flux.create { sink: FluxSink<String> ->
// 2. 본인의 메세지를 필터링할 수 있는 리스너 발급
val pubSubListener = object : RedisPubSubAdapter<String, String>() {
override fun message(messageChannel: String, message: String?) {
if (channel == messageChannel && message != null) {
sink.next(message)
}
}
}
val (_, subscriber) = shardConnection(connection.async().upstream(), channel)
?: return@create sink.error(UnreachableCodeException("redis connection error"))
// 3. 클라이언트와 연결 이후, 메세지를 수신하겠다고 메세지 브로커에 요청
subscriber.statefulConnection.addListener(pubSubListener)
subscriber.subscribe(channel)
// 4. 클라이언트와 SSE간의 연결이 끊어지면 메세지 브로커로부터 구독 취소 요청
sink.onDispose {
subscriber.unsubscribe(channel)
subscriber.statefulConnection.removeListener(pubSubListener)
}
}
}
Kotlin
복사
3.2.2. 예시 (1) 보유자산 polling 제거
•
기존에 주기적으로 폴링 진행
•
3초마다 보유종목 API 폴링
•
S 전자 1주 구매 → 재조회 이벤트 발행 → 조회 DB 업데이트
•
SSE 서버를 중간에 두고, 재조회 이벤트가 발행될 때만 보유종목 API를 호출하여 성능 개선
3.2.3. 예시 (2) 안드로이드 푸쉬알람 대체
•
푸시 알림
•
FCM Message Push 을 사용하나, FCM 요청 메세지 Capacity 초과 이슈 발생
•
FCM Push Server와 SSE 서버를 접속/미접속 유저에 따라 다르게 사용
◦
접속 유저 - SSE Sever를 통해 클라이언트에게 푸시 전달
◦
미접속 유저 - FCM 서버를 통해 클라이언트에게 푸시 전달
•
세션은 어떻게? (접속/미접속 유저 구별은 어떻게?)
◦
Redis PubSub을 사용
◦
SSE 서버에 연결하게 되면 SSE 서버가 Redis Pubsub을 통해 특정 유저를 지칭하는 토픽을 구독
▪
예) client1 토픽을 subscribe
◦
FCM 서버는 특정 유저를 지칭하는 토픽에 메세지 발행
▪
예) client1 msg를 publish
◦
Redis PubSub에 연결된 사용자(예. subscribe cnt)가 있는 경우, 사용자가 있다고 응답을 FCM 서버에 반환
▪
예) client1 1 이 있다고 응답을 FCM 서버에 반환
◦
이를 통해 세션 테이블을 따로 구현하지 않고, 접속 유저 / 미접속 유저를 구별할 수 있음
4. Server side message 전달 전략
•
Message broker 선택 전략
4.1. Kafka
•
SSE 서버를 구성할때, 서버마다 groupId를 발급해서 모든 메세지를 수신하여 연결된 사용자에게만 데이터를 내려주면 카프카로 구현 가능
•
하지만, At Least Once를 지원하는 카프카는 SSE 서버와 어울리지 않다고 판단
•
접속/미접속 유저 모두에게 SSE 서버로 메세지가 발행됨. 불필요한 I/O, 메세지 필터링 기능 구현 필요
•
At Most Once 도구 탐색 중 Redis PubSub을 찾음
4.2. Redis PubSub
•
SSE 서버는 Redis PubSub에 연결된 사용자에게만 subscribe 요청
•
Publisher는 Redis PubSub끼리만 요청
•
SSE 서버는 Redis PubSub에게 subscribe 하는 경우에만 데이터가 오게되고, 카프카 처럼 모든 데이터를 받지 않음
•
성능 테스트 (broker throughput)
◦
60K
4.3. NATS
•
broker throughput 더 좋은 도구 탐색 과정에서 발견
•
고랭 기반의 pub/sub 기반의 모델로 만들어진 도구
•
4.3.1. 클러스터 모드
•
기본 매커니즘
◦
클라이언트 A가 클라이언트 D에게 메세지 전송
◦
동일한 Subject(Redis PubSub의 channel의 개념)를 구독하고 있는 구독자에게만 데이터 전송
4.3.2. 클러스터 모드 예시
•
NATS 2번 노드에게 msg 라는 subject를 발행 요청
•
client port를 통해 데이터 전송
•
cluster port를 통해 나츠 1번으로 메세지 전송
•
NATS 1번 노드는 연결된 사용자를 알고 있어서, 그 데이터를 전송해주게 됨
4.3.3. NATS Server Node 장부
•
NATS 2번 노드는 1번 노드에게 subscriber가 있다는것을 어떻게 알았을까?
•
NATS 서버 노드 내부에는 본인이 라우팅할 장부, 타서버로 라우팅할 장부 2가지가 존재
•
타서버 라우팅 정보는 타서버로 요청을 돌림
•
구독과 해지 시점에서 아래 명령어를 통해 FANOUT으로 라우팅 장부에 추가/제거
◦
RS+ (추가 명령어)
◦
RS- (제거 명령어)
4.3.4. 성능
•
레디스 펍섭에 비해 5배 좋은 성능 → 최종적으로 메세지 브로커로 채택
5. Heartbeat 이슈 트러블 슈팅
5.1. Heartbeat
•
웹소켓 /SSE 등은 일반적으로 heartbeat를 통해 좀비 커넥션 확인
import reactor.core.publisher.Flux
import reactor.util.function.Tuple2
import java.time.Duration
import org.springframework.http.codec.ServerSentEvent
private fun <T> heartbeat(): Flux<ServerSentEvent<T>> {
return Flux.interval(Duration.ofSeconds(2)) // 2초마다 이벤트 발생
.map {
ServerSentEvent.builder<T>()
.comment("heartbeat") // "heartbeat" 코멘트 추가
.retry(Duration.ofHours(1)) // 연결 실패 시 1시간 후 재시도
.build()
}
}
Kotlin
복사
•
HTTP API를 보니 2*n 초 레이턴시 발견
◦
GET /api/v1/sse → 200 took 10000ms
◦
2초, 4초, 6초 등등…
5.2. AS IS 구조
•
L7 http proxy & k8s cluster 로 구성
•
k8s 클러스터와 L7 http proxy간의 TCP dump 확인
◦
HTTP heartbeat를 날리고 나서 L7 http proxy에서 이미 끊어진 connection에 대해 ACK를 응답하다보니 RST가 날라오는 것을 확인
◦
RST 자체를 주는것은 상용에서 비일비재할 수 있지만 heartbeat를 주고나서 RST가 계속 발생하는 것은 이상하다고 추측
5.3. TO BE 구조
•
L7 장비 이슈인 것이 확인
•
L7을 TCP Proxy 모드로 변경후 SSE Connection이 정상적으로 close 되는 것이 확인