프로젝트 기획
주제
농산물 가격 데이터를 통한 물가 변동 조회
구조
- API 데이터 수집
- S3 버킷 생성 및 API 연동
- Snowflake 생성 및 S3 벌크 업데이트
- Snowflake Summary 생성
- Superset 대시보드 생성
프로젝트 코드
RAW_DATA 전처리
UPDATE dev.raw_data.test_data
SET TS = TO_DATE(TS, 'YYYY/MM/DD')
WHERE TS LIKE '%/%'
;
SELECT *
FROM dev.raw_data.test_data
ORDER BY TS DESC
;
사용할 데이터로 테이블 생성
CREATE or replace TABLE dev.raw_data.today_data AS
SELECT *
FROM raw_data.test_data TD
WHERE TS = (SELECT MAX(TS) FROM raw_data.test_data)
;
CREATE or replace TABLE dev.raw_data.yesterday_data AS
SELECT *
FROM raw_data.test_data TD
WHERE TS = DATEADD(DAY, -1, (SELECT MAX(TS) FROM raw_data.test_data))
;
-- 한달전이 휴일인 경우 마지막 평일 기준으로 금액 책정
CREATE or replace TABLE dev.raw_data.month_ago_data AS
SELECT *
FROM raw_data.test_data
WHERE
TS = DATEADD(MONTH, -1, (SELECT MAX(TS) FROM raw_data.test_data))
OR (
NOT EXISTS (
SELECT 1
FROM raw_data.test_data
WHERE TS = DATEADD(MONTH, -1, (SELECT MAX(TS) FROM raw_data.test_data))
)
AND TS = DATEADD(DAY, -1, DATEADD(MONTH, -1, (SELECT MAX(TS) FROM raw_data.test_data)))
)
OR (
NOT EXISTS (
SELECT 1
FROM raw_data.test_data
WHERE TS IN (DATEADD(MONTH, -1, (SELECT MAX(TS) FROM raw_data.test_data)), DATEADD(DAY, -1, DATEADD(MONTH, -1, (SELECT MAX(TS) FROM raw_data.test_data))))
)
AND TS = DATEADD(DAY, -2, DATEADD(MONTH, -1, (SELECT MAX(TS) FROM raw_data.test_data)))
)
;
YESTERDAY_SUMMARY 생성
CREATE or replace TABLE dev.analytics.yesterday_summary AS
SELECT
TD.REGION,
TD.ITEM,
TD.VARIETY,
TD.PRICE TODAY_PRICE,
YD.PRICE YESTERDAY_PRICE,
YD.PRICE-TD.PRICE PRICE_FLUCTUATION,
ROUND((YD.PRICE-TD.PRICE)/YD.PRICE*100, 2) PRICE_FLUCTUATION_RATE
FROM raw_data.today_data TD
LEFT JOIN raw_data.yesterday_data YD
ON TD.ITEM = YD.ITEM and TD.REGION = YD.REGION and TD.VARIETY = YD.VARIETY
;
MONTH_AGO_SUMMARY 생성
USE DATABASE dev;
CREATE or replace TABLE dev.analytics.month_ago_summary AS
SELECT
TD.REGION,
TD.ITEM,
TD.VARIETY,
TD.PRICE TODAY_PRICE,
MD.PRICE MONTH_AGO_PRICE,
MD.PRICE-TD.PRICE PRICE_FLUCTUATION,
ROUND((MD.PRICE-TD.PRICE)/MD.PRICE*100, 2) PRICE_FLUCTUATION_RATE
FROM raw_data.today_data TD
LEFT JOIN raw_data.month_ago_data MD
ON TD.ITEM = MD.ITEM and TD.REGION = MD.REGION and TD.VARIETY = MD.VARIETY
;
DAILY_SUMMARY 생성
CREATE or replace TABLE dev.analytics.daily_summary AS
WITH
current_day_price AS (
SELECT
TS current_ts,
ITEM,
VARIETY,
ROUND(AVG(PRICE)) current_price
FROM dev.raw_data.test_data
WHERE TS != (SELECT MIN(TS) FROM dev.raw_data.test_data)
GROUP BY 1, 2, 3
ORDER BY 1 DESC
),
previous_day_price AS (
SELECT
TS previous_ts,
ITEM,
VARIETY,
ROUND(AVG(PRICE)) previous_price
FROM dev.raw_data.test_data
WHERE TS != (SELECT MAX(TS) FROM dev.raw_data.test_data)
GROUP BY 1, 2, 3
ORDER BY 1 DESC
)
SELECT
COALESCE(P1.previous_ts, P2.previous_ts, P3.previous_ts) AS previous_ts,
CP.current_ts,
CP.ITEM,
CP.VARIETY,
CP.current_price,
COALESCE(P1.previous_price, P2.previous_price, P3.previous_price) AS previous_price,
ROUND(CP.current_price - COALESCE(P1.previous_price, P2.previous_price, P3.previous_price)) AS price_fluctuation,
ROUND((CP.current_price - COALESCE(P1.previous_price, P2.previous_price, P3.previous_price))/CP.current_price*100, 2) AS price_fluctuation_rate
FROM
current_day_price CP
LEFT JOIN previous_day_price P1
ON CP.ITEM = P1.ITEM
AND CP.VARIETY = P1.VARIETY
AND P1.previous_ts = DATEADD(DAY, -1, CP.current_ts)
LEFT JOIN previous_day_price P2
ON CP.ITEM = P2.ITEM
AND CP.VARIETY = P2.VARIETY
AND P2.previous_ts = DATEADD(DAY, -2, CP.current_ts)
LEFT JOIN previous_day_price P3
ON CP.ITEM = P3.ITEM
AND CP.VARIETY = P3.VARIETY
AND P3.previous_ts = DATEADD(DAY, -3, CP.current_ts)
ORDER BY CP.current_ts
;
DELETE FROM dev.analytics.daily_summary
WHERE
previous_ts IS NULL
OR current_ts IS NULL
;
'데브코스 TIL > [프로젝트]' 카테고리의 다른 글
End-to-end 데이터 파이프라인 구성하기 (1) | 2024.01.09 |
---|---|
크롤한 웹데이터로 만들어보는 웹사이트 (0) | 2023.11.07 |