강의
데이터 시스템 구조
데이터 병렬처리
- 데이터 분산 필요 : Partition (128MB)
- 각각 따로 및 동시 처리 필요 : 파티션 단위로 메모리 로드 및 Executor 배정
데이터 처리 흐름
Spark에서 "데이터프레임"은 다수의 작은 파티션으로 구성되어 있으며 수정이 불가(Immutable)합니다.
"셔플링"은 파티션 간에 데이터 이동이 필요한 경우에 발생합니다.
- 명시적 파티션을 새롭게 하는 경우 : 파티션 수 변경
- 시스템에 의해 이뤄지는 셔플링 : aggregation, sorting
셔플링이 발생할 때 네트워크를 타고 데이터가 이동합니다.
- 결과 파티션 수
- spark.sql.shuffle.partitions이 결정
- 오퍼레이션에 따라 결정 : random, hashing partitions, range partition
Data partitioning은 Data Skewness를 발생시키기도 합니다.
데이터 자료 구조
RDD (Resilient Distrubuted Dataset)
- 소개 시기 : 1.0
- 로우레벨 데이터
- 로우레벨 함수형 변환 지원 : map, filter, flatMap
- 클러스터 내의 서버에 변경 불가능하고 분산 저장된 데이터
- 레코드별로 존재하지만 스키마 존재하지 않음
- 구조화된 데이터 및 비구조화된 데이터 모두 지원
- 일반 파이썬 데이터 사용 가능 : parellelize 및 collect
DataFrame
- 소개 시기 : 1.3
- 변경 불가능하고 분산 저장된 데이터
- API 용이
- SparkSQL 기반
- Catayst Optimizer
- 필드 정보 존재 : 관계형 데이터베이스 테이블과 유사
- 다양한 데이터소스 지원 : HDFS, Hive, 외부 데이터베이스, RDD
- 다양한 언어 지원 : 스칼라, JAVA, Python
Dataset
- 소개 시기 : 1.6
- 컴파일타임 타입 체크 존재
- API 용이
- SparkSQL 기반
- Catayst Optimizer
- 필드 정보 존재 (테이블)
- 타입 정보 존재 및 컴파일 언어에서 사용 가능
Spark Session
"Spark Session"은 한 프로그램당 하나(Singleton)를 생성하여 Spark Cluster와 통신하는 Entry Point 입니다.
config 메소드를 통해 다양한 환경설정이 가능합니다.
- DataFrame
- SQL
- Streaming
- ML API
# Spark Session 생성
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark Tutorial')\
.getOrCreate()
...
spark.stop()
환경 변수
- spark.executor.memory : executor별 메모리 (기본값 1g)
- spark.executor.cores : executor별 CPU 수 (YARN에서 기본값 1)
- spark.driver.memory : driver 메모리 (기본값 1g)
- spark.shuffle.partitions : Shuffle 후 Partition의 수 (기본값 최대 200)
환경 설정 방법
- 환경 변수 설정
from pyspark.sql import SparkSession
spark = SparkSession.builder\
...
.config("spark.some.config.option1", "some-value")\
.config("spark.some.config.option1", "some-value")\
...
- $SPARK_HOME/conf/spark_defaults.conf
- spark-submit 명령의 커맨드라인 파라미터
- SparkSession 만들때 지정 : SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.app.name", "PySpark Tutorial")
conf.set("spark.master", "local[*]")
spark = SparkSession.builder\
.config(config=conf)\
...
데이터 소스
- DataFrame
- spark.read(DataFrameReader) : 데이터프레임으로 로드
- spark.write(DataFrameWriter) : 데이터프레임 저장
- HDFS 파일 : CSV, JSON, Parquet, ORC, Text, Avro, Hive 테이블
- JDBC 관계형 데이터베이스
- 클라우드 기반 데이터 시스템
- 스트리밍 시스템
프로그램 구조
- Spark Session 생성
- 입력 데이터 로딩
- 데이터 조작 작업 반복 : DataFrame API 혹은 Spark SQL 사용
- 최종 결과 저장
'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글
Spark 프로그래밍 실습 1/2 (0) | 2024.01.17 |
---|---|
Spark 설치 및 테스트 (0) | 2024.01.17 |
Spark 소개 (0) | 2024.01.15 |
Map Reduce 프로그래밍 (0) | 2024.01.15 |
Hadoop 소개 및 설치 (0) | 2024.01.15 |