용로그
article thumbnail
Published 2024. 3. 16. 21:24
[Spark] Apache Spark RDD Apache/Spark

들어가며


이번 글에서는 Spark의 데이터 구조인 RDD를 알아볼 것이다. 최근에는 DataFrame과 DataSet을 주로 사용하지만 이는 다음글에서 다루겠다. RDD를 살펴보기에 앞서 하둡과 스파크의 관계를 간단하게 짚고 넘어가겠다.

HDFS와 Map Reduce

하둡은 HDFS(Hadoop Distributed File System)라고 불리는 분산 파일 시스템을 기반으로 동작한다. 이를 통해 굉장히 방대한 양의 데이터를 저장/관리할 수 있게 되었고, MapReduce라고 불리는 데이터셋 병렬 처리 방식으로 데이터를 처리했다.

 

하지만 HDFS도 치명적인 문제점이 있었는데, 바로 HDFS가 Distk I/O를 기반으로 동작하는 것이었다. 또한 실시간(Realtime)에 대한 니즈가 크게 증가하기 시작했는데, MapReduce는 스트리밍 처리가 불가능했다. 기본적으로 결과를 얻기 위해서는 수십초에서 몇 분까지 걸리는 것은 당연한 것이었다.

 

Apache Spark란?

이런 문제를 해결하기 위해 등장한 것이 Apache Spark(이하 스파크)다. 아파치 스파크는 HDFS와 달리 In-Memory로 동작하기 때문에 기존 하둡보다 데이터 처리 속도가 훨씬 빠르다. 스파크는 이를 통해 실시간 스트리밍 처리라는 니즈를 충족시켜주었고, 빅데이터 프레임워크 시장에서 빠른 속도로 영향력을 키워나갔다.

 

최근에는 Hadoop과 Spark를 함께 사용하여 효율을 극대화하기 시작했다. 하둡의 YARN 영역에 스파크를 올리고, 빠른 데이터 조회가 필요한 부분은 스파크로 처리하는 방식을 사용한다.

 

스파크는 스스로를 대규모 데이터 처리를 위한 통합 분석 엔진이라고 칭하고 있다. Java, Scala, Python, R등의 언어를 지원한다. 또한 SQL 및 구조화된 데이터 처리를 위한 Spark SQL, Pandas 워크로드를 위한 Spark의 Pandas API, 머신러닝을 위한 MLlib, 그래프 처리를 위한 GraphX, 증분 계산 및 스트리밍 처리를 위한 Structured Streaming을 지원한다. 특히 MLlib은 최근 들어 각광받고 있다. 한 마디로 정리하면 "빅데이터 분산 처리 엔진"으로 표현할 수 있다.

 

Spark의 데이터 처리 모델


스파크는 데이터 처리를 할 때 RDD라는 자료구조를 이용한다. 스파크의 프로그래밍 모델은 RDD를 가공해 새로운 RDD를 만들고, 이런 처리를 반복하여 원하는 결과를 얻는 방식이다. 

Spark RDD(Resillient Distributed Data)

RDD Lineage

RDD는 위의 그림과 같은 Lineage(혈통)을 가진다. Resillient의 또 다른 뜻에는 불변이라는 뜻도 존재한다. 그렇기 때문에 원하는 동작을 하기 위해서는 기존의 RDD를 변경하는 것이 아니라, 새로운 RDD를 생성할 수 밖에 없다. (위 그림에서는 Hadoop RDD를 Filter RDD로 변경하는 것이 아닌 Filter RDD를 추가하는 방식이다.)

 

그래서 위 처럼 Spark 내의 데이터 처리에 있어 수많은 RDD들이 생성된 것이다. 특정 동작에 의해 생성되는 RDD Lineage의 그래프를 그려놓고 보면 DAG(Directed Acyclic Graph)의 형태를 가진다. Lineage는 모든 RDD 생성 과정을 저장하여 관리하기 때문에 데이터가 유실되면 Lineage 기록에 따라 유실되었던 RDD를 다시 실행한다.

 

RDD의 명령어는 크게 Transformation과 Action으로 나눌 수 있는데, Filter, Read 함수 등이 Transformation에 속하고 first, show, head 등의 명령어가 Action에 속한다. Transformation 명령은 기존의 RDD에서 새로운 RDD를 만들어 낸다. 어떤 연산을 할 때 데이터 셋이 읽기 전용이므로 데이터에 어떤 변형을 가할 때 계속 새로운 RDD를 만들어 낸다.

 

스파크가 클러스터 환경에서 RDD를 처리하는 방법

스파크는 여러 머신으로 구성된 클러스터(Cluster, 여러 대의 머신이 하나의 머신처럼 동작하는 환경) 환경에서의 동작을 전제로 설계되었고, 내부는 파티션이라는 단위로 나뉜다. RDD를 파티션 단위로 여러 머신에서 처리하므로 한 대의 머신으로 처리할 수 있는 것 보다 더 큰 데이터를 다룰 수 있다.

 

스파크를 포함해 클러스터 환경에서 처리를 수행하는 일반적인 시스템은 클러스터 내의 계산 리소스를 관리하는 기능이 필요하다. 스파크는 다음 3가지 종류의 클러스터 관리 시스템을 지원한다.

  • YARN
  • Mesos
  • Spark Standalone

클러스터 관리 시스템 아래에서 각 머신은 마스터 노드(Master Node)워커 노드(Worker Node)로 동작한다. 마스터 노드와 복수의 워커 노드로 구성된 클러스터 환경에서는 다음과 같은 과정으로 분산처리가 이루어진다.

 

첫 번째, 애플리케이션 배포와 계산 리소스 요구

스파크로 분산처리할 때는 RDD 생성과 일련의 변환으로 구성된 스파크 애플리케이션을 클라이언트가 클러스터에 배포한다. 클라이언트는 애플리케이션을 배포함과 동시에 애플리케이션 실행에 필요한 executor의 스펙을 설정해야 한다. executor란 워커 노드에서 구동하여 스파크 애플리케이션을 분산처리하는 프로세스를 말한다.

 

두 번째, 클러스터 내 워커 노드 리소스 확보

클러스터에 애플리케이션이 배포되면, 마스터 노드는 각 워커 노드의 이용 가능한 리소스 양과 클라이언트가 요청하는 executor 스펙을 고려하여 하나 이상의 워커 노드에 executor의 구동을 요구한다.

 

세 번째, 드라이버 프로그램 구동

클러스터 내에 리소스가 확보된과 동시에 드라이버 프로그램이 구동된다. 드라이버 프로그램이란 사용자에 의해 정의되는 스파크 애플리케이션의 entry point(함수 실행 시작점)가 되는 프로그램이다. 사용자가 기술한 RDD 생성/변환 로직을 이용해 애플리케이션을 제어하는 역할을 담당한다. 아래는 드라이버 프로그램 예시이다.

 

// HDFS등의 파일 시스템에 저장된 거대한 파일을 가지고 RDD 생성
val textRDD = sc.textFile("/path")

// RDD의 각 요소를 Transform
val mappedRDD = textRDD.map(text -> someFunction(text))
val filteredRDD = textRDD.filter(processedText -> filterFunction(processedText))

// RDD에 액션 적용 후 결과 저장
processedRDD.saveAsTextFile("/path/result")

 

네 번째, RDD 생성부터 액션 적용까지

스파크 애플리케이션에서는 RDD 생성부터 액션 적용까지 잡(job)이라는 단위로 처리한다. 잡은 또 한번 태스크(task)라는 단위로 분할되며, 태스크는 파티션 단위로 데이터를 로드하고 변환/액션을 적용하는 처리 단위이다.

 

모든 것을 종합해서 간단하게 코드와 그림으로 나타내면 아래와 같다.

from pyspark import SparkContext

# SparkContext 생성
sc = SparkContext("local", "Shuffle RDD Example")

# 데이터 생성
data = [(1, 'a'), (2, 'b'), (3, 'c')]

# RDD 생성
rdd = sc.parallelize(data)

 

클러스터 환경에서 하나의 RDD를 여러 개의 파티션으로 처리

Lazy evaluation과 영속화

Lazy evaluation

스파크는 바로바로 RDD를 새로 정의하고 처리하지는 않는다. 스파크는 RDD를 lazy evaluation(여유로운 수행)으로 처음 액션을 사용하는 시점에 처리하게 되는데, 이런 방식은 Hibernate의 Lazy Loading의 개념과 비슷하다.

 

예를 들어, 1만 줄의 텍스트 파일을 읽는 코드(RDD)가 있고, 그 다음에는 '스파크'로 시작하는 문장까지 fetch하도록 필터링을 수행하는 코드가 있다. 만약 스파크가 RDD를 즉시 실행하면 1만 줄의 텍스트를 모두 읽을 것이다. 하지만, Lazy evaluation을 사용한다면 2번째 RDD인 filteredRDD까지 파악하여 필요한 텍스트까지만 읽을 것이다.

 

이렇게 스파크는 RDD를 즉시 실행하지는 않지만 이러한 명령이 있었다는 메타데이터만 기록해놓는다. RDD에 데이터를 로드하는 것도 Transformation 작업과 마찬가지로 Lazy evaluaction이 적용된다.

 

RDD 영속화

스파크의 RDD들은 기본적으로 액션이 실행될 때마다 매번 새롭게 연산을 수행한다. 만약 여러 액션에서 새롭게 수행하는 것이 아닌, RDD 하나를 재사용하고 싶으면 스파크에게 RDD.persist()를 사용하여 계속 결과를 유지하도록 요청할 수 있다.

 

첫 연산이 이루어진 후 스파크는 RDD의 내용을 메모리에 저장하게 되며, 이후의 액션들에서 재사용한다. 메모리 대신 디스크에 RDD를 저장하는 것도 가능하다. RDD를 재사용하지 않기로 한다면 스파크가 일회성 데이터를 가져와 계산하고 굳이 저장하지 않아도 된다.

 

executer는 태스크 처리를 끝낼 때, 처리 과정에서 생성된 파티션 인스턴스를 영속화하는 경우가 있다. 스파크에서는 RDD가 다음 조건 중 어느 하나라도 만족하면 영속화된다.

  • 셔플이 발생하는 변환을 실행하기 직전의 RDD
    • 셔플을 실행할 때, 변환하기 전의 내용을 로컬 디스크에 저장함으로써 파티션을 영속화
  • 사용자에 의해서 명시적으로 영속화가 선언된 RDD
    • 사용자가 드라이버 프로그램에서 선언하면, executor는 해당 RDD에 포함된 파티션을 영속화
from pyspark import SparkContext

# SparkContext 생성
sc = SparkContext("local", "Shuffle RDD Example")

# 데이터 생성
data1 = [(1, 'a'), (2, 'b'), (3, 'c')]
data2 = [(1, 'x'), (2, 'y'), (3, 'z')]

# RDD 생성
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)

# RDD 직접 영속화
# rdd1.persist()

# shuffle하여 두 RDD를 결합(rdd1, rdd2 자동 persist)
shuffled_rdd = rdd1.join(rdd2)

# SparkContext 종료
sc.stop()

 

위 코드는 rdd1.persist 부분에서 직접 영속화를 시도할 수도 있고, join 함수를 사용하여 rdd 데이터를 섞기 전에 rdd1, rdd2를 영속화 시키는 코드이다.

 

RDD를 이용한 Fault-Tolerance

데이터를 실시간으로 처리하는 도중 해당 데이터를 처리하던 서버에 장애가 생기면 어떻게 될까? 당연하게도 해당 서버의 스파크 프로세서는 중단되고, 해당 스파크가 처리하던 데이터도 유실될 것이다. 스파크는 자체적으로 Fault-tolerant(작업 중 장애가 발생하였을 때 예비 부품이나 절차가 즉시 그 역할을 대체 수행함으로써 서비스의 중단이 없게 하는 특성)를 보장하는 강력한 기능을 가지고 있다.

 

스파크에서는 파티션의 executor에 장애가 발생하는 등의 이유로 태스크 실행에 실패하면 다른 executor로 재실행된다. 이 경우에도 태스크 내용을 바탕으로 파티션을 순서대로 인스턴스화해 처리한다. 스파크는 이것을 recompute라고 한다. RDD는 불변(immutable)한 특징을 가지므로, recompute 해도 동일한 결과를 얻을 수 있다.

 

Immutable

한 번 생성된 인스턴스는 임의로 값을 바꿀 수 없기 때문에 읽기만 할 수 있다.(Read-Only) 하나의 파티션에서 장애가 발생했을 때 해당 처리에서 사용된 RDD를 다 버리고, 다시 처음부터 동일한 처리를 해도 원래 기대했던 값을 얻을 수 있다. RDD의 Fault-Tolerant를 실현하는 중요한 특성이다.

 

마무리하며


하둡과 HDFS, MapReduce와 Spark RDD까지 꽤 많은 내용들을 다루었는데, 짧게 정리해보겠다.

  • 빠른 처리 속도
    • Disk I/O로 연산을 수행하던 MapReduce와 달리 Spark RDD 데이터 모델은 In-Memory로 처리하기 때문에 Map Reduce에 비해 굉장히 빠른 속도로 연산을 수행할 수 있다.
  • Fault-Tolerance
    • 데이터를 처리하는 executor에서 문제가 생겨 데이터 유실이 발생해도 Read-Only 특성의 RDD가 기록되어 있기 때문에 다른 노드에서 재실행만 하면 똑같은 결괏값을 얻을 수 있다.

사실 RDD는 스파크의 데이터 타입 중 하나일 뿐이며, 요즘엔 좀 더 효율적인 처리/분석을 위해 DataFrame/DataSet이라는 데이터 타입을 사용한다. 다음 글에서는 DataFrame과 DataSet에 관하여 알아보겠다.

'Apache > Spark' 카테고리의 다른 글

[Spark] Apach Spark DataFrame  (0) 2024.03.18
profile

용로그

@용로그

벨덩보단 용덩 github.com/wonyongChoi05