Data Engineering/Airflow

[Airflow] PostgresOperator

keyhong-DE 2023. 7. 1. 21:21

PostgresOperator 시작 환경

airflow에서 PostgresOperator를 사용하여 postgresql에 일정 주기로 쿼리를 수행하도록 스케줄링을 걸어 보았다. 이 글은 그 과정을 정리하였고, 그 과정을 풀어내고자 한다.

 

먼저 내 머신의 로컬 폴더 구조는 다음과 같다. 에어플로우를 한다면 처음에 $AIRFLOW_HOME 환경변수를 정의하고 그 안에 dags 폴더를 만들게 되는 데, 내 dags 폴더 안에는 총 4개의 dag 모듈이 있고 sql이라는 스크립트를 모아둔 폴더를 만들었다.

dags 폴더
dags 내부의 sql 폴더

PostgresOpeator 연동하기

PostgresOperator를 사용하려면 먼저 postgresSQL DBMS와 airlfow provider가 깔려 있어야 한다. postgresSQL 설치는 이 글에서는 소개하지 않는다. 기존에 airflow를 설치했을텐데 오퍼레이터를 추가 확장하려면 다음과 같은 pip 설치를 통해 가능하다. pip은 항상 latest 버전으로 패키지 설치가 되니, airlfow 버전이 낮다면 명시적으로 지정하여 맞춰주는 것을 권장한다.

pip install apache-airflow[postgres]==<airflow version>

 

설치가 되면 airflow dag에 PostgresOperator를 작성할 수 있다. 내가 작성한 dag 파일인데, 아래와 같이 PostgresOperator를 사용할 수 있고, 내가 사용한 주요 파라미터는 다음과 같다.

 task_id   태스크 ID
 postgres_conn_id   웹 UI에서 설정한 postgres 접속 정보 id
 sql   단일 문자열, 실행할 템플릿 파일을 가리키는 문자열 또는 문자열 목록
 (쿼리문 또는 쿼리 스크립트의 위치)

위에서 넣어준 여러 인자는 PostgresOperator가 SQLExecuteQueryOperator를 상속받고 있기에 사용 가능하다. 다음 도큐먼트에서 SQLExecuteQueryOperator의 인자를 PostgresOperator에서 사용하기 때문에 두 오퍼레이터의 도큐먼트를 모두 봐야 한다. (task_id는 BaseOperator를 상속받기에 사용할 수 있다.)

 

from datetime import timedelta, datetime
from pendulum import timezone

from airflow.models import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

# DAG 정의 부분
with DAG(
    dag_id="soss_postgres_shell",
    start_date=datetime(2023, 6, 26, tzinfo=timezone("Asia/Seoul")),
    schedule_interval="0 8 * * *",
    catchup=True,
    max_active_runs=1,
    default_args={
        "retries": 1,
        "retry_delay": timedelta(minutes=5)
    },
    user_defined_filters={"convert_utc_to_kst": convert_utc_to_kst}
) as dag:

    # PostgresOperator 정의 부분
    dm_gu_cctv_mntr_rate_shell = PostgresOperator(
        task_id="dm_gu_cctv_mntr_rate_shell",
        postgres_conn_id="postgres",
        sql="sql/insert_dm_gu_cctv_mntr_rate.sql",
        params={"today": "{{ data_interval_end | convert_utc_to_kst }}"}
    )

 

그럼 postgres_conn_id는 어디서 정의를 할까? [airflow web UI] - [Admin] - [Connections]에서 추가를 해야한다.

 

airflow web UI Connections 추가

Connection Type는 postgres로 설정하고 본인이 명명할 Connection Id, 그 밖에 주황 부분의 부가 정보를 필수적으로 기입해야 한다. 기입하면 및 쪽에 test 버튼이 있는 데 테스트가

정상이면 웹 상단에 다음 문구가 보인다.

test success

이로써 dag 파일의 postgres_conn_id에 인자로 넣을 수 있는 Connection Id를 만들어 주었다. 그럼 이제 좀 더 개발자다운 세부적인 관리를 구성해보자.

PostgresOpertor에서 sql 스크립트 분리시켜서 관리하기

많은 회사들이 query 스크립트를 (.sql, .hql) 등의 파일로 따로 관리하는 것으로 안다. 프로그램을 작성하면 알겠지만, 구조가 잘 짜여진 모듈은 분리가 될 수록 더욱 관리하기 편하고 trouble-shooting에 장점이 있다. 이는 문제가 생겨도 쿼리만 따로 별도의 환경에서 날려보고 체킹하여 dag 또는 데이터가 문제가 있음을 빠르게 파악 할 수 있기 때문이다.

 

PostgresOperator에는 실행할 sql을 지정하는 두 가지 방법이 있다.

  •  dag 내 sql 파라미터에 직접 쿼리 입력하기
  • query 파일(.sql)을 작성하고 파일을 불러들이기
# (1) dag내 sql 파라미터에 직접 쿼리 작성하기
example_1 = PostgresOperator(
    task_id="example_1",
    postgres_conn_id="postgres",
    sql="select * from my_table"
)

# (2) dag내 sql 파라미터에 직접 쿼리 작성하기
example_2 = PostgresOperator(
    task_id="example_2",
    postgres_conn_id="postgres",
    sql="select_my_table.sql"
)

 

나는 query  파일을 별도로 분리할 수 있는 (2)번을 사용했다. sql 파라미터에 query가 있는 파일의 경로를 지정한다. (dag 모듈이 있는 위치를 기준으로 상대경로를 사용가능하고, "/" 로 시작하는 절대 경로로 가능하다.)

PostgresOperator로 sql 스크립트 실행할 때 인자(argument) 전달하기

마지막 과정에서 가장 헤맸다.. 그 이유는 sql 스크립트를 읽어온 후, jinja2 템플릿을 사용해서 argument를 전달하고 싶었는 데, 이상하게 script에서 변수를 인식하지 못해서 그렇다. 결론적으로 말하자면, sql 파라미터에 스크립트 경로를 넣는 것도, 쿼리를 문자열로 직접 작성해서 넣는 것이랑 다르지 않다. 즉, 위 (2)번은 파일을 읽어서 (1)과 같이 문자열을 넣을 뿐이기에 기존 postgres sql에서 쓰는 ':variable' 을 사용하지 못한다.. 

 

이에 어떻게 하면 더 편하게 관리할 수 있을 까 고민했고, 대신 params라는 파라미터를 활용하기로 하였다. 스크립트에는 {{ params.key }} 를 사용하면 실제 실행할 때 파싱이 되어 내가 value로 넣은 입력값으로 바꿔준다. 

success_example = PostgresOperator(
    task_id="success_example",
    postgres_conn_id="postgres",
    sql="select_my_table.sql"
    params={"today": today}
)
-- select_my_table.sql
select {{ params.today }} from my_table

-- today가 20230701이라면, 실행 결과는
select 20230701 from my_table

 

중요한 게 하나 있다. params로 넘기는 value에 jinja template을 넣으면, jinja template 문자열이 들어가서 실행될 뿐 파싱이 되지 않는다. 예를 들어 오퍼레이터를 작성하면서 아래처럼 value로 jinja 템플릿을 넣으면 문자열 그대로 들어간다는 의미이다. 그렇다고 문자열의 " " 을 뺄 수도 없다. 왜냐하면 airflow는 파이썬으로 작성된 플랫폼이기 때문에 기본적인 파이썬 syntax는 준수해야 하기 때문이다.

fail_example = PostgresOperator(
    task_id="fail_example",
    postgres_conn_id="postgres",
    sql="select_my_table.sql"
    params={"today": "{{ data_interval_end }}"}
)

 

내가 작성한 script 에서는 다음과 같은 결과를 받았다.

psycopg2.erros.SyntaxError

글은 여기서 마무리 하지만, 해결 방법은 있다. 파이썬 함수를 통해 jinja 템플릿의 결과를 받고 그것을 변수에 담아 value 값으로 전달하는 방법이다. 프로그래밍에 정답은 없다. 어떻게 구현하느 냐에 따른 차이가 있을 뿐. 다만 어떻게 설계를 하느냐에 따라 유지보수와 추가 개발의 용이성이 달라진다.

 

나는 query가 문제가 없음을 보장하고 에러시 query를 수정하지 않는 방안으로 script와 dag를 분리시키고, 최소한의 수정으로 문제를 해결 가능한 구조를 만들고 싶었다. 더 좋은 방법이 있다면 그 방법을 찾아나갈 것이다.

Rerference

https://airflow.apache.org/docs/apache-airflow-providers-common-sql/stable/operators.html