데브코스 TIL/[프로젝트]

데이터 웨어하우스를 이용한 대시보드 구성

예니ㅣ 2023. 12. 6. 17:05

프로젝트 기획

주제

농산물 가격 데이터를 통한 물가 변동 조회

 

구조

  1. API 데이터 수집
  2. S3 버킷 생성 및 API 연동
  3. Snowflake 생성 및 S3 벌크 업데이트
  4. Snowflake Summary 생성
  5. Superset 대시보드 생성

 

프로젝트 코드

RAW_DATA 전처리

UPDATE dev.raw_data.test_data
SET TS = TO_DATE(TS, 'YYYY/MM/DD')
WHERE TS LIKE '%/%'
;

SELECT * 
FROM dev.raw_data.test_data
ORDER BY TS DESC
;

 

사용할 데이터로 테이블 생성

CREATE or replace TABLE dev.raw_data.today_data AS
SELECT *
FROM raw_data.test_data TD
WHERE TS = (SELECT MAX(TS) FROM raw_data.test_data)
;

CREATE or replace TABLE dev.raw_data.yesterday_data AS
SELECT *
FROM raw_data.test_data TD
WHERE TS = DATEADD(DAY, -1, (SELECT MAX(TS) FROM raw_data.test_data))
;

-- 한달전이 휴일인 경우 마지막 평일 기준으로 금액 책정
CREATE or replace TABLE dev.raw_data.month_ago_data AS
SELECT *
FROM raw_data.test_data
WHERE 
    TS = DATEADD(MONTH, -1, (SELECT MAX(TS) FROM raw_data.test_data))
    OR ( 
        NOT EXISTS (
            SELECT 1
            FROM raw_data.test_data
            WHERE TS = DATEADD(MONTH, -1, (SELECT MAX(TS) FROM raw_data.test_data))
        )
        AND TS = DATEADD(DAY, -1, DATEADD(MONTH, -1, (SELECT MAX(TS) FROM raw_data.test_data)))
    )
    OR (
        NOT EXISTS (
            SELECT 1
            FROM raw_data.test_data
            WHERE TS IN (DATEADD(MONTH, -1, (SELECT MAX(TS) FROM raw_data.test_data)), DATEADD(DAY, -1, DATEADD(MONTH, -1, (SELECT MAX(TS) FROM raw_data.test_data))))
        )
        AND TS = DATEADD(DAY, -2, DATEADD(MONTH, -1, (SELECT MAX(TS) FROM raw_data.test_data)))
    )
;

 

YESTERDAY_SUMMARY 생성

CREATE or replace TABLE dev.analytics.yesterday_summary AS
SELECT
    TD.REGION,
    TD.ITEM,
    TD.VARIETY,
    TD.PRICE TODAY_PRICE,
    YD.PRICE YESTERDAY_PRICE,
    YD.PRICE-TD.PRICE PRICE_FLUCTUATION,
    ROUND((YD.PRICE-TD.PRICE)/YD.PRICE*100, 2) PRICE_FLUCTUATION_RATE
FROM raw_data.today_data TD
LEFT JOIN raw_data.yesterday_data YD
    ON TD.ITEM = YD.ITEM and TD.REGION = YD.REGION and TD.VARIETY = YD.VARIETY
;

 

MONTH_AGO_SUMMARY 생성

USE DATABASE dev;

CREATE or replace TABLE dev.analytics.month_ago_summary AS
SELECT
    TD.REGION,
    TD.ITEM,
    TD.VARIETY,
    TD.PRICE TODAY_PRICE,
    MD.PRICE MONTH_AGO_PRICE,
    MD.PRICE-TD.PRICE PRICE_FLUCTUATION,
    ROUND((MD.PRICE-TD.PRICE)/MD.PRICE*100, 2) PRICE_FLUCTUATION_RATE
FROM raw_data.today_data TD
LEFT JOIN raw_data.month_ago_data MD
    ON TD.ITEM = MD.ITEM and TD.REGION = MD.REGION and TD.VARIETY = MD.VARIETY
;

 

DAILY_SUMMARY 생성

CREATE or replace TABLE dev.analytics.daily_summary AS

WITH 
current_day_price AS (
    SELECT
        TS current_ts,
        ITEM,
        VARIETY,
        ROUND(AVG(PRICE)) current_price
    FROM dev.raw_data.test_data
    WHERE TS != (SELECT MIN(TS) FROM dev.raw_data.test_data)
    GROUP BY 1, 2, 3
    ORDER BY 1 DESC
),
previous_day_price AS (
    SELECT
        TS previous_ts,
        ITEM,
        VARIETY,
        ROUND(AVG(PRICE)) previous_price
    FROM dev.raw_data.test_data
    WHERE TS != (SELECT MAX(TS) FROM dev.raw_data.test_data)
    GROUP BY 1, 2, 3
    ORDER BY 1 DESC
)

SELECT
    COALESCE(P1.previous_ts, P2.previous_ts, P3.previous_ts) AS previous_ts,
    CP.current_ts,
    CP.ITEM,
    CP.VARIETY,
    CP.current_price,
    COALESCE(P1.previous_price, P2.previous_price, P3.previous_price) AS previous_price,
    ROUND(CP.current_price - COALESCE(P1.previous_price, P2.previous_price, P3.previous_price)) AS price_fluctuation,
    ROUND((CP.current_price - COALESCE(P1.previous_price, P2.previous_price, P3.previous_price))/CP.current_price*100, 2) AS price_fluctuation_rate
FROM
    current_day_price CP
LEFT JOIN previous_day_price P1 
    ON CP.ITEM = P1.ITEM 
        AND CP.VARIETY = P1.VARIETY 
        AND P1.previous_ts = DATEADD(DAY, -1, CP.current_ts)
LEFT JOIN previous_day_price P2 
    ON CP.ITEM = P2.ITEM 
        AND CP.VARIETY = P2.VARIETY 
        AND P2.previous_ts = DATEADD(DAY, -2, CP.current_ts)
LEFT JOIN previous_day_price P3 
    ON CP.ITEM = P3.ITEM 
        AND CP.VARIETY = P3.VARIETY 
        AND P3.previous_ts = DATEADD(DAY, -3, CP.current_ts)
ORDER BY CP.current_ts
;

DELETE FROM dev.analytics.daily_summary
WHERE
    previous_ts IS NULL
    OR current_ts IS NULL
;