데브코스 TIL/데이터 파이프라인, Airflow 17

MySQL 테이블 복사하기

강의 MySQL 테이블 복사 MySQL은 OLTP 및 Production Database 입니다. Redshift 는 OLAP 및 Data Warehouse 입니다. 복사 과정 MySQL의 테이블을 Redshift로 복사합니다. OLTP → OLAP CREATE TABLE prod.nps ( id INT NOT NULL AUTO_INCREMENT primary key, created_at timestamp, score smallint ); CREATE TABLE yen.nps ( id INT NOT NULL primary key, created_at timestamp, score smallint ); 1. AWS 관련 권한 설정 : DAG의 S3 쓰기 권한 및 Redshift의 S3 읽기 권한 IAM으로..

Airflow Backfill 이해하기

강의 Backfill "Backfill"은 실패한 데이터 파이프라인 혹은 이미 추출한 데이터의 오류로 인한 데이터 파이프라인 재실행을 말합니다. Full Refresh : 단순 재실행 Incremental Update : 복잡 → Airflow Backfill 용이 날짜별 Backfill 결과 및 성공 여부 기록하여 ETL 인자로 제공합니다. 시스템이 지정해준 날짜 사용하면 Backfill을 구현할 수 있습니다. Airflow Backfill 구현 방식 ETL별 실행 날짜 및 결과 메타데이터 데이터베이스에 기록 모든 DAG 실행에 execution_date 지정 데이터 갱신하는 코드 작성 Backfill 관련 Airflow 변수 start_date : DAG가 처음 추출해야하는 데이터의 날짜 및 시간 e..

Primary Key Uniqueness 보장하기

강의 Primary Key Uniqueness "Primary Key"는 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드 입니다. 관계형 데이터베이스 시스템이 Primary Key의 값이 중복 존재하는 것을 막아줍니다. 빅데이터 기반 데이터 웨어하우스는 Primary Key Uniqueness를 보장하지 않습니다. Primary Key 유지 방법 ROW_NUMBER : 최근 정보 우선 선택 스테이징 테이블 이용 : 최신 레코드 우선 선택 -- weather_forecast 예시 이용 INSERT INTO Yen.weather_forecast SELECT date, temp, min_temp, max_temp, created_date FROM ( SELECT *, ROW_NUMBER() OVER ..

Airflow DAG 작성 예제 4, 5

강의 CountryInfo 예제 Full Refresh UTC 기준 매주 토요일 오전 6시 30분 실행 import requests @task def extract_transform(): response = requests.get('https://restcountries.com/v3/all') countries = response.json() records = [] for country in countries: name = country['name']['common'] population = country['population'] area = country['area'] records.append([name, population, area]) return records with DAG( dag_id = ..

Airflow DAG 작성 예제 3

강의 Yahoo Finance API 예제 Full Refresh 구현 Yahoo Finance API 호출 애플 주식 정보 수집 (지난 30일) Redshift에 테이블 및 레코드 적재 트랜잭션 형태 구성 from airflow import DAG from airflow.decorators import task from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime from pandas import Timestamp import yfinance as yf import pandas as pd import logging def get_Redshift_connection(autocommit=..

Airflow DAG 작성 예제 1, 2

강의 DAG 파라미터 "DAG parameters"는 Task parameters와 다르게 DAG 객체를 만들 때 지정합니다. max_active_runs max_active_tasks catchup DAG 폴더 주의 사항 Airflow의 dags 폴더 주기적으로 스캔 → DAG 모듈의 모든 파일 메인 함수 실행 → 개발 중인 테스트 코드 실행 가능 Connetions와 Variables Connetions 일부 연결 관련 정보 저장 : hostname, port number, access credential Postgres 연결 혹은 Redshift 연결 정보 저장 Variables API 키 혹은 일부 구성 정보 저장 값 암호화 가능 : access 혹은 secret 사용 Xcom "Xcom"은 태스..