실습
실습 1. 헤더가 없는 CSV 파일 처리하기
- 입력 데이터 : 헤더 없는 CSV 파일
- 데이터에 스키마 지정
- SparkConf 사용
- measure_type값이 TMIN인 레코드의 stationId별 최소 온도
# 파일 다운로드
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/1800.csv
판다스
import pandas as pd
pd_df = pd.read_csv(
"1800.csv",
names=["stationID", "date", "measure_type", "temperature"],
usecols=[0, 1, 2, 3]
)
pd_minTemps = pd_df[pd_df['measure_type'] == "TMIN"]
pd_stationTemps = pd_minTemps[["stationID", "temperature"]]
pd_minTempsByStation = pd_stationTemps.groupby(["stationID"]).min("temperature")
Spark
# 예상하여 설정
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #1")
conf.set("spark.master", "local[*]")
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
df = spark.read.format("csv")\
.option("inferSchema", "true")\
.load("1800.csv")\
.toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")
# type 명시적 지정
from pyspark.sql.types import StringType, IntegerType, FloatType
from pyspark.sql.types import StructType, StructField
schema = StructType([ \
StructField("stationID", StringType(), True), \
StructField("date", IntegerType(), True), \
StructField("measure_type", StringType(), True), \
StructField("temperature", FloatType(), True)])
df = spark.read.schema(schema).csv("1800.csv")
minTemps = df.filter(df.measure_type == "TMIN")
minTemps = df.where(df.measure_type == "TMIN")
minTemps = df.where("measure_type = 'TMIN'")
minTempsByStation = minTemps.groupBy("stationID").min("temperature")
stationTemps = minTemps[["stationID", "temperature"]]
stationTemps = minTemps.select("stationID", "temperature")
results = minTempsByStation.collect()
for result in results:
print(result[0] + "\t{:.2f}F".format(result[1]))
SparkSQL
df.createOrReplaceTempView("station1800")
results = spark.sql("""
SELECT stationID, MIN(temperature)
FROM station1800
WHERE measure_type = 'TMIN'
GROUP BY 1
""").collect()
for r in results:
print(r)
실습 2. 헤더가 없는 CSV 파일 처리하기
- 입력 데이터 : 헤더 없는 CSV 파일
- 데이터에 스키마 지정
- cust_id, item_id amount_spent 컬럼 추가
- cust_id 기분 amount_spent 합 계산
# 파일 다운로드
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/1800.csv
Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark DataFrame #2')\
.getOrCreate()
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, FloatType
schema = StructType([ \
StructField("cust_id", StringType(), True), \
StructField("item_id", StringType(), True), \
StructField("amount_spent", FloatType(), True)])
df = spark.read.schema(schema).csv("customer-orders.csv")
df.groupBy("cust_id") \
.agg(
f.sum('amount_spent').alias('sum'),
f.max('amount_spent').alias('max'),
f.avg('amount_spent').alias('avg')).collect()
Spark SQL
df.createOrReplaceTempView("customer_orders")
spark.sql("""
SELECT cust_id, SUM(amount_spent) sum, MAX(amount_spent) max, AVG(amount_spent) avg
FROM customer_orders
GROUP BY 1
""").head(5)
spark.catalog.listTables()
'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글
Spark 프로그래밍 실습 4 (0) | 2024.01.17 |
---|---|
Spark 프로그래밍 실습 3 (0) | 2024.01.17 |
Spark 설치 및 테스트 (0) | 2024.01.17 |
Spark 데이터 처리 (0) | 2024.01.17 |
Spark 소개 (0) | 2024.01.15 |