데브코스 TIL/빅데이터, 스트리밍 데이터 처리

Spark Streaming 소개 및 실습

예니ㅣ 2024. 1. 29. 13:02

강의

Spark Streaming

"Spark Streaming"는 실시간 데이터 스트림 처리를 위한 Spark API 입니다.

  • 데이터 소스 : Kafk, Kinesis, Flume, TCP 소켓
  • 고급 함수 사용 : Join, Map, Reduce, Window
  • 웹 UI : localhost:4040

 

동작 방식

  1. 마이크로 배치로 데이터 처리
  2. 루프
  3. 새로운 데이터를 이전 데이터에 Merge 혹은 Replace
  4. 데이터 위치 관리
  5. 실패시 Fault Tolerance 및 데이터 재처리 관리
  6. 최종 결과 스트림 일괄 생성 : Spark Engine
    • DStream
      • RDD 기반
      • Spark SQL 엔진의 최적화 기능 사용 불가
      • 이벤트 발생 시간 기반 처리 불가
    • Structured Streaming
      • DataFrame 기반
      • Catalyst 기반 최적화 혜택 
      • 이벤트 발생 시간 기반 처리 가능

내부 동작

 

구성 요소

  • Source
    • 스트리밍 데이터를 수집하는 외부 시스템
    • ReadStream 사용
lines_df = spark.readStream\
    .format("socket")\
    .option("host", "localhost")\
    .option("port", "9999")\
    .load()
  • Sink
    • 수집한 데이터를 처리하여 출력하는 외부 시스템
    • OutputMode : Append, Update, Complete
word_count_query = counts_df.writeStream\
    .format("console")\
    .outputMode("complete")\
    .option("checkpointLocation", "chk-point-dir")\
    .start()

 


실습

Streaming WordCount 예제

  • 터미널 1 : Netcat을 데이터 Producer로 사용
  • 터미널 2 : Spark Structured Streaming으로 생성한 Consumer 실행
# wordcount_streaming.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import *


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Streaming Word Count") \
        .master("local[3]") \
        .config("spark.streaming.stopGracefullyOnShutdown", "true") \
        .config("spark.sql.shuffle.partitions", 3) \
        .getOrCreate()

    # READ
    lines_df = spark.readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", "9999") \
        .load()

    # TRANSFORM
    words_df = lines_df.select(expr("explode(split(value,' ')) as word"))
    counts_df = words_df.groupBy("word").count()

    # SINK
    word_count_query = counts_df.writeStream \
        .format("console") \
        .outputMode("complete") \
        .option("checkpointLocation", "chk-point-dir") \
        .start()

    print("Listening to localhost:9999")
    word_count_query.awaitTermination()
# 터미널 1

nc -lk 9999
# 터미널 2

spark-submit wordcount_streaming.py

 

'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글

Kafka & Spark Streaming 연동 실습  (0) 2024.01.29
Kafka ksqlDB  (0) 2024.01.25
Kafka 프로그래밍 실습  (0) 2024.01.25
Kafka 프로그래밍  (0) 2024.01.24
Kafka 설치  (0) 2024.01.24