_DAG 5

Dag Dependencies

강의 Dag 실행 방법 주기적 실행 : Schedule로 지정 Dag에 의한 트리거 Explicit Trigger : TriggerDagOperator Reactive Trigger : ExternalTaskSensor 조건에 따른 분기 : BranchPythonOperator 불필요한 태스크 처리 : LatestOnlyOperator Trigger Rules "Trigger Rules"는 Upstream 태스크의 성공 혹은 실패 상황에 따라 이어질 태스크의 실행 여부를 결정하는 파라미터 입니다. all_success all_failed all_done one_success none_failed none_failed_min_one_success TriggerDagOperator from airflow.o..

Airflow DAG 작성 예제 4, 5

강의 CountryInfo 예제 Full Refresh UTC 기준 매주 토요일 오전 6시 30분 실행 import requests @task def extract_transform(): response = requests.get('https://restcountries.com/v3/all') countries = response.json() records = [] for country in countries: name = country['name']['common'] population = country['population'] area = country['area'] records.append([name, population, area]) return records with DAG( dag_id = ..

Airflow DAG 작성 예제 3

강의 Yahoo Finance API 예제 Full Refresh 구현 Yahoo Finance API 호출 애플 주식 정보 수집 (지난 30일) Redshift에 테이블 및 레코드 적재 트랜잭션 형태 구성 from airflow import DAG from airflow.decorators import task from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime from pandas import Timestamp import yfinance as yf import pandas as pd import logging def get_Redshift_connection(autocommit=..

Airflow DAG 작성 예제 1, 2

강의 DAG 파라미터 "DAG parameters"는 Task parameters와 다르게 DAG 객체를 만들 때 지정합니다. max_active_runs max_active_tasks catchup DAG 폴더 주의 사항 Airflow의 dags 폴더 주기적으로 스캔 → DAG 모듈의 모든 파일 메인 함수 실행 → 개발 중인 테스트 코드 실행 가능 Connetions와 Variables Connetions 일부 연결 관련 정보 저장 : hostname, port number, access credential Postgres 연결 혹은 Redshift 연결 정보 저장 Variables API 키 혹은 일부 구성 정보 저장 값 암호화 가능 : access 혹은 secret 사용 Xcom "Xcom"은 태스..