카테고리 없음

Spark SQL 실습 2

예니ㅣ 2024. 1. 17. 16:50

실습

 

실습 2. 월별 채널별 매출과 방문자 정보 계산하기

  • 테이블을 데이터프레임으로 로딩
  • 데이터프레임별 테이블 이름 지정

 

!cd /usr/local/lib/python3.8/dist-packages/pyspark/jars && wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL #1") \
    .getOrCreate()
# Redshift와 연결 및 DataFrame으로 로딩하기
url = "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234"

df_user_session_channel = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", url) \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()

df_session_timestamp = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", url) \
    .option("dbtable", "raw_data.session_timestamp") \
    .load()

df_session_transaction = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", url) \
    .option("dbtable", "raw_data.session_transaction") \
    .load()
    
    
df_user_session_channel.createOrReplaceTempView("user_session_channel")
df_session_timestamp.createOrReplaceTempView("session_timestamp")
df_session_transaction.createOrReplaceTempView("session_transaction")
# primary key uniqueness 확인

spark.sql("""SELECT sessionid, COUNT(1) count
FROM user_session_channel
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1""").show() 

spark.sql("""SELECT sessionid, COUNT(1) count
FROM session_transaction
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1""").show()

spark.sql("""SELECT sessionid, COUNT(1) count
FROM session_timestamp
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1""").show()
mon_channel_rev_df = spark.sql("""
    SELECT LEFT(sti.ts, 7) year_month,
    	usc.channel channel,
    	COUNT(DISTINCT userid) total_visitors
    FROM user_session_channel usc
    LEFT JOIN session_timestamp sti ON usc.sessionid = sti.sessionid
    GROUP BY 1 ,2
    ORDER BY 1, 2
    """)
    
mon_channel_rev_df = spark.sql("""
    SELECT LEFT(sti.ts, 7) year_month,
    	usc.channel channel,
    	COUNT(DISTINCT userid) total_visitors,
    	COUNT(DISTINCT CASE WHEN amount is not NULL THEN userid END) paid_visitors
    FROM user_session_channel usc
    LEFT JOIN session_timestamp sti ON usc.sessionid = sti.sessionid
    LEFT JOIN session_transaction str ON usc.sessionid = str.sessionid
    GROUP BY 1 ,2
    ORDER BY 1, 2
    """)
    
mon_channel_rev_df = spark.sql("""
  SELECT LEFT(ts, 7) month,
       usc.channel,
       COUNT(DISTINCT userid) uniqueUsers,
       COUNT(DISTINCT (CASE WHEN amount >= 0 THEN userid END)) paidUsers,
       SUM(amount) grossRevenue,
       SUM(CASE WHEN refunded is not True THEN amount END) netRevenue,
       ROUND(COUNT(DISTINCT CASE WHEN amount >= 0 THEN userid END)*100 / COUNT(DISTINCT userid), 2) conversionRate
   FROM user_session_channel usc
   LEFT JOIN session_timestamp t ON t.sessionid = usc.sessionid
   LEFT JOIN session_transaction st ON st.sessionid = usc.sessionid
   GROUP BY 1, 2
   ORDER BY 1, 2;
	""")
    
mon_channel_rev_df.show()