강의
MySQL 테이블 복사
MySQL은 OLTP 및 Production Database 입니다.
Redshift 는 OLAP 및 Data Warehouse 입니다.
복사 과정
MySQL의 테이블을 Redshift로 복사합니다.
OLTP → OLAP
CREATE TABLE prod.nps (
id INT NOT NULL AUTO_INCREMENT primary key,
created_at timestamp,
score smallint
);
CREATE TABLE yen.nps (
id INT NOT NULL primary key,
created_at timestamp,
score smallint
);
1. AWS 관련 권한 설정 : DAG의 S3 쓰기 권한 및 Redshift의 S3 읽기 권한
- IAM으로 별도 사용자 생성 및 권한 제공 : Create Policy 및 Custom Policy 사용
- 주기적으로 키 변경 필요 → 해킹 방지
- 입력 정보
- S3 버킷 : grepp-data-engineering
2. S3 Connection 설정
- Access Key ID 및 Secret Access Key 사용
- 입력 정보
- Conn Id : aws_conn_id
- Conn Type : S3 혹은 Amazon Web Service 혹은 Generic
- Extra : {"region_name": "ap-northeast-2"}
2. MySQL Connections 설정 : Airflow Scheduler Docker Container에 root 유저로 로그인 및 실행
- 입력 정보
- Conn Id : mysql_conn_id
- Conn Type : MySQL
- Host : ec2-43-201-106.ap-northeast-2.compute.amazoneaws.com
- Schema : prod
- Login : guest
- Password : Guestt1!
- Port : 3306
docker exec --user root -it 0017662673c3 sh
(airflow) sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
MySQL_to_Redshift 예제
DAG Task 구성
- SqlToS3Operator
- MySQL → S3
- (s3://grepp-data-engineering/{yen}-nps)
- s3://s3-bucket/s3_key
- S3ToRedshiftOperator
- S3 → Redshift
- (s3://grepp-data-engineering/{yen}-nps) → Redshift (yen.nps)
- 벌크 업데이트 : COPY 사용
Full Refresh 구현
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift',
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "yen"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps",
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
method = 'REPLACE',
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
Incremental Update 구현
- created (timestamp) : Optional
- modified (timestamp)
- deleted (boolean) : 레코드 삭제하지 않고 deleted = True 설정
- ROW_NUMBER 구현
- S3ToRedshiftOperator 구현
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift_v2',
start_date = datetime(2023,1,1), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "keeyong"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table # s3_key = schema + "/" + table
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = sql,
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
method = "UPSERT",
upsert_keys = ["id"],
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
'데브코스 TIL > 데이터 파이프라인, Airflow' 카테고리의 다른 글
API 모니터링 (0) | 2024.01.03 |
---|---|
구글 시트 연동하기 (0) | 2024.01.03 |
Airflow Backfill 이해하기 (0) | 2023.12.14 |
Primary Key Uniqueness 보장하기 (0) | 2023.12.14 |
Airflow DAG 작성 예제 4, 5 (0) | 2023.12.14 |