본문 바로가기
Kafka

[Kafka] Advanced Consumer Configurations

'Stéphane Maarek - Learn Apache Kafka for Beginners v2'를 보고 작성한 글입니다. 😀

Delivery Semantics for Consumers

At most once

메시지 배치가 수신되는 즉시 오프셋이 커밋된다

처리가 잘못되면 메시지가 손실된다 (다시 읽지 않음)

At least once

메시지가 처리된 후 오프셋이 커밋된다. 처리가 잘못되면 메시지를 다시 읽는다

이로 인해 메시지가 중복 처리 될 수 있다

처리가 멱등성(idempotent)인지 확인한다 (즉, 메시지를 다시 처리해도 시스템에 영향을주지 않음)

Exactly once

Kafka -> Kafka 는 Kafka Stream API 사용

Kafka -> Sink 는 Idempotent Consumer 사용

 

Consumer Poll Behavior

Kafka Consumer는 "Poll" 모델이다 ("push" 모델이 아님)

이를 통해 컨슈머는 로그에서 소비하려는 위치 및 속도를 제어하고 이벤트를 재생할 수 있다

Fecth.min.bytes (default 1)

한번의 fetch 에 대해 반환되는 최소 데이터

처리량 향상 및 요청 수 감소에 도움

지연 발생

Max.poll.records (default 500)

poll 요청 당 수신할 레코드 수 제어

메시지가 매우 작고 사용 가능한 RAM이 많은 경우 값을 올리면 좋다

요청당 poll 되는 레코드 수를 모니터링하는 데 좋다

Max.partitions.fecth.bytes (default 1MB)

브로커가 파티션 당 반환하는 최대 데이터

100개의 파티션에서 읽을 경우 많은 메모리(RAM)가 필요하다

Fetch.max.bytes (default 50MB)

한번의 fetch 에 대해 반환되는 최대 데이터 (여러 파티션 포함)

컨슈머는 여러번의 fetch 를 병렬로 수행한다

컨슈머가 이미 처리량을 최대로 한 경우에 위의 설정을 변경하면 된다

 

Consumer Offset Commits Strategies

enable.auto.commit = true & synchronous processing of batches (Easy)

  • auto.commit 을 사용하면 .poll() 을 호출할 때 마다 오프셋이 정기적인 간격으로 자동 커밋됩니다. (기본적으로 auto.commit.interval.ms = 5000)
  • synchronous processing (동기 처리)을 사용하지 않으면 데이터가 처리되기 전에 오프셋이 커밋되므로 "At most once" 로 동작한다
while(true){
    List<Records> batch = consumer.poll(Duration.ofMillis(100));
  doSomethingSynchronous(batch);
}

enable.auto.commit = false & manual commit of offsets (Medium)

  • 오프셋 커밋시기와 커밋 조건을 제어한다
  • 예를 들면, 레코드를 버퍼에 축적한 다음 버퍼를 데이터베이스로 플러시하고 오프셋을 커밋한다
while(true){
    batch += consumer.poll(Duration.ofMillis(100));
    if isReady(batch) {
        doSomethingSynchronous(batch)
        consumer.commitSync();
    }
}

 

Consumer Offset Reset Behavior

컨슈머 오프셋의 보존 기간은 7일이고 컨슈머가 7일 동안 새 데이터를 읽지 않으면 오프셋은 손실 경우, 오프셋은 유효하지 않다 (Kafka> = 2.0)

auto.offset.reset = latest : 로그의 끝을 읽음

auto.offset.reset = earliest : 로그의 시작을 읽음

auto.offset.reset = none : 오프셋이 없으면 예외 발생

적절한 데이터 보존 기간 및 오프셋 보존 기간을 설정하는 것이 중요하다

 

Replaying data for Consumers

컨슈머 그룹에서 예기치 않은 동작이 발생할 경우 데이터 재생이 가능하다

데이터 재생 방법 :

  • 그룹 내 모든 컨슈머 제거
  • 'Kafka-consumer-groups' 명령을 통해 원하는 오로셋 설정
  • 컨슈머 재시작

 

Consumer Internal Threads

Controlling Consumer Liveness

그룹의 컨슈머와 컨슈머 그룹 코디네이터는 통신한다

다운된 컨슈머를 감지하기 위해 "hearbeat" 메커니즘과 "poll" 메커니즘이 있다

문제를 방지하기 위해 컨슈머는 데이터를 빠르게 처리하고 자주 폴링하는 것이 좋다

Consumer Heartbeat Thread

Session.timeout.ms (default 10 seconds):

  • heartbeat 는 브로커에게 주기적으로 전송된다
  • 해당 기간 동안 heartbeat 가 전송되지 않으면 컨슈머가 죽은 것으로 간주한다
  • 빠른 컨슈머를 리밸런스하기 위해 낮추기도 한다

Heartbreat.interval.ms (default 3 seconds):

  • heartbeat 전송 빈도
  • 일반적으로 session.timeout.ms의 1/3로 설정한다

이 메커니즘은 컨슈머 애플리케이션이 다운되는 것을 감지하는 데 사용된다

Consumer Poll Thread

  • max.poll.interval.ms (default 5 minutes) :
    • 컨슈머가 죽었다고 선언하기 전에 두 .poll() 호출 사이의 최대 시간
    • Spark와 같은 빅데이터 프레임 워크와 같이 처리에 시간이 걸리는 경우 관련이 있다
  • 이 메커니즘은 컨슈머의 데이터 처리 문제를 감지하는 데 사용된다

 


References


🏋🏻 개인적으로 공부한 내용을 기록하고 있습니다.
잘못된 부분이 있다면 과감하게 지적해주세요!! 🏋

댓글