๐Ÿฅ

[Spark] Speculative Execution ๋ณธ๋ฌธ

๋ฐ์ดํ„ฐ/Spark

[Spark] Speculative Execution

•8• 2024. 4. 1. 13:27

๋ถ„์‚ฐ ํ™˜๊ฒฝ์—์„œ๋Š” ๊ณ ๋ฅด์ง€ ๋ชปํ•œ ๋ฆฌ์†Œ์Šค, ๋ฐ์ดํ„ฐ ๋กœ๋“œ ๋“ฑ task ์‹คํ–‰ ์†๋„๊ฐ€ ๋Š๋ ค์ง€๋Š” ์ƒํ™ฉ์ด ๋งŽ์ด ์žˆ๋‹ค. 

Speculative Execution

ํ•œ ๋‹จ๊ณ„์—์„œ ์—ฌ๋Ÿฌ ์ž‘์—…์˜ ์‹คํ–‰ ์‹œ๊ฐ„์ด ๋‹ค๋ฅผ ์ˆ˜ ์žˆ๋‹ค. ๋‹ค๋ฅธ task์™€ ๋น„๊ตํ•˜์—ฌ ํŠนํžˆ ๋Š๋ฆฌ๊ฒŒ ์‹คํ–‰๋˜๋Š” Task๊ฐ€ ์žˆ์„ ์ˆ˜ ์žˆ๋Š”๋ฐ, ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด ๋…ธ๋“œ์˜ ๊ตฌ์„ฑ ์„ฑ๋Šฅ ์ฐจ์ด, ๋„คํŠธ์›Œํฌ ๋ณ€๋™ ๋“ฑ์˜ ์ด์œ ๊ฐ€ ์žˆ์„ ์ˆ˜ ์žˆ๋‹ค.

speculative execution์€ ๋™์ผํ•œ ๋‹จ๊ณ„์— ์˜ค๋žซ๋™์•ˆ ์™„๋ฃŒ๋˜์ง€ ์•Š๋Š” ์ž‘์—…์ด ์žˆ์„ ๊ฒฝ์šฐ, ๋‹ค๋ฅธ ๋…ธ๋“œ์—์„œ ๋™์ผํ•œ ์ž‘์—…์„ ์‹œ์ž‘ํ•˜๊ณ , ๋จผ์ € ์™„๋ฃŒ๋˜๋Š” task๋ฅผ ์„ฑ๊ณต์ฒ˜๋ฆฌํ•˜๊ณ , ์ด์™ธ์˜ ๋‹ค๋ฅธ ์‹œ๋„๋Š” ์ข…๋ฃŒํ•œ๋‹ค.

ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด ๋…ธ๋“œ ๊ฐ„ ์„ฑ๋Šฅ ์ฐจ์ด๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ speculative execution ๊ธฐ๋Šฅ์„ ํ™œ์„ฑํ™”ํ•˜๋Š” ๊ฒƒ์ด ์ข‹๋‹ค.

speculative execution

 

์ฃผ์š” ๊ตฌ์„ฑ ์˜ต์…˜

  • spark.speculation (default=false)
    true ์„ค์ • ์‹œ speculative execution์„ ์‹คํ–‰ํ•œ๋‹ค.
    ๋„์ž…์‹œ๊ธฐ: 0.6.0
  • spark.speculation.interval (default=100ms)
    spark๊ฐ€ speculate ํ•  ์ž‘์—…์„ ํ™•์ธํ•˜๋Š” ๋นˆ๋„
    ๋„์ž…์‹œ๊ธฐ: 0.6.0
  • spark.speculation.multiplier (default = 1.5)
    task ์‹คํ–‰์‹œ๊ฐ„์˜ ์ค‘์•™๊ฐ’๋ณด๋‹ค ์–ผ๋งˆ๋‚˜ ๋Š๋ฆด ๋•Œ speculative execution์ด ๊ณ ๋ ค๋˜์–ด์•ผ ํ•˜๋Š”์ง€์— ๋Œ€ํ•œ ๊ฐ’
    ๋„์ž…์‹œ๊ธฐ: 0.6.0
  • spark.speculation.quantile (default = 0.75)
    speculation์ด ํ™œ์„ฑํ™”๋˜๊ธฐ ์ „์— ์™„๋ฃŒ๋˜์–ด์•ผ ํ•˜๋Š” task์˜ ๋น„์œจ
    ๋„์ž…์‹œ๊ธฐ: 0.6.0

 

spark.speculation์ด ํ™œ์„ฑํ™” ๋œ ํ›„ ๊ฐ ์ฃผ์š” ๊ตฌ์„ฑ ์˜ต์…˜์˜ ๊ธฐ๋ณธ๊ฐ’์„ ํ•ด์„ํ•ด๋ณด๋ฉด:

  • ๋งค 100ms ๋งˆ๋‹ค (spark.speculation.interval) spark๋Š” ๋Š๋ฆฌ๊ฒŒ ์‹คํ–‰๋˜๋Š” ์ž‘์—…์„ ํ™•์ธํ•œ๋‹ค.
  • ์™„๋ฃŒ๋œ ์ž‘์—…์˜ ์‹คํ–‰์‹œ๊ฐ„์˜ ์ค‘์•™๊ฐ’ * 1.5 < target task ์‹คํ–‰์‹œ๊ฐ„ ์ผ ๋•Œ ๋Š๋ฆฌ๊ฒŒ ์‹คํ–‰๋˜๋Š” ์ž‘์—…์œผ๋กœ ๊ฐ„์ฃผํ•œ๋‹ค. (spark.speculation.multiplier)
  • ์ŠคํŒŒํฌ๋Š” ์ „์ฒด ์˜ˆ์•ฝ๋œ ์ž‘์—…์˜ 75%๊ฐ€ ์™„๋ฃŒ๋˜๊ธฐ ์ „๊นŒ์ง€๋Š” speculation์„ ์‹คํ–‰ํ•˜์ง€ ์•Š๋Š”๋‹ค. (spark.speculation.quantile)

https://kb.databricks.com/scala/understanding-speculative-execution

speculative execution์ด ํ™œ์„ฑํ™”๋˜๊ณ  ๋Š๋ฆฐ ์ž‘์—…์œผ๋กœ ๊ฐ„์ฃผ๋œ task์˜ ๊ฒฝ์šฐ ์ƒํƒœ๊ฐ€ success์ธ ์ž‘์—…๊ณผ taskKilled์ธ ์ž‘์—… ๋‘ ๊ฐœ๊ฐ€ ํ‘œ์‹œ๋œ๋‹ค.

 

speculative execution์„ ํ™œ์„ฑํ™”ํ•˜๋ฉด ์ข‹์€ ๊ฒฝ์šฐ

  • <๋ช‡ ๊ฐ€์ง€ ์ž‘์—…๋•Œ๋ฌธ์— ์ „์ฒด ์‹คํ–‰์‹œ๊ฐ„์ด ๋„ˆ๋ฌด ๊ธธ์–ด์ง€๋Š” ๊ฒฝ์šฐ>
    ์ผ๋ถ€ ์ž‘์—…์ด ๋„ˆ๋ฌด ์˜ค๋žซ๋™์•ˆ ์‹คํ–‰๋˜๊ณ  ์›์ธ์ด ํ™•์ธ๋˜์ง€ ์•Š์€ ๊ฒฝ์šฐ์—๋Š” speculative execution์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.
    ๊ทธ๋Ÿฌ๋‚˜ ๊ทผ๋ณธ์ ์ธ ์›์ธ์ด ํ™•์ธ๋˜๋ฉด ํ•ด๊ฒฐํ•˜๊ณ  ๋น„ํ™œ์„ฑํ™” ํ•ด์•ผํ•œ๋‹ค.
  • <์ž˜๋ชป๋œ VM ์ธ์Šคํ„ด์Šค๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ>
    speculative exectuion์€ ์›๋ž˜ ์ž‘์—…๊ณผ ๋™์ผํ•œ ๋…ธ๋“œ์—์„œ ์˜ˆ์•ฝ๋˜์ง€ ์•Š์œผ๋ฏ€๋กœ ํŠน์ • vm์ด ๋น„์ •์ƒ์ ์ด๊ฑฐ๋‚˜ task๋ฅผ ์‹คํ–‰ํ•˜๊ธฐ์— ์ ํ•ฉํ•˜์ง€ ์•Š์€ ๋…ธ๋“œ๊ฐ€ ์ŠคํŒŒํฌ ํด๋Ÿฌ์Šคํ„ฐ์˜ ์ผ๋ถ€๋ผ๋ฉด ํ•ด๋‹น ์˜ต์…˜์„ ํ™œ์„ฑํ™”ํ•˜์—ฌ ์™„ํ™”ํ•  ์ˆ˜ ์žˆ๋‹ค.

 

speculative exection์„ ํ™œ์„ฑํ™”ํ•˜๋ฉด ์•ˆ์ข‹์€ ๊ฒฝ์šฐ

  • ํ”„๋กœ๋•์…˜ ์ž‘์—…์ผ ๋•Œ
    ํ”„๋กœ๋•์…˜ ์ž‘์—…์—์„œ speculation์„ ์žฅ๊ธฐ๊ฐ„ ์‚ฌ์šฉํ•˜๋ฉด ์ž‘์—…์˜ ์‹คํŒจ ํ™•๋ฅ ์ด ๋†’์•„์ง„๋‹ค.
  • ์ž‘์—…์ด ๋ฉฑ๋“ฑ์„ฑ์ด ์•„๋‹Œ ๊ฒฝ์šฐ
    ์ถ”์ธก์‹คํ–‰์„ ํ™œ์„ฑํ™” ํ•˜๋ฉด retry ํ›„ result๊ฐ€ ๋‹ฌ๋ผ์งˆ ์ˆ˜ ์žˆ๋‹ค.
  • skewed data๋กœ ์ธํ•ด ์˜ค๋ž˜ ๊ฑธ๋ฆฌ๋Š” ์ž‘์—…์˜ ๊ฒฝ์šฐ
    skewed data์˜ ๊ฒฝ์šฐ speculative execution task๊ฐ€ ์›๋ž˜ ์ž‘์—…๋งŒํผ ์˜ค๋ž˜ ๊ฑธ๋ฆด ์ˆ˜๋„ ์žˆ๋‹ค.
    speculative exectuion์€ ์›๋ž˜ ์ž‘์—…๋ณด๋‹ค ๋จผ์ € ์™„๋ฃŒ๋œ๋‹ค๋Š” ๊ฒƒ์„ ๋ณด์žฅํ•˜์ง€ ์•Š์œผ๋ฏ€๋กœ ํ™œ์„ฑํ™”ํ•ด๋ดค์ž ์›๋ž˜ ์ž‘์—…์ด ์„ฑ๊ณตํ•˜๊ณ  speculative execution์ด ์ข…๋ฃŒ๋  ๊ฐ€๋Šฅ์„ฑ์ด ์žˆ๋‹ค.
  • ์„ฑ๋Šฅ ์˜ํ–ฅ
    ํ™œ์„ฑํ™”ํ•˜๋ฉด ์„ฑ๋Šฅ์— ์˜ํ–ฅ์„ ๋ฏธ์น  ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ์ž‘์—… ์†๋„ ์ €ํ•˜ ์‹œ ๋‹จ๊ธฐ์ ์ธ ๋ฌธ์ œ ํ•ด๊ฒฐ์šฉ์œผ๋กœ๋งŒ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค.

 

์ถ”๊ฐ€ ๊ตฌ์„ฑ ์˜ต์…˜

timethreshold = (spark.speculation.multiplier * successfulTaskDurations.median) or (spark.speculation.minTaskRunTime) ๋กœ ์ •์˜

  • spark.speculation.minTaskRuntime (defaul = 100ms)
    speculation์„ ๊ณ ๋ คํ•˜๊ธฐ ์ „์— ์ž‘์—…์ด ์‹คํ–‰๋˜๋Š” ์ตœ์†Œ ์‹œ๊ฐ„.
    ๋งค์šฐ ์งง์€ task์˜ speculative execution์„ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉ๋œ๋‹ค.
    ๋„์ž…์‹œ๊ธฐ: 3.2.0
  • spark.speculation.efficiency.processRateMultiplier (default = 0.75)
    ๋น„ํšจ์œจ์ ์ธ ์ž‘์—…์„ ํ‰๊ฐ€ํ•  ๋•Œ ์‚ฌ์šฉ๋˜๋Š” multiplier.
    ๊ฐ’์ด ๋†’์„ ์ˆ˜๋ก ๋” ๋งŽ์€ task๊ฐ€ ๋น„ํšจ์œจ์ ์ธ ์ž‘์—…์œผ๋กœ ๊ฐ„์ฃผ๋  ์ˆ˜ ์žˆ๋‹ค.
    `calculator.getRunningTasksProcessRate(tid) <
                  calculator.getAvgTaskProcessRate() * efficientTaskProcessMultiplier`
    ๋„์ž…์‹œ๊ธฐ: 3.4.0
  • spark.speculation.efficiency.longRunTaskFactor (default = 2)
    task ๋‚ด์˜ process rate ์–‘ํ˜ธ ์—ฌ๋ถ€์™€ ์ƒ๊ด€์—†์ด ๊ธธ๊ฒŒ ์‹คํ–‰๋˜๋Š” Task ๋ˆ„๋ฝ์„๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ์„ค์ •ํ•˜๋Š” ๊ฐ’
    spark.speculation.efficiency.longRunFactor * time threshold์„ ์ดˆ๊ณผํ•˜๋Š” ์ž‘์—…์€ speculative ์ž‘์—…์ด ์‹คํ–‰๋œ๋‹ค.
    ๋„์ž…์‹œ๊ธฐ: 3.4.0
  • spark.speculation.efficiency.enabled (default = true)
    true ์„ค์ • ์‹œ
    1) processing rate๊ฐ€ ์„ฑ๊ณตํ•œ ๋ชจ๋“  task์˜ ํ‰๊ท  * multiplier ๋ณด๋‹ค ์ž‘๊ฑฐ๋‚˜
    2) duration์ด spark.speculation.efficiency.longRunTaskFactor * time thishold์„ ์ดˆ๊ณผํ•œ ๊ฒฝ์šฐ
    ๋น„ํšจ์œจ ์ ์ธ task๋กœ ํŒ๋‹จ ํ›„ speculateํ•œ๋‹ค.

 

์ฃผ์˜

speculative execution์€ execution time์„ ๊ฐœ์„ ํ•˜๊ธฐ ์œ„ํ•ด ๋ฆฌ์†Œ์Šค๊ฐ€ ๋” ๋งŽ์ด ํˆฌ์ž…๋˜๋Š” ์‹คํ–‰ ๋ฐฉ์‹์ด๋ฉฐ,

๋งŒ์•ฝ Speculative Task ์ž์ฒด๊ฐ€ ๋น„ํšจ์œจ์ ์ธ Task(Slow Task)๊ฐ€ ๋œ๋‹ค๋ฉด ๋ฆฌ์†Œ์Šค๋„ ์“ฐ๊ณ  ์‹œ๊ฐ„์  ์ด์ ๋„ ์—†๋Š” ์ƒํ™ฉ์ด ์ดˆ๋ž˜๋  ์ˆ˜ ์žˆ๋”ฐ.

 

์ฐธ๊ณ 

https://spark.apache.org/docs/latest/configuration.html#scheduling

https://kb.databricks.com/scala/understanding-speculative-execution

https://github.com/apache/spark/blob/9b1b2b30d5919c4de358e82786dc631a3ab914ea/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L1208

'๋ฐ์ดํ„ฐ > Spark' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

[Spark] ์ŠคํŒŒํฌ ์Šค์ผ€์ฅด๋ง  (0) 2024.04.01
[Spark] cache()์™€ persist()  (0) 2024.04.01
[Spark] Logical Plan ๊ณผ Physical Plan  (0) 2024.03.25
[Spark] RDD vs Dataframe  (2) 2024.03.24
[Spark] spark-submit ๊ณผ ์˜ต์…˜  (1) 2024.03.24