용로그
article thumbnail

들어가며


서비스를 개발하다 보면 클라이언트의 요청이 없어도 서버에서 자체적으로 데이터를 전달해줘야 할 때가 있습니다. 대표적으로 알림과 같은 기능이 있는데요, 사이드 프로젝트를 진행하면서 비슷한 요구사항을 만났습니다. 

 

PR 수동 동기화 기능을 개발하였는데, 이때 동기화된 PR을 다른 유저들도 실시간으로 볼 수 있어야 하기 때문에 화면을 보고 있는 모든 클라이언트에게 변경 사항을 전달해줘야 합니다.

 

여기서 Server Sent Event(SSE)라는 것을 사용하여 이 문제를 해결할 수 있습니다. 이번 글에서는 Spring에서 SSE를 활용하여 클라이언트에게 직접 데이터를 전송하는 방법을 알아보겠습니다.

 

SSE란?


SSE(Server-Sent Events)는 웹 애플리케이션에서 서버로부터 비동기적으로 전송받을 수 있는 기술 중 하나입니다. 통신 방향이 단방향이라는 단점이 있지만, 실시간 업데이트가 필요할 때는 효율적입니다.

 

SSE 방식 외에 아래와 같은 통신 여러가지 방법들이 존재합니다.

  • Short Polling : 클라이언트가 주기적으로 서버로 요청을 보내서 응답을 받는다.
  • Long Polling : 서버의 변경이 일어날 때까지 대기하다가 변경이 일어나면 클라이언트로 응답을 보낸다.
  • Web Socket : 핸드셰이크를 통해 클라이언트와 서버가 HTTP를 통해 연결하고, 양방향으로 통신합니다.

웹 소켓과 SSE를 고려해볼만 한데, 웹 소켓은 상당히 복잡한 구현과 많은 시간 투자가 필요합니다. 반면에 SSE는 웹 소켓에 비해 굉장히 간편한 구현이 가능하고 적절한 기능을 제공합니다. 

 

해당 프로젝트에서는 PR 수정/추가 상태가 실시간으로 반영되어야 하고 빈번하게 발생될 수 있는 상황이기 때문에 SSE를 선택하였습니다.  SSE는 서버와 한 번 연결하고 나면 HTTP keep alive와 비슷하게 서버에서 변경이 일어날 때마다 데이터를 전송하는 방법입니다.

SSE 통신 과정

SSE의 기본적인 흐름은 클라이언트가 SSE 요청을 보내면 서버에서는 해당 클라이언트의 SSE 객체(Spring API)를 만듦니다. 그리고 데이터 변경 사항이 생기면 만들어둔 SSE 객체를 통해 클라이언트에게 데이터를 전송합니다. 여기서 SSE 객체는 클라이언트당 하나입니다.

 

SSE 동작 과정

SSE Emitter 생성


GET /connect HTTP/1.1
Accept: text/event-stream
Cache-Control: no-cache

 

우선 클라이언트에서 서버의 이벤트를 구독하기 위해 요청을 보냈다고 가정하겠습니다. 참고로 이벤트의 Media-Type은 text/event-stream이 표준으로 정해져 있습니다. 위에서 이야기했지만 이벤트는 캐싱을 사용하지 않으며, 지속적으로 연결하고 있는 상태여야 합니다.

 

HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked

 

응답의 Media-Type은 text/event-stream입니다. 서버는 동적으로 생성된 컨텐츠를 스트리밍 하기 때문에 body의 크기를 미리 알 수 없습니다. 그렇기 때문에 Transfer-Encoding의 헤더 값을 chunked로 설정합니다.

 

SseEmitter API가 지원되는 버전은 Spring Framework 4.2 버전 이상이므로 코드를 작성하기 전에 꼭 확인해 주세요.

 

@RestController
@RequiredArgsConstructor
public class SseController {

    // 3분
    private static final Long DEFAULT_TIMEOUT = 180L * 1000;
    private final SseEmittersInMemoryRepository sseEmittersInMemoryRepository;

    @GetMapping(value = "/connect", produces = TEXT_EVENT_STREAM_VALUE)
    public ResponseEntity<SseEmitter> connect(
            @Auth final AuthProperties authProperties
    ) {
        final SseEmitter sseEmitter = new SseEmitter(DEFAULT_TIMEOUT);
        sseEmittersInMemoryRepository.save(authProperties.memberId(), sseEmitter);
        try {
            sseEmitter.send(SseEmitter.event()
                    .name("connect")
                    .data("connected"));
        } catch (IOException e) {
            throw new SseConnectionRefusedException();
        }
        return ResponseEntity.ok(sseEmitter);
    }

}

 

 

SseEmitter의 만료 시간을 설정할 수 있는데, 기본 값은 30초입니다. 만료 시간이 되면 브라우저에서 자동으로 서버에게 재연결 요청을 보냅니다. 따라서 애플리케이션에 적절한 만료시간을 설정해 주면 됩니다.

 

sseEmitter.send(SseEmitter.event()
                    .name("connect")
                    .data("connected"));

 

 

여기서 이벤트의 이름과 데이터를 설정하게 되는데, 클라이언트에서 받을 이벤트의 이름을 지정하는 과정입니다. 특별히 주의해야 하는 점은 클라이언트가 처음 Emitter를 생성하고 나서 만료 시간까지 아무런 데이터도 보내지 않으면 재연결 요청 시 503 Service unavailable 에러가 발생합니다. 따라서 data를 꼭 지정해서 전달하는 것이 좋습니다.

 

@Component
public class SseEmittersInMemoryRepository {

    private final Map<Long, SseEmitter> sseEmitters = new ConcurrentHashMap<>();

    public SseEmitter save(final Long key, final SseEmitter sseEmitter) {
        sseEmitters.put(key, sseEmitter);
        sseEmitter.onCompletion(() -> sseEmitters.remove(key));
        sseEmitter.onTimeout(() -> sseEmitters.remove(key));
        sseEmitter.onError((e) -> sseEmitters.remove(key));
        return sseEmitter;
    }

    public void sendAllEmitters(final List<PullRequestResponse> updatedPrs) {
        sseEmitters.keySet().forEach(key -> {
            try {
                sseEmitters.get(key).send(SseEmitter.event()
                        .name("count")
                        .data(updatedPrs));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

}

 

 

클라이언트는 SseEmitter를 생성할 때는 비동기 요청이 완료되거나 타임아웃이 발생할 때 실행할 콜백을 등록할 수 있습니다. 따라서 브라우저에서 재연결 요청을 보낼 때 새로운 Emitter 객체를 다시 생성하기 때문에 항상 기존의 Emitter 객체를 제거해야 합니다.

 

또 주의해야 할 점이 있습니다. 코드를 보면 ConcurrentHashMap을 사용하였는데, 이는 SseEmitter를 관리하는 스레드들이 콜백 할 때 스레드가 다를 수 있기 때문에 thread-safe 한 형태를 유지해야기 때문입니다.

 

 

SseRepository를 통해 SseEmitter까지 저장했다면 위의 과정까지 마무리되었습니다.

 

Event 발생 시 클라이언트로 전달


이제 서버에서 데이터에 변경 사항이 생겼을 때 클라이언트로 응답을 보낼 차례입니다. 아래 코드는 깃허브에 있는 PR 목록과 현재 서비스의 PR 목록을 동기화하는 코드입니다.

 

SSE 호출
sseEmittersInMemoryRepository#sendAllEmitters

 

클라이언트에서는 updatedPrs라는 이름의 이벤트가 발생할 때 화면이 변경되도록 동작할 것입니다. 만약 UserA가 PR을 동기화했다면 실시간으로 UserB도 동기화된 PR을 볼 수 있는 것이죠.

 

주의할 점


토큰을 전달할 때

SSE 연결 요청을 할 때 헤더에 JWT를 담아서 보내줘야 할 경우가 있을 수 있습니다. 하지만 자바스크립트의 EventSource 인터페이스는 기본적으로 헤더를 지원하지 않는 문제가 있는데, event-source-polyfill을 사용하면 헤더를 사용할 수 있습니다.

DBCP 고갈 문제

SSE 통신을 하는 동안은 HTTP Connection이 계속 열려 있는 상태입니다. 만약 JPA를 사용하는데, osiv(open session in view)를 true로 설정했다면 DBCP의 생명주기가 HTTP Connection과 같아지기 때문에 순식간에 고갈됩니다. 따라서 반드시 false로 변경해야 합니다.

 

Nginx 사용 시 주의점

Nginx의 Upstream 요청은 HTTP/1.0 프로토콜을 사용합니다. HTTP/1.1은 지속 연결이 기본 값이기 때문에 헤더를 따로 설정해 줄 필요가 없지만 HTTP/1.0은 Connection: close 헤더가 기본이기 때문에 아래와 같이 설정을 변경해 줘야 제대로 동작합니다.

 

proxy_set_header Connection '';
proxy_http_version 1.1;

 

마무리하며


현재 SseEmitter는 인메모리에 저장되어 있는 상태이기 때문에 서버가 여러 대로 확장(Scale out)된다면 기능이 제대로 동작하지 않습니다. 따라서 글로벌 캐시 전략 또는 분산 캐시를 사용하는 방법 등이 있을 것 같습니다.

 

Ref

profile

용로그

@용로그

벨덩보단 용덩 github.com/wonyongChoi05