๐ฅ
[Spark] Adaptive Query Execution(AQE) ๋ณธ๋ฌธ
Spark 2.x ์์๋ rule-based/cost-based์ ๋ฐฉ์์ผ๋ก ์ฟผ๋ฆฌ๋ฅผ ์ต์ ํํ๋ค.
Spark 3.0๋ถํฐ๋ ๋ฐํ์์ ์ต์ ํํ ์ ์๋ AQE๊ฐ ๋์ ๋์๊ณ , Spark 3.2 ๋ถํฐ๋ AQE ํ์ฑํ๊ฐ ๋ํดํธ ๋ฒ์ ์ด๋ค.
AQE์์๋ ์ฟผ๋ฆฌ๋ฅผ ์ต์ ํํ๊ธฐ ์ํด ํฌ๊ฒ ์๋์ ์ธ ๊ฐ์ง์ ๊ธฐ๋ฅ์ ์ง์ํ๋ค.
- Coalescing Post shuffle partitions
- Switching join strategies
- Optimizing Skew Join
Coalescing Shuffle Partitions
์ ํ ํํฐ์ ์ ์๋ฅผ ์ต์ ํํด์ฃผ๋ ๊ธฐ๋ฅ์ด๋ค.
์ ํ์ ์คํํฌ ์ดํ๋ฆฌ์ผ์ด์ ์ ์ฑ๋ฅ์ ๋งค์ฐ ํฐ ์ญํ ์ ์ฐจ์งํ๊ณ , ๊ธฐ์กด์๋ ์ฌ์ฉ์๊ฐ ์ ํ ํฌ๊ธฐ ๋ฐ ํํฐ์ ์ ์ง์ ํ์ธํ๋ฉฐ ์๋์ผ๋ก ํํฐ์ ์๋ฅผ ์กฐ์ ํด์ฃผ์ด์ผ ํ๋ค.
AQE์์๋ ๋ฐํ์ ์ค ์์ฑ๋๋ ๋ค์ํ ํต๊ณ์๋ฃ๋ฅผ ํ์ธํ์ฌ ํํฐ์ ์๋ฅผ ์ต์ ํํ๋ค.
ํํฐ์ ์ ์๋ ` spark.sql.adaptive.advisoryPartitionSizeInBytes`๊ฐ ๊ฒฐ์ ํ๋๋ฐ, ๊ธฐ๋ณธ๊ฐ์ 64m(64MB)์ด๋ค.
ํ๊ท ์ ์ผ๋ก advisoryPartitionSizeInBytes ๊ฐ ๊ทผ์ฒ์์ partition ํฌ๊ธฐ๊ฐ ๊ฒฐ์ ๋ ์ ์๋๋ก ํํฐ์ ์๋ฅผ ์กฐ์ ํ๋ค.
์ด๋ฏธ์ง์ ๊ฐ์ด stage๋ฅผ ์งํํด๊ฐ๋ฉฐ ํํฐ์ ์ ํฌ๊ธฐ๋ฅผ ๋ชจ๋ํฐ๋งํ๋ฉด์ COALESCE ๋ฅผ ํตํด ํํฐ์ ์๋ฅผ ์กฐ์ ํ๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
https://tech.kakao.com/2022/01/18/aqe-coalescing-post-shuffle-partitions/
- Coalescing Post Shuffle Partition์ shuffle ์ดํ coalesce๋ฅผ ์๋์ผ๋ก ํด์ค๋ค.
- ์ด ๊ธฐ๋ฅ์ ๋๋ฌด ์๊ณ ๋ง์ partition(task)์ ์์ฑ์ ๋ฐฉ์งํ๋ค.
- ์ ํ์ฉํ๋ ค๋ฉด shuffle partition์ ์๋ฅผ ์ฒ์์ ์ถฉ๋ถํ ํฐ ์๋ฅผ ์ค์ ํด์ผ ํ๋ค.
ํ์ฑํ ํ๋ ๋ฐฉ๋ฒ์ ์๋์ ๊ฐ๋ค
- `spark.sql.adaptive.enabled` ๊ฐ true๋ก ์ค์
- `spark.sql.adaptive.coalescePartitions.enabled` ๊ฐ true๋ก ์ค์
Switching Join Strategies
์กฐ์ธ ์ ๋ต์ ์ต์ ํํ๋ ๊ธฐ๋ฅ์ด๋ค.
Spark2.x์์๋ ์ํ ์ด๊ธฐ ์์ ์ ์์ธกํ ๋ฐ์ดํฐ์ ํฌ๊ธฐ๊ฐ ์ค์ ์ ๋ฌ๋ผ์ ธ ๋ค๋ฅธ ์กฐ์ธ ์ ๋ต์ด ์ ๋ฆฌํ ์ ์์์๋ ๋ฐํ์ ์ต์ ํ๊ฐ ๋ถ๊ฐ๋ฅํ๊ธฐ ๋๋ฌธ์ ๋นํจ์จ์ ์ธ ์กฐ์ธ ์ ๋ต์ผ๋ก ์์ ์ด ์งํ๋ ์ ์๋ค.
์์ธก์ ์๋์ ์ด์ ๋ก ํ๋ฆด ์ ์๋ค:
- cardinality ๋๋ selectivity ์์ธก์ ์ํ ํต๊ณ๊ฐ ๋ถ์ ํํ ์ ์์.
- ๋์ ๋ฐ์ดํฐ๊ฐ ๋ณต์กํ ์๋ธํธ๋ฆฌ์ผ ์ ์์
- UDFs์ ๊ฐ์ด ๋ธ๋๋ฐ์ค predicates์ฌ์ ์ด๊ธฐ ์์ ์ ํต๊ณ์ง ๊ณ์ฐ์ด ๋ถ๊ฐ๋ฅํ ์ ์์
๊ทธ๋ฌ๋ AQE ๋ ์คํ์ค์ ์์งํ ๋ฐ์ดํฐ ์ ๋ณด๋ฅผ ๋ฐํ์ผ๋ก ์ค๊ฐ์ Join ์ ๋ต์ ๋ณ๊ฒฝํ ์ ์๋ค.
Converting Sort-Merge Join to Broadcast Join
Sort Merge Join์ Broadcast join์ผ๋ก ๋ณ๊ฒฝํ๋ ์์๋ฅผ ์ดํด๋ณด๋ฉด ์ด๋ฏธ์ง์ ๊ฐ๋ค.
์ค๋ฅธ์ชฝ ํ ์ด๋ธ ํฌ๊ธฐ๊ฐ ์๋ฅผ๋ค์ด 25MB๋ก ์์ธก๋์๋ค๊ณ ๊ฐ์ ํ๋ฉด, AQE๊ฐ ์๋ Spark2.x๋ฒ์ ์์๋ ์ถฉ๋ถํ Broadcast๋ฅผ ํ ์ ์์์๋ Sort merge join์ผ๋ก ์ํ์ด ๋ ๊ฒ์ด๋ค.
AQE enabled ์์๋ ์ผ๋ถ leaf stage ๋ฅผ ์๋ฃํ ์ดํ ์ค๋ฅธ์ชฝ ํ ์ด๋ธ์ actual size๊ฐ 8MB๋ผ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
์ดํ ์ต์ ํ ๊ณผ์ ์ด ์ผ์ด๋ Broadcast ์กฐ์ธ์ผ๋ก ๋ณ๊ฒฝ๋๋ค.
๊ทธ๋ฌ๋ ์ด๋ฏธ Sort Merge Join์ด ์์๋์ด๋ฒ๋ ธ๋๋ฐ ๋ค์ ์กฐ์ธ ์ ํ์ ๋ณ๊ฒฝํ๋ ๊ฒ ํจ์จ์ ์ธ์ง ์๋ฌธ์ด ์์ ์ ์๋ค.
๋ํ๋จผํธ์์๋ Sort Merge Join์ ๊ณ์ ๋๊น์ง ํ๋ ๊ฒ ๋ณด๋ค๋ ๋ซ๋ค๊ณ ์ค๋ช ํ๊ณ ์๋ค. `spark.sql.adaptive.localShuffleReader.enabled` ๊ฐ์ด True๋ผ๋ฉด ์ด๋ฏธ ์ ํ๋ ๋ฐ์ดํฐ๋ ๋ก์ปฌ(executor์ธ์ง?) ์ ์ ์ฅ๋๊ณ ํด๋น ํ์ผ์ ๋ก์ปฌ์์ ์ฝ์ด ๋คํธ์ํฌ ํธ๋ํฝ์ ์ ์ฝํ ์ ์๋ค.
Converting sort-merge join to shuffled hash join
์ํธ๋จธ์ง์กฐ์ธ: ์ผ๋จ ๋ ๊ฐ์ ํ ์ด๋ธ์ด ์์ ๋ ์ผ๋จ ์ํ ์ ํ๊ณ ํ ์ด๋ธ๊ฐ record๋ฅผ ๋น๊ตํจ -> ๋๋ฆผ
shuffled hash join: ๋คํธ์ํน์ ํ ๋ ์๋์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ partition์ ๋์ -> ๋ฐ์ดํฐ ์์ฒด๊ฐ ์์ ๋ ์๋์ผ๋ก ์ผ์ด๋จ
์ํ ์ด ์๊ธฐ ๋๋ฌธ์ ์๋์ ์ผ๋ก sort merge๋ณด๋จ ๋น ๋ฆ
Optimizing Skew Join
์ฐธ๊ณ : https://blog.cloudera.com/how-does-apache-spark-3-0-increase-the-performance-of-your-sql-workloads/
ํํฐ์ ๋ฐ์ดํฐ๊ฐ ๋ถ๊ท ํํ๋ค๋ฉด ์กฐ์ธ ์ ์ฟผ๋ฆฌ ์ฑ๋ฅ์ ์ ํ์ํฌ ์ ์๋ค.
์๋ฅผ๋ค์ด ์ ์ด๋ฏธ์ง์ ๊ฒฝ์ฐ A0 ํํฐ์ ์ ๋ค๋ฅธ ํํฐ์ ๋ณด๋ค ๋ฐ์ดํฐ๊ฐ ๋ชฐ๋ ค์์์ ํ์ธํ ์ ์๋ค.
AQE ๋ฅผ ์ฌ์ฉํ์ง ์์ผ๋ฉด A1-B1, A2-B2, A3-B3์ ์ฒ๋ฆฌ๊ฐ ๋ชจ๋ ์๋ฃ๋์ด๋ A0-B0์ ์ฒ๋ฆฌ๋ฅผ ๊ธฐ๋ค๋ ค์ผ ํ๊ณ ์ด๋ ์ ์ฒด ์ดํ๋ฆฌ์ผ์ด์ ์คํ ์๊ฐ์ ์ํฅ์ ๋ผ์น๋ค.
AQE๋ฅผ ํ์ฑํํ๋ค๋ฉด A0๊ณผ ๊ฐ์ skew data๋ฅผ ๊ฐ์งํ๊ณ ๋ ์์ ํํฐ์ ์ผ๋ก ์ชผ๊ฐ ๋ค. A0์ด ์ชผ๊ฐ์ง ๊ฐ์๋งํผ B0์ ๋ณต์ ๋๋ค.
์ด๋ ๊ฒ ๋๋ฉด A0-B0์ ์กฐ์ธ ์ฒ๋ฆฌ ์๊ฐ์ด ์ค์ด๋ค๊ณ ์ ์ฒด ์ดํ๋ฆฌ์ผ์ด์ ์ฒ๋ฆฌ ์๋๊ฐ ํฅ์๋ ์ ์๋ค.
ํ์ฑํ ํ๋ ๋ฐฉ๋ฒ์ ์๋์ ๊ฐ๋ค
- `spark.sql.adaptive.enabled` ๊ฐ true๋ก ์ค์
- `spark.sql.adaptive.skewJoin.enabled` ๊ฐ true๋ก ์ค์
๊ด๋ จ ์ค์ ๊ฐ์ ์๋์ ๊ฐ๋ค
`spark.sql.adaptive.skeyJon.skewedPartitionFactor`
์คํ ํํฐ์ ์์ ํ๋จํ๋ ๊ฐ: ํํฐ์ ํฌ๊ธฐ์ ์ค์๊ฐ * skewedPartitionFactor ๋ณด๋ค ํฌ๋ค๋ฉด ํด๋น ํํฐ์ ์ skew partition์ผ๋ก ๊ฐ์ฃผํ๋ค.
` Spark.sql.adaptive.skewedPartitionThresholdInBytes`
์ ๋๊ฐ์ผ๋ก skew๊ฐ ๋ฌด์๋๋ ์๊ณ๊ฐ์ด๋ค. ๋๋ฌด ์์ ํํฐ์ ์ด ๋ถํ ๋๋ ๊ฒ์ ๋ง๊ธฐ ์ํด ์ค์ ํ๋ค.
'๋ฐ์ดํฐ > Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] RDD vs Dataframe (2) | 2024.03.24 |
---|---|
[Spark] spark-submit ๊ณผ ์ต์ (1) | 2024.03.24 |
[Spark] ์คํํฌ์ Executor Memory ๊ตฌ์กฐ (0) | 2024.03.23 |
[Spark] GraphX (0) | 2024.03.18 |
[Spark] Spark Join ์ข ๋ฅ (0) | 2024.03.18 |