๐Ÿฅ

[Spark] Spark Join ์ข…๋ฅ˜ ๋ณธ๋ฌธ

๋ฐ์ดํ„ฐ/Spark

[Spark] Spark Join ์ข…๋ฅ˜

•8• 2024. 3. 18. 00:03

์ŠคํŒŒํฌ์—์„œ Join ์‹œ์—๋Š” ์ƒํ™ฉ์— ๋”ฐ๋ผ Shuffle ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•ด์•ผ ํ•˜๊ณ , ์ด๋Š” executor ์‚ฌ์ด์˜ ๋ฐฉ๋Œ€ํ•œ ๋ฐ์ดํ„ฐ ์ด๋™์„ ์•ผ๊ธฐํ•œ๋‹ค.

์ŠคํŒŒํฌ์—์„œ ์‚ฌ์šฉ๋˜๋Š” ์กฐ์ธ์˜ ๋ฐฉ์‹์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

1. Broadcast Hash Join

๋‘ ๊ฐœ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์กฐ์ธํ•  ๋•Œ ํ•œ ์ชฝ์ด ๋งค์šฐ ์ž‘๊ณ  ํ•œ ์ชฝ์€ ๋งค์šฐ ํฐ ์‚ฌ์ด์ฆˆ์˜ ๋ฐ์ดํ„ฐ์ผ ๋•Œ, ๋” ์ž‘์€ ์ชฝ์˜ ๋ฐ์ดํ„ฐ๊ฐ€ Driver์— ์˜ํ•ด ๋ชจ๋“  executor๋กœ ๋ณต์‚ฌ๋˜๋Š” ๋ฐฉ์‹์ด๋‹ค. 

  1. ์‚ฌ์ด์ฆˆ๊ฐ€ ์ž‘์€ ํ…Œ์ด๋ธ”์˜ ํ‚ค ๊ฐ’์„ ํ•ด์‹œ ํ…Œ์ด๋ธ”๋กœ ๋งŒ๋“ ๋‹ค.
  2. ํ•ด์‹œ ํ…Œ์ด๋ธ”์˜ ๊ฐ’์„ ํฐ ํ…Œ์ด๋ธ” ํŒŒํ‹ฐ์…˜์— ๋ณต์‚ฌํ•ด์ค€๋‹ค.
  3. ๊ฐ ํŒŒํ‹ฐ์…˜ ๋‚ด๋ถ€์—์„œ ์กฐ์ธ์„ ์ง„ํ–‰ํ•œ๋‹ค.

์ด ๋ฐฉ์‹์€ shuffle์„ ์ƒ๋žตํ•  ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ๋ฐ์ดํ„ฐ ์ด๋™์— ๋”ฐ๋ฅธ ์ฝ”์ŠคํŠธ๊ฐ€ ์‚ฌ๋ผ์ง„๋‹ค.

๋ชจ๋“  executor์— ๋ฐ์ดํ„ฐ๋ฅผ ๋ณต์‚ฌํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋ณต์‚ฌํ•˜๋ ค๋Š” dataset์ด ์ผ์ • ํฌ๊ธฐ๋ณด๋‹ค ๋” ์ž‘์•„์•ผ broadcast hash join์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ, ์ด ๊ฐ’์€ `spark.sql.autoBroadcastJoinThreshold` ์—์„œ ์„ค์ • ๊ฐ€๋Šฅํ•˜๋‹ค. (default 10MB)

 

 

2. Shuffle Sort Merge Join

์ด๋ฆ„ ๊ทธ๋Œ€๋กœ ๋ฐ์ดํ„ฐ๋ฅผ shuffle ํ•˜๊ณ  sort ํ•ด์„œ merge ํ•˜๋Š” join ๋ฐฉ์‹์ด๋‹ค.

`spark.sql.autoBroadcastJoinThreshold` ๊น‚์ด -1๋กœ ์„ค์ •๋˜์–ด ์žˆ๋‹ค๋ฉด ํ•ด๋‹น ์กฐ์ธ์„ ๋จผ์ € ์ˆ˜ํ–‰ํ•œ๋‹ค.

  1. Dataset์—์„œ ๋™์ผํ•œ key๋ฅผ ๊ฐ€์ง„ ๋ ˆ์ฝ”๋“œ๊ฐ€ ๋™์ผ partition์— ์œ„์น˜ํ•  ์ˆ˜ ์žˆ๊ฒŒ๋” shuffle์„ ์ˆ˜ํ–‰ํ•œ๋‹ค. (shuffle)
    ์ด๋ถ€๋ถ„์€ ๋‘ ๋ฐ์ดํ„ฐ์…‹์—์„œ ๊ณตํ†ต๋œ "์ •๋ ฌ๋œ"์ปฌ๋Ÿผ์ด ์ €์žฅ๋œ partitioned bucket์„ ๋งŒ๋“ค๋ฉด ์ƒ๋žต์ด ๊ฐ€๋Šฅํ•˜๋‹ค -> ํ™•์ธํ•„์š”
  2. executor ๋‚ด๋ถ€์—์„œ ๋‘ ๋ฐ์ดํ„ฐ๋ฅผ key๊ฐ’์— ๋”ฐ๋ผ ์ •๋ ฌํ•œ๋‹ค (sort)
  3. ์ •๋ ฌ๋œ key๊ฐ’์„ ๋Œ๋ฉด์„œ key๊ฐ€ ์ผ์น˜ํ•˜๋Š” row๋ผ๋ฆฌ joinํ•œ๋‹ค. (merge)

 

3. Shuffle Hash Join

์ด ์กฐ์ธ ๋ฐฉ๋ฒ•์€ Broadcastํ•˜์ง€ ์•Š๋Š” shuffle hash join์ด๋‹ค. ์ฆ‰, ๋‘ ํ…Œ์ด๋ธ” ๋ชจ๋‘ ์ถฉ๋ถ„ํžˆ ์ž‘์ง€ ์•Š์€ ๊ฒฝ์šฐ ์ˆ˜ํ–‰๋œ๋‹ค.

๋งˆ์ฐฌ๊ฐ€์ง€๋กœ  shuffle์„ ํ†ตํ•ด ๊ฐ™์€ key๋ฅผ ๊ฐ€์ง„ row๋ฅผ ๋™์ผํ•œ executor์— ๋ฐฐ์น˜์‹œํ‚จ๋‹ค. ์ดํ›„ ๊ฐ executor์—์„œ hash join์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

 

 

sparkSQL ์˜ตํ‹ฐ๋งˆ์ด์ €๊ฐ€ ์•Œ์•„์„œ ์ž˜ ์ ํ•ฉํ•œ ์กฐ์ธ ๋ฐฉ๋ฒ•์„ ์ฐพ์•„์ฃผ๊ฒ ์ง€๋งŒ ์‚ฌ์šฉ์ž๊ฐ€ ์ƒ๊ฐํ•˜๊ธฐ์— Broadcast join์ด ๋” ์ ํ•ฉํ•˜๋‹ค๊ณ  ์ƒ๊ฐ๋  ๋•Œ์—๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด Dataframe API๋ฅผ ํ†ตํ•ด ์˜ตํ‹ฐ๋งˆ์ด์ €์—๊ฒŒ ํžŒํŠธ๋ฅผ ์ œ๊ณตํ•  ์ˆ˜ ์žˆ๋‹ค.

A.join(broadcast(B), some_condition)

 

 

์ŠคํŒŒํฌ ์กฐ์ธ ์ „๋žต ๊ด€๋ จํ•˜์—ฌ ์•„๋ž˜์—์„œ ์ž˜ ์„ค๋ช…๋˜์–ด ์žˆ๋‹ค.

https://blog.clairvoyantsoft.com/apache-spark-join-strategies-e4ebc7624b06