데브코스 TIL/[프로젝트]

End-to-end 데이터 파이프라인 구성하기

예니ㅣ 2024. 1. 9. 13:11

주제

구글 트렌드와 네이버 데이터랩을 이용하여 네이버 실시간 검색어 복원하기

 

구조

  1. 데이터 인프라 형성
  2. 구글 트렌드의 키워드 추출
  3. 네이버 데이터랩을 통해 검색어 순위 및 백분위 횟수 추출

 

이슈

  • 구글 트렌드 공용 API 존재하지 않음
  • 네이버 데이터랩 금일 데이터 존재하지 않음
  • 키워드 입력 개수 5개 제한
  • 네이버 데이터랩 ID, PASSWORD Variables 설정
  • 전체 및 성별별 데이터 적재

 

프로젝트 코드

API RAW_CODE.py

import os
import sys
import pandas as pd
from pandas import Timestamp
from datetime import datetime

import urllib.request
import json

client_id = "id"
client_secret = "password

url = "https://openapi.naver.com/v1/datalab/search"

request = urllib.request.Request(url)
request.add_header("X-Naver-Client-Id",client_id)
request.add_header("X-Naver-Client-Secret",client_secret)
request.add_header("Content-Type","application/json")

body = """{
        \"startDate\":\"2024-01-01\",
        \"endDate\":\"2024-01-05\",
        \"timeUnit\":\"month\",
        \"keywordGroups\":[
            {\"groupName\":\"한글\",\"keywords\":[\"한글\",\"korean\"]},
            {\"groupName\":\"영어\",\"keywords\":[\"영어\",\"english\"]},
            {"groupName":"중국어","keywords":["중국어","chinese"]}
        ],
        \"device\":\"pc\",
        \"ages\":[\"1\",\"2\"],
        \"gender\":\"f\"
        }"""

response = urllib.request.urlopen(request, data=body.encode("utf-8"))
rescode = response.getcode()

# 에러 처리
if(rescode==200):
    response_body = response.read()
    print(response_body.decode('utf-8'))
else:
    print("Error Code:" + rescode)

 

 

API RAW_CODE_v2.py

import os
import sys
import pandas as pd
from pandas import Timestamp
from datetime import datetime, timedelta

import urllib.request
import requests
import json

def get_google_api_keyword(google_url):
    r = requests.get(google_url)
    data = json.loads(r.text[6:])

    result = []
    for t in data['default']['trendingSearchesDays'][0]['trendingSearches']:
        result.append(t['title']['query'])
    print(result)
    
    return result

def get_naver_api_result(request, date, keywords_group, ages, genders):
    body = f"""{{
        "startDate": "{date}",
        "endDate": "{date}",
        "timeUnit": "date",
        "keywordGroups": "{keywords_group}",
        "device": "pc",
        "ages": {json.dumps(ages)},
        "gender": "{genders}"
    }}"""

    response = urllib.request.urlopen(request, data=body.encode("utf-8"))
    rescode = response.getcode()

    # 에러 처리

    if(rescode==200):
        response_body = response.read()
        result = response_body.decode('utf-8')
        
        print(result)
    else:
        print("Error Code:", rescode)
        
    return result

def main():
    client_id = "id"
    client_secret = "password"

    url = "https://openapi.naver.com/v1/datalab/search"
    request = urllib.request.Request(url)
    request.add_header("X-Naver-Client-Id",client_id)
    request.add_header("X-Naver-Client-Secret",client_secret)
    request.add_header("Content-Type","application/json")

    # 현재 날짜를 가져오기
    # 날짜를 yyyy-mm-dd 형태로 포맷팅
    date = datetime.now() - timedelta(days=1)
    date = date.strftime("%Y-%m-%d")
    
    google_url = "https://trends.google.com/trends/api/dailytrends?hl=ko&tz=-540&geo=KR&hl=ko&ns=15"

    keywords = get_google_api_keyword(google_url)

    keywords_group = []
    keywords_group.append({"groupName": keywords[0], "keywords": keywords[0]})

    ages = ["1","2"]
    
    genders = "f"

    get_naver_api_result(request, date, keywords_group, ages, genders)
    
if __name__ == "__main__":
    main()

 

 

DAG RAW_CODE.py

from airflow import DAG
from airflow.decorators import task

import os
import sys
import pandas as pd
from pandas import Timestamp
from datetime import datetime, timedelta

import urllib.request
import requests
import json

@task
def get_google_api_keyword(google_url):
    r = requests.get(google_url)
    data = json.loads(r.text[6:])

    result = []
    for t in data['default']['trendingSearchesDays'][0]['trendingSearches']:
        result.append(t['title']['query'])
    print(result)
    
    return result

@task
def get_naver_api_result(request, date, keywords, ages, genders):
    
    keywords_groups = []
    cnt = 0
    group = []
    for i in range(1,len(keywords)):
        if(cnt%4==0):
            group.append({"groupName": keywords[0], "keywords": [keywords[0]]})
        
        group.append({"groupName": keywords[i], "keywords": [keywords[i]]})
        cnt+=1
        if(cnt%4 == 0 or i == len(keywords)-1):
            keywords_groups.append(group)
            group = []
    
    results = {}
    for i in range(len(keywords_groups)):
        body = f"""{{
            "startDate": "{date}",
            "endDate": "{date}",
            "timeUnit": "date",
            "keywordGroups": {json.dumps(keywords_groups[i],ensure_ascii=False)},
            "device": "pc",
            "ages": {json.dumps(ages)},
            "gender": "{genders}"
        }}"""
    
        response = urllib.request.urlopen(request, data=body.encode("utf-8"))
        rescode = response.getcode()
        
        if(rescode==200):
            response_body = response.read()
            res = response_body.decode('utf-8')
            parsed_json = json.loads(res)
            
            # keywords: ratio 형태의 result_dict 생성
            result_dict = {}
            for result in parsed_json.get('results', []):
                title = result.get('title')
                kws = result.get('keywords', [])
                
                # 'data'가 비어있는 경우에 대비
                if 'data' in result and result['data']:
                    ratio = result['data'][0].get('ratio')
                    
                    for kw in kws:
                        result_dict[kw] = ratio
            
            # 비율 조정
            if result_dict[keywords[0]] <100:
                a = 100/result_dict[keywords[0]]
                for key in result_dict:
                    result_dict[key] *= a
                    
            results.update(result_dict)
            
        # 에러 처리
        else:
            print("Error Code:", rescode)
        
        # 최대 비율 조정
        max_value = max(results.values())
        ratios = {key: value / max_value * 100 for key, value in results.items()}
        normalized_dict = {key: value for key, value in ratios.items()}
        print(normalized_dict)
        
    return normalized_dict

with DAG(
    dag_id = 'Naver_API',
    start_date = datetime(2024,1,1),
    catchup=False,
    tags=['mydag'],
    schedule = '* * * * *'
) as dag:
    
    #Assign the order of the tasks in our DAG
    client_id = "id"
    client_secret = "password"

    url = "https://openapi.naver.com/v1/datalab/search"
    request = urllib.request.Request(url)
    request.add_header("X-Naver-Client-Id",client_id)
    request.add_header("X-Naver-Client-Secret",client_secret)
    request.add_header("Content-Type","application/json")

    # 가장 최근 날짜를 yyyy-mm-dd 형태로 포맷팅
    ## 네이버는 당일 데이터가 없음
    date = datetime.now() - timedelta(days=1)
    date = date.strftime("%Y-%m-%d")
    
    google_url = "https://trends.google.com/trends/api/dailytrends?hl=ko&tz=-540&geo=KR&hl=ko&ns=15"
    keywords = get_google_api_keyword(google_url)
    
    ages = ["1","2"]
    
    genders = "f"

    get_naver_api_result(request, date, keywords, ages, genders)

 

NaverDatalabDags.py

from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from plugins import slack

import os
import sys
import pandas as pd
from pandas import Timestamp
from datetime import datetime, timedelta

import urllib.request
import json
import logging
import psycopg2

def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='project_postgres')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

@task
def get_latest_keywords_from_db(result_date):
    cur = get_Redshift_connection(result_date)

    sql = f"""
            SELECT *
            FROM raw_data.google_trends
            WHERE search_date = '{result_date}'
            """
    cur.execute(sql)
    records = cur.fetchall()
    
    result = []
    for row in records:
        result.append(row[1])
    
    cur.close()
    logging.info('Getting latest keywords from db done')
    logging.info(result)
    
    return result


def get_body(keywords_group,date,ages,genders):
    return f"""{{
            "startDate": "{date}",
            "endDate": "{date}",
            "timeUnit": "date",
            "keywordGroups": {json.dumps(keywords_group,ensure_ascii=False)},
            "device": "pc",
            "ages": {json.dumps(ages)},
            "gender": "{genders}"
        }}"""

def get_result_dict_from(response):
    response_body = response.read().decode('utf-8')
    parsed_json = json.loads(response_body)
    
    # keywords: ratio 형태의 result_dict 생성
    dict = {}
    for result in parsed_json.get('results', []):
        title = result.get('title')
        keywords = result.get('keywords', [])
        
        # 'data'가 비어있는 경우에 대비
        if 'data' in result and result['data']:
            ratio = result['data'][0].get('ratio')
            
            for keyword in keywords:
                dict[keyword] = ratio
    return dict

def update_results(results, response, first):
    rescode = response.getcode()
        
    if(rescode==200):
        logging.info(rescode)
        
        result_dict = get_result_dict_from(response)
        
        # 비율 조정
        if result_dict[first] != 100:
            for key in result_dict.keys():
                result_dict[key] *= 100/result_dict[first]
                
        results.update(result_dict)
        
    # 에러 처리
    else:
        logging.error("Error Code:", rescode)

def normalize_ratio(results_dict):    
    max_value = max(results_dict.values())

    for key, value in results_dict.itemls():
        normalized_dict[key] = round((value / max_value) * 100, 4)

    return normalized_dict


@task
def get_naver_api_results(request, date, keywords, ages, genders):
    keywords_groups = []
    cnt = 0
    group = []
    for i in range(1,len(keywords)):
        if(cnt%4==0):
            group.append({"groupName": keywords[0], "keywords": [keywords[0]]})
        
        group.append({"groupName": keywords[i], "keywords": [keywords[i]]})
        cnt+=1
        
        if(cnt%4 == 0 or i == len(keywords)-1):
            keywords_groups.append(group)
            group = []
    
    results = {}
    for i in range(len(keywords_groups)):
        body = get_body(keywords_groups[i],date,ages,genders)
    
        response = urllib.request.urlopen(request, data=body.encode("utf-8"))
        update_results(results, response, keywords[0])
    
    # 최대 비율 조정 및 정렬
    normalized_results = normalize_ratio(results)
    records = [[key, results[key]] for key in normalized_results.keys()]
    records.sort(key=lambda x: x[1], reverse=True)
    
    logging.info("success")
    print(records)
        
    return records


def create_table_if_not_exists(cursor, schema, table_name, columns):
    create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {schema}.{table_name} (
            {columns}
        );
    """
    cursor.execute(create_table_query)

@task
def load(schema, result_date, ages, genders, records):
    logging.info("insert started")
    cur = get_Redshift_connection()
    
    try:
        cur.execute("BEGIN;")
        
        # naver_datalab_result 테이블 생성
        create_table_if_not_exists(
            cursor=cur,
            schema=schema,
            table_name='naver_datalab_result',
            columns="""
                    id SERIAL PRIMARY KEY,
                    result_date TIMESTAMP,
                    ages VARCHAR(255),
                    genders VARCHAR(255)
                    """
        )
        
        # result_detail 테이블 생성
        create_table_if_not_exists(
            cursor=cur,
            schema=schema,
            table_name='result_detail',
            columns="""
                    result_id INT,
                    keyword VARCHAR(255),
                    ratio DOUBLE PRECISION
                    """
        )
        
        # naver_datalab_result에 데이터 삽입
        sql = f"""
                INSERT INTO {schema}.naver_datalab_result (result_date, ages, genders)
                VALUES ('{result_date}', '{ages}', '{genders}')
                RETURNING id;
                """
        cur.execute(sql)
        result_id = cur.fetchone()[0]
        
        # result_detail에 데이터 삽입
        for record in records:
            keyword = record[0]
            ratio = record[1]
            
            sql = f"""
                    INSERT INTO {schema}.result_detail
                    VALUES ({result_id}, '{keyword}', {ratio});
                    """
            cur.execute(sql)
            
        cur.execute("COMMIT;")
        
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        logging.error(error)
        cur.execute("ROLLBACK;")
        raise
    
    logging.info("load done")
    

with DAG(
    dag_id = 'NaverDatalabSummary',
    start_date = datetime(2024,1,2),
    tags=['ELT_for_Chart'],
    schedule_interval = '0 0 * * *',
    default_args = {
        'retries': 0,
        'retry_delay': timedelta(minutes=3),
        'on_failure_callback': slack.on_failure_callback,
    }
) as dag:
    
    # Airflow Variable에서 client_id, client_secret 가져오기
    client_id = Variable.get("client_id")
    client_secret = Variable.get("client_secret")

    url = "https://openapi.naver.com/v1/datalab/search"
    request = urllib.request.Request(url)
    request.add_header("X-Naver-Client-Id",client_id)
    request.add_header("X-Naver-Client-Secret",client_secret)
    request.add_header("Content-Type","application/json")

    # yyyy-mm-dd 형태 포맷팅
    result_date = datetime.now() - timedelta(days=1)
    result_date_str = result_date.strftime("%Y-%m-%d")
    
    keywords = get_latest_keywords_from_db(result_date_str)
    
    ages = []   # 예시 ["1"]
    
    genders = ["", "f", "m"]    # 예시 "f" or "m"
    
    for gender in genders:
        records = get_naver_api_results(request, result_date_str, keywords, ages, gender)
    
        schema = "analytics"
        load(schema, result_date_str, ages, gender, records)

 

TotalSummary.sql

WITH MaxRatio AS (
    SELECT MAX(ratio) AS max_ratio
    FROM result_detail
    WHERE result_id = (
        SELECT id 
        FROM naver_datalab_result
        WHERE result_date = (
            SELECT MAX(result_date)
            FROM naver_datalab_result
        )
        AND genders = ''
    )
)
SELECT rd.keyword, rd.ratio/mr.max_ratio*100 AS ratio
FROM result_detail rd
CROSS JOIN MaxRatio mr
WHERE rd.result_id = (
    SELECT id 
    FROM naver_datalab_result
    WHERE result_date = (
        SELECT MAX(result_date)
        FROM naver_datalab_result
    )
    AND genders = ''
);

 

Superset Chart