주제
구글 트렌드와 네이버 데이터랩을 이용하여 네이버 실시간 검색어 복원하기
구조
- 데이터 인프라 형성
- 구글 트렌드의 키워드 추출
- 네이버 데이터랩을 통해 검색어 순위 및 백분위 횟수 추출
이슈
- 구글 트렌드 공용 API 존재하지 않음
- 네이버 데이터랩 금일 데이터 존재하지 않음
- 키워드 입력 개수 5개 제한
- 네이버 데이터랩 ID, PASSWORD Variables 설정
- 전체 및 성별별 데이터 적재
프로젝트 코드
API RAW_CODE.py
import os
import sys
import pandas as pd
from pandas import Timestamp
from datetime import datetime
import urllib.request
import json
client_id = "id"
client_secret = "password
url = "https://openapi.naver.com/v1/datalab/search"
request = urllib.request.Request(url)
request.add_header("X-Naver-Client-Id",client_id)
request.add_header("X-Naver-Client-Secret",client_secret)
request.add_header("Content-Type","application/json")
body = """{
\"startDate\":\"2024-01-01\",
\"endDate\":\"2024-01-05\",
\"timeUnit\":\"month\",
\"keywordGroups\":[
{\"groupName\":\"한글\",\"keywords\":[\"한글\",\"korean\"]},
{\"groupName\":\"영어\",\"keywords\":[\"영어\",\"english\"]},
{"groupName":"중국어","keywords":["중국어","chinese"]}
],
\"device\":\"pc\",
\"ages\":[\"1\",\"2\"],
\"gender\":\"f\"
}"""
response = urllib.request.urlopen(request, data=body.encode("utf-8"))
rescode = response.getcode()
# 에러 처리
if(rescode==200):
response_body = response.read()
print(response_body.decode('utf-8'))
else:
print("Error Code:" + rescode)
API RAW_CODE_v2.py
import os
import sys
import pandas as pd
from pandas import Timestamp
from datetime import datetime, timedelta
import urllib.request
import requests
import json
def get_google_api_keyword(google_url):
r = requests.get(google_url)
data = json.loads(r.text[6:])
result = []
for t in data['default']['trendingSearchesDays'][0]['trendingSearches']:
result.append(t['title']['query'])
print(result)
return result
def get_naver_api_result(request, date, keywords_group, ages, genders):
body = f"""{{
"startDate": "{date}",
"endDate": "{date}",
"timeUnit": "date",
"keywordGroups": "{keywords_group}",
"device": "pc",
"ages": {json.dumps(ages)},
"gender": "{genders}"
}}"""
response = urllib.request.urlopen(request, data=body.encode("utf-8"))
rescode = response.getcode()
# 에러 처리
if(rescode==200):
response_body = response.read()
result = response_body.decode('utf-8')
print(result)
else:
print("Error Code:", rescode)
return result
def main():
client_id = "id"
client_secret = "password"
url = "https://openapi.naver.com/v1/datalab/search"
request = urllib.request.Request(url)
request.add_header("X-Naver-Client-Id",client_id)
request.add_header("X-Naver-Client-Secret",client_secret)
request.add_header("Content-Type","application/json")
# 현재 날짜를 가져오기
# 날짜를 yyyy-mm-dd 형태로 포맷팅
date = datetime.now() - timedelta(days=1)
date = date.strftime("%Y-%m-%d")
google_url = "https://trends.google.com/trends/api/dailytrends?hl=ko&tz=-540&geo=KR&hl=ko&ns=15"
keywords = get_google_api_keyword(google_url)
keywords_group = []
keywords_group.append({"groupName": keywords[0], "keywords": keywords[0]})
ages = ["1","2"]
genders = "f"
get_naver_api_result(request, date, keywords_group, ages, genders)
if __name__ == "__main__":
main()
DAG RAW_CODE.py
from airflow import DAG
from airflow.decorators import task
import os
import sys
import pandas as pd
from pandas import Timestamp
from datetime import datetime, timedelta
import urllib.request
import requests
import json
@task
def get_google_api_keyword(google_url):
r = requests.get(google_url)
data = json.loads(r.text[6:])
result = []
for t in data['default']['trendingSearchesDays'][0]['trendingSearches']:
result.append(t['title']['query'])
print(result)
return result
@task
def get_naver_api_result(request, date, keywords, ages, genders):
keywords_groups = []
cnt = 0
group = []
for i in range(1,len(keywords)):
if(cnt%4==0):
group.append({"groupName": keywords[0], "keywords": [keywords[0]]})
group.append({"groupName": keywords[i], "keywords": [keywords[i]]})
cnt+=1
if(cnt%4 == 0 or i == len(keywords)-1):
keywords_groups.append(group)
group = []
results = {}
for i in range(len(keywords_groups)):
body = f"""{{
"startDate": "{date}",
"endDate": "{date}",
"timeUnit": "date",
"keywordGroups": {json.dumps(keywords_groups[i],ensure_ascii=False)},
"device": "pc",
"ages": {json.dumps(ages)},
"gender": "{genders}"
}}"""
response = urllib.request.urlopen(request, data=body.encode("utf-8"))
rescode = response.getcode()
if(rescode==200):
response_body = response.read()
res = response_body.decode('utf-8')
parsed_json = json.loads(res)
# keywords: ratio 형태의 result_dict 생성
result_dict = {}
for result in parsed_json.get('results', []):
title = result.get('title')
kws = result.get('keywords', [])
# 'data'가 비어있는 경우에 대비
if 'data' in result and result['data']:
ratio = result['data'][0].get('ratio')
for kw in kws:
result_dict[kw] = ratio
# 비율 조정
if result_dict[keywords[0]] <100:
a = 100/result_dict[keywords[0]]
for key in result_dict:
result_dict[key] *= a
results.update(result_dict)
# 에러 처리
else:
print("Error Code:", rescode)
# 최대 비율 조정
max_value = max(results.values())
ratios = {key: value / max_value * 100 for key, value in results.items()}
normalized_dict = {key: value for key, value in ratios.items()}
print(normalized_dict)
return normalized_dict
with DAG(
dag_id = 'Naver_API',
start_date = datetime(2024,1,1),
catchup=False,
tags=['mydag'],
schedule = '* * * * *'
) as dag:
#Assign the order of the tasks in our DAG
client_id = "id"
client_secret = "password"
url = "https://openapi.naver.com/v1/datalab/search"
request = urllib.request.Request(url)
request.add_header("X-Naver-Client-Id",client_id)
request.add_header("X-Naver-Client-Secret",client_secret)
request.add_header("Content-Type","application/json")
# 가장 최근 날짜를 yyyy-mm-dd 형태로 포맷팅
## 네이버는 당일 데이터가 없음
date = datetime.now() - timedelta(days=1)
date = date.strftime("%Y-%m-%d")
google_url = "https://trends.google.com/trends/api/dailytrends?hl=ko&tz=-540&geo=KR&hl=ko&ns=15"
keywords = get_google_api_keyword(google_url)
ages = ["1","2"]
genders = "f"
get_naver_api_result(request, date, keywords, ages, genders)
NaverDatalabDags.py
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from plugins import slack
import os
import sys
import pandas as pd
from pandas import Timestamp
from datetime import datetime, timedelta
import urllib.request
import json
import logging
import psycopg2
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='project_postgres')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
@task
def get_latest_keywords_from_db(result_date):
cur = get_Redshift_connection(result_date)
sql = f"""
SELECT *
FROM raw_data.google_trends
WHERE search_date = '{result_date}'
"""
cur.execute(sql)
records = cur.fetchall()
result = []
for row in records:
result.append(row[1])
cur.close()
logging.info('Getting latest keywords from db done')
logging.info(result)
return result
def get_body(keywords_group,date,ages,genders):
return f"""{{
"startDate": "{date}",
"endDate": "{date}",
"timeUnit": "date",
"keywordGroups": {json.dumps(keywords_group,ensure_ascii=False)},
"device": "pc",
"ages": {json.dumps(ages)},
"gender": "{genders}"
}}"""
def get_result_dict_from(response):
response_body = response.read().decode('utf-8')
parsed_json = json.loads(response_body)
# keywords: ratio 형태의 result_dict 생성
dict = {}
for result in parsed_json.get('results', []):
title = result.get('title')
keywords = result.get('keywords', [])
# 'data'가 비어있는 경우에 대비
if 'data' in result and result['data']:
ratio = result['data'][0].get('ratio')
for keyword in keywords:
dict[keyword] = ratio
return dict
def update_results(results, response, first):
rescode = response.getcode()
if(rescode==200):
logging.info(rescode)
result_dict = get_result_dict_from(response)
# 비율 조정
if result_dict[first] != 100:
for key in result_dict.keys():
result_dict[key] *= 100/result_dict[first]
results.update(result_dict)
# 에러 처리
else:
logging.error("Error Code:", rescode)
def normalize_ratio(results_dict):
max_value = max(results_dict.values())
for key, value in results_dict.itemls():
normalized_dict[key] = round((value / max_value) * 100, 4)
return normalized_dict
@task
def get_naver_api_results(request, date, keywords, ages, genders):
keywords_groups = []
cnt = 0
group = []
for i in range(1,len(keywords)):
if(cnt%4==0):
group.append({"groupName": keywords[0], "keywords": [keywords[0]]})
group.append({"groupName": keywords[i], "keywords": [keywords[i]]})
cnt+=1
if(cnt%4 == 0 or i == len(keywords)-1):
keywords_groups.append(group)
group = []
results = {}
for i in range(len(keywords_groups)):
body = get_body(keywords_groups[i],date,ages,genders)
response = urllib.request.urlopen(request, data=body.encode("utf-8"))
update_results(results, response, keywords[0])
# 최대 비율 조정 및 정렬
normalized_results = normalize_ratio(results)
records = [[key, results[key]] for key in normalized_results.keys()]
records.sort(key=lambda x: x[1], reverse=True)
logging.info("success")
print(records)
return records
def create_table_if_not_exists(cursor, schema, table_name, columns):
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {schema}.{table_name} (
{columns}
);
"""
cursor.execute(create_table_query)
@task
def load(schema, result_date, ages, genders, records):
logging.info("insert started")
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
# naver_datalab_result 테이블 생성
create_table_if_not_exists(
cursor=cur,
schema=schema,
table_name='naver_datalab_result',
columns="""
id SERIAL PRIMARY KEY,
result_date TIMESTAMP,
ages VARCHAR(255),
genders VARCHAR(255)
"""
)
# result_detail 테이블 생성
create_table_if_not_exists(
cursor=cur,
schema=schema,
table_name='result_detail',
columns="""
result_id INT,
keyword VARCHAR(255),
ratio DOUBLE PRECISION
"""
)
# naver_datalab_result에 데이터 삽입
sql = f"""
INSERT INTO {schema}.naver_datalab_result (result_date, ages, genders)
VALUES ('{result_date}', '{ages}', '{genders}')
RETURNING id;
"""
cur.execute(sql)
result_id = cur.fetchone()[0]
# result_detail에 데이터 삽입
for record in records:
keyword = record[0]
ratio = record[1]
sql = f"""
INSERT INTO {schema}.result_detail
VALUES ({result_id}, '{keyword}', {ratio});
"""
cur.execute(sql)
cur.execute("COMMIT;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
logging.error(error)
cur.execute("ROLLBACK;")
raise
logging.info("load done")
with DAG(
dag_id = 'NaverDatalabSummary',
start_date = datetime(2024,1,2),
tags=['ELT_for_Chart'],
schedule_interval = '0 0 * * *',
default_args = {
'retries': 0,
'retry_delay': timedelta(minutes=3),
'on_failure_callback': slack.on_failure_callback,
}
) as dag:
# Airflow Variable에서 client_id, client_secret 가져오기
client_id = Variable.get("client_id")
client_secret = Variable.get("client_secret")
url = "https://openapi.naver.com/v1/datalab/search"
request = urllib.request.Request(url)
request.add_header("X-Naver-Client-Id",client_id)
request.add_header("X-Naver-Client-Secret",client_secret)
request.add_header("Content-Type","application/json")
# yyyy-mm-dd 형태 포맷팅
result_date = datetime.now() - timedelta(days=1)
result_date_str = result_date.strftime("%Y-%m-%d")
keywords = get_latest_keywords_from_db(result_date_str)
ages = [] # 예시 ["1"]
genders = ["", "f", "m"] # 예시 "f" or "m"
for gender in genders:
records = get_naver_api_results(request, result_date_str, keywords, ages, gender)
schema = "analytics"
load(schema, result_date_str, ages, gender, records)
TotalSummary.sql
WITH MaxRatio AS (
SELECT MAX(ratio) AS max_ratio
FROM result_detail
WHERE result_id = (
SELECT id
FROM naver_datalab_result
WHERE result_date = (
SELECT MAX(result_date)
FROM naver_datalab_result
)
AND genders = ''
)
)
SELECT rd.keyword, rd.ratio/mr.max_ratio*100 AS ratio
FROM result_detail rd
CROSS JOIN MaxRatio mr
WHERE rd.result_id = (
SELECT id
FROM naver_datalab_result
WHERE result_date = (
SELECT MAX(result_date)
FROM naver_datalab_result
)
AND genders = ''
);
Superset Chart
'데브코스 TIL > [프로젝트]' 카테고리의 다른 글
데이터 웨어하우스를 이용한 대시보드 구성 (1) | 2023.12.06 |
---|---|
크롤한 웹데이터로 만들어보는 웹사이트 (0) | 2023.11.07 |