๐Ÿฅ

[Spark] Repartition๊ณผ Coalesce ๋ณธ๋ฌธ

์นดํ…Œ๊ณ ๋ฆฌ ์—†์Œ

[Spark] Repartition๊ณผ Coalesce

•8• 2024. 4. 1. 15:21

spark๋ฅผ ํ†ตํ•ด ์—ฐ์‚ฐ์„ ํ•˜๋‹ค ๋ณด๋ฉด ํŒŒํ‹ฐ์…˜์ด ๊ณผ๋‹คํ•˜๊ฒŒ ๋งŽ๊ฑฐ๋‚˜ ๋„ˆ๋ฌด ์ ์€ ์ƒํ™ฉ์ด ์žˆ์„ ์ˆ˜ ์žˆ๋‹ค. ์ด๋Ÿด ๋•Œ์—๋Š” repartition๊ณผ coalesce๋ฅผ ํ†ตํ•ด ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜๋ฅผ ์ง€์ •ํ•ด์ค„ ์ˆ˜ ์žˆ๋‹ค.

`repartition()`๊ณผ `coalesce()` ๋ชจ๋‘ ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜๋ฅผ ์„ค์ •ํ•ด์ฃผ๋Š” ํ•จ์ˆ˜์ด๋‹ค.

ํŒŒํ‹ฐ์…˜์˜ ๊ฐœ์ˆ˜๋ฅผ ์กฐ์ •ํ•ด์ฃผ์ง€๋งŒ `coalesce()`๋Š” ํŒŒํ‹ฐ์…˜์˜ ๊ฐœ์ˆ˜๋ฅผ ์ค„์ด๋Š” ๊ฒƒ๋งŒ ๊ฐ€๋Šฅํ•˜๋‹ค๋Š” ์ ์—์„œ ์ฐจ์ด์ ์ด ์žˆ๋‹ค.

 

 

1. ์‹คํ–‰ ์˜ˆ์‹œ

repartition๊ณผ coalesce์˜ ๊ฐ€์žฅ ํฐ ์ฐจ์ด์ ์€ shuffle ์ˆ˜ํ–‰ ์—ฌ๋ถ€์— ์žˆ๋‹ค.

 

`coalesce`๋ฅผ ์‚ฌ์šฉํ•ด์„œ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ์ค„์ธ ๊ฒฝ์šฐ, stage2์—์„œ coalesce ๊ฐ€ ์ˆ˜ํ–‰๋˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

load df1
load df2
df3 = df1.join(df2, some_conditions, 'inner')
df3.coalesce(5).write.parquet('my_path')

 

๊ทธ๋Ÿฐ๋ฐ ์ž์„ธํžˆ ๋ณด๋ฉด default ํŒŒํ‹ฐ์…˜ ์ˆ˜๋Š” 200์ผํ…๋ฐ task๊ฐ€ 200์ด ์žˆ๋Š” stage๋Š” ๋ณด์ด์ง€ ์•Š๋Š”๋‹ค.

์ด๋ถ€๋ถ„์€ "5"๋ผ๋Š” ํŒŒํ‹ฐ์…˜ ๊ฐ’์ด ์ด์ „์˜ ๋ถ€๋ชจ rdd ์— overwrite ๋˜์—ˆ๊ณ  stage2์—์„œ 5๊ฐœ์˜ task๋กœ ์ˆ˜ํ–‰๋˜์—ˆ๋‹ค.

 

`repartition` ์‚ฌ์šฉ์˜ ๊ฒฝ์šฐ์—๋Š” `coalesce`์™€๋Š” ๋‹ฌ๋ฆฌ 4๊ฐœ์˜ stage๊ฐ€ ์ˆ˜ํ–‰๋˜์—ˆ๋‹ค.

load df1
load df2
df3 = df1.join(df2, some_conditions, 'inner')
df3.repartition(5).write.parquet('my_path')

stage 5์—์„œ Join์ด ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜๋งŒํผ 200๊ฐœ์˜ ๋ณ‘๋ ฌ task๋กœ ์‹คํ–‰๋˜์—ˆ๊ณ , ์ด๋ถ€๋ถ„์—์„œ shuffle์ด ๋ฐœ์ƒํ•œ ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

 

 

2. ์ˆ˜ํ–‰๋ฐฉ๋ฒ•

repartition์˜ ๊ฒฝ์šฐ์—๋Š” full shuffle์ด ์ผ์–ด๋‚˜๋ฉด์„œ ํŒŒํ‹ฐ์…˜ ์ˆ˜๊ฐ€ ์กฐ์ •์ด ๋˜๊ณ ,

coalesce๋Š” ์…”ํ”Œ์„ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ  ๋‹จ์ˆœ์ด ํŒŒํ‹ฐ์…˜์„ combineํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ ํŒŒํ‹ฐ์…˜ ์กฐ์ •์ด ์ˆ˜ํ–‰๋˜๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

์ถœ์ฒ˜: https://dongza.tistory.com/18

 

 

 

repartition์€ execution time์€ ์ฆ๊ฐ€ํ•˜์ง€๋งŒ shuffle ๋•๋ถ„์— ํŒŒํ‹ฐ์…˜๋งˆ๋‹ค ๊ณ ๋ฅธ ๋ฐ์ดํ„ฐ ๋ถ„ํฌ๋ฅผ ๊ฐ€์งˆ ์ˆ˜ ์žˆ๋‹ค.

๋ฐ˜๋Œ€๋กœ coalesce๋Š” ๋ถˆ๊ท ํ˜•ํ•œ ๋ถ„ํฌ๋ฅผ ๊ฐ€์ง„ ํŒŒํ‹ฐ์…˜์œผ๋กœ ์žฌ์กฐ์ •๋  ์ˆ˜ ์žˆ๋‹ค.

 

์ฐธ๊ณ ๋กœ repartition ํ•จ์ˆ˜๋ฅผ ์‚ดํŽด๋ณด๋ฉด ๊ทธ๋ƒฅ shuffle ๊ฐ’์„ True๋กœ ๋‘๊ณ  coalesce๋ฅผ ํ˜ธ์ถœํ•˜๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

(์ฐธ๊ณ : https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L479)

  /**
   * Return a new RDD that has exactly numPartitions partitions.
   *
   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
   * a shuffle to redistribute data.
   *
   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
   * which can avoid performing a shuffle.
   */
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

 

 

 

https://www.youtube.com/watch?v=ijD5zuEV8U8

https://blog.51cto.com/u_14009243/5975125