데브코스 TIL/빅데이터, 스트리밍 데이터 처리

Spark 프로그래밍 실습 5

예니ㅣ 2024. 1. 17. 15:58

실습

실습 5. Redshift 연결해보기

  • 테이블을 Redshift에서 Spark로 로드
  • DataFrame, SparkSQL 사용해서 조인
  • MAU(Monthly Active User) 계산
# JDBC Redshift Driver 다운로드 경로 설정
!cd /usr/local/lib/python3.8/dist-packages/pyspark/jars && wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("PySpark DataFrame #5") \
    .getOrCreate()
    

# 테이블 로딩
df_user_session_channel = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234") \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()
    
df_session_timestamp = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234") \
    .option("dbtable", "raw_data.session_timestamp") \
    .load()

 

DataFrame

join_expr = df_user_session_channel.sessionid == df_session_timestamp.sessionid
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner")

session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner").select(
    "userid", "sessionid", "channel", "ts"
)

session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner").select(
    "userid", df_user_session_channel.sessionid, "channel", "ts"
)

channel_count_df = session_df.groupby("channel").count().orderBy("count", ascending=False)

from pyspark.sql.functions import date_format, asc, countDistinct

session_df.withColumn('month', date_format('ts', 'yyyy-MM')).groupby('month').\
    agg(countDistinct("userid").alias("mau")).sort(asc('month')).show()

 

Spark SQL

df_user_session_channel.createOrReplaceTempView("user_session_channel")
df_session_timestamp.createOrReplaceTempView("session_timestamp")

channel_count_df = spark.sql("""
    SELECT channel, count(distinct userId) uniqueUsers
    FROM session_timestamp st
    JOIN user_session_channel usc ON st.sessionID = usc.sessionID
    GROUP BY 1
    ORDER BY 1
    """)

mau_df = spark.sql("""
    SELECT
        LEFT(A.ts, 7) AS month,
        COUNT(DISTINCT B.userid) AS mau
    FROM session_timestamp A
    JOIN user_session_channel B ON A.sessionid = B.sessionid
    GROUP BY 1      
    ORDER BY 1 DESC
    """)

mau_df.collect()

'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글

Spark SQL 실습 1  (0) 2024.01.17
Spark SQL 소개  (0) 2024.01.17
Spark 프로그래밍 실습 4  (0) 2024.01.17
Spark 프로그래밍 실습 3  (0) 2024.01.17
Spark 프로그래밍 실습 1/2  (0) 2024.01.17