Local Executor를 먼저 학습하는 이유
Sequential Executor는 병렬처리가 불가능하고 Celery Executor나 Kubernetes Executor는 여러 대의 서버가 필요하다. 프로덕션 환경에서는 확장성이 매우 중요한 요소이고, 당연히 멀티 노드 아키텍처에서 분산 실행 될 수 있는 Executor를 사용하는 것이 맞다. 다만, 기본이 되는 Local Executor를 통해 개념을 이해하고 단일 머신에서 간단하게 학습할 수 있다.
다중 프로세스를 병렬처리
Local Executor는 단일 서버에서 여러 프로세스를 돌릴 수 있다. airflow의 기반이 되는 언어는 Python으로, 내장 병렬처리 모듈인 multiprocessing을 활용한다. 여러 task를 동시 실행 할 수 있고, 각 task는 operator 종류에 따라 실행해야 할 프로세스를 spawning 한다. 동시에 task를 담고 있고 dag를 DagRun 객체로 실행시키 때문에 여러 dag가 동시에 실행되기도 한다. 컴퓨팅 성능이 좋고 프로세스와 관련된 config 옵션을 조절함으로써 단일 서버 내에서 많은 작업들을 병렬로 처리 할 수 있는 수직 스케일링이 가능하다.
SPOF (Single Point Of Failure, 단일 실패 지점)
Local Executor는 단일 장애점(Singe Point Of Failure)을 가지고 있다. 단일 서버에서 모든 컴포넌트가 돌아가기 때문에 서버가 다운 되면 스케줄링을 유지할 수 없다. 반면, 멀티 노드 아키텍처에서 분산 실행 될 수 있는 Executor들은 한 서버가 다운 되더라도 다른 서버에서 task를 실행 시킬 수 있다. (물론, 핵심적인 컴포넌트를 지닌 서버가 다운된다면 그것은 막을 수 없다.) 단순히 airflow를 구성하는 하나의 컴포넌트가 다운되는 경우라면 docker 등을 활용해 health check를 하고 자동 재기동하는 방법은 사용할 수 있을 것이다.
메타 데이터 저장소로 RDB가 필요
airflow를 설치하면 default excutor로 설정된 Sequential Executor은 SQLite를 사용한다. SQLite는 동시 쓰기가 지원되지 않는 DB이다. airflow는 task, dag의 스케줄링 및 실행 이력 등을 모두 DB에 쓰고 읽으면서 관리 하는 데, 동시 쓰기가 되지 않는다는 것은 다중 실행이 불가능하다는 것을 의미한다. 때문에 흔히 사용되는 동시 쓰기가 가능한 MySQL, MSSQL, PostgreSQL, Oracle 등의 client-sever RDB를 사용해야 한다.가능한 DB는 도큐먼트에서 확인 할 수 있다. 또한 DB 접근 권한 설정 등 절차는 까다롭지만 도큐먼트에서 설명하는 대로 따라하면 쉽게 할 수 있다.
https://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#
동시성 (Concurrency) & 병렬성 (Parallelism)
이름은 같아 보이지만 두 단어에는 차이가 있다. 동시성과 병렬성을 이해하려면 thread와 process의 개념을 이해해야 한다. 인터넷에서 본 글을 활용하여 간단한 비유를 통해 설명을 해보고자 한다.
동시성 : 여러 작업을 한 시점에 할 수 있는 성질
- single process에서 multi-thread를 이용해 여러 작업을 번갈아 가면서 할 수 있는 것
- (1) 같은 주방에서 요리사 한 명이, 파스타를 5초 볶다가 치킨 텐더를 10초간 뒤집으러 갔다가 다시 파스타를 볶으러 온다.
- (2) 같은 주방에서 두 명의 요리사가 동시에 각 각 파스타를 볶고, 치킨 텐더를 뒤집고 있다.
- 기존에는 multi-thread를 이용해 단일 코어로 여러 일을 번갈아 가면서 하는 것이었으나, 요즘은 단일 머신이 여러 개의 CPU 코어를 소유한다. 그래서 (1)에서는 context switching(문맥 교환) 비용이 발생하나, 멀티코어에서는 (2)번 처럼 동시 작업도 가능하는 데 큰 비용소모는 없다.
병렬성 : 여러 작업을 한 시점에 동시에 할 수 있는 성질
- multi process에서 multi-processing를 이용해서 여러 작업을 동시에 할 수 있는 것
- (1) 서로 다른 주방에서 요리사 두 명이, 각 각 파스타를 볶다고 치킨 텐더를 뒤집는다.
- 병렬처리는 말 그대로, 각 각의 프로세스에서 따로 일 처리를 하는 것이다. 때문에 최소 2개의 CPU 코어가 필요하고, 두 개의 프로세스는 서로 간의 개입이나 관여가 불가능 하기 때문에 요리 도구(변수)를 공유하는 등의 소통은 일반적으로는 불가능하다. 또한 새로운 주방이 필요한 것 처럼 초기 환경을 셋팅 하는 데 시간이 걸린다.
- 병렬성은 동시성과 배타적인 관계가 아니다. 동시성은 말 그대로 번갈아 하든 같이 하든 여러 일을 동시에 할 수 있는 것이고, 병렬성은 정말로 여러 일을 같은 순간에 계속 하는 것이다. 병렬성은 동시성을 이미 지니고 있다.
조금 더 깊게 들어가려면 thread, process 개념을 이해하고 multi-threading, multi-processing, IPC(Inter-Process Communication) 순으로 배우고, 더 나아가 syncronizaiton, asyncronizaiton를 배워보길 바란다. 많은 데이터 엔지니어 플랫폼에 분산 처리가 많은 만큼 중요한 요소라고 생각한다.
그럼 다시 airflow로 돌아오자. airflow.cfg 파일을 보면 dag, task에 대한 동시성, 병렬성에 관한 옵션이 있다. (초기에는 default_airflow.cfg 파일로 있는 데, 이름을 airflow.cfg로 바꾸고 내용을 수정한다.)
# 스케줄러당 동시에 실행할 수 있는 최대 작업 인스턴스 수
# 클러스터 구성이라면 클러스터 전체에서 최대 작업 가능 수 (전체 worker에서의 동시 작업 가능수)
parallelism = 32
# DAG당 최대 활성 DAG 실행 수
max_active_runs_per_dag = 16
# 각 DAG에서 동시에 실행할 수 있는 최대 작업 인스턴스 수
max_active_tasks_per_dag = 16
parallelism
parallelism은 두 가지 선택이 있다.
- parallelism = 0 (Unlimited parallelism)
- parallelism > 0 (Limited parallelism)
parallelism은 전체에서 동시 실행 될 수 있는 task의 수이기 때문에, 0이면 작업이 제출될 때마다 수행을 시킨다. 이는 리소스를 초과할 가능성이 높다. 따라서 최대 parallelism을 지정해줘야 하는 데, udemy 강의에서 권장하는 수치는 "number_of_cores - 1" 이다. 기본 노트북 코어도 8 코어 또는 16 코어가 많으므로, 나는 7을 설정해주었다. 대부분 airflow에서 실행하는 task는 I/O Bound 보다는 CPU Bound의 작업이 주를 이루기 때문에, 소유하는 CPU 코어에 맞춰준 것으로 보인다.
max_active_runs_per_dag
DAG별 최대로 동시 실행 가능항 DagRun의 수를 조정하는 옵션이다.
catchup을 true로 설정하면 start_date로 부터 현재까지 비워진 날짜를 채우기 위해 일자별로 DagRun이 생기는 데, max_active_runs_per_dag을 조절하면 비워진 일자로부터 생성되는 DagRun이 동시 실행되는 수를 제한할 수 있다.
max_active_tasks_per_dag
DAG별 최대로 동시 실행 가능한 task의 수를 조정하는 옵션이다.
DAG(Directed Acyclic Graph)에 따라 순차적으로 task을 실행하겠지만, 어떤 부분에서 여러 task를 동시 처리해야 하는 부분이 있을 것이다. 그 때 동시 처리를 할 수 있는 task의 수를 제한한다. 만약 동시처리 해야 할 task의 수가 max_active_tasks_per_dag 보다 많다면, max_active_tasks_per_dag 만큼 task를 수행하고 먼저 끝나는 task의 slot을 남은 task가 들어갈 것이다.
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] docker Compose로 Celery Executor 시작하기 (0) | 2023.11.11 |
---|---|
[Airflow] PostgresOperator (0) | 2023.07.01 |
[Airflow] SubDAG로 반복 패턴 최소화하기 (0) | 2023.06.26 |