본문 바로가기
Kafka

[Kafka] Kafka Stream 에제 3 - bank-balance-exactly-once

'Stéphane Maarek - Kafka Streams'를 보고 작성한 글입니다. 😀

1. 에제 설명

  • 은행 입출금 예제
    • Producer
      • exactly once 를 위해 idempotent producer 활용
      • Ex : { "Name":"John", "amount":100, "time":"2021-04-014T12:00:00"}
    • Stream
      • exactly once processing 활용
      • 사용자의 거래를 처리하고 총 금액과 최신 업데이트 시간을 계산

2. build.gradle 설정

plugins {
    id 'java'
}

group 'hardenkim.github.io'
version '1.0'

repositories {
    mavenCentral()
}

dependencies {
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.7.0'
    compile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.7.0'
    compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
}

test {
    useJUnitPlatform()
}

3-1. producer 코드 및 실행

public class BankTransactionsProducer {
    static final String inputTopic = "bank-transactions";
    static final String bootstrapServers = "localhost:9092";

    public static void main(String[] args) {
        Properties properties = new Properties();

        // kafka bootstrap server
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // producer acks
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); // strongest producing guarantee
        properties.put(ProducerConfig.RETRIES_CONFIG, "3");
        properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        // idempotent producer 활용 -> exactly once
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 중복 방지

        Producer<String, String> producer = new KafkaProducer<>(properties);

        // 임시 거래 데이터 삽입
        int i = 0;
        while(true){
            System.out.println("Producing batch: " + i);
            try {
                producer.send(newRandomTransaction("naver"));
                Thread.sleep(100);
                producer.send(newRandomTransaction("kakao"));
                Thread.sleep(100);
                producer.send(newRandomTransaction("line"));
                Thread.sleep(100);
                i += 1;
            } catch (InterruptedException e){
                break;
            }
        }
        producer.close();
    }

    public static ProducerRecord<String, String> newRandomTransaction(String name){
        // empty json {} 생성
        ObjectNode transaction = JsonNodeFactory.instance.objectNode();
        // amount : 0~100 (랜덤 값)
        Integer amount = ThreadLocalRandom.current().nextInt(0,100);
        // Instant.now() : 현재시간
        Instant now = Instant.now();

        // json 에 넣기
        transaction.put("name", name);
        transaction.put("amount", amount);
        transaction.put("time", now.toString());
        return new ProducerRecord<>(inputTopic, name, transaction.toString());
    }
}

3-2. Stream 코드 및 실행

public class BankBalanceExactlyOnceApp {
    static final String inputTopic = "bank-transactions";
    static final String outputTopic = "bank-balance-exactly-once";
    static final String bootstrapServers = "localhost:9092";

    public Topology createTopology(){
        // json Serde
        final Serializer<JsonNode> jsonNodeSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonNodeDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonNodeSerde = Serdes.serdeFrom(jsonNodeSerializer, jsonNodeDeserializer);

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, JsonNode> bankTransactions =
                builder.stream(inputTopic, Consumed.with(Serdes.String(), jsonNodeSerde));

        // balances(잔액)에 대한 초기 json {} 생성
        ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
        initialBalance.put("count", 0);
        initialBalance.put("balance", 0);
        initialBalance.put("time", Instant.ofEpochMilli(0L).toString());

        KTable<String, JsonNode> bankBalance = bankTransactions
                .groupByKey(Grouped.with(Serdes.String(), jsonNodeSerde))
                .aggregate(
                        () -> initialBalance,
                        (key, transaction, balance) -> newBalance(transaction, balance),
                        Materialized.<String, JsonNode, KeyValueStore<Bytes, byte[]>>as("bank-balance-agg")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(jsonNodeSerde)
                );
        bankBalance.toStream().to(outputTopic, Produced.with(Serdes.String(), jsonNodeSerde));

        return builder.build();
    }    

    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank-balance-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 예제이므로 캐쉬 비활성화
        config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
        // exactly once processing
        config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

        BankBalanceExactlyOnceApp bankBalanceExactlyOnceApp = new BankBalanceExactlyOnceApp();

        KafkaStreams streams = new KafkaStreams(bankBalanceExactlyOnceApp.createTopology(), config);
        streams.cleanUp(); // 데모에서만 사용
        streams.start();

        // 토폴로지 출력
        streams.localThreadsMetadata().forEach(data -> System.out.println(data));

        // shutdown hook -> application 옳바르게 종료
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

    private static JsonNode newBalance(JsonNode transaction, JsonNode balance){
        // balances(잔액)에 대한 json {} 생성
        ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
        newBalance.put("count", balance.get("count").asInt() + 1);
        newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());

        Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
        Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();
        Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
        newBalance.put("time", newBalanceInstant.toString());
        return newBalance;
    }
}

4. Kafka 토픽 생성

# 순서를 위해 파티션 1개로 생성
# input topic 
kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic bank-transactions

# output log compacted topic
kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic bank-balance-exactly-once \
    --config cleanup.policy=compact

5. consumer 실행

# consumer - bank-balance-exactly-once
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic bank-balance-exactly-once \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

 

# consumer - bank-transactions
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic bank-transactions

 

 


References


🏋🏻 개인적으로 공부한 내용을 기록하고 있습니다.
잘못된 부분이 있다면 과감하게 지적해주세요!! 🏋

댓글