데브코스 TIL/데이터 파이프라인, Airflow

Dag Dependencies

예니ㅣ 2024. 1. 5. 11:40

강의

Dag 실행 방법

  • 주기적 실행 : Schedule로 지정
  • Dag에 의한 트리거
    • Explicit Trigger : TriggerDagOperator 
    • Reactive Trigger : ExternalTaskSensor
  • 조건에 따른 분기 : BranchPythonOperator
  • 불필요한 태스크 처리 : LatestOnlyOperator

 


Trigger Rules

"Trigger Rules"는 Upstream 태스크의 성공 혹은 실패 상황에 따라 이어질 태스크의 실행 여부를 결정하는 파라미터 입니다.

  • all_success
  • all_failed
  • all_done
  • one_success
  • none_failed
  • none_failed_min_one_success

 


TriggerDagOperator

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_dag = TriggerDagRunOperator(
	task_id="trigger_dag",
    trigger_dag_id="dag_name"
)

 


Sensor

"Sensor"는 특정 조건이 충족될 때까지 대기하는 Operator 입니다.

외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용합니다.

mode 파라미터를 reschedule 혹은 poke로 지정하여 이용합니다.

  • FileSensor : 지정된 위치에 파일 생성
  • HttpSensor : HTTP 요청 수행 및 응답
  • SqlSensor : 데이터베이스의 조건 충족
  • TimeSensor : 특정 시간 도달
  • ExternalTaskSensor : 다른 Airflow DAG의 작업 완료
from airflow.sensors.external_task import ExternalTaskSensor

waiting_for_end_of_dag_a = ExternalTaskSensor(
	task_id='waiting_for_end_of_dag_a',
	external_dag_id='DAG이름',
	external_task_id='end',
	timeout=5*60,
	mode='reschedule',
	execution_delta=timedelta(minutes=5)	# schedule interval 존재할 때
)

 


BranchPythonOperator

"BranchPythonOperator"는 상황에 따라 뒤에 실행되어야 할 태스크를 동적으로 결정해주는 Operator 입니다.

from airflow.operators.python import BranchPythonOperator

def skip_or_cont_trigger():
	if Variable.get("mode", "dev") == "dev":
		return []
	else:
		return ["trigger_b"]

branching = BranchPythonOperator(
	task_id='branching',
	python_callable=skip_or_cont_trigger,
)

 


LatestOnlyOperator

"LatestOnlyOperator"는 Time-sensitive한 태스크가 과거 데이터 Backfill 시에 실행되는 것을 막는 Operator 입니다.

from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator

with DAG(
	dag_id='latest_only_example',
	schedule=timedelta(hours=48), # 매 48시간마다 실행
	start_date=datetime(2023, 6, 14),
	catchup=True) as dag:
    
    t1 = EmptyOperator(task_id='task1')
    t2 = LatestOnlyOperator(task_id = 'latest_only')
    t3 = EmptyOperator(task_id='task3')
    t4 = EmptyOperator(task_id='task4')
    
t1 >> t2 >> [t3, t4]

 

 

 

'데브코스 TIL > 데이터 파이프라인, Airflow' 카테고리의 다른 글

Dynamic Dags  (1) 2024.01.05
Task Groups  (0) 2024.01.05
API 모니터링  (0) 2024.01.03
구글 시트 연동하기  (0) 2024.01.03
MySQL 테이블 복사하기  (0) 2023.12.15