๐ฅ
[Spark] Repartition๊ณผ Coalesce ๋ณธ๋ฌธ
spark๋ฅผ ํตํด ์ฐ์ฐ์ ํ๋ค ๋ณด๋ฉด ํํฐ์ ์ด ๊ณผ๋คํ๊ฒ ๋ง๊ฑฐ๋ ๋๋ฌด ์ ์ ์ํฉ์ด ์์ ์ ์๋ค. ์ด๋ด ๋์๋ repartition๊ณผ coalesce๋ฅผ ํตํด ํํฐ์ ๊ฐ์๋ฅผ ์ง์ ํด์ค ์ ์๋ค.
`repartition()`๊ณผ `coalesce()` ๋ชจ๋ ํํฐ์ ๊ฐ์๋ฅผ ์ค์ ํด์ฃผ๋ ํจ์์ด๋ค.
ํํฐ์ ์ ๊ฐ์๋ฅผ ์กฐ์ ํด์ฃผ์ง๋ง `coalesce()`๋ ํํฐ์ ์ ๊ฐ์๋ฅผ ์ค์ด๋ ๊ฒ๋ง ๊ฐ๋ฅํ๋ค๋ ์ ์์ ์ฐจ์ด์ ์ด ์๋ค.
1. ์คํ ์์
repartition๊ณผ coalesce์ ๊ฐ์ฅ ํฐ ์ฐจ์ด์ ์ shuffle ์ํ ์ฌ๋ถ์ ์๋ค.
`coalesce`๋ฅผ ์ฌ์ฉํด์ ํํฐ์ ์๋ฅผ ์ค์ธ ๊ฒฝ์ฐ, stage2์์ coalesce ๊ฐ ์ํ๋๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
load df1
load df2
df3 = df1.join(df2, some_conditions, 'inner')
df3.coalesce(5).write.parquet('my_path')
๊ทธ๋ฐ๋ฐ ์์ธํ ๋ณด๋ฉด default ํํฐ์ ์๋ 200์ผํ ๋ฐ task๊ฐ 200์ด ์๋ stage๋ ๋ณด์ด์ง ์๋๋ค.
์ด๋ถ๋ถ์ "5"๋ผ๋ ํํฐ์ ๊ฐ์ด ์ด์ ์ ๋ถ๋ชจ rdd ์ overwrite ๋์๊ณ stage2์์ 5๊ฐ์ task๋ก ์ํ๋์๋ค.
`repartition` ์ฌ์ฉ์ ๊ฒฝ์ฐ์๋ `coalesce`์๋ ๋ฌ๋ฆฌ 4๊ฐ์ stage๊ฐ ์ํ๋์๋ค.
load df1
load df2
df3 = df1.join(df2, some_conditions, 'inner')
df3.repartition(5).write.parquet('my_path')
stage 5์์ Join์ด ํํฐ์ ๊ฐ์๋งํผ 200๊ฐ์ ๋ณ๋ ฌ task๋ก ์คํ๋์๊ณ , ์ด๋ถ๋ถ์์ shuffle์ด ๋ฐ์ํ ๊ฒ์ ์ ์ ์๋ค.
2. ์ํ๋ฐฉ๋ฒ
repartition์ ๊ฒฝ์ฐ์๋ full shuffle์ด ์ผ์ด๋๋ฉด์ ํํฐ์ ์๊ฐ ์กฐ์ ์ด ๋๊ณ ,
coalesce๋ ์ ํ์ ์ฌ์ฉํ์ง ์๊ณ ๋จ์์ด ํํฐ์ ์ combineํ๋ ๋ฐฉ์์ผ๋ก ํํฐ์ ์กฐ์ ์ด ์ํ๋๋ ๊ฒ์ ์ ์ ์๋ค.
repartition์ execution time์ ์ฆ๊ฐํ์ง๋ง shuffle ๋๋ถ์ ํํฐ์ ๋ง๋ค ๊ณ ๋ฅธ ๋ฐ์ดํฐ ๋ถํฌ๋ฅผ ๊ฐ์ง ์ ์๋ค.
๋ฐ๋๋ก coalesce๋ ๋ถ๊ท ํํ ๋ถํฌ๋ฅผ ๊ฐ์ง ํํฐ์ ์ผ๋ก ์ฌ์กฐ์ ๋ ์ ์๋ค.
์ฐธ๊ณ ๋ก repartition ํจ์๋ฅผ ์ดํด๋ณด๋ฉด ๊ทธ๋ฅ shuffle ๊ฐ์ True๋ก ๋๊ณ coalesce๋ฅผ ํธ์ถํ๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
(์ฐธ๊ณ : https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L479)
/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}