데브코스 TIL/빅데이터, 스트리밍 데이터 처리

Spark 프로그래밍 실습 3

예니ㅣ 2024. 1. 17. 15:33

실습

실습 3. 텍스트를 파싱해서 구조화된 데이터로 변환하기

  • 입력 데이터 : 텍스트 데이터
  • 출력 데이터 : 구조화된 데이터
  • Regex 이용
# 파일 다운로드
!wget https://s3-geospatial.s3.us-west-2.amazonaws.com/transfer_cost.txt
from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #3")
conf.set("spark.master", "local[*]")

spark = SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()
    
    
# 텍스트 확인
import pyspark.sql.functions as F
from pyspark.sql.types import *

schema = StructType([ StructField("text", StringType(), True)])
transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt")
transfer_cost_df.show(truncate=False)
        
        
# csv 변환
from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'

df_with_new_columns = transfer_cost_df\
    .withColumn('week', regexp_extract('text', regex_str, 1))\
    .withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
    .withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
    .withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
    .withColumn('vendor', regexp_extract(col('text'), regex_str, 5))
df_with_new_columns.printSchema()

final_df = df_with_new_columns.drop("text")
final_df.write.csv("extracted.csv")
final_df.write.format("json").save("extracted.json")