강의
ETL 개요
1. schema 밑에 테이블 생성
# Colab 환경
!pip install ipython-sql==0.4.1
!pip install SQLAlchemy==1.4.49
%load_ext sql
postgresql://ID:PW@learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev
DROP TABLE IF EXISTS keeyong.name_gender;
CREATE TABLE yen.name_gender (
name varchar(32) primary key,
gender varchar(8)
);
import psycopg2
# Redshift connection 함수
# 본인 ID/PW 사용!
def get_Redshift_connection():
host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
redshift_user = "ID"
redshift_pass = "Password"
port = 5439
dbname = "dev"
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=True)
return conn.cursor()
2. Colab 코드 작성
- extract
- transform
- load
import requests
def extract(url):
f = requests.get(url)
return (f.text)
def transform(text):
lines = text.strip().split("\n")
records = []
for l in lines:
(name, gender) = l.split(",") # l = "yen, F" -> [ 'yen', 'F' ]
records.append([name, gender])
return records
def load(records):
"""
records = [
[ "yen", "F" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = "INSERT INTO yen.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
3. 데이터 소스 : https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv
link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
data = extract(link)
ETL 개선
def load(records):
"""
records = [
[ "yen", "F" ],
[ "Claire", "F" ],
...
]
"""
schema = "keeyong"
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
raise
'데브코스 TIL > 데이터 파이프라인, Airflow' 카테고리의 다른 글
Airflow DAG 작성 예제 1, 2 (0) | 2023.12.13 |
---|---|
Airflow 기본 프로그램 실행 (0) | 2023.12.12 |
Airflow 설치 (0) | 2023.12.12 |
Airflow 소개 (0) | 2023.12.11 |
데이터 파이프라인 소개 (0) | 2023.12.11 |