데브코스 TIL/빅데이터, 스트리밍 데이터 처리

Spark SQL 실습 3

예니ㅣ 2024. 1. 17. 16:56

실습

 

실습 3. 사용자별로 처음 채널과 마지막 채널 알아내기

  • 테이블을 데이터프레임으로 로딩
  • 데이터프레임별 테이블 이름 지정

 

!cd /usr/local/lib/python3.8/dist-packages/pyspark/jars && wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL #1") \
    .getOrCreate()
# Redshift와 연결 및 DataFrame으로 로딩하기
url = "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234"

df_user_session_channel = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", url) \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()

df_session_timestamp = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", url) \
    .option("dbtable", "raw_data.session_timestamp") \
    .load()

df_session_transaction = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", url) \
    .option("dbtable", "raw_data.session_transaction") \
    .load()
    
    
df_user_session_channel.createOrReplaceTempView("user_session_channel")
df_session_timestamp.createOrReplaceTempView("session_timestamp")
df_session_transaction.createOrReplaceTempView("session_transaction")
first_last_channel_df = spark.sql("""
    WITH RECORD AS (
    	SELECT /*사용자의 유입에 따른, 채널 순서 매기는 쿼리*/
        	userid,
        	channel, 
        	ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts ASC) AS seq_first,
        	ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts DESC) AS seq_last
        FROM user_session_channel u
        LEFT JOIN session_timestamp t
        	ON u.sessionid = t.sessionid
    )
    SELECT /*유저의 첫번째 유입채널, 마지막 유입 채널 구하기*/
        f.userid,
        f.channel first_channel,
        l.channel last_channel
    FROM RECORD f
    INNER JOIN RECORD l ON f.userid = l.userid
    WHERE f.seq_first = 1 and l.seq_last = 1
    ORDER BY userid
    """)
    
first_last_channel_df2 = spark.sql("""
    SELECT DISTINCT A.userid,
    	FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS First_Channel,
		LAST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS Last_Channel
    FROM user_session_channel A
    LEFT JOIN session_timestamp B
	    ON A.sessionid = B.sessionid
    """)

'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글

Spark Unit Test  (0) 2024.01.17
Hive 메타 스토어  (0) 2024.01.17
Spark SQL 실습 1  (0) 2024.01.17
Spark SQL 소개  (0) 2024.01.17
Spark 프로그래밍 실습 5  (0) 2024.01.17