데브코스 TIL/빅데이터, 스트리밍 데이터 처리

Kafka 프로그래밍

예니ㅣ 2024. 1. 24. 13:31

강의

Kafka 프로그래밍 언어

  • Java
    • Apache Kafka Java Client
    • Spring Kafka
  • Python
    • Confluent Kafka Python
    • Kafka-Python
pip install kafka-python
  • .NET
    • Confluent Kafka .NET
  • Go
    • Sarama
  • Node.js
    • node-rdkafka
    • kafka-node

 


Kafka 기본 프로그래밍

Producer 생성

from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(
   bootstrap_servers=['localhost:9092'],
   value_serializer=lambda x: dumps(x).encode('utf-8')
)

for j in range(999):
   print("Iteration", j)
   data = {'counter': j}
   producer.send('topic_test', value=data)
   sleep(0.5)

 

Consumer 객체 생성

from json import loads
from time import sleep

consumer = KafkaConsumer(
   'topic_test',
   bootstrap_servers=['localhost:9092'],
   auto_offset_reset='earliest',
   enable_auto_commit=True,
   group_id='my-group-id',
   value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
   event_data = event.value
   # Do whatever you want
   print(event_data)
   sleep(2)

 

Topic 생성

client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic = NewTopic(
	name=name,
	num_partitions=partitions,
	replication_factor=replica)
client.create_topics([topic])

 


Kafka Client Tool

Producer와 Consumer를 각각 터미널에서 실행하면 Side-by-side로 메세지가 발생하는 것을 확인할 수 있습니다.

docker exec -it Broker_Container_ID sh

kafka-topics
kafka-topics --bootstrap-server kafka1:9092 --list
kafka-topics --bootstrap-server kafka1:9092 --delete --topic topic_test

kafka-configs

kafka-console-consumer
kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console --from-beginning

kafka-console-producer
kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console
	Hellow World
    Bye

 


Topic 파라미터

KafkaProducer

  • bootstrap_servers : 브로커 리스트
  • client_id : Producer 이름
  • key_serializer, value_serializer : serialize 방법 지정
  • enable_idempotence : 중복 메세지 전송 결정
  • acks: 0, 1, 'all' 
  • retries : 재시도 횟수
  • delivery.timeout.ms : 메세지 전송 최대 시간
  • linger_ms, batch_size : 배치 전송

 

KafkaConsumer

  • bootstrap_servers : 브로커 리스트
  • client_id : Consumer 이름
  • group_id : Consumer Group 이름
  • key_deserializer, value_deserializer : serialize 방법 지정
  • auto_offset_reset
  • enable_auto_commit

 


Message Processing Guarantee 방식

  • Exactly Once
  • At Least Once
  • At Most Once

'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글

Kafka ksqlDB  (0) 2024.01.25
Kafka 프로그래밍 실습  (0) 2024.01.25
Kafka 설치  (0) 2024.01.24
Kafka 기타기능  (0) 2024.01.24
Serialization & Deserialization  (0) 2024.01.24