강의
Spark Streaming
"Spark Streaming"는 실시간 데이터 스트림 처리를 위한 Spark API 입니다.
- 데이터 소스 : Kafk, Kinesis, Flume, TCP 소켓
- 고급 함수 사용 : Join, Map, Reduce, Window
- 웹 UI : localhost:4040
동작 방식
- 마이크로 배치로 데이터 처리
- 루프
- 새로운 데이터를 이전 데이터에 Merge 혹은 Replace
- 데이터 위치 관리
- 실패시 Fault Tolerance 및 데이터 재처리 관리
- 최종 결과 스트림 일괄 생성 : Spark Engine
- DStream
- RDD 기반
- Spark SQL 엔진의 최적화 기능 사용 불가
- 이벤트 발생 시간 기반 처리 불가
- Structured Streaming
- DataFrame 기반
- Catalyst 기반 최적화 혜택
- 이벤트 발생 시간 기반 처리 가능
- DStream
구성 요소
- Source
- 스트리밍 데이터를 수집하는 외부 시스템
- ReadStream 사용
lines_df = spark.readStream\
.format("socket")\
.option("host", "localhost")\
.option("port", "9999")\
.load()
- Sink
- 수집한 데이터를 처리하여 출력하는 외부 시스템
- OutputMode : Append, Update, Complete
word_count_query = counts_df.writeStream\
.format("console")\
.outputMode("complete")\
.option("checkpointLocation", "chk-point-dir")\
.start()
실습
Streaming WordCount 예제
- 터미널 1 : Netcat을 데이터 Producer로 사용
- 터미널 2 : Spark Structured Streaming으로 생성한 Consumer 실행
# wordcount_streaming.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Streaming Word Count") \
.master("local[3]") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
# READ
lines_df = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", "9999") \
.load()
# TRANSFORM
words_df = lines_df.select(expr("explode(split(value,' ')) as word"))
counts_df = words_df.groupBy("word").count()
# SINK
word_count_query = counts_df.writeStream \
.format("console") \
.outputMode("complete") \
.option("checkpointLocation", "chk-point-dir") \
.start()
print("Listening to localhost:9999")
word_count_query.awaitTermination()
# 터미널 1
nc -lk 9999
# 터미널 2
spark-submit wordcount_streaming.py
'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글
Kafka & Spark Streaming 연동 실습 (0) | 2024.01.29 |
---|---|
Kafka ksqlDB (0) | 2024.01.25 |
Kafka 프로그래밍 실습 (0) | 2024.01.25 |
Kafka 프로그래밍 (0) | 2024.01.24 |
Kafka 설치 (0) | 2024.01.24 |