실습
실습 2. 월별 채널별 매출과 방문자 정보 계산하기
- 테이블을 데이터프레임으로 로딩
- 데이터프레임별 테이블 이름 지정
!cd /usr/local/lib/python3.8/dist-packages/pyspark/jars && wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL #1") \
.getOrCreate()
# Redshift와 연결 및 DataFrame으로 로딩하기
url = "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234"
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.user_session_channel") \
.load()
df_session_timestamp = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_timestamp") \
.load()
df_session_transaction = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_transaction") \
.load()
df_user_session_channel.createOrReplaceTempView("user_session_channel")
df_session_timestamp.createOrReplaceTempView("session_timestamp")
df_session_transaction.createOrReplaceTempView("session_transaction")
# primary key uniqueness 확인
spark.sql("""SELECT sessionid, COUNT(1) count
FROM user_session_channel
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1""").show()
spark.sql("""SELECT sessionid, COUNT(1) count
FROM session_transaction
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1""").show()
spark.sql("""SELECT sessionid, COUNT(1) count
FROM session_timestamp
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1""").show()
mon_channel_rev_df = spark.sql("""
SELECT LEFT(sti.ts, 7) year_month,
usc.channel channel,
COUNT(DISTINCT userid) total_visitors
FROM user_session_channel usc
LEFT JOIN session_timestamp sti ON usc.sessionid = sti.sessionid
GROUP BY 1 ,2
ORDER BY 1, 2
""")
mon_channel_rev_df = spark.sql("""
SELECT LEFT(sti.ts, 7) year_month,
usc.channel channel,
COUNT(DISTINCT userid) total_visitors,
COUNT(DISTINCT CASE WHEN amount is not NULL THEN userid END) paid_visitors
FROM user_session_channel usc
LEFT JOIN session_timestamp sti ON usc.sessionid = sti.sessionid
LEFT JOIN session_transaction str ON usc.sessionid = str.sessionid
GROUP BY 1 ,2
ORDER BY 1, 2
""")
mon_channel_rev_df = spark.sql("""
SELECT LEFT(ts, 7) month,
usc.channel,
COUNT(DISTINCT userid) uniqueUsers,
COUNT(DISTINCT (CASE WHEN amount >= 0 THEN userid END)) paidUsers,
SUM(amount) grossRevenue,
SUM(CASE WHEN refunded is not True THEN amount END) netRevenue,
ROUND(COUNT(DISTINCT CASE WHEN amount >= 0 THEN userid END)*100 / COUNT(DISTINCT userid), 2) conversionRate
FROM user_session_channel usc
LEFT JOIN session_timestamp t ON t.sessionid = usc.sessionid
LEFT JOIN session_transaction st ON st.sessionid = usc.sessionid
GROUP BY 1, 2
ORDER BY 1, 2;
""")
mon_channel_rev_df.show()