데브코스 TIL/빅데이터, 스트리밍 데이터 처리

Spark 데이터 처리

예니ㅣ 2024. 1. 17. 13:47

강의

데이터 시스템 구조

 


데이터 병렬처리

  • 데이터 분산 필요 : Partition (128MB)
  • 각각 따로 및 동시 처리 필요 : 파티션 단위로 메모리 로드 및 Executor 배정

 


데이터 처리 흐름

Spark에서 "데이터프레임"은 다수의 작은 파티션으로 구성되어 있으며 수정이 불가(Immutable)합니다.

 

"셔플링"은 파티션 간에 데이터 이동이 필요한 경우에 발생합니다.

  • 명시적 파티션을 새롭게 하는 경우 : 파티션 수 변경
  • 시스템에 의해 이뤄지는 셔플링 : aggregation, sorting

 

셔플링이 발생할 때 네트워크를 타고 데이터가 이동합니다.

  • 결과 파티션 수
    • spark.sql.shuffle.partitions이 결정
    • 오퍼레이션에 따라 결정 : random, hashing partitions, range partition

 

Data partitioning은 Data Skewness를 발생시키기도 합니다.

 


데이터 자료 구조

RDD (Resilient Distrubuted Dataset)

  • 소개 시기 : 1.0
  • 로우레벨 데이터
  • 로우레벨 함수형 변환 지원 : map, filter, flatMap
  • 클러스터 내의 서버에 변경 불가능하고 분산 저장된 데이터
  • 레코드별로 존재하지만 스키마 존재하지 않음
  • 구조화된 데이터 및 비구조화된 데이터 모두 지원
  • 일반 파이썬 데이터 사용 가능 : parellelize 및 collect

 

DataFrame

  • 소개 시기 : 1.3
  • 변경 불가능하고 분산 저장된 데이터
  • API 용이
  • SparkSQL 기반
  • Catayst Optimizer
  • 필드 정보 존재 : 관계형 데이터베이스 테이블과 유사
  • 다양한 데이터소스 지원 : HDFS, Hive, 외부 데이터베이스, RDD
  • 다양한 언어 지원 : 스칼라, JAVA, Python

 

Dataset

  • 소개 시기 : 1.6
  • 컴파일타임 타입 체크 존재
  • API 용이
  • SparkSQL 기반
  • Catayst Optimizer
  • 필드 정보 존재 (테이블)
  • 타입 정보 존재 및 컴파일 언어에서 사용 가능

 


Spark Session

 

"Spark Session"은 한 프로그램당 하나(Singleton)를 생성하여 Spark Cluster와 통신하는 Entry Point 입니다.

config 메소드를 통해 다양한 환경설정이 가능합니다.

  • DataFrame
  • SQL
  • Streaming
  • ML API
# Spark Session 생성

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark Tutorial')\
        .getOrCreate()
        
...

spark.stop()

 

환경 변수

  • spark.executor.memory : executor별 메모리 (기본값 1g)
  • spark.executor.cores : executor별 CPU 수 (YARN에서 기본값 1)
  • spark.driver.memory : driver 메모리 (기본값 1g)
  • spark.shuffle.partitions : Shuffle 후 Partition의 수 (기본값 최대 200)

 

환경 설정 방법

  • 환경 변수 설정
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        ...
        .config("spark.some.config.option1", "some-value")\
        .config("spark.some.config.option1", "some-value")\
        ...
  • $SPARK_HOME/conf/spark_defaults.conf
  • spark-submit 명령의 커맨드라인 파라미터
  • SparkSession 만들때 지정 : SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.app.name", "PySpark Tutorial")
conf.set("spark.master", "local[*]")

spark = SparkSession.builder\
        .config(config=conf)\
        ...

 

데이터 소스

  • DataFrame
    • spark.read(DataFrameReader) : 데이터프레임으로 로드
    • spark.write(DataFrameWriter) : 데이터프레임 저장
  • HDFS 파일 : CSV, JSON, Parquet, ORC, Text, Avro, Hive 테이블
  • JDBC 관계형 데이터베이스
  • 클라우드 기반 데이터 시스템
  • 스트리밍 시스템

 


프로그램 구조

  1. Spark Session 생성
  2. 입력 데이터 로딩
  3. 데이터 조작 작업 반복 : DataFrame API 혹은 Spark SQL 사용
  4. 최종 결과 저장

 

 

'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글

Spark 프로그래밍 실습 1/2  (0) 2024.01.17
Spark 설치 및 테스트  (0) 2024.01.17
Spark 소개  (0) 2024.01.15
Map Reduce 프로그래밍  (0) 2024.01.15
Hadoop 소개 및 설치  (0) 2024.01.15