๐Ÿฅ

[Spark] Accumulator์™€ Broadcast (๊ณต์œ ๋ณ€์ˆ˜) ๋ณธ๋ฌธ

๋ฐ์ดํ„ฐ/Spark

[Spark] Accumulator์™€ Broadcast (๊ณต์œ ๋ณ€์ˆ˜)

•8• 2024. 4. 1. 20:55

์ฐธ๊ณ : https://spark.apache.org/docs/latest/rdd-programming-guide.html#shared-variables

๊ณต์œ ๋ณ€์ˆ˜

์ŠคํŒŒํฌ์—์„œ๋Š” ๋ถ„์‚ฐํ•˜์—ฌ ๋ณ‘๋ ฌ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ๋ถ„ํ• ํ•ด์„œ(ํŒŒํ‹ฐ์…”๋‹) ์—ฌ๋Ÿฌ ๋จธ์‹ ์—์„œ ๋™์‹œ์— ์ฒ˜๋ฆฌํ•œ๋‹ค. ์ด๋•Œ ์‚ฌ์šฉ๋œ ๋ชจ๋“  ๋ณ€์ˆ˜๋Š” ๋ฐฐํฌ๋  ๋•Œ ๋ณต์‚ฌ๋œ ๋ณต์‚ฌ๋ณธ์œผ๋กœ ์ž‘๋™ํ•˜๋Š”๋ฐ, ์ด๋Ÿฌํ•œ ๋ณ€์ˆ˜์˜ ์—…๋ฐ์ดํŠธ๋Š” driver์— ๋‹ค์‹œ ์ „๋‹ฌ๋˜์ง€ ์•Š๋Š”๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์–ด๋–ค ์กฐ๊ฑด(?) ์„ ๊ฐ€์ง€๊ณ  ์žˆ์–ด์•ผ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” ๊ฒฝ์šฐ๊ฐ€ ๋ฐœ์ƒํ•˜๋Š”๋ฐ, ์ด๋Ÿด๋•Œ ์ œํ•œ๋œ ์œ ํ˜•์˜ ๊ณต์œ  ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ํ›„ ์›ํ•˜๋Š” ๊ฐ’์„ ๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค.

 

Broadcast Variable

๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋ณ€์ˆ˜์˜ ๋ณต์‚ฌ๋ณธ์„ ์ „๋‹ฌํ•˜๋Š”๊ฒŒ ์•„๋‹Œ, Read Only ๋ณ€์ˆ˜๋ฅผ ๊ฐ ์›Œ์ปค ๋…ธ๋“œ์— ์บ์‹œ๋œ ์ƒํƒœ๋กœ ์œ ์ง€ํ•  ์ˆ˜ ์žˆ๋‹ค.

์•ก์…˜๋งˆ๋‹ค ์žฌ์ „์†ก ์—†์ด ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๋ณ€์ˆ˜์ด๋‹ค. ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ๋œ ํ›„์—๋Š” ๊ฐ์ฒด๋ฅผ ์ˆ˜์ •ํ•ด์„œ๋Š” ์•ˆ๋œ๋‹ค.

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]

์ด๋ ‡๊ฒŒ ๋˜๋ฉด broadcastVar์€ ๋ชจ๋“  ์›Œ์ปค๋…ธ๋“œ์— ์บ์‹œ๋œ๋‹ค.

`.unpersist()`: ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ ๋ณ€์ˆ˜๊ฐ€ executor์— ๋ณต์‚ฌ๋œ ๋ฆฌ์†Œ์Šค๋ฅผ ํ•ด์ œํ•  ๋•Œ ํ˜ธ

`.destroy()`: ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ ๋ณ€์ˆ˜์—์„œ ์‚ฌ์šฉํ•˜๋Š” ๋ชจ๋“  ๋ฆฌ์†Œ์Šค๋ฅผ ์˜๊ตฌ์ ์œผ๋กœ ํ•ด์ œํ•  ๋•Œ ํ˜ธ์ถœ

์—ฌ๋Ÿฌ ๋‹จ๊ณ„์˜ ์ž‘์—…์— ๋™์ผํ•œ ๋ฐ์ดํ„ฐ๊ฐ€ ํ•„์š”ํ•œ ๊ฒฝ์šฐ์— ์‚ฌ์šฉํ•˜๋ฉด ์ข‹๋‹ค.

 

Accumulator

๋ถ„์‚ฐ ํ™˜๊ฒฝ์˜ executor node ์ „์ฒด์— ๊ฐ’์„ ๋ˆ„์ ํ•  ์ˆ˜ ์žˆ๋Š” ๊ณต์œ  ๋ณ€์ˆ˜์ด๋‹ค.

๊ฐ executor ๋ณ„๋กœ ์ฒ˜๋ฆฌํ•œ ๋‚ด์—ญ์„ Accumulator๋ผ๋Š” ๋ถ„์‚ฐ ๊ณต์œ ํ˜• ๋ณ€์ˆ˜์— ๊ฐฑ์‹ ํ•˜์—ฌ ์ฒ˜๋ฆฌํ•œ ๊ฒฐ๊ณผ๋ฅผ ์ถ”ํ›„ driver์—์„œ ๋จธ์ง€ํ•  ์ˆ˜ ์žˆ๋‹ค.

์ฃผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ง‘๊ณ„ํ•˜๊ฑฐ๋‚˜ ๊ณ„์‚ฐ ์ค‘ ์ „์—ญ ํ†ต๊ณ„๋ฅผ ์ถ”์ ํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.

์•„๋ž˜์™€ ๊ฐ™์ด ์ดˆ๊ธฐ๊ฐ’์„ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value
# result: 10

 

์ผ๋ฐ˜ ๋ณ€์ˆ˜์™€ ๋‹ฌ๋ฆฌ accumulator ๋ณ€์ˆ˜๋Š” executor๊ฐ€ ์ด ๋ณ€์ˆ˜์— `add` ๋“ฑ์˜ ๋ฉ”์†Œ๋“œ๋ฅผ ํ†ตํ•ด ๊ฐ’์„ ์—…๋ฐ์ดํŠธํ•  ์ˆ˜๋Š” ์žˆ์ง€๋งŒ ๊ฐ’ ์ž์ฒด๋ฅผ ์ฝ์–ด์˜ฌ ์ˆ˜๋Š” ์—†๊ณ , driver ์—์„œ๋งŒ `value` ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ฝ์–ด์˜ฌ ์ˆ˜ ์žˆ๋‹ค.

Int ๊ฐ’๋งŒ ์ง€์›ํ•˜์ง€๋งŒ ์‚ฌ์šฉ์ž๊ฐ€ ์ง์ ‘ Accumulator์„ ๋งŒ๋“ค์ˆ˜๋„ ์žˆ๋‹ค.

accumulator์— ๋Œ€ํ•œ ์—…๋ฐ์ดํŠธ๊ฐ€ action ๋‚ด์—์„œ ์ˆ˜ํ–‰๋˜๊ธฐ ๋•Œ๋ฌธ์— ์ž‘์—… ๋‹น ํ•œ ๋ฒˆ๋งŒ ์ด๋ฃจ์–ด์ง€๋„๋ก ๋ณด์žฅํ•œ๋‹ค. ์ฆ‰ retry๋œ tasks ํ•ด๋‹น ๊ฐ’์„ ์—…๋ฐ์ดํŠธํ•˜์ง€ ์•Š๋Š”๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ transformation ์ž‘์—… ์ค‘ accumualtor ๊ฐ’์— ์ ‘๊ทผํ•˜๋ ค๊ณ  ํ•˜๋ฉด ์—์ƒ์น˜ ๋ชปํ•œ ๊ฐ’์„ ์–ป์„ ์ˆ˜ ์žˆ๋‹ค.

์˜ˆ๋ฅผ๋“ค์–ด ์œ„์˜ ์˜ˆ์‹œ๊ฐ™์€ ๊ฒฝ์šฐ `map` ์•ˆ์—์„œ accumulator์˜ ์—…๋ฐ์ดํŠธ๋ฅผ ์ˆ˜ํ–‰ํ•˜๊ณ  ์žˆ์œผ๋‚˜ action์ด ์ด๋ฃจ์–ด์ง€์ง€ ์•Š์•˜๊ธฐ ๋•Œ๋ฌธ์— `va` ๊ฐ’์€ ํ•ญ์ƒ 0์ด๋‹ค.

 

AccumulatorParam

์Šค์นผ๋ผ์˜ ๊ฒฝ์šฐ `AccumulatorV2`๋ฅผ, ํŒŒ์ด์ฌ์—์„œ๋Š” `AccumulatorParam`์„ ์ƒ์†๋ฐ›์•„ ์‚ฌ์šฉ์ž ์ •์˜ Accumulator๋ฅผ ๋งŒ๋“ ๋‹ค.

AccumulatorParam์—์„œ๋Š” `zero`์™€ `addInPlace`๋งŒ ๊ตฌํ˜„ํ•ด์ฃผ๋ฉด ๋˜๋Š”๋ฐ, pyspark ์˜ˆ์ œ๋Š” ์•„๋ž˜์™€ ๊ฐ™๋‹ค: ์ถœ์ฒ˜

from pyspark.accumulators import AccumulatorParam
class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return [0.0] * len(value)
    def addInPlace(self, val1, val2):
        for i in range(len(val1)):
             val1[i] += val2[i]
        return val1
va = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam())
va.value
# result: [1.0, 2.0, 3.0]

def g(x):
    global va
    va += [x] * 3
rdd = sc.parallelize([1,2,3])
rdd.foreach(g)
va.value
# result: [7.0, 8.0, 9.0]

 

`zero(value)`: ์ œ๋กœ๊ฐ’์„ ์–ด๋–ค ๊ฐ’์œผ๋กœ ํ•  ๊ฒƒ์ธ์ง€ ์„ค์ •ํ•œ๋‹ค.

`addInPlace(value1, value2)`: value1์€ ํ˜„์žฌ accumulator์—์„œ ๊ฐ€์ง€๊ณ  ์žˆ๋Š” ๊ฐ’, value2๋Š” ๊ฐฑ์‹ ์„ ์š”์ฒญํ•˜๋Š” ๊ฐ’์ด๋‹ค.

 

executor ๋ณ„๋กœ partition์„ ์ฒ˜๋ฆฌํ•˜๋„๋ก ๊ตฌ์„ฑํ•  ๋•Œ accumulator๋ฅผ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

 

ํ™œ์šฉ ์˜ˆ์‹œ

https://medium.com/@ARishi/mastering-accumulators-in-apache-spark-and-not-screwing-yourself-in-the-process-8708cdb4de27

ETL ํ”„๋กœ์„ธ์Šค ์ค‘ ๋ฐ์ดํ„ฐ ํ’ˆ์งˆ ๋ชจ๋‹ˆํ„ฐ๋ง๊ณผ ๊ฐ™์€ ์ž‘์—…์ด ํ•„์š”ํ•  ๋•Œ accumulator๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์ค‘ ์œ ํšจํ•˜์ง€ ์•Š์€ ์ด๋ฉ”์ผ ์ฃผ์†Œ์˜ ์ˆ˜๋ฅผ ๋ชจ๋‹ˆํ„ฐ๋ง ํ•˜๋Š” ๊ฒฝ์šฐ์—, invalidEmails ๋ผ๋Š” accumulator๋ฅผ ๋งŒ๋“ค๊ณ , ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์˜ฌ ๋•Œ ๊ฐ row๋งˆ๋‹ค ์œ ํšจํ•˜์ง€ ์•Š์€ ์ด๋ฉ”์ผ์„ ๋ฐœ๊ฒฌํ•  ๋•Œ๋งˆ๋‹ค accumulator์— ๊ฐ’์„ ์ถ”๊ฐ€ํ•œ๋‹ค.

'๋ฐ์ดํ„ฐ > Spark' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

[Spark] Spark Structured Streaming - Fault Tolerance  (0) 2024.04.04
[Spark] Spark Structured Streaming ๊ฐœ์š”  (0) 2024.04.04
[Spark] SQL Hint  (0) 2024.04.01
[Spark] ์ŠคํŒŒํฌ ์Šค์ผ€์ฅด๋ง  (0) 2024.04.01
[Spark] cache()์™€ persist()  (0) 2024.04.01