데브코스 TIL/데이터 파이프라인, Airflow

Dynamic Dags

예니ㅣ 2024. 1. 5. 11:51

강의

Dynamic Dags

"Dynamic Dags"는 템플릿과 YAML을 기반으로 만들 수 있습니다.

비슷한 DAG를 매뉴얼하게 개발하는 것을 방지할 수 있습니다.

# config_appl.yml

dag_id: 'APPL'
schedule: '@daily'
catchup: False
symbol: 'APPL'
# config_googl.yml

dag_id: 'GOOG'
schedule: '@weekly'
symbol: 'GOOG'
# generator.py

from jinja2 import Environment, FileSystemLoader
import yaml
import os

file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('templated_dag.jinja2')

for f in os.listdir(file_dir):
    if f.endswith(".yml"):
        with open(f"{file_dir}/{f}", "r") as cf:
            config = yaml.safe_load(cf)
            with open(f"dags/get_price_{config['dag_id']}.py", "w") as f:
                f.write(template.render(config))
# templated_dag.jinja2

from airflow import DAG
from airflow.decorators import task
from datetime import datetime

with DAG(dag_id="get_price_{{ dag_id }}",
    start_date=datetime(2023, 6, 15),
    schedule='{{ schedule }}',
    catchup={{ catchup or True }}) as dag:

    @task
    def extract(symbol):
        return symbol

    @task
    def process(symbol):
        return symbol

    @task
    def store(symbol):
        return symbol

    store(process(extract("{{ symbol }}")))

'데브코스 TIL > 데이터 파이프라인, Airflow' 카테고리의 다른 글

Airflow 운영  (0) 2024.01.05
Task Groups  (0) 2024.01.05
Dag Dependencies  (0) 2024.01.05
API 모니터링  (0) 2024.01.03
구글 시트 연동하기  (0) 2024.01.03