데브코스 TIL/데이터 파이프라인, Airflow

ETL 작성 실습

예니ㅣ 2023. 12. 11. 16:43

강의

ETL 개요

1. schema 밑에 테이블 생성

# Colab 환경
!pip install ipython-sql==0.4.1
!pip install SQLAlchemy==1.4.49

%load_ext sql
postgresql://ID:PW@learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev

DROP TABLE IF EXISTS keeyong.name_gender;
CREATE TABLE yen.name_gender (
   name varchar(32) primary key,
   gender varchar(8)
);
import psycopg2

# Redshift connection 함수
# 본인 ID/PW 사용!
def get_Redshift_connection():
    host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
    redshift_user = "ID"
    redshift_pass = "Password"
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
        dbname=dbname,
        user=redshift_user,
        password=redshift_pass,
        host=host,
        port=port
    ))
    conn.set_session(autocommit=True)
    return conn.cursor()

 

2. Colab 코드 작성

  • extract
  • transform
  • load
import requests

def extract(url):
    f = requests.get(url)
    return (f.text)
def transform(text):
    lines = text.strip().split("\n")
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "yen, F" -> [ 'yen', 'F' ]
      records.append([name, gender])
    return records
def load(records):
    """
    records = [
      [ "yen", "F" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
    for r in records:
        name = r[0]
        gender = r[1]
        print(name, "-", gender)
        sql = "INSERT INTO yen.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
        cur.execute(sql)

 

3. 데이터 소스 : https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv

link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"

data = extract(link)

 


ETL 개선

def load(records):
    """
    records = [
      [ "yen", "F" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    schema = "keeyong"
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;")
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;")
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise

'데브코스 TIL > 데이터 파이프라인, Airflow' 카테고리의 다른 글

Airflow DAG 작성 예제 1, 2  (0) 2023.12.13
Airflow 기본 프로그램 실행  (0) 2023.12.12
Airflow 설치  (0) 2023.12.12
Airflow 소개  (0) 2023.12.11
데이터 파이프라인 소개  (0) 2023.12.11