실습
랜덤하게 사람 정보 생성 및 저장하는 KafkaProducer 구현
- Faker 모듈 사용
- pydantic 모듈의 BaseModel 사용
- Topic 생성 후 추가
pip install faker
pip install pydantic
# person.py
from pydantic import BaseModel
class Person(BaseModel):
id: str
name: str
title: str
# fake_person_producer.py
import uuid
import json
from typing import List
from person import Person
from faker import Faker
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from kafka import KafkaAdminClient
from kafka.producer import KafkaProducer
def create_topic(bootstrap_servers, name, partitions, replica=1):
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
try:
topic = NewTopic(
name=name,
num_partitions=partitions,
replication_factor=replica)
client.create_topics([topic])
except TopicAlreadyExistsError as e:
print(e)
pass
finally:
client.close()
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
# create a topic first
create_topic(bootstrap_servers, topic_name, 4)
# ingest some random people events
people: List[Person] = []
faker = Faker()
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
client_id="Fake_Person_Producer",
)
for _ in range(100):
person = Person(id=str(uuid.uuid4()), name=faker.name(), title=faker.job().title())
people.append(person)
producer.send(
topic=topic_name,
key=person.title.lower().replace(r's+', '-').encode('utf-8'),
value=person.json().encode('utf-8'))
producer.flush()
if __name__ == '__main__':
main()
랜덤하게 저장된 사람 정보 확인하는 KafkaConsumer 구현
Offset Auto Commit : True
import json
from kafka.consumer import KafkaConsumer
def key_deserializer(key):
return key.decode('utf-8')
def value_deserializer(value):
return json.loads(value.decode('utf-8'))
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
consumer_group_id = "fake_people_group"
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=consumer_group_id,
key_deserializer=key_deserializer,
value_deserializer=value_deserializer,
auto_offset_reset='earliest',
enable_auto_commit=True)
consumer.subscribe([topic_name])
for record in consumer:
print(f"""
Consumed person {record.value} with key '{record.key}'
from partition {record.partition} at offset {record.offset}
""")
if __name__ == '__main__':
main()
Offset Auto Commit : False
import json
from kafka import TopicPartition, OffsetAndMetadata
from kafka.consumer import KafkaConsumer
def key_deserializer(key):
return key.decode('utf-8')
def value_deserializer(value):
return json.loads(value.decode('utf-8'))
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
consumer_group_id = "manual_fake_people_group"
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=consumer_group_id,
key_deserializer=key_deserializer,
value_deserializer=value_deserializer,
auto_offset_reset='earliest',
enable_auto_commit=False)
consumer.subscribe([topic_name])
for record in consumer:
print(f"""
Consumed person {record.value} with key '{record.key}'
from partition {record.partition} at offset {record.offset}
""")
topic_partition = TopicPartition(record.topic, record.partition)
offset = OffsetAndMetadata(record.offset + 1, record.timestamp)
consumer.commit({
topic_partition: offset
})
if __name__ == '__main__':
main()
'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글
Spark Streaming 소개 및 실습 (0) | 2024.01.29 |
---|---|
Kafka ksqlDB (0) | 2024.01.25 |
Kafka 프로그래밍 (0) | 2024.01.24 |
Kafka 설치 (0) | 2024.01.24 |
Kafka 기타기능 (0) | 2024.01.24 |