용로그
article thumbnail

일반적으로 정렬된(aligned) 체크포인팅 시간은 체크포인팅 프로세스의 동기 및 비동기 부분에 의해 좌우된다. 그러나 Flink 작업이 높은 백프레셔 상태에 놓여있을 때, 체크포인팅 e2e 시간에 큰 영향을 미치는데, 가장 큰 요소는 체크포인팅 베리어를 모든 연산자(oeprator/task)에 전파하는 데 걸리는 시간일 것이다.

 

이러한 문제가 발생할 경우, 우리는 다음 세 가지 방법으로 문제를 해결할 수 있다.

  • Flink 작업을 최적화한다. 또는 Flink Managed Memory(Off Heap) 또는 JVM 구성을 조정한다.
  • Flink 작업에서 buffered in-flight 데이터를 줄인다.
  • 정렬되지 않은(unaligned) 체크포인트를 활성화한다.

이러한 옵션은 상호 배타적이지 않으며, 함께 사용할 수 있다. 이번 글에서는 마지막 두 옵션에 대해 알아본다.

Buffer debloating


Flink 1.14에서는 Flink 하위 오퍼레이터 간에 buffered in-flight 양을 자동으로 제어하는 기능이 도입되었다. 버퍼 디블로팅은 taskmanager.network.memory.buffer-debloat.enabled=true로 설정하여 활성화할 수 있다.

 

이 기능은 aligned checkpoint와 unaligned checkpoint 모두에서 작동하며, 두 체크포인팅 모드 모두 체크포인트 생성 시간을 단축할 수 있지만, 디블로팅 효과는 aligned checkpoint에서 가장 효과가 뛰어나다.

 

정렬되지 않는 체크포인트에 버퍼 디블로팅을 사용하면 체크포인트 크기가 작아지고 복구 시간이 단축되는 추가적인 이점이 있다. 버퍼 디블로팅 기능의 작동 방식과 구성 방법에 대한 자세한 내용은 아래에서 설명한다. 앞서 언급한 튜닝 가이드에 설명된 대로, 버퍼링된 전송 중인 데이터 양을 수동으로 줄일 수도 있다.

 

Buffer debloating Machanism


Flink에서는 Task 간에 데이터를 전달할 때 네트워크 버퍼를 사용한다. 기본적으로는 throughput 중심으로 설정되어 있어 많은 양의 데이터를 버퍼에 쌓아 전송하는게 일반적이었다. 하지만 이는 latency가 증가하는 원인이 되기도 했다.

 

결국 throughput과 latency 사이에서 적절한 buffer 값을 찾았어야 했는데, 플링크 파이프라인을 작업할 때마다 버퍼 크기가 다를 수 있기 때문에 이상적인 값을 선택하기 어려웠다. 

 

Flink 1.14에 추가된 버퍼 디블로팅(buffer debloating)은 해당 버퍼를 자동으로 조정하여 이 문제를 해결한다. 버퍼 디블로팅은 하위 작업에 대해 가능한 최대 처리량을 계산하고 전송 중인 데이터 양을 조정하여 하위 작업이 받아줄 수 있는 만큼의 데이터만을 담아 보낸다.

 

이 기능은 과거에 하위 작업이 얼만큼의 데이터를 처리했는지를 확인하고, 하위 작업이 받는데 필요한 시간을 예측한다. 이 때 예측이 정확하지 않으면 디블로팅 메커니즘이 두 가지 원인 중 하나로 실패할 수 있다.

  • 버퍼링된 데이터가 너무 적어 전체 처리량을 못따라간다.
  • buffered in-flight 데이터가 너무 많아 aligned checkpoint 배리어 전파 시간이나, unaligned checkpoint 크기에 부정적인 영향을 미친다.

작업에서 부하가 변동하는 경우(즉, 들어오는 레코드 수의 갑작스러운 급증 또는 주기적으로 윈도우 집계 또는 조인 실행) 다음 설정을 조정해야할 수도 있다.

  • taskmanager.network.memory.buffer-debloat.period
    • 버퍼 크기 재계산 사이의 최소 시간 간격이다. 이 시간의 짧을수록 디블로팅 메커니즘의 반응속도는 빨라지지만, 필요한 계산에 따라 필요한 CPU 오버헤드가 증가한다.
  • taskmanager.network.memory.buffer-debloat.samples
    • 처리량 측정의 평균을 구하는 샘플 수를 조정한다. 수집된 샘플의 빈도는 taskmanager.network.memory.buffer-debloat.period를 통해 조정할 수 있다. 샘플 수가 너무 적으면 디블로팅 메커니즘의 반응 시간은 빨라지지만(데이터 양이 작기 때문에), 모수가 적은 만큼 버퍼 디블로팅 메커니즘이 최적의 전송 데이터를 잘못 계산 수 있다.
  • taskmanager.netowkr.memory.buffer-debloat.threshold-percentages
    • 버퍼 크기가 자주 변경되는 것을 방지하기 위한 최적화 옵션이다. 새로운 버퍼 크기가 이전 버퍼 크기와 크게 다르지 않은 경우 적용하지 않는 등의 최적화를 한다.

혀녀재 버퍼 크기를 모니터링하는 데 사용할 수 있는 메트릭은 다음과 같다.

  • estimatedTimeToConsumeBuffersms
    • 모든 입력 채널에서 데이터를 소비하는데 걸리는 총 시간
  • debloateBufferSize
    • 현재 버퍼 크기

Limit of Debloat Buffer

현재 버퍼 디블로팅 메커니즘으로 자동 처리되지 않는 경우가 몇 가지 있다.

Multiple inputs and unions

현재 처리량 계산과 버퍼 디블로팅은 하위 작업 수준에서 발생한다. 하위 작업에 여러 개의 서로 다른 Source가 있는 경우, 버퍼 디블로팅으로 인해 처리량이 낮은 Source에는 Buffered in-flight 데이터가 너무 많이 포함되는 반면, 처리량이 높은 Source에는 해당 처리량을 감당하기에는 버퍼가 너무 작을 수 있다. 이는 특히 서로 다른 Source의 처리량이 크게 다를 경우 더 두드러진다.

버퍼 크기 및 버퍼 수

현재 버퍼 디블로팅은 최대 사용할 수 있는 버퍼의 크기만 조정할 수 있다. 실제 버퍼 크기와 버퍼 개수는 변경되지 않는다. 즉, 디블로팅 메커니즘은 작업의 메모리 사용량을 줄일 수는 없으며, 버퍼의 양이나 크기를 수동으로 줄여야 한다.

높은 병렬성

현재 버퍼 디블로팅 매커니즘은 기본 구성을 사용할 경우 높은 병렬 처리(약 200 이상)에서 제대로 동작하지 않을 수 있다. 처리량 감소 또는 예상보다 긴 체크포인팅 시간이 발생하는 경우, 플로팅 버퍼 수(taskmanager.network.memory.floating-buffers-per-gate)를 기본값에서 병렬 처리 수와 같은 수 이상으로 늘리는 것이 좋다.

 

문제가 발생하는 병렬 처리의 실제 값은 작업마다 다르지만 일반적으로는 수백 이상이어야 한다.

 

Unaligned Checkpoint


Flink 1.11부터 체크포인트의 정렬을 해제할 수 있는 옵션을 제공한다. 기존 Aligned Checkpoint 방식은 모든 연산자가 같은 타이밍에 체크포인트를 찍기 위해 데이터의 흐름을 일시적으로 멈추는(stop-the-world) 방식이다.

 

즉, 체크포인트 배리어(Barrier)를 기다리며 연산자 간 정렬(alignment)이 필요하고, 이 과정에서 백프레셔가 심하면 정렬 시간이 길어지며 체크포인트 지연까지 이어진다.

 

예를 들어, Operator A는 데이터를 200번까지 처리했고, Operator B는 데이터를 100번까지 진행했다면 Checkpoint Barrier는 A를 거쳐 B가 200번 데이터를 처리할 때 까지 기다리며, A로 들어오는 데이터를 stop-the-world 시킨다.

 

Unaligned 체크포인트는 이러한 정렬 과정을 건너뛰고, 배리어가 도착하는 즉시 현재의 네트워크 버퍼 상태(101~200번 데이터)와 Barrier의 위치를 그대로 저장하는 방식이다. 즉, 배리어 도착 시점에 연산자의 입력 버퍼도 같이 스냅샷에 포함한다.

 

백프레셔로 인해 체크포인팅 시간이 너무 오래걸린다면, Unaligned Checkpoint를 고려할 수 있다. 그러면 체크포인팅 시간은 e2e 지연 시간과 거의 무관해질 것이다. Unalgiend Checkpoint는 버퍼에 대기중인 데이터들을 모두 저장하기 때문에 StateBackend에 대한 I/O를 증가시키므로, 체크포인팅 중 상태 저장소에 대한 I/O가 병목인 경우에는 사용하지 말아야 한다.

 

Limit of Unaligned Checkpoint

체크포인트 동시 생성

Flink는 현재 동시에 Unaligned Checkpoint(UC)를 찍는 것을 지원하지 않는다. 여기서 말하는 동시 체크포인팅은 CP와 SP를 같이 생성하는 것을 뜻한다. UC는 CP와 SP를 동시에 생성할 수 없기 때문에 SP 생성에 시간이 약간 더 오래 걸릴 수 있다.

워터마크와 상호 작용

Flink는 체크포인트 시점에 워터마크를 연산자 상태로 저장하지 않고, 복구 시 워터마크를 다시 생성하는 방식으로 처리 흐름을 이어간다. 이로 인해 Unaligned Checkpoint에서는 복구 직후에 생성된 워터마크가 원래 실행 시점의 워터마크와 달라질 수 있으며, 이는 연산자의 처리 결과에 영향을 미칠 수 있다.

 

예를 들어, 연산자가 입력 레코드에 대해 "현재 워터마크 이전인지 여부"를 기준으로 필터링하거나 윈도우를 구성하는 경우, 실행 중 생성된 워터마크와 복구 시 생성된 워터마크가 다르면 동일한 입력이라도 처리 결과가 달라질 수 있다.

 

이는 재현 가능한 결과가 보장되지 않는다는 의미이며, 특히 워터마크에 민감한 연산 로직에서는 문제가 될 수 있다. 이러한 문제를 방지하려면 연산자 상태에 워터마크를 명시적으로 저장하는 방식으로 처리해야 한다. 특히 키 기반 연산자에서는 워터마크를 키 그룹별 유니온 상태에 저장함으로써 재조정(re-scaling) 시에도 워터마크 상태를 안전하게 복원할 수 있다.

 

따라서 Unaligned Checkpoint를 사용할 경우, 성능 이점을 얻는 대신 워터마크 일관성 보장을 위한 추가적인 조치가 필요하다. 워터마크를 활용하는 연산자가 있다면, 워터마크를 상태에 명시적으로 저장하고 복원 시 해당 값을 재사용하는 전략을 고려해야 한다.

 

단일 레코드 처리 시간이 너무 길어질 수 있는 경우

Unaligned Checkpoint는 정렬 과정 없이 입력 버퍼의 상태를 그대로 저장함으로써 백프레셔 상황에서도 빠르게 체크포인트를 진행할 수 있도록 돕는다. 그러나 연산자가 단일 입력 레코드를 처리하는 데 오랜 시간이 걸리는 경우에는, Unaligned Checkpoint의 장점이 제한되거나 오히려 지연을 유발할 수 있다.

 

예를 들어 윈도우 연산에서 다수의 타이머가 동시에 트리거되는 경우나, 특정 입력 레코드에 대해 연산자가 복잡한 계산을 수행하는 경우에는 체크포인트 배리어가 큐에 도달하더라도 해당 레코드 처리가 완료될 때까지 기다려야 한다.

 

또한, Flink는 하나의 레코드 처리를 중간에 중단할 수 없기 때문에, 해당 레코드가 처리되는 동안 필요한 네트워크 버퍼가 모두 확보되지 않으면 처리 자체가 지연될 수 있다. 예를 들어, 입력 레코드 하나가 단일 버퍼에 직렬화되지 않을 만큼 크거나, flatMap과 같은 연산자가 하나의 입력으로 다수의 출력을 생성하여 여러 버퍼가 필요한 경우, 모든 버퍼가 확보되기 전까지는 처리를 진행할 수 없다.

 

이러한 경우 Unaligned Checkpoint는 현재 레코드 처리가 완료될 때까지 차단되며, 그 결과로 체크포인트 전체 지연 시간이 늘어나게 된다.

 

결과적으로, Unaligned Checkpoint는 데이터 흐름이 비교적 빠르게 진행되는 환경에서는 큰 이점을 주지만, 단일 레코드 처리 시간이 긴 상황에서는 체크포인트 지연 문제를 피할 수 없다. 따라서 시스템 설계 시 연산자의 처리 시간, 버퍼 사용량, 백프레셔 상황 등을 종합적으로 고려해야 하며, 장기 실행 연산이 예상되는 경우에는 aligned 방식이나 다른 튜닝 방안을 검토할 필요가 있다.

셔플 또는 리밸런스 시 데이터 순서가 변경될 수 있다.

Pointwise Connection

Flink에서 점별 연결은 한 연산자의 서브태스크가 고정된 방식으로 다음 연산자의 서브태스크와 직접 연결되는 구조를 의미한다. 이 구조에서는 데이터 순서에 대한 명시적인 보장은 없지만, 많은 경우에 데이터가 기존 소스 또는 키별(keyed) 처리 방식과 동일한 방식으로 암묵적으로 정렬되기 때문에 이를 기반으로 연산을 구성하는 경우가 있다. 예를 들어, 연산을 더 작은 단위로 나누면서도 순서를 유지해야 할 경우 이러한 암묵적 정렬을 활용한다.

Pointwise Connection

그러나 Unaligned Checkpoint(UC)를 사용하는 경우 병렬 처리 수준이 변경되지 않는다면 이러한 순서 특성은 유지될 수 있다. 문제는 UC 기반의 리스케일링(rescaling) 기능이 도입되면서 이러한 암묵적 순서 보장이 깨질 수 있다는 점이다.

 

예를 들어 병렬 처리 수준을 2에서 3으로 변경하면, 기존 키 기반 채널 내의 레코드를 키 그룹 기준으로 새 채널에 재분배해야 한다. 키가 명시적으로 있는 경우엔 키 그룹을 기준으로 이 작업이 가능하지만, 순방향(forward) 채널에서는 키 컨텍스트가 존재하지 않기 때문에 레코드가 어떤 키 그룹에 속하는지 알 수 없어 순서를 보존할 수 없게 된다. 즉, 키 기반이 아닌 점별 연결의 경우 리스케일링 시 순서가 바뀔 수 있다는 점에 주의해야 한다.

 

Broadcast Connection

브로드캐스트 연결은 또 다른 문제를 발생시킨다. 브로드캐스트 채널은 하나의 스트림을 모든 연산자 서브태스크에 동일하게 전달하는 구조인데, 각 서브태스크가 데이터를 소비하는 속도가 다를 수 있어 동일한 브로드캐스트 이벤트가 모든 서브태스크에 동일한 시점에 도달하지 않을 수 있다. 그 결과, 일부 연산자는 특정 브로드캐스트 이벤트에 의해 상태(state)가 갱신된 이후의 데이터를 처리하는 반면, 다른 연산자는 아직 그 상태 갱신이 적용되지 않은 데이터를 처리하게 되는 불일치가 발생할 수 있다.

Broadcast Connection

예를 들어, 브로드캐스트 스트림으로 들어온 정책 변경 이벤트를 어떤 서브태스크는 이미 처리했고, 다른 서브태스크는 아직 처리하지 못한 상황에서 UC가 발생했다고 가정해보자. 이 상태로 복구하면 모든 서브태스크는 동일한 브로드캐스트 상태를 가지게 되지만, 각 서브태스크는 복원된 중간 데이터들을 서로 다른 시점의 상태로 처리해야 했던 것이다. 결국, 상태와 데이터의 타이밍이 어긋나게 되어 처리 결과의 불일치가 발생할 수 있다.

 

Flink는 이러한 브로드캐스트 상태를 처리하기 위해 연산자의 첫 번째 서브태스크(서브태스크 0)에만 브로드캐스트 상태의 단일 복사본을 체크포인트로 저장하고, 복구 시 이 복사본을 모든 서브태스크에 복제하여 전달한다. 이 과정에서도 마찬가지로, 복구된 이후 곧 사용할 레코드가 이전 상태에 기반해 처리될 가능성이 있기 때문에, 상태 일관성 측면에서 잠재적인 문제가 발생할 수 있다.

profile

용로그

@용로그

벨덩보단 용덩 github.com/wonyongChoi05