강의
Dynamic Dags
"Dynamic Dags"는 템플릿과 YAML을 기반으로 만들 수 있습니다.
비슷한 DAG를 매뉴얼하게 개발하는 것을 방지할 수 있습니다.
# config_appl.yml
dag_id: 'APPL'
schedule: '@daily'
catchup: False
symbol: 'APPL'
# config_googl.yml
dag_id: 'GOOG'
schedule: '@weekly'
symbol: 'GOOG'
# generator.py
from jinja2 import Environment, FileSystemLoader
import yaml
import os
file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('templated_dag.jinja2')
for f in os.listdir(file_dir):
if f.endswith(".yml"):
with open(f"{file_dir}/{f}", "r") as cf:
config = yaml.safe_load(cf)
with open(f"dags/get_price_{config['dag_id']}.py", "w") as f:
f.write(template.render(config))
# templated_dag.jinja2
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG(dag_id="get_price_{{ dag_id }}",
start_date=datetime(2023, 6, 15),
schedule='{{ schedule }}',
catchup={{ catchup or True }}) as dag:
@task
def extract(symbol):
return symbol
@task
def process(symbol):
return symbol
@task
def store(symbol):
return symbol
store(process(extract("{{ symbol }}")))
'데브코스 TIL > 데이터 파이프라인, Airflow' 카테고리의 다른 글
Airflow 운영 (0) | 2024.01.05 |
---|---|
Task Groups (0) | 2024.01.05 |
Dag Dependencies (0) | 2024.01.05 |
API 모니터링 (0) | 2024.01.03 |
구글 시트 연동하기 (0) | 2024.01.03 |