데브코스 TIL/빅데이터, 스트리밍 데이터 처리

Kafka & Spark Streaming 연동 실습

예니ㅣ 2024. 1. 29. 13:10

실습

fake_people 토픽의 title top 10 계산하기

  • spark.jars.packages 설정
    • spark-defaults.conf 파일 수정
    • SparkSession 생성시 config 지정
    • spark-submit 실행시 --packages 옵션 사용
# kafka_source_streaming.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("File Streaming Demo") \
        .master("local[3]") \
        .config("spark.streaming.stopGracefullyOnShutdown", "true") \
        .getOrCreate()

    schema = StructType([
        StructField("id", StringType()),
        StructField("name", StringType()),
        StructField("title", StringType())
    ])

    kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "fake_people") \
        .option("startingOffsets", "earliest") \
        .load()
    kafka_df.printSchema()
    """
    |-- key: binary (nullable = true)
    |-- value: binary (nullable = true)
    |-- topic: string (nullable = true)
    |-- partition: integer (nullable = true)
    |-- offset: long (nullable = true)
    |-- timestamp: timestamp (nullable = true)
    |-- timestampType: integer (nullable = true)
    """
    value_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("value"))
    value_df.createOrReplaceTempView("fake_people")
    value_df.printSchema()
    count_df = spark.sql("SELECT value.title, COUNT(1) count FROM fake_people GROUP BY 1 ORDER BY 2 DESC LIMIT 10")

    count_writer_query = count_df.writeStream \
        .format("console") \
        .outputMode("complete") \
        .option("checkpointLocation", "chk-point-dir-json") \
        .start()

    print("Listening to Kafka")
    count_writer_query.awaitTermination()
spark-shell
    spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 kafka_source_streaming.py

'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글

Spark Streaming 소개 및 실습  (0) 2024.01.29
Kafka ksqlDB  (0) 2024.01.25
Kafka 프로그래밍 실습  (0) 2024.01.25
Kafka 프로그래밍  (0) 2024.01.24
Kafka 설치  (0) 2024.01.24