๐ฅ
[Spark] RDD vs Dataframe ๋ณธ๋ฌธ
์คํํฌ์์ ๋ฐ์ดํฐ ๊ตฌ์กฐ ์ข ๋ฅ์๋ ์ธ ๊ฐ๊ฐ ์๋ค.
- RDD: spark 1.0 ๋์
- Dataframe: spark 1.3 ๋์
- Dataset: spark 1.6 ๋์
์ฐธ๊ณ ๋ก spark 2.0๋ถํฐ dataframe api๋ datasets๊ณผ ํตํฉ๋์๋ค. (์ค์นผ๋ผ์ ์๋ฐ์์)
` Unifying DataFrames and Datasets in Scala/Java: Starting in Spark 2.0, DataFrame is just a type alias for Dataset of Row. `
1. RDD๋?
Reillient Distributed Data๋ก ํ์ด์ฐ์ฌ์ง๋ RDD๋ ํ๋ณต๋ ฅ ์๋/๋ถ๋ณ์ ๋ถ์ฐ ๋ฐ์ดํฐ ์ ๋๋ก ํด์๋ ์ ์๋ค.
์คํํฌ์ ๊ฐ์ฅ ๊ธฐ๋ณธ์ ์ธ ๋ฐ์ดํฐ ๋จ์์ด๋ค.
๋ถ๋ณ์ ํน์ฑ
RDD๋ ์ด๋ฆ(Reillient)์์๋ ์ ์ ์๋ฏ์ด ๋ถ๋ณ(Read only)์ ํน์ง์ ๊ฐ์ง๋ค.
์๋ฅผ๋ค์ด ์ด๋ ํ RDD ์ฐ์ฐ์ด ์์ ๊ฐ์ DAG(=lineage)์ ์์๋ก ์ํ๋๋ค๊ณ ๊ฐ์ ํ๋ฉด,
RDD๋ read only ์ด๊ธฐ ๋๋ฌธ์ ํ ๋จ๊ณ์ ์ฐ์ฐ์ ๊ฑฐ์น ๋๋ง๋ค ๊ธฐ์กด source RDD๋ฅผ modifyํ๋ ๊ฒ์ด ์๋ ์๋ก์ด RDD๋ฅผ ์์ฑํ๋ค.
์ด์ ๊ฐ์ ํน์ฑ๋๋ถ์ ์ค๊ฐ ๋จ๊ณ์์ ๋ ธ๋ ๊ฐ ์ปค๋ฅ์ ๋ฑ์ ์ด์๋ก RDD๊ฐ ์์ค๋์๋๋ผ๋, DAG๋ฅผ ํตํด ๋ค์ ๋ณต๊ธฐํ๊ณ ๋ณต๊ตฌ๋ ์ ์๋ค.(=fault tolerant)
Lazy Evaluation (๋๊ธํ ์ฐ์ฐ)
RDD์ ์ฐ์ฐ์๋ ํฌ๊ฒ ๋ ๊ฐ์ง์ ์ข ๋ฅ๊ฐ ์๋๋ฐ, Transformation ๊ณผ Action์ด๋ค.
Transformation๊ณผ Action ์ฐ์ฐ์ ์ข ๋ฅ๋ ์๋์ ๋ฌธ์์ ์ ๋์ด๋์ด ์๋ค.
https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
Transformation
์๋ก์ด RDD๋ฅผ ๋ฆฌํดํ๋ค. ๋จ์ํ ์ด๋ ํ ์ฐ์ฐ์ธ์ง๋ฅผ ๊ธฐ๋กํ ๋ฟ์ด๊ณ , ์ค์ ๋ก๋ ์๋ฌด๋ฐ ์ฐ์ฐ๋ ์ํ๋์ง ์๋๋ค.
Action
์ค์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ํํ๋ ์ฐ์ฐ์์ด๋ค. ๊ธฐ๋ก๋ ๋ชจ๋ Transformation ์์ ์ ์ํํ๋ค. ์ค์ ๋ฐ์ดํฐ๋ฅผ ๋ฆฌํดํด์ค๋ค.
Transformation ์ฐ์ฐ์ด ์๋ฌด๋ฆฌ ๋ง์ด ์์ด๋ ์ค์ Action ์ฐ์ฐ์๊ฐ ์๊ธฐ ์ ๊น์ง๋ ์๋ฌด๋ฐ ์ฐ์ฐ๋ ์ํ๋์ง ์๋๋ฐ, ์ด๋ฌํ ๋์ ๋ฐฉ์์ Lazy evaluation์ผ๋ก ๋ถ๋ฅด๊ณ , RDD ๋์์ ํต์ฌ์ด ๋๋ค.
์์ Spark Lineage๋ฅผ ์๋ก ๋ค๋ฉด Filter/Aggregate ์์๋ ์ค์ ์ด๋ ํ ์ฐ์ฐ๋ ์ํ๋์ง ์๊ณ ์ผ์ข ์ ๋ฉํ๋ฐ์ดํฐ๋ง ๊ธฐ๋ก๋๋ค๊ฐ, ๋ง์ง๋ง Output์ ๋ง๋ค๊ธฐ ์ํ Action ์ฐ์ฐ์๊ฐ ํธ์ถ๋ ๋ ์ค์ ์ฐ์ฐ์ด ์ํ๋๋ ๊ฒ์ด๋ค.
2. Dataframe & Datasets
Dataframe์ extended RDD๋ก, RDD์ ์์ ๊ฐ๋ ์ ์ํ๋ค.
RDD๋ non-structured data ํํ๋ฅผ ๋ค๋ฃจ์ง๋ง, dataframe์ ํ ์ด๋ธ๊ณผ ๊ฐ์ ํํ์ ์คํค๋ง๋ฅผ ๊ฐ์ง๊ณ ์๋ structured data ๊ตฌ์กฐ์ด๋ค
Dataset์ extension of Dataframe์ผ๋ก DataFrame = row ํ์ ์ Dataset ( Dataset[Row] ) ์ด๋ค.
python์ ์ฌ์ฉํด์ dataset์ ๊ฐ๋ ์ด ์ข ์ด๋ ค์ ๋๋ฐ, Dataframe์์ Dataset์ผ๋ก ๋ณํํ๋ ๋ฐฉ์์ ์ดํด๋ณด๋ ๋๋์ด ์๋ค.
case class Person(name: String, age: Int) // ๋ฐ์ดํฐ ํ์
์ ์
val Persondf = spark.read.... // ๋ฐ์ดํฐํ๋ ์ ๋ก๋
val PersonDS = df.as[Person] // Dataset์ผ๋ก ๋ณํ
Dataframe์ ์์ฑ์์๋ Person ๊ฐ์ฒด๋ฅผ Rowํ์ ์ ์ง๋ ฌํํ ๋ฐ์ด๋๋ฆฌ ๊ตฌ์กฐ๋ก ๋ณํํ๋ค.
Dataset ์์ฑ์์๋ ์ธ์ฝ๋(encoder)๊ฐ ๋ฐํ์ ์์ Person ๊ฐ์ฒด๋ฅผ ๋ฐ์ด๋๋ฆฌ ๊ตฌ์กฐ๋ก ์์ฑํ๋ค. DatasetAPI ์ฌ์ฉ ์์ ์คํํฌ๋ Dataset์ ์ ๊ทผํ ๋ Row ํฌ๋งท์ด ์๋ ์ฌ์ฉ์ ์ ์ ๋ฐ์ดํฐํ์ ์ ๋ฐํํ๋ค.
- ์ฅ์ : ์ฌ์ฉ์์๊ฒ ์ ์ฐ์ฑ ์ ๊ณต
- ๋จ์ : ์ฌ์ฉ์ ์ ์ ๋ฐ์ดํฐ ํ์ ์ผ๋ก์ ๋ณํ์์ ์ด ์๊ธฐ ๋๋ฌธ์ Dataframe ๋๋น ๋๋ฆฐ ์ฑ๋ฅ
Dataset์ JVM์ ์ด์ฉํ๋ ์ค์นผ๋ผ์ ์๋ฐ์์๋ง ์ฌ์ฉํ ์ ์๋ค.
์ ํ์์ ๋ณผ ์ ์๋ค์ํผ, Dataset์ JVM ํ์ ์ ๊ฐ์ฒด๋ก ๋ฐ์ดํฐ๊ฐ ํํ๋๊ธฐ ๋๋ฌธ์ type-safety๋ฅผ ์ ๊ณตํ๊ณ , ์ปดํ์ผ ์์ ์์ ํ์ ์ ๊ฒ์ฌํ ์ ์์ด analysis error๋ฅผ ๋ฐ๊ฒฌํ ์ ์๋ค.
3. ์ฌ์ฉ ์๊ธฐ
์ผ๋ฐ์ ์ผ๋ก ๊ฐ API ๋ณ๋ก ์ฅ๋จ์ ์ด ๋ช ํํด์ ์ฌ์ฉ๋์ด์ผ ํ๋ ์ผ์ด์ค๋ฅผ ์๋์ ๊ฐ์ด ์ ๋ฆฌํด๋ณผ ์ ์์ ๊ฒ ๊ฐ๋ค.
RDD
Low-Level API๋ฅผ ์ฌ์ฉํด์ผ ํ ๋
์์ฒ๋ฐ์ดํฐ๊ฐ unstructured ํํ์ผ ๋
์คํค๋ง๋ฅผ ์ ๊ฒฝ์ฐ๊ณ ์ถ์ง ์์ ๋
์ต์ ํ๋ฅผ ์ ๊ฒฝ์ฐ์ง ์์ ๋
๋ฐ์ดํฐ๋ฅผ ํจ์ํ ํ๋ก๊ทธ๋๋ฐ์ผ๋ก ์กฐ์ํ๊ณ ์ถ์ ๋ (udf ๋ฑ)
Dataframe
๋ฐ์ดํฐ ์ถ์ํ๊ฐ ํ์ํ ๋, ๋๋ฉ์ธ๋ณ API๋ฅผ ์ฌ์ฉํ๊ณ ์ถ์ ๋
๋จ์ํ๋ API๋ฅผ ์ฌ์ฉํ๊ณ ์ถ์ ๋, ๊ฐ๊ฒฐํ ์ฝ๋๋ฅผ ์์ฑํ๊ณ ์ถ์ ๋
Dataset
dataframe์ด ํ์ํ ์ํฉ์์ ์ถ๊ฐ๋ก ํ์ ์์ ์ฑ์ด ํ์ํ ๋
dataframe ๋๋น ๋๋ฆฐ ์ฑ๋ฅ์ ๊ฐ์ํ ์ ์์ ๋
[์ถ๊ฐ] Spark RDD/Dataframe API Category
์์์๋ ์ค๋ช ํ๋ค์ํผ, ์คํํฌ์ API๋ ํฌ๊ฒ transformations์ actions๋ก ๋๋๋ค.
1. Transformation
๋ฐ์ดํฐ๋ฅผ ๋ณํํ ๋ ์ฌ์ฉํ๋ API. action ์ ๊น์ง๋ ๋ชจ๋ transformation์ ์คํ๋์ง ์๋๋ค.
1-1. narrow dependency
๋ฐ์ดํฐ๊ฐ ์ฌ๋ฌ๊ฐ์ ํํฐ์ ์ผ๋ก ๋๋์ด์ ธ์ ๋ณ๋ ฌ์ ์ผ๋ก ์ํ๋ ์ ์๋ API.
๋ถ์ฐ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํ๊ธฐ ๋๋ฌธ์ ์๋์ ์ผ๋ก ๋ฎ์ ๋น์ฉ์ transformation์ด๋ค.
RDD์๋ `map()`, `mapPartition()`, `flatMap()`, `filter()`, `union()` ์ด ์๊ณ ,
Dataframe์๋ `select()`, `filter()`, `withColumn()`, `drop()`, `where()` ๋ฑ์ด ์๋ค.
1-2. wide dependency
๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ ๋ชจ์์ ์ฒ๋ฆฌํด์ผ ํ๋ API.
ํ๊บผ๋ฒ์ ๋ชจ์์ ์ฒ๋ฆฌ๋๊ธฐ ๋๋ฌธ์ narrow ์ ๋นํด์๋ ๋์ ๋น์ฉ์ transformation์ด๋ค.
RDD์๋ `groupByKey()`, `aggregateByKey()`, `sortByKey()`, `reduceByKey()`, `aggregate()`, `join()`, `repartitions()` ๋ฑ์ด ์๊ณ ,
Dataframe์๋ `groupBy()`, `join()`, `agg()`, `cube()`, `rollup()`, `repartitions()` ๋ฑ์ด ์๋ค.
2. Action
์ค์ ๋ก job์ด ์์๋๋ ์ฐ์ฐ์ด๋ค. action ๋จ๊ณ์์ ์ค์ ๋ก ๋ชจ๋ ์ฐ์ฐ์ด ์ํ๋๊ธฐ ๋๋ฌธ์ staging์ด๋ผ๊ณ ๋ณผ ์ ์๋ค.
์คํํฌ๋ staging ๋จ์๋ก logical plan์ ์์ฑํ๊ธฐ ๋๋ฌธ์ action์ด ์ด๋ค ์๊ฐ์ ์ํ๋๋์ ๋ฐ๋ผ optimization ๋ฐฉ์์ด ๋ฌ๋ผ์ง ์ ์๋ค.
RDD์๋ `collect()`, `count()`, `first()`, `take()`, `reduce()`, `saveAsTextFile()` ๋ฑ์ด ์๊ณ ,
Dataframe์๋ `show()`, `head()`, `count()`, `collect()` ๋ฑ์ด ์๋ค.
'๋ฐ์ดํฐ > Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] Speculative Execution (0) | 2024.04.01 |
---|---|
[Spark] Logical Plan ๊ณผ Physical Plan (0) | 2024.03.25 |
[Spark] spark-submit ๊ณผ ์ต์ (1) | 2024.03.24 |
[Spark] Adaptive Query Execution(AQE) (0) | 2024.03.23 |
[Spark] ์คํํฌ์ Executor Memory ๊ตฌ์กฐ (0) | 2024.03.23 |