강의
Kafka 프로그래밍 언어
- Java
- Apache Kafka Java Client
- Spring Kafka
- Python
- Confluent Kafka Python
- Kafka-Python
pip install kafka-python
- .NET
- Confluent Kafka .NET
- Go
- Sarama
- Node.js
- node-rdkafka
- kafka-node
Kafka 기본 프로그래밍
Producer 생성
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
for j in range(999):
print("Iteration", j)
data = {'counter': j}
producer.send('topic_test', value=data)
sleep(0.5)
Consumer 객체 생성
from json import loads
from time import sleep
consumer = KafkaConsumer(
'topic_test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group-id',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
event_data = event.value
# Do whatever you want
print(event_data)
sleep(2)
Topic 생성
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic = NewTopic(
name=name,
num_partitions=partitions,
replication_factor=replica)
client.create_topics([topic])
Kafka Client Tool
Producer와 Consumer를 각각 터미널에서 실행하면 Side-by-side로 메세지가 발생하는 것을 확인할 수 있습니다.
docker exec -it Broker_Container_ID sh
kafka-topics
kafka-topics --bootstrap-server kafka1:9092 --list
kafka-topics --bootstrap-server kafka1:9092 --delete --topic topic_test
kafka-configs
kafka-console-consumer
kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console --from-beginning
kafka-console-producer
kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console
Hellow World
Bye
Topic 파라미터
KafkaProducer
- bootstrap_servers : 브로커 리스트
- client_id : Producer 이름
- key_serializer, value_serializer : serialize 방법 지정
- enable_idempotence : 중복 메세지 전송 결정
- acks: 0, 1, 'all'
- retries : 재시도 횟수
- delivery.timeout.ms : 메세지 전송 최대 시간
- linger_ms, batch_size : 배치 전송
KafkaConsumer
- bootstrap_servers : 브로커 리스트
- client_id : Consumer 이름
- group_id : Consumer Group 이름
- key_deserializer, value_deserializer : serialize 방법 지정
- auto_offset_reset
- enable_auto_commit
Message Processing Guarantee 방식
- Exactly Once
- At Least Once
- At Most Once
'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글
Kafka ksqlDB (0) | 2024.01.25 |
---|---|
Kafka 프로그래밍 실습 (0) | 2024.01.25 |
Kafka 설치 (0) | 2024.01.24 |
Kafka 기타기능 (0) | 2024.01.24 |
Serialization & Deserialization (0) | 2024.01.24 |