강의
Dag 실행 방법
- 주기적 실행 : Schedule로 지정
- Dag에 의한 트리거
- Explicit Trigger : TriggerDagOperator
- Reactive Trigger : ExternalTaskSensor
- 조건에 따른 분기 : BranchPythonOperator
- 불필요한 태스크 처리 : LatestOnlyOperator
Trigger Rules
"Trigger Rules"는 Upstream 태스크의 성공 혹은 실패 상황에 따라 이어질 태스크의 실행 여부를 결정하는 파라미터 입니다.
- all_success
- all_failed
- all_done
- one_success
- none_failed
- none_failed_min_one_success
TriggerDagOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_dag = TriggerDagRunOperator(
task_id="trigger_dag",
trigger_dag_id="dag_name"
)
Sensor
"Sensor"는 특정 조건이 충족될 때까지 대기하는 Operator 입니다.
외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용합니다.
mode 파라미터를 reschedule 혹은 poke로 지정하여 이용합니다.
- FileSensor : 지정된 위치에 파일 생성
- HttpSensor : HTTP 요청 수행 및 응답
- SqlSensor : 데이터베이스의 조건 충족
- TimeSensor : 특정 시간 도달
- ExternalTaskSensor : 다른 Airflow DAG의 작업 완료
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id='waiting_for_end_of_dag_a',
external_dag_id='DAG이름',
external_task_id='end',
timeout=5*60,
mode='reschedule',
execution_delta=timedelta(minutes=5) # schedule interval 존재할 때
)
BranchPythonOperator
"BranchPythonOperator"는 상황에 따라 뒤에 실행되어야 할 태스크를 동적으로 결정해주는 Operator 입니다.
from airflow.operators.python import BranchPythonOperator
def skip_or_cont_trigger():
if Variable.get("mode", "dev") == "dev":
return []
else:
return ["trigger_b"]
branching = BranchPythonOperator(
task_id='branching',
python_callable=skip_or_cont_trigger,
)
LatestOnlyOperator
"LatestOnlyOperator"는 Time-sensitive한 태스크가 과거 데이터 Backfill 시에 실행되는 것을 막는 Operator 입니다.
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id='latest_only_example',
schedule=timedelta(hours=48), # 매 48시간마다 실행
start_date=datetime(2023, 6, 14),
catchup=True) as dag:
t1 = EmptyOperator(task_id='task1')
t2 = LatestOnlyOperator(task_id = 'latest_only')
t3 = EmptyOperator(task_id='task3')
t4 = EmptyOperator(task_id='task4')
t1 >> t2 >> [t3, t4]
'데브코스 TIL > 데이터 파이프라인, Airflow' 카테고리의 다른 글
Dynamic Dags (1) | 2024.01.05 |
---|---|
Task Groups (0) | 2024.01.05 |
API 모니터링 (0) | 2024.01.03 |
구글 시트 연동하기 (0) | 2024.01.03 |
MySQL 테이블 복사하기 (0) | 2023.12.15 |