실습
실습 3. 텍스트를 파싱해서 구조화된 데이터로 변환하기
- 입력 데이터 : 텍스트 데이터
- 출력 데이터 : 구조화된 데이터
- Regex 이용
# 파일 다운로드
!wget https://s3-geospatial.s3.us-west-2.amazonaws.com/transfer_cost.txt
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #3")
conf.set("spark.master", "local[*]")
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
# 텍스트 확인
import pyspark.sql.functions as F
from pyspark.sql.types import *
schema = StructType([ StructField("text", StringType(), True)])
transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt")
transfer_cost_df.show(truncate=False)
# csv 변환
from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'
df_with_new_columns = transfer_cost_df\
.withColumn('week', regexp_extract('text', regex_str, 1))\
.withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
.withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
.withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
.withColumn('vendor', regexp_extract(col('text'), regex_str, 5))
df_with_new_columns.printSchema()
final_df = df_with_new_columns.drop("text")
final_df.write.csv("extracted.csv")
final_df.write.format("json").save("extracted.json")