๐ฅ
[Spark] ์คํํฌ์ Executor Memory ๊ตฌ์กฐ ๋ณธ๋ฌธ
์ฐธ๊ณ : https://community.cloudera.com/t5/Community-Articles/Spark-Memory-Management/ta-p/317794
Spark Executor Memory ๊ตฌ์กฐ
Executor Container์ ๋ฉ๋ชจ๋ฆฌ ๊ตฌ์กฐ๋ ์ด๋ฏธ์ง์ ๊ฐ์ด ํฌ๊ฒ
- Memory Inside of JVM: spark.executor.memory
- Memory Outside of JVM: spark.yarn.executor.MemroyOverHead
๋ก ๋๋๋ค.
Object์ ์ฝ๊ธฐ/์ฐ๊ธฐ ์๋๋ `on-heap > off-heap > disk` ์์๋ก ๋น ๋ฅด๋ค.
Memory Inside of JVM (=On-Heap Memory = In-Memory)
java GC ๊ด๋ฆฌ ํ์ ์๋ ๋ฉ๋ชจ๋ฆฌ ์์ญ์ด๋ค. ์ดํ๋ฆฌ์ผ์ด์ ๋๋ถ๋ถ์ ๋ฐ์ดํฐ๊ฐ on-heap ๋ฉ๋ชจ๋ฆฌ์ ์ ์ฅ๋๋ค.
`spark.executor.memory`๋ ์คํํฌ์ ์ค์ ์ต์ ์ค ํ๋๋ก, executor ์ ํ ๋น๋๋ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์ค์ ํ๋๋ฐ์ ์ฌ์ฉ๋๋ conf๊ฐ์ด๋ค. ๋ฉ๋ชจ๋ฆฌ ๊ฐ์ด ๋๋ฌด ์์ผ๋ฉด ์์ ์ด ์ถฉ๋ถํ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์ฌ์ฉํ์ง ๋ชปํด ์ฑ๋ฅ ์ ํ ๋๋ OOM์ด ๋ฐ์ํ ์ ์๊ณ , ๋๋ฌด ํฌ๋ฉด ํด๋ฌ์คํฐ์ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ด ๋์์ ธ ๋ค๋ฅธ ์ดํ๋ฆฌ์ผ์ด์ ์ ์ํฅ์ ์ค ์ ์๋ค.
on-heap ๋ฉ๋ชจ๋ฆฌ์์๋ ์ธ ๊ฐ์ง ๋ฉ๋ชจ๋ฆฌ ์์ญ์ด ์๋๋ฐ ์ ์ด๋ฏธ์ง์ ๊ฐ๋ค.
Spark Memory
์คํํฌ์์ ๊ด๋ฆฌํ๋ ๋ฉ๋ชจ๋ฆฌ ์์ญ์ผ๋ก, ์คํํฌ๊ฐ ์ธ๋ฉ๋ชจ๋ฆฌ ์ปดํจํ ์ ํ ๋ ์ค๊ฐ ์ํ๋ฅผ spark memory์ ์ ์ฅํ๋ค.
(Java Heap — Reserved Memory) * spark.memory.fraction
์ฒซ ๋ฒ์งธ ์ด๋ฏธ์ง ์ฌ์ง์์ ๋ณด๋ฉด spark memory ์์ญ์ด ๋ ๊ฐ๋ก ๋๋์ด ์๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
- `spark.memory.fraction - spark.memory.storageFraction`: Execution
- `spark.memory.storageFraction`: Storage
fraction ์์ญ์ด ์ ํ, ์กฐ์ธ, ์ ๋ ฌ, ์ง๊ณ ๋ฑ ์ฐ์ฐ ์์ ์ ์ฌ์ฉ๋๊ณ , storageFraction ์์ญ์ผ๋ก ํ์๋ ๋ถ๋ถ์ด ๋ฐ์ดํฐ ํํฐ์ ์ ์บ์ํ๋ ๋ฐ์ ์ฌ์ฉ๋๋ค.
์ด ์์ญ์ `spark.memory.stroageFraction` ๊ฐ์ผ๋ก ์กฐ์ ํ ์ ์์ผ๋ฉฐ ๊ธฐ๋ณธ๊ฐ์ 0.5 (50%) ์ด๋ค.
a. Storage Memory
storage memory๋ ์บ์ ๋ฐ์ดํฐ, broadcast variable ๋ฑ์ ์ ์ฅํ๋๋ฐ์ ์ฌ์ฉ๋๋ค. serialized data -> deserialized ๋ก ๋ณํํ๋๋ฐ์ ์ฌ์ฉ๋๋ ํ๋ก์ธ์ค์ธ " unroll" ๋ storage memory์ ์ฌ๋ผ๊ฐ์๋ค.
(Java Heap — Reserved Memory) * spark.memory.fraction * spark.memory.storageFraction
b. Execution Memory
์คํํฌ ์์ ์ ์คํํ๋ ๋์ ํ์ํ object๋ฅผ ์ ์ฅํ๋๋ฐ์ ์ฌ์ฉ๋๋ค.
์ ํ, ์กฐ์ธ, ์ํธ, ์ง๊ณ ๋ฑ ์ฐ์ฐ ์ ์ค๊ฐ ๋ฒํผ(์์ ๋ฐ์ดํฐ)๋ฅผ ์ ์ฅํ๊ฑฐ๋ ํด์ ํ ์ด๋ธ์ execution memory ์ ์ ์ฅํ๋ค.
storage memory๋ณด๋ค ์๋ช ์ด ์งง์์ task๊ฐ ๋๋๋ฉด ๋ฐ๋ก ๋น์์ง๊ณ ๋ค์ task ์์ ์ ์ํด์ ๊ณต๊ฐ์ ๋ง๋ค์ด์ค๋ค.
(Java Heap — Reserved Memory) * spark.memory.fraction * (1.0 - spark.memory.storageFraction)
User Memory
user memory๋ user defined data structure, ์คํํฌ ๋ด๋ถ ๋ฉํ๋ฐ์ดํฐ, UDF, RDD ๋ณํ์์ ์ ํ์ํ ๋ฐ์ดํฐ(RDD ์ข ์์ฑ ๋ฑ)๋ฅผ ์ ์ฅํ๋๋ฐ์ ์ฌ์ฉ๋๋ ๋ฉ๋ชจ๋ฆฌ์ด๋ค.
(Java Heap — Reserved Memory) * (1.0 — spark.memory.fraction)
Reserved Memory
์์คํ ์ฉ์ผ๋ก ์์ฝ๋ ๋ฉ๋ชจ๋ฆฌ ์์ญ์ผ๋ก, Spark์ ๋ด๋ถ object๋ฅผ ์ ์ฅํ๋๋ฐ์ ์ฌ์ฉ๋๋ค.
300MB๋ก ํ๋์ฝ๋ฉ ๋์ด ์์ผ๋ฉฐ, ๊ด๋ จ ๊ฐ์ ๋ฐ๊พธ๊ณ ์คํํฌ๋ฅผ ๋ค์ ์ปดํ์ผํด์ ๋ณ๊ฒฝ๋ ๊ฐ์ ์ฌ์ฉํ ์๋ ์์ง๋ง ์ด์ ํ๊ฒฝ์์๋ ์ถ์ฒํ์ง ์๋๋ค๊ณ ํ๋ค.
// https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala#L198
val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
์ถ๊ฐ๋ก `spark.executor.memory` < `Reserved Memory * 1.5 ` ์ธ ๊ฒฝ์ฐ session์ ์ด๊ธฐํํ์ง ๋ชปํ๊ณ ์คํจ๋๋ค.
Memory Outside of JVM (=Off-Heap Memory = External-Memory)
* ์ฐธ๊ณ
์คํํฌ 2.2 ์ดํ: `spark.yarn.executor.memoryOverhead`
์คํํฌ 2.3 ์ด์: `spark.executor.memoryOverhead`
cluster manager๊ฐ yarn ๋ฟ๋ง ์๋๋ผ kubernetes ๋ฑ ๋ค์ํ๊ฒ ์๊ธฐ ๋๋ฌธ์ yarndp ์ข ์๋๋ฏํ ์ค์ ๋ช ์ ์ผ๋ฐ์ ์ค์ ๋ช ์ผ๋ก ๋ณ๊ฒฝํ๋ค๊ณ ํจ
๋ฆฌ์์ค ๋งค๋์ ๊ด๋ฆฌ ํ์ ๋ฉ๋ชจ๋ฆฌ
JVM GC ๊ด๋ฆฌ ๋ฒ์ ๋ฐ์ ์๋ ๋ฉ๋ชจ๋ฆฌ ์์ญ์ด๋ค.
๊ธฐ๋ณธ์ ์ผ๋ก executor memory๋ฅผ ์ค์ ํ๋ฉด memoryOverhead ํฌ๊ธฐ๋ ์๋์ ๊ฐ์ด ์ค์ ๋๋ค.
MAX(spark.executor.memory*0.1, 384MB)
`spark.executor.memoryOverhead` ๊ฐ ์ง์ ์ ํตํด ์ง์ ์ค์ ํด์ค ์๋ ์๋ค.
On-Heap๊ณผ Off-Heap
๋ง์ฝ executor-memory=5g๋ก ์ค์ ํ๋ค๋ฉด
- on-heap: 5G
- off-heap: min(5G*0.1, 384MB) = 500MB
์ด 5GB + 500MB๋ฅผ ํ ๋น๋ฐ๊ฒ ๋๋ค.
๋ฉ๋ชจ๋ฆฌ๊ฐ ๋ถ์กฑํ๋ค๋ ๋ฉ์์ง๊ฐ ์์ ๋ ์ด๋ค ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๋๋ ค์ผ ํ ์ง ๊ณ ๋ฏผ์ด ๋ ๋๊ฐ ์๋ค.
์๋์ ๊ฒฝ์ฐ์๋ executor-memory๋ฅผ ๋๋ ค์ผ ํ๋ค.
- GC๊ฐ ์์ฃผ ๋ฐ์ํ๋ ๊ฒฝ์ฐ: on heap ๋ฉ๋ชจ๋ฆฌ๊ฐ ๋ถ์กฑํด์ ๊ฐ๋น์ง ์ปฌ๋ ์ ์ด ์์ฃผ ๋ฐ์ํ๋ค๋ ๋ป์ด๋ฏ๋ก on heap ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๋๋ ค์ผํ๋ค.
๋ฐ๋๋ก ์๋์ ๊ฒฝ์ฐ์๋ memoryOverhead๋ฅผ ๋๋ ค์ผ ํ๋ค.
- GC๋ ์์ฃผ ๋ฐ์ํ์ง ์์ง๋ง cluster manager์ ์ํด executor ๊ฐ ์๊พธ ์ฃฝ์ ๊ฒฝ์ฐ
Reason: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.โ
Spark Core ๊ด๋ฆฌ ํ์ ๋ฉ๋ชจ๋ฆฌ
Off-heap ๋ฉ๋ชจ๋ฆฌ ๊ด๋ จ ์ค์ ๊ฐ์ด ์๋์ ๊ฐ์ด ์ถ๊ฐ๋ก ๋ ์๋ค.
- spark.memory.offHeap.enabled: ์คํํ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ ์ฌ๋ถ (๋ํดํธ๋ false)
- spark.memory.offHeap.size: ์คํ ํ ์ฌ์ด์ฆ (๋ํดํธ 0)
spark.yarn.executor.memoryOverhead ์์ ์ฐจ์ด์ ์ ์๋์ ๊ฐ๋ค:
- spark.yarn.executor.memoryOverhead: yarn๊ณผ ๊ฐ์ ๋ฆฌ์์ค ๋งค๋์ ์ ์ํด ๊ด๋ฆฌ๋จ
- spark.memory.offHeap.size: spark core์ ์ํด ๊ด๋ฆฌ๋จ (tungsten ์์ง)
spark.yarn.executor.memoryOverhead ์ spark.memory.offHeap.size ์ ๊ด๊ณ๋ ๋ค์๊ณผ ๊ฐ๋ค:
- spark 1.x and 2.x: ์ ์ฒด Off-heap ๋ฉ๋ชจ๋ฆฌ = spark.yarn.executor.memoryOverhead
spark.yarn.executor.memoryOverhead ๊ฐ spark.memory.offHeap.size๋ฅผ ํฌํจํจ - spark 3.x: ์ ์ฒด Off-heap ๋ฉ๋ชจ๋ฆฌ = spark.yarn.executor.memoryOverhead + spark.memory.offHeap.size
Spark์ Tungsten ์์ง์ ๊ฐ๋น์ง ์ปฌ๋ ์ ์ค๋ฒํค๋๋ฅผ ์ต์ํํ๋๋ฐ ๊ธฐ์ฌํ๋ค. GC ์ค๋ฒํค๋๋ฅผ ์ด๋ป๊ฒ ์ต์ํํ๋ค๋ ๊ฒ์ผ๊น,,
์ผ๋ฐ์ ์ผ๋ก JVM์์ GC ๊ฐ ๋ฐ์ํ ๋ stop-the-world ๋ผ๊ณ ํํํ๋ ์ดํ๋ฆฌ์ผ์ด์ ์ค๋จ ํ์์ด ์ผ์ด๋๋ค. ์ด ๊ณผ์ ์ ์ดํ๋ฆฌ์ผ์ด์ ์คํ์ ์ง์ฐ์ํค๊ณ ์ ์ฒด ์คํ ์๊ฐ์ ๋๋ฆฐ๋ค.
Tungsten์์๋ (c์ malloc ๊ฐ์) sun.misc.Unsafe ๊ธฐ๋ฐ์ผ๋ก off heap์ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ํ ๋นํ ์ ์๋๋ก ํ๋ค.
Off heap ๋ฉ๋ชจ๋ฆฌ๋ JVM ๋ฐ๊นฅ์ ์กด์ฌํ๊ธฐ ๋๋ฌธ์ GC ๊ด๋ฆฌ ๋ฐ์ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์ฌ์ฉํ๋ฉด stop-the-world ๋ฅผ ์ต์ํํ ์ ์๋ค๋ ์ ์์ ์ฑ๋ฅ ํฅ์์ ๋ณผ ์ ์์ ๊ฒ ๊ฐ๋ค.
๋จ์ ์ ์๋์ ๊ฐ๋ค.
- on-heap ๋ฉ๋ชจ๋ฆฌ๋ณด๋ค ์ฝ๊ณ ์ฐ๊ธฐ๊ฐ ๋๋ฆฌ๋ค.
- ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์ง์ ์๋์ผ๋ก ๊ด๋ฆฌํด์ฃผ์ด์ผ ํ๋ค.
์คํ ์ค GC ๋จ๊ณ๊ฐ ๋๋ฌด ๊ธธ์ด์ง๋ฉด ์ฐพ์๋ณผ๋งํ๋ค.
'๋ฐ์ดํฐ > Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] spark-submit ๊ณผ ์ต์ (1) | 2024.03.24 |
---|---|
[Spark] Adaptive Query Execution(AQE) (0) | 2024.03.23 |
[Spark] GraphX (0) | 2024.03.18 |
[Spark] Spark Join ์ข ๋ฅ (0) | 2024.03.18 |
[Spark] spark์์ s3 ์ ๊ทผํ๊ธฐ (ACCESS_KEY, SECRET_KEY) (0) | 2023.12.19 |