본문 바로가기
Kafka

[Kafka] 카프카 커맨드 라인 툴 (command-line tool)

최원영님의 ‘아파치 카프카 애플리케이션 프로그래밍 with 자바’를 보고 작성한 글입니다. 😀

 

1. kafka-topics.sh

토픽이란 카프카에서 데이터를 구분하는 가장 기본적인 개념(RDBMS의 테이블과 유사)으로 최소 1개 부터 많게는 브로커 당 4,000개 및 클러스터 당 200,000개 이하의 파티션으로 구성된다. (출처)

토픽 생성 (옵션 x)

$ kafka-topics.sh \
    --create \
    --bootstrap-server ip-kafka:9092 \
    --topic example.kafka.1

create: 토픽을 생성하는 명령어이다.

bootstrap-server: 토픽을 생성할 카프카 클러스터의 브로커 ip와 port 작성한다. (브로커가 여러개라면 여러개의 브로커 ip와 port를 작성)

topic: 토픽의 이름은 내부 데이터가 무엇이 있는지 유추가 가능할 정도로 자세히 적는 것이 좋다. (규모가 커지면 유지보수가 어려움)

토픽 생성 (옵션 o)

$ kafka-topics.sh \
    --create \
    --bootstrap-server ip-kafka:9092 \
    --partitions 3 \
    --replication-factor 1 \
    --config retention.ms=172800000 \
    --topic example.kafka.2

partitions: 파티션의 개수로 만약 옵션으로 파티션의 개수를 지정하지 않으면 카프카 브로커 설정 파일(config/server.properties)에 있는 num.partitions 옵션값에 따라 생성된다.

replication-factor: 토픽의 파티션을 복제할 복제 개수를 의미한다. replication-factor는 leader 파티션과 follower 파티션 모두를 포함하므로 replication-factor가 1이면 복제를 하지 않는다는 의미이다. 따라서 replication-factor가 2이면 1개의 브로커에 장애가 발생하더라도 나머지 한개의 브로커에 있는 follwer 파티션에 복제된 데이터를 사용하여 안전하게 데이터 처리를 지속적으로 할 수 있다. 즉, replication-factor의 최대 개수는 카프카 클러스터를 구성하는 브로커의 개수이다.

config: 추가적인 설정을 할 수 있다. 위와 같이 retention.ms는 토픽의 데이터를 유지하는 시간을 뜻한다. 172800000ms는 2일을 ms(밀리세컨드) 단위로 나타낸 것으로, 2일이 지나면 토픽의 데이터는 삭제된다.

토픽 리스트 조회

$ kafka-topics.sh --bootstrap-server ip-kafka:9092 --list 
example.kafka.1
example.kafka.2

list 옵션을 사용하여 카프카 클러스터에 생성된 토픽들을 확인할 수 있다.

토픽 상세 조회

$ kafka-topics.sh --bootstrap-server ip-kafka:9092 --describe --topic example.kafka.2
Topic: example.kafka.2  PartitionCount: 3 ReplicationFactor: 1  Configs: segment.bytes=1073741824,retention.ms=172800000
  Topic: example.kafka.2  Partition: 0  Leader: 0 Replicas: 0 Isr: 0
  Topic: example.kafka.2  Partition: 1  Leader: 0 Replicas: 0 Isr: 0
  Topic: example.kafka.2  Partition: 2  Leader: 0 Replicas: 0 Isr: 0

describe 옵션을 사용하여 토픽의 파티션 개수, 리더 파티션의 브로커 번호, 복제된 파티션의 브로커 번호, ISR

만약 모든 파티션의 leader가 일부 브로커에 몰려있는 경우 카프카 클러스터 부하가 특정 브로커들로 몰려 네트워크 대역의 이슈가 생길 수 있다.

토픽 삭제

kafka-topics.sh --bootstrap-server ip-kafka:9092 --delete --topic example.kafka.2 

토픽 옵션 변경

카프카 2.5까지는 kafka-topics.hs와 —alter 옵션을 통해 retention 기간을 변경할 수 있지만 추후 삭제 예정이므로 kafak-configs.sh를 사용한다.

파티션 개수 변경

$ kafka-topics.sh --bootstrap-server ip-kafka:9092 \
    --topic example.kafka.1 \
    --alter \
    --partitions 3

retention 기간 변경

$ kafka-configs.sh --bootstrap-server ip-kafka:9092 \
    --entity-type topics \
    --entity-name example.kafka.1 \
    --alter --add-config retention.ms=86400000

변경 확인 (kafka-configs.sh —describe 옵션을 통해서도 확인 가능)

$ kafka-configs.sh --bootstrap-server ip-kafka:9092 \
    --entity-type topics \
    --entity-name example.kafka.1 \
    --describe
Dynamic configs for topic example.kafka.1 are:
  retention.ms=86400000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=86400000}

$ kafka-topics.sh --bootstrap-server ip-kafka:9092 --describe --topic example.kafka.1
Topic: example.kafka.1  PartitionCount: 3 ReplicationFactor: 1  Configs: segment.bytes=1073741824,retention.ms=86400000
  Topic: example.kafka.1  Partition: 0  Leader: 0 Replicas: 0 Isr: 0
  Topic: example.kafka.1  Partition: 1  Leader: 0 Replicas: 0 Isr: 0
  Topic: example.kafka.1  Partition: 2  Leader: 0 Replicas: 0 Isr: 0

 

2. kafka-console-producer.sh

생성된 토픽에 데이터를 넣을 수 있는 kafka-console-producer.sh 명령어를 실행해 보자. 토픽에 넣는 데이터는 ‘레코드(record)’라고 부르며 메시지 키(key)와 메시지 값(value)으로 이루어져 있다. 메시지 키를 설정하지 않으면 자바에서는 null로 기본 설정되어 브로커로 전송된다.

메시지 키가 없는 경우

$ kafka-console-producer.sh --bootstrap-server ip-kafka:9092 \
    --topic example.kafka.1
>1
>2

여기서 주의할 점은 kafka-console-producer.sh로 전송되는 레코드 값은 UTF-8을 기반으로 ByteArraySerializer로만 직렬화 된다. 즉, String이 아닌 타입으로는 직렬화하여 전송할 수 없다. 그러므로 다른 타입을 직렬화하여 데이터를 브로커로 전송하고 싶다면 카프카 프로듀서 애플리케이션을 직접 개발해야 한다.
메시지 키가 null인 경우는 프로듀서가 파티션으로 전송할 때 레코드 배치 단위(레코드 전송 묶음)로 라운드 로빈(round robin)으로 전송한다.

메시지 키가 있는 경우

$ kafka-console-producer.sh --bootstrap-server ip-kafka:9092 \
    --topic example.kafka.1 \
    --property "parse.key=true" \
    --property "key.separator=:"
>key1:no1
>key2:no2

property "parse.key=true" : parse.key를 true로 두면 레코드를 전송할 때 메시지 키를 추가할 수 있다.

property "key.separator=:" : 메시지 키와 메시지 값을 구분하는 구분자를 선언한다. (default ‘\t’)

레코드의 메시지 키가 동일한 경우에는 동일한 파티션으로 전송된다. 하지만 파티션이 추가되면 기존의 파티션과 메시지 키의 일관성이 보장되지 않는다. 즉, 이전에 메시지 키가 파티션 0번에 들어갔다면 파티션을 늘린 뒤에는 파티션 0번으로 간다는 보장이 없다.

 

3. kafka-console-consumer.sh

토픽으로 전송된 데이터는 kafka-console-consumer.sh 명령어로 확인할 수 있다.

property 옵션이 없는 경우

$ kafka-console-consumer.sh --bootstrap-server ip-kafka:9092 \
    --topic example.kafka.1 \
    --from-beginning
1
2
no1
no2

from-beginning 옵션을 통해 토픽에 저장된 데이터를 처음부터 출력한다.

property 옵션이 있는 경우

$ kafka-console-consumer.sh --bootstrap-server ip-kafka:9092 \
    --topic example.kafka.1 \
    --property print.key=true \
    --property key.separator="-" \
    --group example-group \
    --from-beginning
null-2
null-1
key1-no1
key2-no2

property print.key=true : 메시지 키를 확인할 수 있다.

property key.separator="-" : 메시지 키와 값을 구분하기 위해 구분자 설정 (default ‘\t’)

group example-group : 컨슈머 그룹은 1개 이상의 컨슈머로 구성된다. 컨슈머 그룹을 통해 가져간 토픽의 메시지는 커밋(commit)을 한다. 커밋이란 컨슈머가 특정 레코드까지 처리를 완료했다고 레코드의 오프셋 번호를 카프카 브로커에 저장하는 것이다. 커밋 정보는 __consumer_offsets 토픽에 저장된다.

위의 결과를 보면 kafka-console-producer.sh 명령어를 통해 전송했던 데이터의 순서가 현재 출력되는 순서와 다르다. 그 이유는 kafka-console-consumer.sh 명령어를 통해 토픽의 데이터를 가져갈 때 토픽의 모든 파티션으로부터 동일한 중요도로 데이터를 가져가기 때문에 프로듀서가 토픽에 넣은 데이터의 순서와 컨슈머가 토픽에서 가져간 데이터의 순서가 달라지게 된다. 하지만 각 파티션의 데이터 순서는 보장된다.

 

4. kafka-consumer-groups.sh

컨슈머 그룹은 따로 명령어로 생성하지 않고 컨슈머가 동작할 때 컨슈머 그룹 이름을 지정하면 새로 생성된다. 생성된 컨슈머 그룹의 리스트는 kafka-consumer-groups.sh 명령어로 확인할 수 있다.

컨슈머 그룹 리스트 조회

$ kafka-consumer-groups.sh --bootstrap-server ip-kafka:9092 --list
example-group

컨슈머 그룹 리스트 상세 조회

$ kafka-consumer-groups.sh --bootstrap-server ip-kafka:9092 \
    --group example-group \
    --describe

PARTITION: 마지막으로 커밋된 파티션의 번호이다.

CURRENT-OFFSET: 컨슈머 그룹에 의해 커밋된 이 토픽 파티션의 마지막 오프셋. 즉, 해당 필드가 파티션 내부의 컨슈머의 위치이다.

LOG-END-OFFSET: 해당 토픽 파티션에 저장된 데이터의 끝을 나타내며(브로커가 관리함), 파티션에 쓰고 클러스터에 커밋된 마지막 메시지의 오프셋이다.

LAG: 랙(lag)은 컨슈머 그룹이 토픽의 파티션에 있는 데이터를 가져가는 데 얼마나 지연이 발생하는지 나타내는 지표다. (LAG = LOG-END-OFFSET - CURRENT-OFFSET)

CONSUMER-ID: 컨슈머의 토픽 할당을 카프카 내부적으로 구분하기 위해 사용하는 id이다. 이 값은 자동 할당되어 유니크한 값으로 설정된다.

HOST: 컨슈머가 동작하는 host명을 출력한다. 컨슈머의 호스트명 또는 ip를 알 수 있다.

CLIENT-ID: 컨슈머에 할당된 id로 사용자가 지정하지 않으면 자동 생성된다.

 

5. kafka-verifiable-producer, consumer.sh

kafka-verifiable로 시작하는 2개의 스크립트를 사용하면 String 타입 메시지 값을 코드 없이 주고받을 수 있다. 카프카 클러스터를 설치하고 토픽에 데이터를 전송하여 간단한 네트워크 통신 테스트를 할 때 유용하다.

kafka-verifiable-producer.sh

$ kafka-verifiable-producer.sh --bootstrap-server ip-kafka:9092 \
    --max-messages 10 \
    --topic example.kafka.1 

kafka-verifiable-producer.sh로 보내는 데이터 개수를 지정한다. -1은 kafka-verifiable-producer.sh가 종료될 때까지 계속 데이터를 보낸다.

최초 실행 시점이 startup_complete와 함께 출력된다.

메시지별로 보낸 시간과 메시지 키/값, 토픽, 저장된 파티션, 저장된 오프셋 번호가 출력된다.

10개의 데이터가 모두 전송된 후 통계값이 출력된다. 평균 처리량을 알 수 있다.

kafka-verifiable-consumer.sh

$ kafka-verifiable-consumer.sh --bootstrap-server ip-kafka:9092 \
    --topic example.kafka.1 \
    --group-id test-group 

컨슈머가 실행된면 startup_complete 문자열과 시작 시간이 timestamp와 함께 출력된다. 여기서는 0번의 파티션에서 데이터를 가져오는 것을 확인할 수 있다.

컨슈머는 한 번에 다수의 메시지를 가져와서 처리하므로 한 번에 10개의 메시지를 받는다.

 

6. kafka-delete-records.sh

이미 적재된 토픽의 데이터를 지우는 방법으로 토픽의 특정 레코드 하나만 삭제할 수 는 없지만 파티션에 존재하는 가장 오래된 데이터(가장 낮은 숫자의 오프셋)부터 특정 시점의 오프셋까지 삭제할 수 있다.

$ nano delete-topic.json
{"partitions": [{"topic": "test", "partition": 0, "offset": 10}], "version": 1}
$ kafka-delete-records.sh --bootstrap-server ip-kafka:9092 \
    --offset-json-file delete-topic.json
Executing records delete operation
Records delete operation completed:
partition: test-0 low_watermark: 10 

삭제하고자 하는 데이터에 대한 정보(토픽, 파티션, 오프셋)를 파일로 저장해서 사용한다.

삭제하고자 하는 데이터가 있는 delete-topic.json을 —offset-json-file 옵션값으로 입력하여 레코드를 삭제한다.

 


References


🏋🏻 개인적으로 공부한 내용을 기록하고 있습니다.
잘못된 부분이 있다면 과감하게 지적해주세요!! 🏋

댓글