강의
DAG 파라미터
"DAG parameters"는 Task parameters와 다르게 DAG 객체를 만들 때 지정합니다.
- max_active_runs
- max_active_tasks
- catchup
DAG 폴더 주의 사항
Airflow의 dags 폴더 주기적으로 스캔
→ DAG 모듈의 모든 파일 메인 함수 실행
→ 개발 중인 테스트 코드 실행 가능
Connetions와 Variables
Connetions
- 일부 연결 관련 정보 저장 : hostname, port number, access credential
- Postgres 연결 혹은 Redshift 연결 정보 저장
Variables
- API 키 혹은 일부 구성 정보 저장
- 값 암호화 가능 : access 혹은 secret 사용
Xcom
"Xcom"은 태스크(Operator)간에 데이터를 주고 받기 위한 방식 입니다.
앞에 실행한 Operator의 리턴값을 뒤에 실행할 Operator에서 읽어가는 형태 입니다.
Aiflow 메타 데이터 DB에 저장이 되지 않는 큰 데이터에는 사용할 수 없습니다.
HellowWorld 예제
PythonOperator
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = 'HelloWorld',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *')
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
print_hello = PythonOperator(
task_id = 'print_hello',
python_callable = print_hello, # 작동 함수 지정
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye, # 작동 함수 지정
dag = dag)
# 순서 지정
print_hello >> print_goodbye
Task Decorators
"Task Decorators"는 프로그래밍을 단순하게 만듭니다.
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
# 순서 지정
print_hello() >> print_goodbye()
NameGenderCSV 예제
1. 기본 형태
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
import logging
import psycopg2
def get_Redshift_connection():
host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
user = "yen" # 본인 ID 사용
password = "..." # 본인 Password 사용
port = 5439
dbname = "dev"
conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}")
conn.set_session(autocommit=True)
return conn.cursor()
def extract(url):
logging.info("Extract started")
f = requests.get(url)
logging.info("Extract done")
return (f.text)
def transform(text):
logging.info("Transform started")
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Yen,F" -> [ 'Yen', 'F' ]
records.append([name, gender])
logging.info("Transform ended")
return records
def load(records):
logging.info("load started")
"""
records = [
[ "Yen", "F" ],
[ "Claire", "F" ],
...
]
"""
schema = "Yen"
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
def etl():
link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
data = extract(link)
lines = transform(data)
load(lines)
dag_second_assignment = DAG(
dag_id = 'name_gender',
catchup = False,
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *') # 적당히 조절
task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
dag = dag_second_assignment)
2. Params 사용 및 execution_data 추출
def etl(**context):
link = context["params"]["url"]
# task 자체에 대한 정보 (일부는 DAG의 정보가 되기도 함)를 읽고 싶다면 context['task_instance'] 혹은 context['ti']를 통해 가능
# https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
data = extract(link)
lines = transform(data)
load(lines)
3. Varialbes 이용 및 Xcom 이용하여 태스크 분할
Variables의 장점은 코드 푸시가 필요하지 않다는 것입니다.
Variables의 단점은 관리나 테스트가 되지 않아 사고로 이어질 가능성이 있다는 것입니다.
Task의 수가 너무 많으면 전체 DAG 실행 시간이 길어지고 스케줄러에 부하가 발생합니다.
Task의 수가 너무 적으면 모듈화가 되지 않아 재실행 시간이 오래 걸립니다.
def extract(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
f = requests.get(link)
return (f.text)
def transform(**context):
logging.info("Transform started")
text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Yen,F" -> [ 'Yen', 'F' ]
records.append([name, gender])
logging.info("Transform ended")
return records
def load(**context):
logging.info("load started")
schema = context["params"]["schema"]
table = context["params"]["table"]
lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
"""
records = [
[ "Yen", "F" ],
[ "Claire", "F" ],
...
]
"""
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get("csv_url")
},
dag = dag)
transform = PythonOperator(
task_id = 'transform',
python_callable = transform,
params = {
},
dag = dag)
load = PythonOperator(
task_id = 'load',
python_callable = load,
params = {
'schema': 'Yen',
'table': 'name_gender'
},
dag = dag)
extract >> transform >> load
4. Redshift Connection 사용
PostgresHook의 Default 값은 False입니다.
이 경우에 BIGIN은 영향을 주지 못합니다. (No-Operation)
from datetime import datetime
from datetime import timedelta
dag = DAG(
dag_id = 'name_gender_v4',
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
# 'on_failure_callback': slack.on_failure_callback,
}
)
5. Task Decorators 사용
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
from datetime import datetime
from datetime import timedelta
import requests
import logging
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
@task
def extract(url):
logging.info(datetime.utcnow())
f = requests.get(url)
return f.text
@task
def transform(text):
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Yen,F" -> [ 'Yen', 'F' ]
records.append([name, gender])
logging.info("Transform ended")
return records
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
"""
records = [
[ "Yen", "F" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
with DAG(
dag_id='namegender_v5',
start_date=datetime(2022, 10, 6), # 날짜가 미래인 경우 실행이 안됨
schedule='0 2 * * *', # 적당히 조절
max_active_runs=1,
catchup=False,
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=3),
# 'on_failure_callback': slack.on_failure_callback,
}
) as dag:
url = Variable.get("csv_url")
schema = 'Yen' ## 자신의 스키마로 변경
table = 'name_gender'
lines = transform(extract(url))
load(schema, table, lines)
'데브코스 TIL > 데이터 파이프라인, Airflow' 카테고리의 다른 글
Airflow DAG 작성 예제 4, 5 (0) | 2023.12.14 |
---|---|
Airflow DAG 작성 예제 3 (0) | 2023.12.13 |
Airflow 기본 프로그램 실행 (0) | 2023.12.12 |
Airflow 설치 (0) | 2023.12.12 |
Airflow 소개 (0) | 2023.12.11 |