이번주차는 지난 5주차에서 내주신 Weather_Forecast DAG 구현하기 과제를 수행한 내용과 세션 내에서 맥스님이 설명하신 내용에 대해 포스팅하겠다.
전 세계의 날씨 관련 정보를 관측소 자료를 통해 API로 제공하는 OpenWeathermp 사이트에서 무료 API Key를 발급받아서 매일 날씨정보를 가져와 Redshift에 저장하는 과제이다.
이전 과제의 포맷을 따라 DAG의 task는 크게 extract, transform, load로 나누어 구현을 했고 각각의 task는 PythonOperator
를 사용하여 구현했다.
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get('weather_url')
},
provide_context = True,
dag = weather_forecast)
def extract(**context):
link = context["params"]["url"]
return requests.get(link).json()
transform = PythonOperator(
task_id = 'transform',
python_callable = transform,
params = {
},
provide_context = True,
dag = weather_forecast)
def transform(**context):
weather = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
daily_weather = []
for daily in weather['daily']:
date = datetime.fromtimestamp(daily['dt']).strftime('%Y-%m-%d')
temp = daily['temp']['day']
min_temp = daily['temp']['min']
max_temp = daily['temp']['max']
daily_weather.append([date, temp, min_temp, max_temp])
return daily_weather
load = PythonOperator(
task_id = 'load',
python_callable = load,
params = {
'schema': '...',
'table': 'weather_forecast'
},
provide_context=True,
dag = weather_forecast)
def load(**context):
schema = context["params"]["schema"]
table = context["params"]["table"]
cur = get_Redshift_connection()
lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
lines = iter(lines)
next(lines)
sql = "BEGIN; DELETE FROM {schema}.{table};".format(schema=schema, table=table)
for line in lines:
date, temp, min_temp, max_temp = line
print(date, temp, min_temp, max_temp)
sql += f"""INSERT INTO {schema}.{table} VALUES ('{date}', '{temp}','{min_temp}','{max_temp}');"""
sql += "END;"
logging.info(sql)
cur.execute(sql)
sql
변수에 INSERT INTO
문을 계속 붙여서 한번만 execute하게 만들었다. 이전에는 for문에서 한줄씩 execute 해줬는데, 이렇게 되면 데이터가 작아서 이번 예시에서는 크게 관계없을 수도 있지만, 총 7개의 execute가 실행된다. 이번에는 1번만 execute하기 위해 string 변수에 계속 INSERT 문을 추가해서 1번만 실행하게 했다.
2가지 큰 차이점이 있었다.
1. try/except문 사용
2. PostgresHook의 autocommit 파라미터를 사용하여 Transaction 관리
하나씩 살펴보면
cur = get_Redshift_connection()
insert_sql = """DELETE FROM {schema}.{table};INSERT INTO {schema}.{table} VALUES """.format(schema=schema, table=table) + ",".join(ret)
logging.info(insert_sql)
try:
cur.execute(insert_sql)
cur.execute("Commit;")
except Exception as e:
cur.execute("Rollback;")
raise
Delete문과 Insert문을 같이 실행하고 있는데 어떤 이유로 인해 에러가 발생하여 Delete는 실행되었는데 Insert는 안될 수도 있다. try/except문을 사용하여 성공할 경우 그대로 Commit을 하고, Insert를 하다 실패할 경우에는 Rollback을 시키며 raise문을 사용하여 예외를 발생시킨다.
try/except를 사용하여 예외처리를 하지만 Rollback과 같은 경우에는 오히려 raise문을 사용해 명확하게 에러를 발생시킴으로써 작업자나 개발자들이 확실하게 인지할 수 있는 것이 더 좋다고 하셨다.
Redshift와 연결하기 위해 사용한 PostgresHook은 autocommit이 Default 값으로 False라서 사실 내 코드에서 DELETE 하기 전 BEGIN
과 END
는 의미가 없었다. 그렇기 때문에 BEGIN과 END를 사용하는것 보다 위처럼 try/except문으로 관리를 했어야 했다.
주차가 거듭될수록 난이도가 올라가며 점점 과제가 빌드업되는 느낌이다. 초반에는 SQL을 사용한 ETL 맛보기였다면 그 다음에는 SQL을 사용한 ETL을 Python으로 구현하고, 그 다음에는 Python으로 구현한 ETL을 Airflow를 사용하게 구현을 했다. 그 전 과제에서는 Airflow를 사용하는데에 의의를 가졌다면 이번 과제에서는 Transaction이라던지 예외처리라던지 그리고 외부 API를 사용하는 등의 조금 더 난이도가 있는 과제가 주어졌다. 과제할 당시에는 미처 신경쓰지 못했던 부분이었지만 세션에서 맥스님이 잘 설명해주셨고, 워낙 자료를 잘 만들어주셔서 몇번 따라하다보면 이해가 되었다. 벌써 다음주가 마지막주차인데 끝나는게 정말 아쉽다.
데이터 엔지니어링 스타터 키트 - 7주차 (0) | 2022.06.11 |
---|---|
[AWS] Airflow 설치 - Rocky Linux (0) | 2022.06.07 |
데이터 엔지니어링 스타터 키트 - 5주차 (0) | 2022.05.22 |
데이터 엔지니어링 스타터 키트 - 4주차 (0) | 2022.05.17 |
데이터 엔지니어링 스타터 키트 - 3주차 (0) | 2022.05.08 |
댓글 영역