๐Ÿฅ

[Spark] Adaptive Query Execution(AQE) ๋ณธ๋ฌธ

๋ฐ์ดํ„ฐ/Spark

[Spark] Adaptive Query Execution(AQE)

•8• 2024. 3. 23. 15:45

Spark 2.x ์—์„œ๋Š” rule-based/cost-based์˜ ๋ฐฉ์‹์œผ๋กœ ์ฟผ๋ฆฌ๋ฅผ ์ตœ์ ํ™”ํ•œ๋‹ค.

Spark 3.0๋ถ€ํ„ฐ๋Š” ๋Ÿฐํƒ€์ž„์— ์ตœ์ ํ™”ํ•  ์ˆ˜ ์žˆ๋Š” AQE๊ฐ€ ๋„์ž…๋˜์—ˆ๊ณ , Spark 3.2 ๋ถ€ํ„ฐ๋Š” AQE ํ™œ์„ฑํ™”๊ฐ€ ๋””ํดํŠธ ๋ฒ„์ „์ด๋‹ค.

 

AQE์—์„œ๋Š” ์ฟผ๋ฆฌ๋ฅผ ์ตœ์ ํ™”ํ•˜๊ธฐ ์œ„ํ•ด ํฌ๊ฒŒ ์•„๋ž˜์˜ ์„ธ ๊ฐ€์ง€์˜ ๊ธฐ๋Šฅ์„ ์ง€์›ํ•œ๋‹ค.

  • Coalescing Post shuffle partitions
  • Switching join strategies
  • Optimizing Skew Join

Coalescing Shuffle Partitions

์…”ํ”Œ ํŒŒํ‹ฐ์…˜์˜ ์ˆ˜๋ฅผ ์ตœ์ ํ™”ํ•ด์ฃผ๋Š” ๊ธฐ๋Šฅ์ด๋‹ค. 

์…”ํ”Œ์€ ์ŠคํŒŒํฌ ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜์˜ ์„ฑ๋Šฅ์— ๋งค์šฐ ํฐ ์—ญํ• ์„ ์ฐจ์ง€ํ•˜๊ณ , ๊ธฐ์กด์—๋Š” ์‚ฌ์šฉ์ž๊ฐ€ ์…”ํ”Œ ํฌ๊ธฐ ๋ฐ ํŒŒํ‹ฐ์…˜์„ ์ง์ ‘ ํ™•์ธํ•˜๋ฉฐ ์ˆ˜๋™์œผ๋กœ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ์กฐ์ •ํ•ด์ฃผ์–ด์•ผ ํ–ˆ๋‹ค.

AQE์—์„œ๋Š” ๋Ÿฐํƒ€์ž„ ์ค‘ ์ƒ์„ฑ๋˜๋Š” ๋‹ค์–‘ํ•œ ํ†ต๊ณ„์ž๋ฃŒ๋ฅผ ํ™•์ธํ•˜์—ฌ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ์ตœ์ ํ™”ํ•œ๋‹ค.

 

ํŒŒํ‹ฐ์…˜์˜ ์ˆ˜๋Š” ` spark.sql.adaptive.advisoryPartitionSizeInBytes`๊ฐ€ ๊ฒฐ์ •ํ•˜๋Š”๋ฐ, ๊ธฐ๋ณธ๊ฐ’์€ 64m(64MB)์ด๋‹ค.

ํ‰๊ท ์ ์œผ๋กœ advisoryPartitionSizeInBytes ๊ฐ’ ๊ทผ์ฒ˜์—์„œ partition ํฌ๊ธฐ๊ฐ€ ๊ฒฐ์ •๋  ์ˆ˜ ์žˆ๋„๋ก ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ์กฐ์ •ํ•œ๋‹ค.

 

https://www.slideshare.net/slideshow/adaptive-query-execution-speeding-up-spark-sql-at-runtime/236693931

์ด๋ฏธ์ง€์™€ ๊ฐ™์ด stage๋ฅผ ์ง„ํ–‰ํ•ด๊ฐ€๋ฉฐ ํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ๋ฅผ ๋ชจ๋‹ˆํ„ฐ๋งํ•˜๋ฉด์„œ COALESCE ๋ฅผ ํ†ตํ•ด ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ์กฐ์ •ํ•˜๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

https://blog.cloudera.com/how-does-apache-spark-3-0-increase-the-performance-of-your-sql-workloads/

 

 

https://tech.kakao.com/2022/01/18/aqe-coalescing-post-shuffle-partitions/

 

  • Coalescing Post Shuffle Partition์€ shuffle ์ดํ›„ coalesce๋ฅผ ์ž๋™์œผ๋กœ ํ•ด์ค€๋‹ค.
  • ์ด ๊ธฐ๋Šฅ์€ ๋„ˆ๋ฌด ์ž‘๊ณ  ๋งŽ์€ partition(task)์˜ ์ƒ์„ฑ์„ ๋ฐฉ์ง€ํ•œ๋‹ค.
  • ์ž˜ ํ™œ์šฉํ•˜๋ ค๋ฉด shuffle partition์˜ ์ˆ˜๋ฅผ ์ฒ˜์Œ์— ์ถฉ๋ถ„ํžˆ ํฐ ์ˆ˜๋ฅผ ์„ค์ •ํ•ด์•ผ ํ•œ๋‹ค.

ํ™œ์„ฑํ™” ํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค

- `spark.sql.adaptive.enabled` ๊ฐ’ true๋กœ ์„ค์ •

- `spark.sql.adaptive.coalescePartitions.enabled` ๊ฐ’ true๋กœ ์„ค์ •

Switching Join Strategies

์กฐ์ธ ์ „๋žต์„ ์ตœ์ ํ™”ํ•˜๋Š” ๊ธฐ๋Šฅ์ด๋‹ค.

Spark2.x์—์„œ๋Š” ์ˆ˜ํ–‰ ์ดˆ๊ธฐ ์‹œ์ ์— ์˜ˆ์ธกํ•œ ๋ฐ์ดํ„ฐ์˜ ํฌ๊ธฐ๊ฐ€ ์‹ค์ œ์™€ ๋‹ฌ๋ผ์ ธ ๋‹ค๋ฅธ ์กฐ์ธ ์ „๋žต์ด ์œ ๋ฆฌํ•  ์ˆ˜ ์žˆ์Œ์—๋„ ๋Ÿฐํƒ€์ž„ ์ตœ์ ํ™”๊ฐ€ ๋ถˆ๊ฐ€๋Šฅํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋น„ํšจ์œจ์ ์ธ ์กฐ์ธ ์ „๋žต์œผ๋กœ ์ž‘์—…์ด ์ง„ํ–‰๋  ์ˆ˜ ์žˆ๋‹ค.

์˜ˆ์ธก์€ ์•„๋ž˜์˜ ์ด์œ ๋กœ ํ‹€๋ฆด ์ˆ˜ ์žˆ๋‹ค:

  • cardinality ๋˜๋Š” selectivity ์˜ˆ์ธก์„ ์œ„ํ•œ ํ†ต๊ณ„๊ฐ€ ๋ถ€์ •ํ™•ํ•  ์ˆ˜ ์žˆ์Œ.
  • ๋Œ€์ƒ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ณต์žกํ•œ ์„œ๋ธŒํŠธ๋ฆฌ์ผ ์ˆ˜ ์žˆ์Œ
  • UDFs์™€ ๊ฐ™์ด ๋ธ”๋ž™๋ฐ•์Šค predicates์—ฌ์„œ ์ดˆ๊ธฐ ์‹œ์ ์— ํ†ต๊ณ„์ง€ ๊ณ„์‚ฐ์ด ๋ถˆ๊ฐ€๋Šฅํ•  ์ˆ˜ ์žˆ์Œ

๊ทธ๋Ÿฌ๋‚˜ AQE ๋Š” ์‹คํ–‰์ค‘์— ์ˆ˜์ง‘ํ•œ ๋ฐ์ดํ„ฐ ์ •๋ณด๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ ์ค‘๊ฐ„์— Join ์ „๋žต์„ ๋ณ€๊ฒฝํ•  ์ˆ˜ ์žˆ๋‹ค.

 

Converting Sort-Merge Join to Broadcast Join

 

https://jeevan-madhur22.medium.com/spark-3-0-aqe-dynamically-choosing-join-strategy-explained-part-2-66d0cfb28c42

Sort Merge Join์„ Broadcast join์œผ๋กœ ๋ณ€๊ฒฝํ•˜๋Š” ์˜ˆ์‹œ๋ฅผ ์‚ดํŽด๋ณด๋ฉด ์ด๋ฏธ์ง€์™€ ๊ฐ™๋‹ค.

์˜ค๋ฅธ์ชฝ ํ…Œ์ด๋ธ” ํฌ๊ธฐ๊ฐ€ ์˜ˆ๋ฅผ๋“ค์–ด 25MB๋กœ ์˜ˆ์ธก๋˜์—ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•˜๋ฉด, AQE๊ฐ€ ์—†๋Š” Spark2.x๋ฒ„์ „์—์„œ๋Š” ์ถฉ๋ถ„ํžˆ Broadcast๋ฅผ ํ•  ์ˆ˜ ์žˆ์Œ์—๋„ Sort merge join์œผ๋กœ ์ˆ˜ํ–‰์ด ๋  ๊ฒƒ์ด๋‹ค.

AQE enabled ์‹œ์—๋Š” ์ผ๋ถ€ leaf stage ๋ฅผ ์™„๋ฃŒํ•œ ์ดํ›„ ์˜ค๋ฅธ์ชฝ ํ…Œ์ด๋ธ”์˜ actual size๊ฐ€ 8MB๋ผ๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

์ดํ›„ ์ตœ์ ํ™” ๊ณผ์ •์ด ์ผ์–ด๋‚˜ Broadcast ์กฐ์ธ์œผ๋กœ ๋ณ€๊ฒฝ๋œ๋‹ค.

 

๊ทธ๋Ÿฌ๋‚˜ ์ด๋ฏธ Sort Merge Join์ด ์‹œ์ž‘๋˜์–ด๋ฒ„๋ ธ๋Š”๋ฐ ๋‹ค์‹œ ์กฐ์ธ ์œ ํ˜•์„ ๋ณ€๊ฒฝํ•˜๋Š” ๊ฒŒ ํšจ์œจ์ ์ธ์ง€ ์˜๋ฌธ์ด ์žˆ์„ ์ˆ˜ ์žˆ๋‹ค.

์ฐธ๊ณ : https://spark.apache.org/docs/latest/sql-performance-tuning.html#converting-sort-merge-join-to-broadcast-join

๋„ํ๋จผํŠธ์—์„œ๋Š” Sort Merge Join์„ ๊ณ„์† ๋๊นŒ์ง€ ํ•˜๋Š” ๊ฒƒ ๋ณด๋‹ค๋Š” ๋‚ซ๋‹ค๊ณ  ์„ค๋ช…ํ•˜๊ณ  ์žˆ๋‹ค. `spark.sql.adaptive.localShuffleReader.enabled` ๊ฐ’์ด True๋ผ๋ฉด ์ด๋ฏธ ์…”ํ”Œ๋œ ๋ฐ์ดํ„ฐ๋Š” ๋กœ์ปฌ(executor์ธ์ง€?) ์— ์ €์žฅ๋˜๊ณ  ํ•ด๋‹น ํŒŒ์ผ์€ ๋กœ์ปฌ์—์„œ ์ฝ์–ด ๋„คํŠธ์›Œํฌ ํŠธ๋ž˜ํ”ฝ์„ ์ ˆ์•ฝํ•  ์ˆ˜ ์žˆ๋‹ค.

 

 

Converting sort-merge join to shuffled hash join

์†ŒํŠธ๋จธ์ง€์กฐ์ธ: ์ผ๋‹จ ๋‘ ๊ฐœ์˜ ํ…Œ์ด๋ธ”์ด ์žˆ์„ ๋•Œ ์ผ๋‹จ ์†ŒํŒ…์„ ํ•˜๊ณ  ํ…Œ์ด๋ธ”๊ฐ„ record๋ฅผ ๋น„๊ตํ•จ -> ๋Š๋ฆผ

shuffled hash join: ๋„คํŠธ์›Œํ‚น์„ ํ•  ๋•Œ ์ž๋™์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ™์€ partition์— ๋†“์Œ -> ๋ฐ์ดํ„ฐ ์ž์ฒด๊ฐ€ ์ž‘์„ ๋•Œ ์ž๋™์œผ๋กœ ์ผ์–ด๋‚จ

์†ŒํŒ…์ด ์—†๊ธฐ ๋•Œ๋ฌธ์— ์ƒ๋Œ€์ ์œผ๋กœ sort merge๋ณด๋‹จ ๋น ๋ฆ„

 

Optimizing Skew Join

์ฐธ๊ณ : https://blog.cloudera.com/how-does-apache-spark-3-0-increase-the-performance-of-your-sql-workloads/

ํŒŒํ‹ฐ์…˜ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ถˆ๊ท ํ˜•ํ•˜๋‹ค๋ฉด ์กฐ์ธ ์‹œ ์ฟผ๋ฆฌ ์„ฑ๋Šฅ์„ ์ €ํ•˜์‹œํ‚ฌ ์ˆ˜ ์žˆ๋‹ค.

https://blog.cloudera.com/how-does-apache-spark-3-0-increase-the-performance-of-your-sql-workloads/

์˜ˆ๋ฅผ๋“ค์–ด ์œ„ ์ด๋ฏธ์ง€์˜ ๊ฒฝ์šฐ A0 ํŒŒํ‹ฐ์…˜์— ๋‹ค๋ฅธ ํŒŒํ‹ฐ์…˜๋ณด๋‹ค ๋ฐ์ดํ„ฐ๊ฐ€ ๋ชฐ๋ ค์žˆ์Œ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

AQE ๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š์œผ๋ฉด A1-B1, A2-B2, A3-B3์˜ ์ฒ˜๋ฆฌ๊ฐ€ ๋ชจ๋‘ ์™„๋ฃŒ๋˜์–ด๋„ A0-B0์˜ ์ฒ˜๋ฆฌ๋ฅผ ๊ธฐ๋‹ค๋ ค์•ผ ํ•˜๊ณ  ์ด๋Š” ์ „์ฒด ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜ ์‹คํ–‰ ์‹œ๊ฐ„์— ์˜ํ–ฅ์„ ๋ผ์นœ๋‹ค.

https://blog.cloudera.com/how-does-apache-spark-3-0-increase-the-performance-of-your-sql-workloads/

 AQE๋ฅผ ํ™œ์„ฑํ™”ํ•œ๋‹ค๋ฉด A0๊ณผ ๊ฐ™์€ skew data๋ฅผ ๊ฐ์ง€ํ•˜๊ณ  ๋” ์ž‘์€ ํŒŒํ‹ฐ์…˜์œผ๋กœ ์ชผ๊ฐ ๋‹ค. A0์ด ์ชผ๊ฐœ์ง„ ๊ฐœ์ˆ˜๋งŒํผ B0์€ ๋ณต์ œ๋œ๋‹ค.

์ด๋ ‡๊ฒŒ ๋˜๋ฉด A0-B0์˜ ์กฐ์ธ ์ฒ˜๋ฆฌ ์‹œ๊ฐ„์ด ์ค„์–ด๋“ค๊ณ  ์ „์ฒด ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ฒ˜๋ฆฌ ์†๋„๊ฐ€ ํ–ฅ์ƒ๋  ์ˆ˜ ์žˆ๋‹ค.

 

 

 

ํ™œ์„ฑํ™” ํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค

- `spark.sql.adaptive.enabled` ๊ฐ’ true๋กœ ์„ค์ •

- `spark.sql.adaptive.skewJoin.enabled` ๊ฐ’ true๋กœ ์„ค์ •

 

๊ด€๋ จ ์„ค์ •๊ฐ’์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค

`spark.sql.adaptive.skeyJon.skewedPartitionFactor` 

์Šคํ ํŒŒํ‹ฐ์…˜์ž„์„ ํŒ๋‹จํ•˜๋Š” ๊ฐ’: ํŒŒํ‹ฐ์…˜ ํฌ๊ธฐ์˜ ์ค‘์•™๊ฐ’ * skewedPartitionFactor ๋ณด๋‹ค ํฌ๋‹ค๋ฉด ํ•ด๋‹น ํŒŒํ‹ฐ์…˜์€ skew partition์œผ๋กœ ๊ฐ„์ฃผํ•œ๋‹ค.

` Spark.sql.adaptive.skewedPartitionThresholdInBytes`

์ ˆ๋Œ€๊ฐ’์œผ๋กœ skew๊ฐ€ ๋ฌด์‹œ๋˜๋Š” ์ž„๊ณ„๊ฐ’์ด๋‹ค. ๋„ˆ๋ฌด ์ž‘์€ ํŒŒํ‹ฐ์…˜์ด ๋ถ„ํ• ๋˜๋Š” ๊ฒƒ์„ ๋ง‰๊ธฐ ์œ„ํ•ด ์„ค์ •ํ•œ๋‹ค.

 

'๋ฐ์ดํ„ฐ > Spark' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

[Spark] RDD vs Dataframe  (2) 2024.03.24
[Spark] spark-submit ๊ณผ ์˜ต์…˜  (1) 2024.03.24
[Spark] ์ŠคํŒŒํฌ์˜ Executor Memory ๊ตฌ์กฐ  (0) 2024.03.23
[Spark] GraphX  (0) 2024.03.18
[Spark] Spark Join ์ข…๋ฅ˜  (0) 2024.03.18