๐ฅ
[Spark] cache()์ persist() ๋ณธ๋ฌธ
์คํํฌ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ฉด action ์ฐ์ฐ์ ์ํ ์์ ๋ฐ์ดํฐ๊ฐ ๋ก๋๋๋ค.
๋ฐ๋ณต๋ ๋์ผํ ์ฐ์ฐ์ ํ๋ค๋ฉด ๋งค๋ฒ action ๋ ๋ก๋ํ์ง ์๊ณ cache()์ persist()๋ฅผ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ์์ฃผ์ํฌ ์ ์๋ค.
์บ์๋ ๋ฐ์ดํฐ๋ storage memory์ ๋ณด๊ด๋๋ค.
1. cache() ์ persist()
cache์ persist ๋ชจ๋ ๋์ผํ๋ฐ persist์์๋ storageLevel์ ์ค์ ํด์ค ์ ์๋ค๋ ์ ์์ ์ฐจ์ด๊ฐ ์๋ค.
`cache()`๋ Dataframe ์ ๊ฒฝ์ฐ`persist(storageLevel=MEMORY_AND_DISK)` , RDD์ ๊ฒฝ์ฐ ` persist(storageLevel=MEMORY_ONLY)` ์ ๊ฐ์ ํจ์์ด๋ค.
https://spark.apache.org/docs/latest/sql-ref-syntax-aux-cache-cache-table.html
`df.cache()`๋ฅผ ์คํํ์ ๋์ ์คํํ์ง ์์์ ๋์ ์ฟผ๋ฆฌ ์ํ ๋จ๊ณ์ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ ๋น๊ตํด๋ณด์๋ค.
csv_file_path = my_path
df = spark.read.schema(table_schema).csv(csv_file_path)
df.cache()
df.count()
cache ๋ฏธ์ํ ์ | cache ์ํ ์ |
storage memory์์ cache ๋ฏธ์ํ์๋ณด๋ค ์ํ์์ ์ฌ์ฉ๋์ด ์ฆ๊ฐํ ๊ฒ์ ํ์ธํ ์ ์์๋ค.
`cache()` ๋ฅผ ์คํํ์ ๋ ์ค๊ฐ์ `InMemoryTableScan`์ด๋ผ๋ ๋จ๊ณ๊ฐ ์ถ๊ฐ๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
์บ์๋ ์ํ์์ ๋ค๋ฅธ ์ฐ์ฐ์ ์ํํ์ ๋์๋ ์๋์ ๊ฐ์ด csv ํ์ผ์์ ๋ค์ ๋ก๋ํ์ง ์๊ณ `InMemoryTableScan`์ ํตํด 1000๊ฐ์ rows๋ฅผ ๋ถ๋ฌ์ค๋ ๊ฒ์ ํ์ธํ ์ ์์๋ค.
๋ฉ๋ชจ๋ฆฌ์์ ํด์ ํ๊ณ ์ถ์ ๋์๋ `unpersist()`๋ฅผ ์ฌ์ฉํด์ฃผ๋ฉด ๋๋ค.
df.unpersist()
2. StorageLevel
storageLevel ์ ์ดํด๋ณด๋ฉด ์๋์ ๊ฐ๋ค.
Level |
Space Used |
CPU Time |
In memory |
On disk |
Nodes with data |
MEMORY_ONLY |
High |
Low |
Y |
N |
1 |
MEMORY_ONLY_2 |
High |
Low |
Y |
N |
2 |
MEMORY_ONLY_SER |
Low |
High |
Y |
N |
1 |
MEMORY_ONLY_SER_2 |
Low |
High |
Y |
N |
2 |
MEMORY_AND_DISK |
High |
Medium |
Some |
Some |
1 |
MEMORY_AND_DISK_2 |
High |
Medium |
Some |
Some |
2 |
MEMORY_AND_DISK_SER |
Low |
High |
Some |
Some |
1 |
MEMORY_AND_DISK_SER_2 |
Low |
High |
Some |
Some |
2 |
DISK_ONLY |
Low |
High |
N |
Y |
1 |
DISK_ONLY_2 |
Low |
High |
N |
Y |
2 |
Serialization
RDD ์บ์ฑ์ ์ง๋ ฌํ๋ java ๊ฐ์ฒด๋ก ์ ์ฅํ๋ค. ์ง๋ ฌํ๋ฅผ ํตํด storage cost๋ฅผ ์ค์ผ ์ ์์ผ๋, deserialize๋ฅผ ํด์ผํ๊ธฐ ๋๋ฌธ์ CPU Time์ด ๋ ๋๋ค.
spark์์๋ java serializer๋ฅผ ๊ธฐ๋ณธ serializer๋ก ์ฌ์ฉํ๊ณ ์๋ค. java์ scala์์๋ kryo serializer๋ฅผ ์ฌ์ฉํ ์ ์๋๋ฐ java serializer๋ณด๋ค storage cost ๊ฐ ๋ ์ข๋ค๊ณ ํ๋ค.
serialization ํ๋ ์บ์ฑ๊ณผ ํ์ง ์๋ ์บ์ฑ ์์ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ ํ์ธํด๋ดค๋ค.
1. MEMORY_ONLY
df = spark.read.schema(table_schema).csv(csv_file_path)
# MEMORY_ONLY
df.persist(StorageLevel(False, True, False, True, 1))
df.show()
2. MEMORY_ONLY_SER
df = spark.read.schema(table_schema).csv(csv_file_path)
# MEMORY_ONLY_SER
df.persist(StorageLevel(False, True, False, False, 1))
df.show()
serialization ๋์์ ๋๊ฐ ์๋์์ ๋๋ณด๋ค ๋ ์์ ์ฌ์ด์ฆ๊ฐ ๋ฉ๋ชจ๋ฆฌ์ ๋ก๋๋ ๊ฒ์ ํ์ธํ ์ ์์๋ค.
(storagelevel ์ฐธ๊ณ : https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/api/java/StorageLevels.java#L25)
3.์ธ์ ?
์ฌ๋ฌ๋ฒ ์ฌ์ฌ์ฉ์ ํ ๊ฒ์ผ๋ก ์์๋ ๋ ์ฌ์ฉํ๋ค.
transformation์ ์ถฉ๋ถํ ๋ง์น ํ action ์ง์ ์ ์บ์ฑํด์ฃผ๋ฉด ์ข๋ค.
์๋ฅผ๋ค์ด ์๋์ ๊ฐ์ด action ์ฐ์ฐ์ด 2ํ ์๋ ๊ฒฝ์ฐ,
df = spark.read.schema(table_schema).csv(csv_file_path)
# Action 1
df.show()
# Action 2
B = df.count()
print(B)
์๋์ ๊ฐ์ด ๋ ๊ฐ์ ์ฟผ๋ฆฌ๊ฐ ๋ง๋ค์ด์ง๊ณ , ๋ ๊ฐ์ ์ฟผ๋ฆฌ์์ ๊ฐ๊ฐ scan์ ํ๋ ๊ฒ์ ์ ์ ์๋ค.
์๋์ ๊ฐ์ด action ์ ์ cache๋ฅผ ๋ฃ์ด์ฃผ๋ฉด,
df = spark.read.schema(table_schema).csv(csv_file_path)
df.cache()
# Action 1
df.show()
# Action 2
B = df.count()
print(B)
๋ง์ฐฌ๊ฐ์ง๋ก ๋ ๊ฐ์ ์ฟผ๋ฆฌ๊ฐ ๋ง๋ค์ด์ง์ง๋ง ์ฒซ๋ฒ์งธ action์์๋ง scan์ด ์ผ์ด๋๊ณ ์ดํ count() ์์๋ InMemoryTableScan์ ํตํด ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ ๊ฒ์ ์ ์ ์๋ค.
์์์์๋ ๋ฐ์ดํฐ ์ฌ์ด์ฆ๋ ์๊ณ downstream ์์ ์ด๋ ๋ณต์กํ ๋ก์ง์ด ์์ด ์ํ์๊ฐ์ ๋์ผํ๋๋ฐ, transformation์ด ๋ง๊ฑฐ๋ ๋ ํฐ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃฐ ๋์๋ ์บ์๋ฅผ ์ ์ ํ๊ฒ ์ฌ์ฉํ๋ฉด ์ฑ๋ฅํฅ์์ ๋ณผ ์ ์์ ๊ฒ ๊ฐ๋ค.
'๋ฐ์ดํฐ > Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] SQL Hint (0) | 2024.04.01 |
---|---|
[Spark] ์คํํฌ ์ค์ผ์ฅด๋ง (0) | 2024.04.01 |
[Spark] Speculative Execution (0) | 2024.04.01 |
[Spark] Logical Plan ๊ณผ Physical Plan (0) | 2024.03.25 |
[Spark] RDD vs Dataframe (2) | 2024.03.24 |