다양한 환경에서 실행가능한 스파크
스파크는 다양한 저장소에 붙여서 분산 처리가 가능한 프레임 워크이다. 하지만, Spark는 처음 개발된 환경은 Hadoop에서 사용하는 HDFS라는 분산 파일 시스템이며 해당 환경에서 In-Memory 기반으로 최적의 분산 처리를 수행하는 데 최적화가 되어있다. 때문에 오늘은 Spark를 분산 파일시스템에서 운영 할 때의 이점을 소개하고자 한다. 결론부터 말하자면 분산 파일시스템에서 Spark를 이용하면 데이터 지역성의 이점을 누릴 수 있다. 대표적인 분산 파일 시스템인 Hadoop에서의 데이터 지역성을 이해하기 전에 최적화에 대한 나의 기본 관점을 공유하고자 한다.
최적화의 기본
프로그램을 최적화 시킬 때, 두 가지 I/O를 줄이는 것이 정말 중요하다고 생각한다. 첫 번째는 Network I/O 이다. Network I/O를 개선하기 위해서는 서로 다른 서버 혹은 노드 간에 통신을 하기 전 이동할 데이터의 양을 줄이고, 꼭 필요한 통신만 할 수 있도록 처리 로직을 구현하는 것이 중요하다. 그리고 두 번째는 Disk I/O를 개선하는 것이다. Disk I/O를 줄이는 기본은 데이터 소스에서 꼭 필요 컬럼만 추출하고 컬럼을 나타낼 수 있는 데이터 타입을 명시적으로 지정하는 일이다. 예를 들어 Python에서 int32로 표현할 수 있는 컬럼을 명시적으로 지정하지 않아 자동화된 타입 추론에 의해 Int64로 읽게 된다면 row의 한 행 마다 32 bit를 불필요하게 더 쓴다. 이는 데이터의 행이 많아질 수록 (행의 개수 * 32) bit 씩 추가적인 메모리를 소모하므로 데이터 행이 많다면 더 큰 리소스 소모를 일으킨다. 다만, 타입을 명시적으로 지정할 때 주의 할 점은 너무 최소 비트로만 데이터를 읽어 들여 범위를 넘어서는 이상 값이 발견됬을 때, 에러가 발생하거나 자동으로 더 큰 타입이으로 casting 될 수 있으므로 충분히 컬럼 값의 범위를 고려해야 한다. 내 경험 상 pandas.DataFrame에서 NaN 값이 있다면 int가 float64로 자동 캐스팅 되는 것도 볼 수 있었다. Network I/O는 개선하는 것은 정말 어렵고 효율 개선도 한계가 있다. 때문에 Disk I/O를 개선하는 것이 필수적이며 단순하지만 더 큰 리소스 절감을 만들 수 있다.
HDFS : Block Storage
HDFS(Hadoop Distributed File System)는 데이터 파일을 블록(default 128 MB) 단위로 쪼개어 클러스터의 여러 Data Node에 나눠서 저장하고 서로 복제(replication)하여 고가용성(High Availability)을 만들어 낸다. 각 블록에 대한 위치 정보는 Hadoop Cluster의 Name Node의 메모리에 저장되어 있다. Hadoop도 JVM 위에서 구동되는 하나의 프로그램이기 때문에 당연히 리소스를 관리해야 한다. Hadoop을 공부하면 블록 추상화라는 재미있는 요소가 있는 데, 한번 읽어보면 얼마나 섬세하게 설계 했는 지 그 생각을 볼 수 있다. 하여튼 Hadoop이라는 프로그램은 클러스터를 여러 노드를 제어하기 위해 일반적으로 Yarn이라는 Cluster Resource Manager를 사용한다. Hadoop Cluster에서 앱을 구동하려면 Yarn을 통해 Data Node의 리소스를 할당받고 각 노드에서 Container라는 사용 가능한 리소스의 환경을 제공받는다. 이 위에서 Spark라는 분산 처리 가능한 프로세스가 실행된다. 그렇다. 이는 우리가 Spark에서 executor라고 부르는 것이며 이것은 하나의 프로세스다. 그럼 이 프로세스는 각 Data Node에 나눠져 있는 데 어떻게 task에 데이터를 나눠줄 수 있을까?
쉽게 생각해보자. 어떤 하나의 노드에서 데이터를 읽는다면 읽은 데이터 사이즈 만큼 메모리에 가지고 있다가 그 데이터를 파티션 개수만큼 나눠서 네트워크를 통해 다른 노드에 나눠줄 수 있지 않을 까? 가능하다. 하지만 비효율적이다. 왜냐하면 하나의 노드에서 큰 데이터를 읽는 데는 많은 시간이 걸리고, 또한 분할하는 시간과 네트워크를 통해 데이터를 전송하는 시간이 많이 걸리기 때문이다. 분명 읽어들이는 동안 다른 노드의 CPU들은 유휴 시간(idle time)을 갖게 될 것이다.
그럼 개선할 수 있는 방법은? 당연히 나눠서 일을 하는 것이다. 각 노드들이 동시 접근한 저장소를 만들고, 각 자 읽어야 할 데이터만 읽으면 따로 나눠줄 필요도 없을 것이다. 분산 파일 시스템에서의 Spark는 이 때 빛을 발한다. 아까 HDFS에 적재된 데이터가 블록 단위로 나눠져 여러 노드에 걸쳐 분산 저장된다고 했던 말 기억하는 가? 그럼 Spark의 executor가 각각 처리할 데이터를 자신의 Data Node에서 읽어 들이면 Disk I/O의 속도와 불필요한 Network I/O를 줄일 수 있지 않을까?
데이터 지역성
Spark는 분산 파일 시스템에서 기본적으로 가까운 노드에서 RDD로 데이터를 읽으려고 한다. 이는 변환 작업(transformation)의 최적화에 집중하기 위함이고 이를 위해 HDFS, S3 데이터와 같이 분산된 파티션의 데이터에 액세스 한다. 자신의 Data Node에 있는 소스 데이터의 블록을 모아 RDD 파티션을 구성하는 것이다. 물론 데이터를 읽자마자 Shuffle(rdd 파티션 간의 데이터 교환 작업)을 해야하는 경우라면, 이점이 많이 희석되겠지만 그럼에도 분산 파일 시스템에서의 여러 노드가 나눠서 Disk I/O를 발생시키기 때문에 하나의 노드에서 하는 것 보다 훨씬 나을 것이다.
분산 파일 시스템을 사용하지 않는 경우 (Local File System)
Spark를 Local Mode로 구동한다면 로컬 파일 시스템에서 데이터를 읽을 수 있겠지만, 다행히도 클러스터에서 실행되도록 구동한다면 특정 로컬 파일시스템에서 읽는 것은 불가능하게 만들어 놓았다. 만약 클러스터의 Spark 프로세스가 특정 로컬의 파일을 읽으려고 한다면 Java.io.FileNotFoundException 에러가 발생한다. 물론, 특정 노드에서 NFS(Network Files System)을 이용한다면 가능하겠지만 분산 파일 시스템에 비하면 당연히 성능이 느릴 수 밖에 없다.
최종 정리
짧게 정리한다고 했는 데, 많은 생각과 경험을 우려내고 싶어서 글이 읽어졌다. (그럼에도 배운 것을 다 못 쓴 것 같아서 아쉬움이 남는다..) 마지막으로 최종 요약하며, 이 글을 쓰는 데 참고 했던 책을 공유하고자 한다.
- 로드 중인 파일이 클러스터의 모든 작업자 노드에서 동일한 상대 경로로 사용 가능한지 확인 필요
- 스파크의 RDD 파일 기반 소스로 HDFS 또는 S3와 같은 분산 파일 시스템을 사용하는 것이 좋음
- 가능한 로컬 파일 시스템에서 분산 파일 시스템으로 업로드한 다음 분산된 객체의 RDD 를 사용
- 그럼에도 로컬 파일 시스템에서 사용하고 싶은 경우, NFS를 사용
Reference
『파이썬을 활용한 스파크 프로그래밍』 - 제프리 에이븐
'Data Engineering > PySpark' 카테고리의 다른 글
[Spark] Local Mode Executor에 대한 고찰 (0) | 2023.07.17 |
---|---|
[PySpark] Dataset (0) | 2023.06.24 |