데브코스 TIL/빅데이터, 스트리밍 데이터 처리

Kafka 프로그래밍 실습

예니ㅣ 2024. 1. 25. 13:01

실습

랜덤하게 사람 정보 생성 및 저장하는 KafkaProducer 구현

  • Faker 모듈 사용
  • pydantic 모듈의 BaseModel 사용
  • Topic 생성 후 추가
pip install faker
pip install pydantic
# person.py

from pydantic import BaseModel

class Person(BaseModel):
    id: str
    name: str
    title: str
# fake_person_producer.py

import uuid
import json
from typing import List
from person import Person

from faker import Faker
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from kafka import KafkaAdminClient
from kafka.producer import KafkaProducer


def create_topic(bootstrap_servers, name, partitions, replica=1):
    client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    try:
        topic = NewTopic(
            name=name,
            num_partitions=partitions,
            replication_factor=replica)
        client.create_topics([topic])
    except TopicAlreadyExistsError as e:
        print(e)
        pass
    finally:
        client.close()


def main():
    topic_name = "fake_people"
    bootstrap_servers = ["localhost:9092"]

    # create a topic first
    create_topic(bootstrap_servers, topic_name, 4)

    # ingest some random people events
    people: List[Person] = []
    faker = Faker()
    producer = KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        client_id="Fake_Person_Producer",
        
    )

    for _ in range(100):
        person = Person(id=str(uuid.uuid4()), name=faker.name(), title=faker.job().title())
        people.append(person)
        producer.send(
            topic=topic_name,
            key=person.title.lower().replace(r's+', '-').encode('utf-8'),
            value=person.json().encode('utf-8'))

    producer.flush()

if __name__ == '__main__':
    main()

 


랜덤하게 저장된 사람 정보 확인하는 KafkaConsumer 구현

Offset Auto Commit : True

import json

from kafka.consumer import KafkaConsumer


def key_deserializer(key):
    return key.decode('utf-8')


def value_deserializer(value):
    return json.loads(value.decode('utf-8'))


def main():
    topic_name = "fake_people"
    bootstrap_servers = ["localhost:9092"]
    consumer_group_id = "fake_people_group"

    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        group_id=consumer_group_id,
        key_deserializer=key_deserializer,
        value_deserializer=value_deserializer,
        auto_offset_reset='earliest',
        enable_auto_commit=True)

    consumer.subscribe([topic_name])
    for record in consumer:
        print(f"""
            Consumed person {record.value} with key '{record.key}'
            from partition {record.partition} at offset {record.offset}
        """)


if __name__ == '__main__':
    main()

 

Offset Auto Commit : False

import json

from kafka import TopicPartition, OffsetAndMetadata
from kafka.consumer import KafkaConsumer


def key_deserializer(key):
    return key.decode('utf-8')


def value_deserializer(value):
    return json.loads(value.decode('utf-8'))


def main():
    topic_name = "fake_people"
    bootstrap_servers = ["localhost:9092"]
    consumer_group_id = "manual_fake_people_group"

    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        group_id=consumer_group_id,
        key_deserializer=key_deserializer,
        value_deserializer=value_deserializer,
        auto_offset_reset='earliest',
        enable_auto_commit=False)

    consumer.subscribe([topic_name])
    for record in consumer:
        print(f"""
            Consumed person {record.value} with key '{record.key}'
            from partition {record.partition} at offset {record.offset}
        """)

        topic_partition = TopicPartition(record.topic, record.partition)
        offset = OffsetAndMetadata(record.offset + 1, record.timestamp)
        consumer.commit({
            topic_partition: offset
        })

if __name__ == '__main__':
    main()

'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글

Spark Streaming 소개 및 실습  (0) 2024.01.29
Kafka ksqlDB  (0) 2024.01.25
Kafka 프로그래밍  (0) 2024.01.24
Kafka 설치  (0) 2024.01.24
Kafka 기타기능  (0) 2024.01.24