๐ฅ
[Spark] Speculative Execution ๋ณธ๋ฌธ
๋ถ์ฐ ํ๊ฒฝ์์๋ ๊ณ ๋ฅด์ง ๋ชปํ ๋ฆฌ์์ค, ๋ฐ์ดํฐ ๋ก๋ ๋ฑ task ์คํ ์๋๊ฐ ๋๋ ค์ง๋ ์ํฉ์ด ๋ง์ด ์๋ค.
Speculative Execution
ํ ๋จ๊ณ์์ ์ฌ๋ฌ ์์ ์ ์คํ ์๊ฐ์ด ๋ค๋ฅผ ์ ์๋ค. ๋ค๋ฅธ task์ ๋น๊ตํ์ฌ ํนํ ๋๋ฆฌ๊ฒ ์คํ๋๋ Task๊ฐ ์์ ์ ์๋๋ฐ, ํด๋ฌ์คํฐ ๋ด ๋ ธ๋์ ๊ตฌ์ฑ ์ฑ๋ฅ ์ฐจ์ด, ๋คํธ์ํฌ ๋ณ๋ ๋ฑ์ ์ด์ ๊ฐ ์์ ์ ์๋ค.
speculative execution์ ๋์ผํ ๋จ๊ณ์ ์ค๋ซ๋์ ์๋ฃ๋์ง ์๋ ์์ ์ด ์์ ๊ฒฝ์ฐ, ๋ค๋ฅธ ๋ ธ๋์์ ๋์ผํ ์์ ์ ์์ํ๊ณ , ๋จผ์ ์๋ฃ๋๋ task๋ฅผ ์ฑ๊ณต์ฒ๋ฆฌํ๊ณ , ์ด์ธ์ ๋ค๋ฅธ ์๋๋ ์ข ๋ฃํ๋ค.
ํด๋ฌ์คํฐ ๋ด ๋ ธ๋ ๊ฐ ์ฑ๋ฅ ์ฐจ์ด๊ฐ ์๋ ๊ฒฝ์ฐ speculative execution ๊ธฐ๋ฅ์ ํ์ฑํํ๋ ๊ฒ์ด ์ข๋ค.
์ฃผ์ ๊ตฌ์ฑ ์ต์
- spark.speculation (default=false)
true ์ค์ ์ speculative execution์ ์คํํ๋ค.
๋์ ์๊ธฐ: 0.6.0 - spark.speculation.interval (default=100ms)
spark๊ฐ speculate ํ ์์ ์ ํ์ธํ๋ ๋น๋
๋์ ์๊ธฐ: 0.6.0 - spark.speculation.multiplier (default = 1.5)
task ์คํ์๊ฐ์ ์ค์๊ฐ๋ณด๋ค ์ผ๋ง๋ ๋๋ฆด ๋ speculative execution์ด ๊ณ ๋ ค๋์ด์ผ ํ๋์ง์ ๋ํ ๊ฐ
๋์ ์๊ธฐ: 0.6.0 - spark.speculation.quantile (default = 0.75)
speculation์ด ํ์ฑํ๋๊ธฐ ์ ์ ์๋ฃ๋์ด์ผ ํ๋ task์ ๋น์จ
๋์ ์๊ธฐ: 0.6.0
spark.speculation์ด ํ์ฑํ ๋ ํ ๊ฐ ์ฃผ์ ๊ตฌ์ฑ ์ต์ ์ ๊ธฐ๋ณธ๊ฐ์ ํด์ํด๋ณด๋ฉด:
- ๋งค 100ms ๋ง๋ค (spark.speculation.interval) spark๋ ๋๋ฆฌ๊ฒ ์คํ๋๋ ์์ ์ ํ์ธํ๋ค.
- ์๋ฃ๋ ์์ ์ ์คํ์๊ฐ์ ์ค์๊ฐ * 1.5 < target task ์คํ์๊ฐ ์ผ ๋ ๋๋ฆฌ๊ฒ ์คํ๋๋ ์์ ์ผ๋ก ๊ฐ์ฃผํ๋ค. (spark.speculation.multiplier)
- ์คํํฌ๋ ์ ์ฒด ์์ฝ๋ ์์ ์ 75%๊ฐ ์๋ฃ๋๊ธฐ ์ ๊น์ง๋ speculation์ ์คํํ์ง ์๋๋ค. (spark.speculation.quantile)
speculative execution์ด ํ์ฑํ๋๊ณ ๋๋ฆฐ ์์ ์ผ๋ก ๊ฐ์ฃผ๋ task์ ๊ฒฝ์ฐ ์ํ๊ฐ success์ธ ์์ ๊ณผ taskKilled์ธ ์์ ๋ ๊ฐ๊ฐ ํ์๋๋ค.
speculative execution์ ํ์ฑํํ๋ฉด ์ข์ ๊ฒฝ์ฐ
- <๋ช ๊ฐ์ง ์์
๋๋ฌธ์ ์ ์ฒด ์คํ์๊ฐ์ด ๋๋ฌด ๊ธธ์ด์ง๋ ๊ฒฝ์ฐ>
์ผ๋ถ ์์ ์ด ๋๋ฌด ์ค๋ซ๋์ ์คํ๋๊ณ ์์ธ์ด ํ์ธ๋์ง ์์ ๊ฒฝ์ฐ์๋ speculative execution์ ์ฌ์ฉํ ์ ์๋ค.
๊ทธ๋ฌ๋ ๊ทผ๋ณธ์ ์ธ ์์ธ์ด ํ์ธ๋๋ฉด ํด๊ฒฐํ๊ณ ๋นํ์ฑํ ํด์ผํ๋ค. - <์๋ชป๋ VM ์ธ์คํด์ค๊ฐ ์๋ ๊ฒฝ์ฐ>
speculative exectuion์ ์๋ ์์ ๊ณผ ๋์ผํ ๋ ธ๋์์ ์์ฝ๋์ง ์์ผ๋ฏ๋ก ํน์ vm์ด ๋น์ ์์ ์ด๊ฑฐ๋ task๋ฅผ ์คํํ๊ธฐ์ ์ ํฉํ์ง ์์ ๋ ธ๋๊ฐ ์คํํฌ ํด๋ฌ์คํฐ์ ์ผ๋ถ๋ผ๋ฉด ํด๋น ์ต์ ์ ํ์ฑํํ์ฌ ์ํํ ์ ์๋ค.
speculative exection์ ํ์ฑํํ๋ฉด ์์ข์ ๊ฒฝ์ฐ
- ํ๋ก๋์
์์
์ผ ๋
ํ๋ก๋์ ์์ ์์ speculation์ ์ฅ๊ธฐ๊ฐ ์ฌ์ฉํ๋ฉด ์์ ์ ์คํจ ํ๋ฅ ์ด ๋์์ง๋ค. - ์์
์ด ๋ฉฑ๋ฑ์ฑ์ด ์๋ ๊ฒฝ์ฐ
์ถ์ธก์คํ์ ํ์ฑํ ํ๋ฉด retry ํ result๊ฐ ๋ฌ๋ผ์ง ์ ์๋ค. - skewed data๋ก ์ธํด ์ค๋ ๊ฑธ๋ฆฌ๋ ์์
์ ๊ฒฝ์ฐ
skewed data์ ๊ฒฝ์ฐ speculative execution task๊ฐ ์๋ ์์ ๋งํผ ์ค๋ ๊ฑธ๋ฆด ์๋ ์๋ค.
speculative exectuion์ ์๋ ์์ ๋ณด๋ค ๋จผ์ ์๋ฃ๋๋ค๋ ๊ฒ์ ๋ณด์ฅํ์ง ์์ผ๋ฏ๋ก ํ์ฑํํด๋ดค์ ์๋ ์์ ์ด ์ฑ๊ณตํ๊ณ speculative execution์ด ์ข ๋ฃ๋ ๊ฐ๋ฅ์ฑ์ด ์๋ค. - ์ฑ๋ฅ ์ํฅ
ํ์ฑํํ๋ฉด ์ฑ๋ฅ์ ์ํฅ์ ๋ฏธ์น ์ ์๊ธฐ ๋๋ฌธ์ ์์ ์๋ ์ ํ ์ ๋จ๊ธฐ์ ์ธ ๋ฌธ์ ํด๊ฒฐ์ฉ์ผ๋ก๋ง ์ฌ์ฉํด์ผ ํ๋ค.
์ถ๊ฐ ๊ตฌ์ฑ ์ต์
timethreshold = (spark.speculation.multiplier * successfulTaskDurations.median) or (spark.speculation.minTaskRunTime) ๋ก ์ ์
- spark.speculation.minTaskRuntime (defaul = 100ms)
speculation์ ๊ณ ๋ คํ๊ธฐ ์ ์ ์์ ์ด ์คํ๋๋ ์ต์ ์๊ฐ.
๋งค์ฐ ์งง์ task์ speculative execution์ ๋ฐฉ์งํ๊ธฐ ์ํด ์ฌ์ฉ๋๋ค.
๋์ ์๊ธฐ: 3.2.0 - spark.speculation.efficiency.processRateMultiplier (default = 0.75)
๋นํจ์จ์ ์ธ ์์ ์ ํ๊ฐํ ๋ ์ฌ์ฉ๋๋ multiplier.
๊ฐ์ด ๋์ ์๋ก ๋ ๋ง์ task๊ฐ ๋นํจ์จ์ ์ธ ์์ ์ผ๋ก ๊ฐ์ฃผ๋ ์ ์๋ค.
`calculator.getRunningTasksProcessRate(tid) <
calculator.getAvgTaskProcessRate() * efficientTaskProcessMultiplier`
๋์ ์๊ธฐ: 3.4.0 - spark.speculation.efficiency.longRunTaskFactor (default = 2)
task ๋ด์ process rate ์ํธ ์ฌ๋ถ์ ์๊ด์์ด ๊ธธ๊ฒ ์คํ๋๋ Task ๋๋ฝ์๋ฐฉ์งํ๊ธฐ ์ํด ์ค์ ํ๋ ๊ฐ
spark.speculation.efficiency.longRunFactor * time threshold์ ์ด๊ณผํ๋ ์์ ์ speculative ์์ ์ด ์คํ๋๋ค.
๋์ ์๊ธฐ: 3.4.0 - spark.speculation.efficiency.enabled (default = true)
true ์ค์ ์
1) processing rate๊ฐ ์ฑ๊ณตํ ๋ชจ๋ task์ ํ๊ท * multiplier ๋ณด๋ค ์๊ฑฐ๋
2) duration์ด spark.speculation.efficiency.longRunTaskFactor * time thishold์ ์ด๊ณผํ ๊ฒฝ์ฐ
๋นํจ์จ ์ ์ธ task๋ก ํ๋จ ํ speculateํ๋ค.
์ฃผ์
speculative execution์ execution time์ ๊ฐ์ ํ๊ธฐ ์ํด ๋ฆฌ์์ค๊ฐ ๋ ๋ง์ด ํฌ์ ๋๋ ์คํ ๋ฐฉ์์ด๋ฉฐ,
๋ง์ฝ Speculative Task ์์ฒด๊ฐ ๋นํจ์จ์ ์ธ Task(Slow Task)๊ฐ ๋๋ค๋ฉด ๋ฆฌ์์ค๋ ์ฐ๊ณ ์๊ฐ์ ์ด์ ๋ ์๋ ์ํฉ์ด ์ด๋๋ ์ ์๋ฐ.
์ฐธ๊ณ
https://spark.apache.org/docs/latest/configuration.html#scheduling
https://kb.databricks.com/scala/understanding-speculative-execution
'๋ฐ์ดํฐ > Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] ์คํํฌ ์ค์ผ์ฅด๋ง (0) | 2024.04.01 |
---|---|
[Spark] cache()์ persist() (0) | 2024.04.01 |
[Spark] Logical Plan ๊ณผ Physical Plan (0) | 2024.03.25 |
[Spark] RDD vs Dataframe (2) | 2024.03.24 |
[Spark] spark-submit ๊ณผ ์ต์ (1) | 2024.03.24 |