๐ฅ
[Spark] ์คํํฌ ์ค์ผ์ฅด๋ง ๋ณธ๋ฌธ
Application ์ค์ผ์ฅด๋ง
๊ด๋ จํ์ฌ https://community.cloudera.com/t5/Community-Articles/Dynamic-Allocation-in-Apache-Spark/ta-p/368095 ์ด์ชฝ์ ์ ๋ฆฌ๊ฐ ์ ๋์ด ์๋ค.
SRA(Static Resource Allocation)
spark ์ดํ๋ฆฌ์ผ์ด์ ์ ์คํ๋๊ธฐ ์ ์ดํ๋ฆฌ์ผ์ด์ ์์ ์ฌ์ฉํ ๋ฆฌ์์ค๋ฅผ ๋ฏธ๋ฆฌ ์์ฝํด๋๋ค.
์ด ๋ฆฌ์์ค์ ์์ ๊ณ ์ ๋์ด์ ๋ฐํ์์ค์๋ ๋ณ๊ฒฝ์ด ๋ถ๊ฐ๋ฅํ๋ค. ๋ง์ฝ ํ ๋น๋ ๊ฒ๋ณด๋ค ๋ ๋ง์ ๋ฆฌ์์ค๊ฐ ํ์ํ๋ค๋ฉด ์คํ์๊ฐ์ด ๊ธธ์ด์ง๊ฑฐ๋ ์คํจํ ์ ์๋ค.
๋ฆฌ์์ค๊ฐ ๊ณ ์ ๋์ด ์๊ธฐ ๋๋ฌธ์ ๋ง์ฝ ํ ๋น๋ ๋ฆฌ์์ค๊ฐ ์ดํ๋ฆฌ์ผ์ด์ ์์ ์ฌ์ฉ๋์ง ์๋๋ค๋ฉด ๋นํจ์จ์ ์ธ ๋ฆฌ์์ค ํ์ฉ์ผ๋ก ์ด์ด์ง๊ฒ ๋๋ค.
๋ํ ๋ฐํ์์ค์ ๋ฆฌ์์ค๋ฅผ ์กฐ์ ํ๋๊ฒ์ด ๋ถ๊ฐ๋ฅํ๊ธฐ ๋๋ฌธ์ ์ค๊ฐ์ ๋ฆฌ์์ค ํฌ์ ์ด ๋ ํ์ํ๋ค๋ฉด out of memory ์๋ฌ๊ฐ ๋ฐ์ํ ์ ์๋ค.
DRA(Dynamic Resource Allocation)
dynamic resource allocation์ ํ์ฑํ์ํค๋ฉด ๋ฐํ์์ค์ ํ์์ ๋ฐ๋ผ ๋ฆฌ์์ค๊ฐ ์ดํ๋ฆฌ์ผ์ด์ ์ ํ ๋น๋๋ค. ์ด๋ฅผ ํตํด ๋ฆฌ์์ค ํ์ฉ๋๊ฐ ํฅ์๋๊ณ ๋ฆฌ์์ค ํ์ฉ๋๊ฐ ๋ฎ๊ฑฐ๋ ๊ณผ๋ํ๊ฒ ํ์ฉํ๋ ๊ฒ์ ๋ฐฉ์งํ ์ ์๋ค. dynamic resouce allocation์์์ ๋ฆฌ์์ค ๊ณต์ ๋จ์๋ Executor์ด๋ฉฐ, ์๋์ ๋์ค๋ ๋ชจ๋ "๋ฆฌ์์ค"๋จ์ด์ ๋จ์๋ Executor์ด๋ค.
๋์ ํ ๋น์ ํจ์ผ๋ก์จ ์๊ธฐ๋ ์ฅ์ ์ ์๋์ ๊ฐ๋ค.
- ๋ฆฌ์์ค๋ฅผ ํจ์จ์ ์ผ๋ก ์ฌ์ฉํ ์ ์๊ณ ํด๋ฌ์คํฐ์ ์ ๋ฐ์ ์ธ ํจ์จ์ฑ์ ํฅ์์ํฌ ์ ์๋ค.
- ์ํฌ๋ก๋์ ๋ฐ๋ผ ํ ๋น๋ ๋ฆฌ์์ค๋ฅผ ํ์ฅํ๊ฑฐ๋ ์ถ์ํ ์ ์๋ค.
- ๋ฆฌ์์ค๋ฅผ ํจ์จ์ ์ผ๋ก ํ ๋นํ๊ธฐ ๋๋ฌธ์ ์ดํ๋ฆฌ์ผ์ด์ ์ ์ฒด ๋น์ฉ์ ์ค์ผ ์ ์๋ค.
- ํ ํด๋ฌ์คํฐ์์ ์ฌ๋ฌ ์ดํ๋ฆฌ์ผ์ด์ ์ด ์คํ๋ ๋ ๊ณต์ ํ๊ฒ ๋ฆฌ์์ค๋ฅผ ํ ๋น๋ฐ์ ์ ์๋ค.
๋ฐ๋๋ก ๋จ์ ์ ์๋์ ๊ฐ๋ค.
- ์คํํฌ์์ ์ํฌ๋ก๋๋ฅผ ์ง์์ ์ผ๋ก ๋ชจ๋ํฐ๋งํ๊ณ ๋ฆฌ์์ค ํ ๋น์ ์กฐ์ ํด์ผํ๊ธฐ ๋๋ฌธ์ ์ถ๊ฐ์ ์ธ ์ค๋ฒํค๋๊ฐ ๋ฐ์ํ๊ณ , ์ด๋ถ๋ถ์ ์ดํ๋ฆฌ์ผ์ด์ ์ฑ๋ฅ์ ์ํฅ์ ๋ฏธ์น ์ ์๋ค.
- ๋ฆฌ์์ค ์กฐ์ ์ ๋ฐ๋ฅธ ๋ ์ดํด์๊ฐ ์์ ์ ์๋ค.
- ๊ด๋ จ ์์ฑ๊ฐ ๊ตฌ์ฑ์ด ์ถ๊ฐ๋ก ํ์ํด ์ดํ๋ฆฌ์ผ์ด์ ๊ด๋ฆฌ ๋ฐ ๋ฐฐํฌ๊ฐ ๋ ๋ณต์กํด์ง ์ ์๋ค.
- ํ ๋น๋ ๋ฆฌ์์ค๊ฐ ์์ฃผ ๋ณ๊ฒฝ๋๋ฉด ์ฌ์ฉํ executor์ ๋ํด์ ์์ธก๋ถ๊ฐ๋ฅํด์ง๋ค.
- ๋งค ๋ฆฌ์์ค ์์ฒญ ๋ฐ ํด์ ์๋ง๋ค node manager์ ํต์ ํด์ผ ํ๊ธฐ ๋๋ฌธ์ ๋คํธ์ํฌ ๋น์ฉ์ ์ฆ๊ฐ์ํฌ ์ ์๋ค.
- ํ ๋น๋ ๋ฆฌ์์ค๊ฐ ๋ถ์กฑํ๋ค๋ฉด ์ ํ ํ๋ก์ธ์ค๋ฅผ ์ฒ๋ฆฌํ๋ yarn shuffle service๊ฐ ์ค๋ฒ๋ก๋ ๋ ์ ์๊ณ , ์ด๋ ์ ์ฒด ํด๋ฌ์คํฐ ์ฑ๋ฅ ์ ํ๋ฅผ ์ผ๊ธฐํ ์ ์๋ค. ๋์ ์ผ๋ก ํ ๋น๋๋ ๋ฆฌ์์ค ๋ชจ๋ํฐ๋ง์ ์ ํด์ผํ๋ค.
๊ด๋ จ ๊ตฌ์ฑ
Property Name | Default Value | Description |
spark.shuffle.service.enabled | false | Enables the external shuffle service. |
spark.dynamicAllocation.enabled | false | Set this to true to enable dynamic allocation. |
spark.dynamicAllocation.minExecutors | 0 | ์ดํ๋ฆฌ์ผ์ด์ ์ ํ ๋นํด์ผ ํ๋ ์ต์ executor ์ |
spark.dynamicAllocation.initialExecutors | spark.dynamicAllocation.minExecutors | ๋์ ํ ๋น์ด ํ์ฑํ๋ ๊ฒฝ์ฐ ์คํํ initial executor ์ `--num-executors` (or `spark.executor.instances`) ๊ฐ ์ค์ ๋์ด ์๊ณ , ์ด ๊ฐ๋ณด๋ค ํฐ ๊ฒฝ์ฐ์ ์ฌ์ฉ๋จ |
spark.dynamicAllocation.maxExecutors | infinity | ์ดํ๋ฆฌ์ผ์ด์ ์ ํ ๋นํด์ผ ํ๋ ์ต๋ executor ์ |
* ์ฐธ๊ณ : yarn shuffle service
https://mallikarjuna_g.gitbooks.io/spark/content/yarn/spark-yarn-YarnShuffleService.html
dynamic resource allocation์ ์ฌ์ฉํ๊ธฐ ์ํด ํ์ฑํํด์ผํ๋ external shuffle service. ์คํํฌ executor๊ฐ ์ ํ ํ์ผ์ ๊ฐ์ ธ์ค๋๋ฐ ์ฌ์ฉ๋๋ฉฐ, executor ํ๋๊ฐ fail๋๋๋ผ๋ shuffled file์ ์์ค๋์ง ์๋๋ค.
Job ์ค์ผ์ฅด๋ง
FIFO ์ค์ผ์ฅด๋ง
ํ job์ ๋ชจ๋ task๊ฐ ์ข ๋ฃ๋๋ฉด ๋ค์ job์ ์คํํ๋ First in first out ์ค์ผ์ฅด๋ง์ด๋ค.
๋ํดํธ๋ก FIFO ๋ฐฉ์์ผ๋ก ๋ฆฌ์์ค๋ฅผ ํ ๋นํ๋ฉฐ, ์ฒซ ๋ฒ์งธ ์ ์๋ ์์ ์ด ์ฌ์ฉ๊ฐ๋ฅํ ๋ชจ๋ ๋ฆฌ์์ค์ ๋ํด ์ฐ์ ์์๋ฅผ ๊ฐ๊ฒ ๋๋ค.
์ฒซ ๋ฒ์งธ job์์ ๋ชจ๋ ๋ฆฌ์์ค๊ฐ ํ์ํ์ง ์๋ค๋ฉด ๋ค์ ์์ ์์๋ ํด๋น ๋ฆฌ์์ค๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
๊ทธ๋ฌ๋ ๋ฌธ์ ๋ ์ ์๋ ์ํฉ์ ์ฒซ ๋ฒ์จฐ Job์์ ์คํํด์ผ ํ๋ task ๊ฐ ๋ง์ ๋ ๋ค์ Job์ task ์๊ฐ ์๋ฌด๋ฆฌ ์์๋ ๋ค์ job์ ์ฒซ ๋ฒ์จฐ job์ด ์๋ฃ๋๊ธฐ๋ฅผ ๊ธฐ๋ค๋ ค์ผ ํ๋ค๋ ์ง์ ์ด๋ค.
์ด๋ฌํ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด FAIR ์ค์ผ์ฅด๋ง์ ์ฌ์ฉํ ์ ์๋ค.
FAIR ์ค์ผ์ฅด๋ง
Fair ์ค์ผ์ฅด๋ง์ ๋ผ์ด๋ ๋ก๋น ๋ฐฉ์์ผ๋ก ์๋ํ๋ค.
์๋ ์ด๋ฏธ์ง์์ ๋ณผ ์ ์๋ค์ํผ job2๋ job1์ด ์ข ๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆด ํ์ ์์ด ๊ฐ๋ฅํ ํ ๋นจ๋ฆฌ ์์๋ ์ ์๋ค.
ํ๋์ application ๋ด์์ ์ฌ๋ฌ ์์ ์ ์คํ์๊ฐ์ ์ต์ ํํ ์ ์๋ ๋ชจ๋ ์ค ํ๋์ด๋ค. FIFO ์ ๋ฌ๋ฆฌ ์์ ๊ฐ ๋ฆฌ์์ค๋ฅผ ๊ณต์ ํ๊ธฐ ๋๋ฌธ์ ๊ธด ์๊ฐ ์คํ๋๋ ํ๋์ job๋๋ฌธ์ ๋ฆฌ์์ค๊ฐ lock๋๋ ์ํฉ์ผ ํผํ ์ ์๋ค.
์ฌ์ฉ๋ฐฉ๋ฒ
`spark.scheduler.mode`๋ฅผ FAIR๋ก ์ค์ ํด์ฃผ๋ฉด ๋๋ค.
from pyspark.sql SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.scheduler.mode", "FAIR")
spark = SparkSession.builder.appName(...)\
...
.config(conf=conf)\
...
scheduler pool
https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
Job์ pool๋ก ์ผ์ข ์ ๊ทธ๋ฃนํ๋ฅผ ํ์ฌ ๊ด๋ฆฌํ๊ณ ๊ฐ pool์ ๊ฐ์ค์น ๋ฑ ์๋ก ๋ค๋ฅธ ์ต์ ์ค์ ์ ์ง์ํ๋ค.
์ค์ผ์ฅด๋ง ๋ชจ๋๋ฅผ FAIR๋ก ์ค์ ์ ๊ธฐ๋ณธ์ ์ผ๋ก ๊ฐ pool์ ํด๋ฌ์คํฐ์์ ๋์ผํ FAIR ๋ชจ๋๋ก ์คํ๋์ง๋ง, ๊ฐ pool ๋ด์์์ ์์ ์ FIFO๋ก ์ํ๋๋ค.
Pool ๊ด๋ จ ์์ฑ
`schedulingMode`
pool ๋๊ธฐ์ด ๋ด์ ์์ ์ ์ค์ผ์ฅด๋ง ๋ชจ๋์ด๋ค. FIFO or FAIR
`weight`
๋ค๋ฅธ pool์ ๊ธฐ์ค์ผ๋ก ํด๋ฌ์คํฐ์ pool ๊ณต์ ๋ฅผ ์ ์ดํ๋ค. ๋ํดํธ ๊ฐ์ 1๋ก ๋ชจ๋ pool์ ๊ฐ์ค์น๋ 1์ด๋ค.
์๋ฅผ ๋ค์ด ํน์ pool์ ๊ฐ์ค์น๋ฅผ 2๋ก ์ค์ ํ๋ฉด ๋ค๋ฅธ pool๋ค๋ณด๋ค 2๋ฐฐ ๋ ๋ง์ ๋ฆฌ์์ค๋ฅผ ์ป์ ์ ์๋ค.
1000 ๊ณผ ๊ฐ์ ์์ฃผ ํฐ ์๋ก ์ค์ ํด pool๊ฐ์ ์ฐ์ ์์๋ฅผ ๊ตฌํํ ์๋ ์๋ค. ( ๊ฐ์ค์น 1000 ๋ก ์ค์ ํ๋ฉด ์ฆ ํด๋น pool์ ์๋ ์์ ์ด ํ์ฑํ๋ ๋๋ง๋ค ํญ์ ๋จผ์ ์์ ์ ์์ํ๊ฒ ๋๋ค๋ ์๋ฏธ์ด๋ค.)
`minShare`
์ ์ฒด ๊ฐ์ค์น์ ๋ณ๋๋ก ๊ฐ pool์ ์ต์ ๋ฆฌ์์ค(CPU ์ฝ์ด) ๋ฅผ ์ค์ ํ๋ค. ์ค์ผ์ฅด๋ฌ๋ ๊ฐ์ค์น์ ๋ฐ๋ผ ๋ฆฌ์์ค๋ฅผ ์ฌ๋ถ๋ฐฐํ๊ธฐ ์ ์ ํญ์ ๋ชจ๋ ํ์ฑ pool์ minShare๋ฅผ ์ถฉ์กฑํด์ผ ํ๋ค.
๋ฐ๋ผ์ minShare ๊ฐ์ pool ์ด ํญ์ ํน์ ์์ ๋ฆฌ์์ค(CPU ์ฝ์ด)๋ฅผ ๋น ๋ฅด๊ฒ ํ๋ณดํ ์ ์๋๋ก ๋ณด์ฅํ ์ ์๋ ๊ฐ์ด์ด์ผ ํ๋ค. ๋ํดํธ ๊ฐ์ 0์ด๋ค.
Pool ์ฌ์ฉ ๋ฐฉ๋ฒ
๋จผ์ XML ํ์ผ์ ์์ฑํด ๊ฐ Pool ์ ์ ์ํด์ค๋ค.
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
๊ทธ๋ฆฌ๊ณ spark config ๋ด `spark.scheduler.allocation.file`์ ํด๋น ํ์ผ ์์น๋ฅผ ์์ฑํด์ค๋ค. ์๋๋ฉด conf ๋๋ ํ ๋ฆฌ์ fairscheduler.xml ํ์ผ์ ๊ธฐ์ฌํด์ฃผ์ด๋ ๋๋ค.
# scheduler file at local
conf.set("spark.scheduler.allocation.file", "file:///path/to/file")
# scheduler file at hdfs
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")
pool์ ์ ์ํ ๋์๋ ์๋์ ๊ฐ์ด ์ด๋ค pool์ธ์ง ์ค์ ํด์ฃผ๋ฉด ๋๋ค. ์ฐธ๊ณ ๋ก ์๋์ ๊ฐ์ด ์ค์ ํ๋ฉด "test"๋ผ๋ ์ด๋ฆ์ pool ์ด ์๋์ง ๋จผ์ spark.scheduler.allocation.file์ ํ์ธํ๊ณ , fairscheduler.xml ํ์ผ์ ํ์ธํ๊ณ , ์ผ์นํ๋ ํ์ด ์๋ค๋ฉด default pool๋ก ์ด๋ํ๋ค.
# Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "test")
์ฐ๊ฒฐ๋ pool์ ์ง์ฐ๊ธฐ ์ํด์๋ ์๋์ ๊ฐ์ด null๋ก ์ค์ ํด์ฃผ๋ฉด ๋๋ค. ๊ทธ๋ผ ํด๋น ์ฐ๋ ๋ ๋ด์์ ์ ์ถ๋ ์์ ๋ค์ ๋ํดํธ pool๋ก ๋ค์ด๊ฐ๋ค.
# Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", None)
์ฐธ๊ณ
https://youtu.be/BLT6eHcT-e8?si=dgDttOPjW7I40mH-
https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
'๋ฐ์ดํฐ > Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] Accumulator์ Broadcast (๊ณต์ ๋ณ์) (0) | 2024.04.01 |
---|---|
[Spark] SQL Hint (0) | 2024.04.01 |
[Spark] cache()์ persist() (0) | 2024.04.01 |
[Spark] Speculative Execution (0) | 2024.04.01 |
[Spark] Logical Plan ๊ณผ Physical Plan (0) | 2024.03.25 |