'Stéphane Maarek - Kafka Streams'를 보고 작성한 글입니다. 😀
1. 에제 설명
- inner join 과 left join 활용
2. build.gradle 설정
plugins {
id 'java'
}
group 'hardenkim.github.io'
version '1.0'
repositories {
mavenCentral()
}
dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
compile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.7.0'
compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
}
test {
useJUnitPlatform()
}
3-1. producer 코드 및 실행
public class UserDataProducer {
static final String userPurchaseTopic = "user-purchases";
static final String userTableTopic = "user-table";
static final String bootstrapServers = "localhost:9092";
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties properties = new Properties();
// kafka bootstrap server
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// producer acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "3");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
// idempotent producer 활용 -> exactly once
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 중복 방지
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
// 데모 -> .get()을 통해 send가 잘 되었는지 확인한다 (시나리오 순서대로 보기 위해)
// 1 - we create a new user, then we send some data to Kafka
System.out.println("\nExample 1 - new user\n");
producer.send(userRecord("john", "First=John,Last=Doe,Email=john.doe@gmail.com")).get();
producer.send(purchaseRecord("john", "Apples and Bananas (1)")).get();
Thread.sleep(10000);
// 2 - we receive user purchase, but it doesn't exist in Kafka
System.out.println("\nExample 2 - non existing user\n");
producer.send(purchaseRecord("bob", "Kafka Udemy Course (2)")).get();
Thread.sleep(10000);
// 3 - we update user "john", and send a new transaction
System.out.println("\nExample 3 - update to user\n");
producer.send(userRecord("john", "First=Johnny,Last=Doe,Email=johnny.doe@gmail.com")).get();
producer.send(purchaseRecord("john", "Oranges (3)")).get();
Thread.sleep(10000);
// 4 - we send a user purchase for stephane, but it exists in Kafka later
System.out.println("\nExample 4 - non existing user then user\n");
producer.send(purchaseRecord("stephane", "Computer (4)")).get();
producer.send(userRecord("stephane", "First=Stephane,Last=Maarek,GitHub=simplesteph")).get();
producer.send(purchaseRecord("stephane", "Books (4)")).get();
producer.send(userRecord("stephane", null)).get(); // delete for cleanup
Thread.sleep(10000);
// 5 - we create a user, but it gets deleted before any purchase comes through
System.out.println("\nExample 5 - user then delete then data\n");
producer.send(userRecord("alice", "First=Alice")).get();
producer.send(userRecord("alice", null)).get(); // that's the delete record
producer.send(purchaseRecord("alice", "Apache Kafka Series (5)")).get();
Thread.sleep(10000);
System.out.println("End of demo");
producer.close();
}
private static ProducerRecord<String, String> userRecord(String key, String value){
return new ProducerRecord<>(userTableTopic, key, value);
}
private static ProducerRecord<String, String> purchaseRecord(String key, String value){
return new ProducerRecord<>(userPurchaseTopic, key, value);
}
}
3-2. Stream 코드 및 실행
public class UserEventEnricherApp {
static final String userPurchaseTopic = "user-purchases";
static final String userTableTopic = "user-table";
static final String leftJoinTopic = "user-purchases-enriched-left-join";
static final String innerJoinTopic = "user-purchases-enriched-inner-join";
static final String bootstrapServers = "localhost:9092";
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-event-enricher-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// GlobalKTable 은 Kafka Streams Application 에 복제(replicated)된다
GlobalKTable<String, String> userGlobalTable = builder.globalTable(userTableTopic);
// userPurchase 스트림
KStream<String, String> userPurchases = builder.stream(userPurchaseTopic);
// inner join
KStream<String, String> userPurchasesEnrichedJoin =
userPurchases.join(userGlobalTable,
(key, value) -> key,
(userPurchase, userInfo) -> "Purchase=" + userPurchase + ",UserInfo=[" + userInfo + "]"
);
userPurchasesEnrichedJoin.to(innerJoinTopic);
// left join
KStream<String, String> userPurchasesEnrichedLeftJoin =
userPurchases.leftJoin(userGlobalTable,
(key, value) -> key,
(userPurchase, userInfo) -> {
// left join 에서는 userInfo 가 null 일 수 있다
if (userInfo != null) {
return "Purchase=" + userPurchase + ",UserInfo=[" + userInfo + "]";
} else {
return "Purchase=" + userPurchase + ",UserInfo=null";
}
}
);
userPurchasesEnrichedLeftJoin.to(leftJoinTopic);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp(); // 데모에서만 사용
streams.start();
// 토폴로지 출력
streams.localThreadsMetadata().forEach(data -> System.out.println(data));
// shutdown hook -> application 옳바르게 종료
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
4. Kafka 토픽 생성
# create input topic for user purchases
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic user-purchases
# create table of user information - log compacted for optimisation
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic user-table --config cleanup.policy=compact
# create out topic for user purchases enriched with user data (left join)
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic user-purchases-enriched-left-join
# create out topic for user purchases enriched with user data (inner join)
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic user-purchases-enriched-inner-join
5. consumer 실행
# start a consumer on the output topic (left join)
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic user-purchases-enriched-left-join \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# start a consumer on the output topic (inner join)
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic user-purchases-enriched-inner-join \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
References
🏋🏻 개인적으로 공부한 내용을 기록하고 있습니다.
잘못된 부분이 있다면 과감하게 지적해주세요!! 🏋
'Kafka' 카테고리의 다른 글
[Kafka] Kafka Theory (기본 개념) (0) | 2021.11.23 |
---|---|
[Kafka] Kafka Stream 에제 3 - bank-balance-exactly-once (0) | 2021.11.23 |
[Kafka] Kafka Stream 에제 2 - FavoriteColor (0) | 2021.11.23 |
댓글