상세 컨텐츠

본문 제목

데이터 엔지니어링 스타터 키트 - 6주차

Data Platform/데이터 엔지니어링

by leediz 2022. 5. 30. 21:43

본문

데이터 엔지니어링 스타터 키트 - 6주차

이번주차는 지난 5주차에서 내주신 Weather_Forecast DAG 구현하기 과제를 수행한 내용과 세션 내에서 맥스님이 설명하신 내용에 대해 포스팅하겠다.

 

숙제 - Weather_Forecast DAG 구현하기

전 세계의 날씨 관련 정보를 관측소 자료를 통해 API로 제공하는 OpenWeathermp 사이트에서 무료 API Key를 발급받아서 매일 날씨정보를 가져와 Redshift에 저장하는 과제이다.

요구사항

  • 서울의 위도와 경도를 찾아 서울 정보에 대해 API를 Request할 것
  • 응답 결과에서 온도 정보(평균/최소/최대)만 앞으로 7일을 대상으로 출력해볼 것
    • 날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max)
  • 읽어온 결과를 Redshift 각자 스키마 밑의 weather_forecast라는 테이블로 저장
  • Full-Refresh 형태로 구현해보고 그 다음 Incremetal Update로 구현해볼 것

 

Airflow DAG 구현

이전 과제의 포맷을 따라 DAG의 task는 크게 extract, transform, load로 나누어 구현을 했고 각각의 task는 PythonOperator를 사용하여 구현했다.

  • extract task
  extract = PythonOperator(
      task_id = 'extract',
      python_callable = extract,
      params = {
          'url': Variable.get('weather_url')
      },
      provide_context = True,
      dag = weather_forecast)

  def extract(**context):
      link = context["params"]["url"]
      return requests.get(link).json()
  • weather api는 미리 Airflow의 Variable로 저장을 해놨다.

  • Airflow에 Variable로 저장된 API를 변수로 가져와 extract 함수에 파라미터로 전달하는 태스크이다.
    extract 함수는 task가 넘겨준 API를 requests 라이브러리를 사용하여 응답값을 가져와 리턴한다.

 

  • transform task
 transform = PythonOperator(
      task_id = 'transform',
      python_callable = transform,
      params = {
      },
      provide_context = True,
      dag = weather_forecast)

  def transform(**context):
      weather = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
      daily_weather = []
      for daily in weather['daily']:
          date = datetime.fromtimestamp(daily['dt']).strftime('%Y-%m-%d')
          temp = daily['temp']['day']
          min_temp = daily['temp']['min']
          max_temp = daily['temp']['max']
          daily_weather.append([date, temp, min_temp, max_temp])

      return daily_weather
  • transform task에서는 별도로 파라미터로 무언가를 넘겨주진 않는다.
    대신 transform 함수에서 xcom_pull을 사용해 이전 extract 함수의 api 응답값을 받아왔다.
    transform이라는 단어에 걸맞게 받아온 응답값을 잘 파싱해주는 작업을 하며 일자별로 파싱해 리스트에 담아 리턴한다.

 

  • load task
  load = PythonOperator(
      task_id = 'load',
      python_callable = load,
      params = {
          'schema': '...',
          'table': 'weather_forecast'
      },
      provide_context=True,
      dag = weather_forecast)

  def load(**context):
      schema = context["params"]["schema"]
      table = context["params"]["table"]

      cur = get_Redshift_connection()
      lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
      lines = iter(lines)
      next(lines)
      sql = "BEGIN; DELETE FROM {schema}.{table};".format(schema=schema, table=table)
      for line in lines:
          date, temp, min_temp, max_temp = line
          print(date, temp, min_temp, max_temp)
          sql += f"""INSERT INTO {schema}.{table} VALUES ('{date}', '{temp}','{min_temp}','{max_temp}');"""
      sql += "END;"

      logging.info(sql)
      cur.execute(sql)
  • load task에서는 변환된 데이터를 Redshift에 저장하는 작업을 진행해야 하기 때문에 Redshift 관련 스키마와 테이블의 변수를 파라미터로 넘겨준다. Redshift와 연결하는 부분은 Airflow의 Connections를 활용하여 미리 관련 정보를 저장했다.
    Redshift는 Postgres의 API들과 호환이 되기 때문에 PostgresHook을 사용하여 Redshift와 연동하는 과정은 분리했다.
    이렇게 하면 보안이나 편리성 등에서 장점을 가져갈 수 있다.
    load 함수는 transform 함수에서 정제하여 리스트로 담은 값을 xcom_pull을 사용하여 가져온다.
    SQL을 만드는 부분은 지난 주 과제리뷰시간의 맥스님이 하신 방법을 참고하여 sql 변수에 INSERT INTO 문을 계속 붙여서 한번만 execute하게 만들었다. 이전에는 for문에서 한줄씩 execute 해줬는데, 이렇게 되면 데이터가 작아서 이번 예시에서는 크게 관계없을 수도 있지만, 총 7개의 execute가 실행된다. 이번에는 1번만 execute하기 위해 string 변수에 계속 INSERT 문을 추가해서 1번만 실행하게 했다.

 

맥스님 구현

2가지 큰 차이점이 있었다.

1. try/except문 사용
2. PostgresHook의 autocommit 파라미터를 사용하여 Transaction 관리

하나씩 살펴보면

  • try/except문 사용
cur = get_Redshift_connection()
insert_sql = """DELETE FROM {schema}.{table};INSERT INTO {schema}.{table} VALUES """.format(schema=schema, table=table) + ",".join(ret)
logging.info(insert_sql)
try:
    cur.execute(insert_sql)
    cur.execute("Commit;")
except Exception as e:
    cur.execute("Rollback;")
    raise

Delete문과 Insert문을 같이 실행하고 있는데 어떤 이유로 인해 에러가 발생하여 Delete는 실행되었는데 Insert는 안될 수도 있다. try/except문을 사용하여 성공할 경우 그대로 Commit을 하고, Insert를 하다 실패할 경우에는 Rollback을 시키며 raise문을 사용하여 예외를 발생시킨다.

try/except를 사용하여 예외처리를 하지만 Rollback과 같은 경우에는 오히려 raise문을 사용해 명확하게 에러를 발생시킴으로써 작업자나 개발자들이 확실하게 인지할 수 있는 것이 더 좋다고 하셨다.

  • PostgresHook의 autocommit 파라미터를 사용하여 Transaction 관리

Redshift와 연결하기 위해 사용한 PostgresHook은 autocommit이 Default 값으로 False라서 사실 내 코드에서 DELETE 하기 전 BEGINEND는 의미가 없었다. 그렇기 때문에 BEGIN과 END를 사용하는것 보다 위처럼 try/except문으로 관리를 했어야 했다.

 

6주차 소감

주차가 거듭될수록 난이도가 올라가며 점점 과제가 빌드업되는 느낌이다. 초반에는 SQL을 사용한 ETL 맛보기였다면 그 다음에는 SQL을 사용한 ETL을 Python으로 구현하고, 그 다음에는 Python으로 구현한 ETL을 Airflow를 사용하게 구현을 했다. 그 전 과제에서는 Airflow를 사용하는데에 의의를 가졌다면 이번 과제에서는 Transaction이라던지 예외처리라던지 그리고 외부 API를 사용하는 등의 조금 더 난이도가 있는 과제가 주어졌다. 과제할 당시에는 미처 신경쓰지 못했던 부분이었지만 세션에서 맥스님이 잘 설명해주셨고, 워낙 자료를 잘 만들어주셔서 몇번 따라하다보면 이해가 되었다. 벌써 다음주가 마지막주차인데 끝나는게 정말 아쉽다.

 

참고자료

관련글 더보기

댓글 영역