본문 바로가기
Kafka

[Kafka] Kafka Stream 에제 4 - user-event-Enricher (join)

'Stéphane Maarek - Kafka Streams'를 보고 작성한 글입니다. 😀

1. 에제 설명

  • inner join 과 left join 활용

2. build.gradle 설정

plugins {
    id 'java'
}

group 'hardenkim.github.io'
version '1.0'

repositories {
    mavenCentral()
}

dependencies {
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
    compile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.7.0'
    compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
}

test {
    useJUnitPlatform()
}

3-1. producer 코드 및 실행

public class UserDataProducer {
    static final String userPurchaseTopic = "user-purchases";
    static final String userTableTopic = "user-table";
    static final String bootstrapServers = "localhost:9092";

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties properties = new Properties();

        // kafka bootstrap server
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // producer acks
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "3");
        properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        // idempotent producer 활용 -> exactly once
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 중복 방지

        Producer<String, String> producer = new KafkaProducer<String, String>(properties);

        // 데모 -> .get()을 통해 send가 잘 되었는지 확인한다 (시나리오 순서대로 보기 위해)
        // 1 - we create a new user, then we send some data to Kafka
        System.out.println("\nExample 1 - new user\n");
        producer.send(userRecord("john", "First=John,Last=Doe,Email=john.doe@gmail.com")).get();
        producer.send(purchaseRecord("john", "Apples and Bananas (1)")).get();

        Thread.sleep(10000);

        // 2 - we receive user purchase, but it doesn't exist in Kafka
        System.out.println("\nExample 2 - non existing user\n");
        producer.send(purchaseRecord("bob", "Kafka Udemy Course (2)")).get();

        Thread.sleep(10000);

        // 3 - we update user "john", and send a new transaction
        System.out.println("\nExample 3 - update to user\n");
        producer.send(userRecord("john", "First=Johnny,Last=Doe,Email=johnny.doe@gmail.com")).get();
        producer.send(purchaseRecord("john", "Oranges (3)")).get();

        Thread.sleep(10000);

        // 4 - we send a user purchase for stephane, but it exists in Kafka later
        System.out.println("\nExample 4 - non existing user then user\n");
        producer.send(purchaseRecord("stephane", "Computer (4)")).get();
        producer.send(userRecord("stephane", "First=Stephane,Last=Maarek,GitHub=simplesteph")).get();
        producer.send(purchaseRecord("stephane", "Books (4)")).get();
        producer.send(userRecord("stephane", null)).get(); // delete for cleanup

        Thread.sleep(10000);

        // 5 - we create a user, but it gets deleted before any purchase comes through
        System.out.println("\nExample 5 - user then delete then data\n");
        producer.send(userRecord("alice", "First=Alice")).get();
        producer.send(userRecord("alice", null)).get(); // that's the delete record
        producer.send(purchaseRecord("alice", "Apache Kafka Series (5)")).get();

        Thread.sleep(10000);

        System.out.println("End of demo");
        producer.close();
    }

    private static ProducerRecord<String, String> userRecord(String key, String value){
        return new ProducerRecord<>(userTableTopic, key, value);
    }

    private static ProducerRecord<String, String> purchaseRecord(String key, String value){
        return new ProducerRecord<>(userPurchaseTopic, key, value);
    }
}

3-2. Stream 코드 및 실행

public class UserEventEnricherApp {
    static final String userPurchaseTopic = "user-purchases";
    static final String userTableTopic = "user-table";
    static final String leftJoinTopic = "user-purchases-enriched-left-join";
    static final String innerJoinTopic = "user-purchases-enriched-inner-join";
    static final String bootstrapServers = "localhost:9092";

    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-event-enricher-app");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        // GlobalKTable 은 Kafka Streams Application 에 복제(replicated)된다
        GlobalKTable<String, String> userGlobalTable = builder.globalTable(userTableTopic);

        // userPurchase 스트림
        KStream<String, String> userPurchases = builder.stream(userPurchaseTopic);

        // inner join
        KStream<String, String> userPurchasesEnrichedJoin =
                userPurchases.join(userGlobalTable,
                        (key, value) -> key,
                        (userPurchase, userInfo) -> "Purchase=" + userPurchase + ",UserInfo=[" + userInfo + "]"
                );
        userPurchasesEnrichedJoin.to(innerJoinTopic);

        // left join
        KStream<String, String> userPurchasesEnrichedLeftJoin =
                userPurchases.leftJoin(userGlobalTable,
                        (key, value) -> key,
                        (userPurchase, userInfo) -> {
                            // left join 에서는 userInfo 가 null 일 수 있다
                            if (userInfo != null) {
                                return "Purchase=" + userPurchase + ",UserInfo=[" + userInfo + "]";
                            } else {
                                return "Purchase=" + userPurchase + ",UserInfo=null";
                            }
                        }
                );
        userPurchasesEnrichedLeftJoin.to(leftJoinTopic);

        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.cleanUp(); // 데모에서만 사용
        streams.start();

        // 토폴로지 출력
        streams.localThreadsMetadata().forEach(data -> System.out.println(data));

        // shutdown hook -> application 옳바르게 종료
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

4. Kafka 토픽 생성

# create input topic for user purchases
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic user-purchases

# create table of user information - log compacted for optimisation
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic user-table --config cleanup.policy=compact

# create out topic for user purchases enriched with user data (left join)
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic user-purchases-enriched-left-join

# create out topic for user purchases enriched with user data (inner join)
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic user-purchases-enriched-inner-join

5. consumer 실행

# start a consumer on the output topic (left join)
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic user-purchases-enriched-left-join \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# start a consumer on the output topic (inner join)
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic user-purchases-enriched-inner-join \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

 


References


🏋🏻 개인적으로 공부한 내용을 기록하고 있습니다.
잘못된 부분이 있다면 과감하게 지적해주세요!! 🏋

댓글