용로그
article thumbnail
Published 2024. 3. 18. 03:50
[Spark] Apach Spark DataFrame Apache/Spark

들어가며


지난 글에서 Spark RDD에 대해서 알아보았다. 이번 글에서는 DataFrame에 대해서 알아보겠다. 

 

RDD의 문제점


DataFrame이 등장한 이유는 기존 RDD에 문제점이 있었기 때문일 것이다. 그렇다면 왜 RDD가 있었음에도 불구하고 DataFrame이라는 새로운 데이터 구조가 생긴 것일까?

 

여러가지 이유가 있었는데, 주된 내용은 성능 이슈였다.

  • 메모리 사용 및 디스크 I/O
    • RDD는 디스크에서 데이터를 읽고 쓸 수 있으며, RDD의 메모리 사용은 비효율적이다. 또한 메모리나 디스크에 용량이 충분하지 않으면 제대로 동작하지 않는다.
  • 데이터 처리의 효율성이 떨어짐
    • 스키마 개념이 없어, 구조화된 데이터와 비구조화 데이터를 함께 저장하여 효율성이 떨어진다.
  • 객체 직렬화 오버헤드
    • RDD는 자바의 직렬화와, GC(Garbage Collection)를 사용한다. 이는 메모리의 오버헤드를 증가시킨다.
  • 옵티마이저의 부재
    • RDD는 자체 옵티마이저가 존재하지 않는다. 따라서 개발자가 직접 RDD를 최적화해야 한다.

DataFrame의 등장


이러한 이유들로 스파크는 RDD의 한계를 극복하기 위해 DataFrame이라는 개념을 도입했다. Python Pandas 또는 R의 Dataframe과 유사한 개념이다. DataFrame은 행과 열로 구성된 데이터 분산 컬렉션이다. 쉽게 생각하면 행과 열로 이루어진 RDB의 테이블 구조와 비슷하다고 할 수 있다.

 

DataFrame 구조

 

DataFrame을 사용한 처리는 데이터 처리에 최적화된 기법을 사용하기 때문에 RDD를 사용한 것 보다 훨씬 빠른 처리 속도를 보여주며, 사용자 입장에서 효율성이 증대되었다.

  • 구조화된 데이터 구조
    • DataFrame은 구조화된 데이터를 다루기 쉽게 하기 위해 만들어졌다. RDB와 비슷하게 SparkSQL 등을 통해 데이터를 쿼리로 처리할 수 있게 되었다.
  • GC 오버헤드 감소
    • RDD는 데이터를 메모리에 저장하지만, DataFrame은 데이터를 오프-힙(GC의 영향을 받지 않는 영역) 영역에 저장한다.
  • 직렬화 오버헤드 감소
    • 직렬화 또한 오프-힙 메모리를 사용하여 오버헤드를 크게 감소시켰다.
  • 유연성 - 다양한 형태의 데이터 지원
    • 다양한 형식의 데이터를 처리할 수 있다. 구조화된 데이터 파일CSV, JSON, Parquet..)뿐만 아니라 DB 테이블, 외부 API Response 등 다양한 형태의 데이터를 지원한다.

기존 RDD에서 지원하던 Immutable, Lazy Evaluation, Distributed Processing, Parallelism, 다양한 언어 지원과 같은 특징들은 모두 DataFrame에서도 동일하게 제공된다.

Spark DataFrame example

DataFrame을 생성하고 간단한 작업을 실행하는 코드이다. 여러가지 메서드 및 SparkSQL을 간단하게 다루겠다.

 

Spark Session 생성

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("demo").getOrCreate()

 

일부 스파크 런타임 환경에는 사전에 인스턴스화된 스파크 세션이 함께 제공된다. getOrCreate() 메서드는 기존 스파크 세션을 사용하거나 없는 경우 새로운 스파크 세션을 생성한다.

 

Spark DataFrame 생성

df = spark.createDataFrame(
    [
        ("sue", 32),
        ("li", 3),
        ("bob", 75),
        ("heo", 13),
    ],
    ["first_name", "age"],
)

df.show()

# 결과
+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+

 

createDataFrame() 메서드로 DataFrame을 만들 수 있으며 결과는 행과 열로 이루어진 형태로 나온다. 이해하기 어렵지 않을 것이다. 이제 몇 가지 데이터 처리 작업을 수행해보겠다.

 

DataFrame에 열 추가

life_stage 연령이 12세 이하이면 "child", 13세에서 19세 사이면 "teenager", 20세 이상이면 "adult"를 반환하는 로직이다.

 

from pyspark.sql.functions import col, when

df1 = df.withColumn(
    "life_stage",
    when(col("age") < 13, "child")
    .when(col("age").between(13, 19), "teenager")
    .otherwise("adult"),
)

# 새롭게 추가된 DataFrame
df1.show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|        li|  3|     child|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+

# 원본 데이터
df.show()
+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+

 

스파크는 위의 결과와 같이 기존의 DataFrame 형태를 변경하지 않는다. 새로운 작업을 위해서는 새로운 결과를 새로운 변수에 할당해야 한다.

 

DataFrame 필터링

df1.where(col("life_stage").isin(["teenager", "adult"])).show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+

 

"teenager"와 "adult"만 조회하는 로직이다.

 

DataFrame 그룹화 및 집계

df1.select(avg("age")).show()

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+

df1.groupBy("life_stage").avg().show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+

 

DataFrame Query - SparkSQL

spark.sql("select life_stage, avg(age) from {df1} group by life_stage", df1=df1).show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+

 

 

문법을 보면 ANSI SQL과 매우 유사하게 동작한다. 조회 뿐만 아니라 데이터 추가도 가능하다.

 

spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")
spark.sql("select * from some_people").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
|       sue| 32|     adult|
|       bob| 75|     adult|
|        li|  3|     child|
|     frank|  4|     child|
+----------+---+----------+

 

RDD를 사용하는 예시

DataFrame은 구조화된 데이터에 대해 더 쉽고 효율적으로 동작하기 때문에 구조화 되지 않은 데이터를 다룰 때는 기존의 RDD가 적합할 수 있다. "some_text.txt" 파일이 있다고 가정하자.

 

text_file = spark.sparkContext.textFile("some_words.txt")

counts = (
    text_file.flatMap(lambda line: line.split(" "))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
)

 

텍스트 파일에 있는 각 단어의 개수를 RDD를 사용하여 계산하는 로직은 다음과 같다.

 

text_file = spark.sparkContext.textFile("some_words.txt")

counts = (
    text_file.flatMap(lambda line: line.split(" "))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
)

counts.collect()

# 결과
[('these', 2),
 ('are', 2),
 ('more', 1),
 ('in', 1),
 ('words', 3),
 ('english', 1)]

 

마무리하며


이번 글에서는 스파크에서 DataFrame의 등장 배경과 사용 법을 간단하게 살펴보았다. 기존 스파크와 RDD의 장점들을 유지하며, 단점을 보완한 형태이기 때문에 매우 매력적일 수 밖에 없다.

'Apache > Spark' 카테고리의 다른 글

[Spark] Apache Spark RDD  (3) 2024.03.16
profile

용로그

@용로그

벨덩보단 용덩 github.com/wonyongChoi05