데브코스 TIL/데이터 파이프라인, Airflow

MySQL 테이블 복사하기

예니ㅣ 2023. 12. 15. 13:17

강의

MySQL 테이블 복사

MySQL은 OLTP 및 Production Database 입니다.

Redshift 는 OLAP 및 Data Warehouse 입니다.

 

복사 과정

MySQL의 테이블을 Redshift로 복사합니다.

OLTP → OLAP

CREATE TABLE prod.nps (
	id INT NOT NULL AUTO_INCREMENT primary key,
    created_at timestamp,
    score smallint
);
CREATE TABLE yen.nps (
	id INT NOT NULL primary key,
    created_at timestamp,
    score smallint
);

 

1. AWS 관련 권한 설정 : DAG의 S3 쓰기 권한 및 Redshift의 S3 읽기 권한

  • IAM으로 별도 사용자 생성 및 권한 제공 : Create Policy 및 Custom Policy 사용
  • 주기적으로 키 변경 필요 → 해킹 방지
  • 입력 정보
    • S3 버킷 : grepp-data-engineering

2. S3 Connection 설정

  • Access Key ID 및 Secret Access Key 사용
  • 입력 정보
    • Conn Id : aws_conn_id
    • Conn Type : S3 혹은 Amazon Web Service 혹은 Generic
    • Extra : {"region_name": "ap-northeast-2"}

2. MySQL Connections 설정 : Airflow Scheduler Docker Container에 root 유저로 로그인 및 실행

  • 입력 정보
    • Conn Id : mysql_conn_id
    • Conn Type : MySQL
    • Host : ec2-43-201-106.ap-northeast-2.compute.amazoneaws.com
    • Schema : prod
    • Login : guest
    • Password : Guestt1!
    • Port : 3306
docker exec --user root -it 0017662673c3 sh
(airflow) sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"

 


MySQL_to_Redshift 예제

DAG Task 구성

  • SqlToS3Operator
    • MySQL → S3
    • (s3://grepp-data-engineering/{yen}-nps)
    • s3://s3-bucket/s3_key
  • S3ToRedshiftOperator
    • S3 → Redshift
    • (s3://grepp-data-engineering/{yen}-nps) → Redshift (yen.nps)
    • 벌크 업데이트 : COPY 사용

 

Full Refresh 구현

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta

import requests
import logging
import psycopg2
import json


dag = DAG(
    dag_id = 'MySQL_to_Redshift',
    start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 9 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

schema = "yen"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table

mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = "SELECT * FROM prod.nps",
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True,
    pd_kwargs={"index": False, "header": False},    
    dag = dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    method = 'REPLACE',
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",
    dag = dag
)

mysql_to_s3_nps >> s3_to_redshift_nps

 

Incremental Update 구현

  • created (timestamp) : Optional
  • modified (timestamp)
  • deleted (boolean) : 레코드 삭제하지 않고 deleted = True 설정

 

  1. ROW_NUMBER 구현
  2. S3ToRedshiftOperator 구현
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta

import requests
import logging
import psycopg2
import json

dag = DAG(
    dag_id = 'MySQL_to_Redshift_v2',
    start_date = datetime(2023,1,1), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 9 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

schema = "keeyong"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table       # s3_key = schema + "/" + table

sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = sql,
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True,
    pd_kwargs={"index": False, "header": False},    
    dag = dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",    
    method = "UPSERT",
    upsert_keys = ["id"],
    dag = dag
)

mysql_to_s3_nps >> s3_to_redshift_nps

 

 

 

'데브코스 TIL > 데이터 파이프라인, Airflow' 카테고리의 다른 글

API 모니터링  (0) 2024.01.03
구글 시트 연동하기  (0) 2024.01.03
Airflow Backfill 이해하기  (0) 2023.12.14
Primary Key Uniqueness 보장하기  (0) 2023.12.14
Airflow DAG 작성 예제 4, 5  (0) 2023.12.14