๐ฅ
[Spark] Accumulator์ Broadcast (๊ณต์ ๋ณ์) ๋ณธ๋ฌธ
์ฐธ๊ณ : https://spark.apache.org/docs/latest/rdd-programming-guide.html#shared-variables
๊ณต์ ๋ณ์
์คํํฌ์์๋ ๋ถ์ฐํ์ฌ ๋ณ๋ ฌ์ฒ๋ฆฌํ๊ธฐ ์ํด ๋ฐ์ดํฐ๋ฅผ ๋ถํ ํด์(ํํฐ์ ๋) ์ฌ๋ฌ ๋จธ์ ์์ ๋์์ ์ฒ๋ฆฌํ๋ค. ์ด๋ ์ฌ์ฉ๋ ๋ชจ๋ ๋ณ์๋ ๋ฐฐํฌ๋ ๋ ๋ณต์ฌ๋ ๋ณต์ฌ๋ณธ์ผ๋ก ์๋ํ๋๋ฐ, ์ด๋ฌํ ๋ณ์์ ์ ๋ฐ์ดํธ๋ driver์ ๋ค์ ์ ๋ฌ๋์ง ์๋๋ค. ๊ทธ๋ฌ๋ ์ด๋ค ์กฐ๊ฑด(?) ์ ๊ฐ์ง๊ณ ์์ด์ผ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ ๊ฒฝ์ฐ๊ฐ ๋ฐ์ํ๋๋ฐ, ์ด๋ด๋ ์ ํ๋ ์ ํ์ ๊ณต์ ๋ณ์๋ฅผ ์ฌ์ฉํ์ฌ ๋ณ๋ ฌ ์ฒ๋ฆฌ ํ ์ํ๋ ๊ฐ์ ๋ฐ์ ์ ์๋ค.
Broadcast Variable
๋ธ๋ก๋์บ์คํธ ๋ณ์๋ฅผ ์ฌ์ฉํ๋ฉด ๋ณ์์ ๋ณต์ฌ๋ณธ์ ์ ๋ฌํ๋๊ฒ ์๋, Read Only ๋ณ์๋ฅผ ๊ฐ ์์ปค ๋ ธ๋์ ์บ์๋ ์ํ๋ก ์ ์งํ ์ ์๋ค.
์ก์ ๋ง๋ค ์ฌ์ ์ก ์์ด ์ฌ์ฉํ ์ ์๋ ๋ณ์์ด๋ค. ๋ธ๋ก๋์บ์คํธ๋ ํ์๋ ๊ฐ์ฒด๋ฅผ ์์ ํด์๋ ์๋๋ค.
>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]
์ด๋ ๊ฒ ๋๋ฉด broadcastVar์ ๋ชจ๋ ์์ปค๋ ธ๋์ ์บ์๋๋ค.
`.unpersist()`: ๋ธ๋ก๋์บ์คํธ ๋ณ์๊ฐ executor์ ๋ณต์ฌ๋ ๋ฆฌ์์ค๋ฅผ ํด์ ํ ๋ ํธ
`.destroy()`: ๋ธ๋ก๋์บ์คํธ ๋ณ์์์ ์ฌ์ฉํ๋ ๋ชจ๋ ๋ฆฌ์์ค๋ฅผ ์๊ตฌ์ ์ผ๋ก ํด์ ํ ๋ ํธ์ถ
์ฌ๋ฌ ๋จ๊ณ์ ์์ ์ ๋์ผํ ๋ฐ์ดํฐ๊ฐ ํ์ํ ๊ฒฝ์ฐ์ ์ฌ์ฉํ๋ฉด ์ข๋ค.
Accumulator
๋ถ์ฐ ํ๊ฒฝ์ executor node ์ ์ฒด์ ๊ฐ์ ๋์ ํ ์ ์๋ ๊ณต์ ๋ณ์์ด๋ค.
๊ฐ executor ๋ณ๋ก ์ฒ๋ฆฌํ ๋ด์ญ์ Accumulator๋ผ๋ ๋ถ์ฐ ๊ณต์ ํ ๋ณ์์ ๊ฐฑ์ ํ์ฌ ์ฒ๋ฆฌํ ๊ฒฐ๊ณผ๋ฅผ ์ถํ driver์์ ๋จธ์งํ ์ ์๋ค.
์ฃผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ง๊ณํ๊ฑฐ๋ ๊ณ์ฐ ์ค ์ ์ญ ํต๊ณ๋ฅผ ์ถ์ ํ ๋ ์ฌ์ฉํ๋ค.
์๋์ ๊ฐ์ด ์ด๊ธฐ๊ฐ์ ์ค์ ํ ์ ์๋ค.
accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value
# result: 10
์ผ๋ฐ ๋ณ์์ ๋ฌ๋ฆฌ accumulator ๋ณ์๋ executor๊ฐ ์ด ๋ณ์์ `add` ๋ฑ์ ๋ฉ์๋๋ฅผ ํตํด ๊ฐ์ ์ ๋ฐ์ดํธํ ์๋ ์์ง๋ง ๊ฐ ์์ฒด๋ฅผ ์ฝ์ด์ฌ ์๋ ์๊ณ , driver ์์๋ง `value` ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ์ฝ์ด์ฌ ์ ์๋ค.
Int ๊ฐ๋ง ์ง์ํ์ง๋ง ์ฌ์ฉ์๊ฐ ์ง์ Accumulator์ ๋ง๋ค์๋ ์๋ค.
accumulator์ ๋ํ ์ ๋ฐ์ดํธ๊ฐ action ๋ด์์ ์ํ๋๊ธฐ ๋๋ฌธ์ ์์ ๋น ํ ๋ฒ๋ง ์ด๋ฃจ์ด์ง๋๋ก ๋ณด์ฅํ๋ค. ์ฆ retry๋ tasks ํด๋น ๊ฐ์ ์ ๋ฐ์ดํธํ์ง ์๋๋ค. ๊ทธ๋ฌ๋ transformation ์์ ์ค accumualtor ๊ฐ์ ์ ๊ทผํ๋ ค๊ณ ํ๋ฉด ์์์น ๋ชปํ ๊ฐ์ ์ป์ ์ ์๋ค.
์๋ฅผ๋ค์ด ์์ ์์๊ฐ์ ๊ฒฝ์ฐ `map` ์์์ accumulator์ ์ ๋ฐ์ดํธ๋ฅผ ์ํํ๊ณ ์์ผ๋ action์ด ์ด๋ฃจ์ด์ง์ง ์์๊ธฐ ๋๋ฌธ์ `va` ๊ฐ์ ํญ์ 0์ด๋ค.
AccumulatorParam
์ค์นผ๋ผ์ ๊ฒฝ์ฐ `AccumulatorV2`๋ฅผ, ํ์ด์ฌ์์๋ `AccumulatorParam`์ ์์๋ฐ์ ์ฌ์ฉ์ ์ ์ Accumulator๋ฅผ ๋ง๋ ๋ค.
AccumulatorParam์์๋ `zero`์ `addInPlace`๋ง ๊ตฌํํด์ฃผ๋ฉด ๋๋๋ฐ, pyspark ์์ ๋ ์๋์ ๊ฐ๋ค: ์ถ์ฒ
from pyspark.accumulators import AccumulatorParam
class VectorAccumulatorParam(AccumulatorParam):
def zero(self, value):
return [0.0] * len(value)
def addInPlace(self, val1, val2):
for i in range(len(val1)):
val1[i] += val2[i]
return val1
va = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam())
va.value
# result: [1.0, 2.0, 3.0]
def g(x):
global va
va += [x] * 3
rdd = sc.parallelize([1,2,3])
rdd.foreach(g)
va.value
# result: [7.0, 8.0, 9.0]
`zero(value)`: ์ ๋ก๊ฐ์ ์ด๋ค ๊ฐ์ผ๋ก ํ ๊ฒ์ธ์ง ์ค์ ํ๋ค.
`addInPlace(value1, value2)`: value1์ ํ์ฌ accumulator์์ ๊ฐ์ง๊ณ ์๋ ๊ฐ, value2๋ ๊ฐฑ์ ์ ์์ฒญํ๋ ๊ฐ์ด๋ค.
executor ๋ณ๋ก partition์ ์ฒ๋ฆฌํ๋๋ก ๊ตฌ์ฑํ ๋ accumulator๋ฅผ ํ์ฉํ ์ ์๋ค.
ํ์ฉ ์์
ETL ํ๋ก์ธ์ค ์ค ๋ฐ์ดํฐ ํ์ง ๋ชจ๋ํฐ๋ง๊ณผ ๊ฐ์ ์์ ์ด ํ์ํ ๋ accumulator๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
์๋ฅผ ๋ค์ด ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์ค ์ ํจํ์ง ์์ ์ด๋ฉ์ผ ์ฃผ์์ ์๋ฅผ ๋ชจ๋ํฐ๋ง ํ๋ ๊ฒฝ์ฐ์, invalidEmails ๋ผ๋ accumulator๋ฅผ ๋ง๋ค๊ณ , ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์ฌ ๋ ๊ฐ row๋ง๋ค ์ ํจํ์ง ์์ ์ด๋ฉ์ผ์ ๋ฐ๊ฒฌํ ๋๋ง๋ค accumulator์ ๊ฐ์ ์ถ๊ฐํ๋ค.
'๋ฐ์ดํฐ > Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] Spark Structured Streaming - Fault Tolerance (0) | 2024.04.04 |
---|---|
[Spark] Spark Structured Streaming ๊ฐ์ (0) | 2024.04.04 |
[Spark] SQL Hint (0) | 2024.04.01 |
[Spark] ์คํํฌ ์ค์ผ์ฅด๋ง (0) | 2024.04.01 |
[Spark] cache()์ persist() (0) | 2024.04.01 |