데브코스 TIL/데이터 파이프라인, Airflow

Airflow DAG 작성 예제 1, 2

예니ㅣ 2023. 12. 13. 12:30

강의

DAG 파라미터

"DAG parameters"는 Task parameters와 다르게 DAG 객체를 만들 때 지정합니다.

  • max_active_runs
  • max_active_tasks
  • catchup

 


DAG 폴더 주의 사항

Airflow의 dags 폴더 주기적으로 스캔

→ DAG 모듈의 모든 파일 메인 함수 실행

→ 개발 중인 테스트 코드 실행 가능

 


Connetions와 Variables

Connetions

  • 일부 연결 관련 정보 저장 : hostname, port number, access credential
  • Postgres 연결 혹은 Redshift 연결 정보 저장

 

Variables

  • API 키 혹은 일부 구성 정보 저장
  • 값 암호화 가능 : access 혹은 secret 사용

 


Xcom

"Xcom"은 태스크(Operator)간에 데이터를 주고 받기 위한 방식 입니다.

앞에 실행한 Operator의 리턴값을 뒤에 실행할 Operator에서 읽어가는 형태 입니다.

Aiflow 메타 데이터 DB에 저장이 되지 않는 큰 데이터에는 사용할 수 없습니다.

 

 


HellowWorld 예제

PythonOperator

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG(
    dag_id = 'HelloWorld',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *')

def print_hello():
    print("hello!")
    return "hello!"

def print_goodbye():
    print("goodbye!")
    return "goodbye!"

print_hello = PythonOperator(
    task_id = 'print_hello',
    python_callable = print_hello,		# 작동 함수 지정
    dag = dag)

print_goodbye = PythonOperator(
    task_id = 'print_goodbye',
    python_callable = print_goodbye,	# 작동 함수 지정
    dag = dag)

# 순서 지정
print_hello >> print_goodbye

 

Task Decorators

"Task Decorators"는 프로그래밍을 단순하게 만듭니다.

from airflow import DAG
from airflow.decorators import task
from datetime import datetime

@task
def print_hello():
    print("hello!")
    return "hello!"

@task
def print_goodbye():
    print("goodbye!")
    return "goodbye!"

with DAG(
    dag_id = 'HelloWorld_v2',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *'
) as dag:

    # 순서 지정
    print_hello() >> print_goodbye()

 


NameGenderCSV 예제

1. 기본 형태

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
import logging
import psycopg2

def get_Redshift_connection():
    host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
    user = "yen"  # 본인 ID 사용
    password = "..."  # 본인 Password 사용
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}")
    conn.set_session(autocommit=True)
    return conn.cursor()


def extract(url):
    logging.info("Extract started")
    f = requests.get(url)
    logging.info("Extract done")
    return (f.text)


def transform(text):
    logging.info("Transform started")	
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Yen,F" -> [ 'Yen', 'F' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


def load(records):
    logging.info("load started")
    """
    records = [
      [ "Yen", "F" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    schema = "Yen"
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")


def etl():
    link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
    data = extract(link)
    lines = transform(data)
    load(lines)
    
dag_second_assignment = DAG(
	dag_id = 'name_gender',
	catchup = False,
	start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
	schedule = '0 2 * * *')  # 적당히 조절

task = PythonOperator(
	task_id = 'perform_etl',
	python_callable = etl,
	dag = dag_second_assignment)

 

2. Params 사용 및 execution_data 추출

def etl(**context):
    link = context["params"]["url"]
    # task 자체에 대한 정보 (일부는 DAG의 정보가 되기도 함)를 읽고 싶다면 context['task_instance'] 혹은 context['ti']를 통해 가능
    # https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)

    data = extract(link)
    lines = transform(data)
    load(lines)

 

3. Varialbes 이용 및 Xcom 이용하여 태스크 분할

Variables의 장점은 코드 푸시가 필요하지 않다는 것입니다.

Variables의 단점은 관리나 테스트가 되지 않아 사고로 이어질 가능성이 있다는 것입니다.

 

Task의 수가 너무 많으면 전체 DAG 실행 시간이 길어지고 스케줄러에 부하가 발생합니다.

Task의 수가 너무 적으면 모듈화가 되지 않아 재실행 시간이 오래 걸립니다.

def extract(**context):
    link = context["params"]["url"]
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)
    f = requests.get(link)
    return (f.text)
def transform(**context):
    logging.info("Transform started")    
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Yen,F" -> [ 'Yen', 'F' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records
def load(**context):
    logging.info("load started")    
    schema = context["params"]["schema"]
    table = context["params"]["table"]

    lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
    """
    records = [
      [ "Yen", "F" ],
      [ "Claire", "F" ],
      ...
    ]
    """
extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)

transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    params = { 
    },  
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    params = {
        'schema': 'Yen',
        'table': 'name_gender'
    },
    dag = dag)

extract >> transform >> load

 

4. Redshift Connection 사용

PostgresHook의 Default 값은 False입니다.

이 경우에 BIGIN은 영향을 주지 못합니다. (No-Operation)

from datetime import datetime
from datetime import timedelta

dag = DAG(
    dag_id = 'name_gender_v4',
    start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        # 'on_failure_callback': slack.on_failure_callback,
    }
)

 

5. Task Decorators 사용

from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def extract(url):
    logging.info(datetime.utcnow())
    f = requests.get(url)
    return f.text


@task
def transform(text):
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Yen,F" -> [ 'Yen', 'F' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


@task
def load(schema, table, records):
    logging.info("load started")    
    cur = get_Redshift_connection()   
    """
    records = [
      [ "Yen", "F" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")


with DAG(
    dag_id='namegender_v5',
    start_date=datetime(2022, 10, 6),  # 날짜가 미래인 경우 실행이 안됨
    schedule='0 2 * * *',  # 적당히 조절
    max_active_runs=1,
    catchup=False,
    default_args={
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        # 'on_failure_callback': slack.on_failure_callback,
    }
) as dag:

    url = Variable.get("csv_url")
    schema = 'Yen'   ## 자신의 스키마로 변경
    table = 'name_gender'

    lines = transform(extract(url))
    load(schema, table, lines)

'데브코스 TIL > 데이터 파이프라인, Airflow' 카테고리의 다른 글

Airflow DAG 작성 예제 4, 5  (0) 2023.12.14
Airflow DAG 작성 예제 3  (0) 2023.12.13
Airflow 기본 프로그램 실행  (0) 2023.12.12
Airflow 설치  (0) 2023.12.12
Airflow 소개  (0) 2023.12.11