데브코스 TIL/빅데이터, 스트리밍 데이터 처리

Spark 프로그래밍 실습 1/2

예니ㅣ 2024. 1. 17. 15:17

실습

실습 1. 헤더가 없는 CSV 파일 처리하기

  • 입력 데이터 : 헤더 없는 CSV 파일
  • 데이터에 스키마 지정
  • SparkConf 사용
  • measure_type값이 TMIN인 레코드의 stationId별 최소 온도
# 파일 다운로드
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/1800.csv

 

판다스

import pandas as pd 

pd_df = pd.read_csv(
    "1800.csv",
    names=["stationID", "date", "measure_type", "temperature"],
    usecols=[0, 1, 2, 3]
)

pd_minTemps = pd_df[pd_df['measure_type'] == "TMIN"]

pd_stationTemps = pd_minTemps[["stationID", "temperature"]]

pd_minTempsByStation = pd_stationTemps.groupby(["stationID"]).min("temperature")

 

Spark

# 예상하여 설정
from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #1")
conf.set("spark.master", "local[*]")

spark = SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()
 
df = spark.read.format("csv")\
    .option("inferSchema", "true")\
    .load("1800.csv")\
    .toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")
    
# type 명시적 지정
from pyspark.sql.types import StringType, IntegerType, FloatType
from pyspark.sql.types import StructType, StructField

schema = StructType([ \
                     StructField("stationID", StringType(), True), \
                     StructField("date", IntegerType(), True), \
                     StructField("measure_type", StringType(), True), \
                     StructField("temperature", FloatType(), True)])
                     
df = spark.read.schema(schema).csv("1800.csv")


minTemps = df.filter(df.measure_type == "TMIN")
minTemps = df.where(df.measure_type == "TMIN")
minTemps = df.where("measure_type = 'TMIN'")

minTempsByStation = minTemps.groupBy("stationID").min("temperature")

stationTemps = minTemps[["stationID", "temperature"]]
stationTemps = minTemps.select("stationID", "temperature")

results = minTempsByStation.collect()
for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))

 

SparkSQL

df.createOrReplaceTempView("station1800")

results = spark.sql("""
    SELECT stationID, MIN(temperature)
    FROM station1800
    WHERE measure_type = 'TMIN'
    GROUP BY 1
    """).collect()

for r in results:
    print(r)

 


실습 2. 헤더가 없는 CSV 파일 처리하기

  • 입력 데이터 : 헤더 없는 CSV 파일
  • 데이터에 스키마 지정
  • cust_id, item_id amount_spent 컬럼 추가
  • cust_id 기분 amount_spent 합 계산
# 파일 다운로드
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/1800.csv

 

Spark

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark DataFrame #2')\
        .getOrCreate()
        
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, FloatType

schema = StructType([ \
                     StructField("cust_id", StringType(), True), \
                     StructField("item_id", StringType(), True), \
                     StructField("amount_spent", FloatType(), True)])
                     
df = spark.read.schema(schema).csv("customer-orders.csv")
 
df.groupBy("cust_id") \
   .agg(
       f.sum('amount_spent').alias('sum'),
       f.max('amount_spent').alias('max'),
       f.avg('amount_spent').alias('avg')).collect()

 

Spark SQL

df.createOrReplaceTempView("customer_orders")

spark.sql("""
    SELECT cust_id, SUM(amount_spent) sum, MAX(amount_spent) max, AVG(amount_spent) avg
    FROM customer_orders
    GROUP BY 1
    """).head(5)
            
spark.catalog.listTables()

'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글

Spark 프로그래밍 실습 4  (0) 2024.01.17
Spark 프로그래밍 실습 3  (0) 2024.01.17
Spark 설치 및 테스트  (0) 2024.01.17
Spark 데이터 처리  (0) 2024.01.17
Spark 소개  (0) 2024.01.15