๐Ÿฅ

[Spark] cache()์™€ persist() ๋ณธ๋ฌธ

๋ฐ์ดํ„ฐ/Spark

[Spark] cache()์™€ persist()

•8• 2024. 4. 1. 13:52

์ŠคํŒŒํฌ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋ฉด action ์—ฐ์‚ฐ์ž ์ˆ˜ํ–‰ ์‹œ์— ๋ฐ์ดํ„ฐ๊ฐ€ ๋กœ๋“œ๋œ๋‹ค.

๋ฐ˜๋ณต๋œ ๋™์ผํ•œ ์—ฐ์‚ฐ์„ ํ•œ๋‹ค๋ฉด ๋งค๋ฒˆ action ๋•Œ ๋กœ๋“œํ•˜์ง€ ์•Š๊ณ  cache()์™€ persist()๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฉ”๋ชจ๋ฆฌ์— ์ƒ์ฃผ์‹œํ‚ฌ ์ˆ˜ ์žˆ๋‹ค.

์บ์‹œ๋œ ๋ฐ์ดํ„ฐ๋Š” storage memory์— ๋ณด๊ด€๋œ๋‹ค.

 

1. cache() ์™€ persist()

cache์™€ persist ๋ชจ๋‘ ๋™์ผํ•œ๋ฐ persist์—์„œ๋Š” storageLevel์„ ์„ค์ •ํ•ด์ค„ ์ˆ˜ ์žˆ๋‹ค๋Š” ์ ์—์„œ ์ฐจ์ด๊ฐ€ ์žˆ๋‹ค.

`cache()`๋Š” Dataframe ์˜ ๊ฒฝ์šฐ`persist(storageLevel=MEMORY_AND_DISK)` , RDD์˜ ๊ฒฝ์šฐ  ` persist(storageLevel=MEMORY_ONLY)` ์™€ ๊ฐ™์€ ํ•จ์ˆ˜์ด๋‹ค.

https://spark.apache.org/docs/latest/sql-ref-syntax-aux-cache-cache-table.html

 

 

`df.cache()`๋ฅผ ์‹คํ–‰ํ–ˆ์„ ๋•Œ์™€ ์‹คํ–‰ํ•˜์ง€ ์•Š์•˜์„ ๋•Œ์˜ ์ฟผ๋ฆฌ ์ˆ˜ํ–‰ ๋‹จ๊ณ„์™€ ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰์„ ๋น„๊ตํ•ด๋ณด์•˜๋‹ค.

csv_file_path = my_path
df = spark.read.schema(table_schema).csv(csv_file_path)

df.cache()
df.count()

 

cache ๋ฏธ์ˆ˜ํ–‰ ์‹œ cache ์ˆ˜ํ–‰ ์‹œ

 

storage memory์—์„œ cache ๋ฏธ์ˆ˜ํ–‰์‹œ๋ณด๋‹ค ์ˆ˜ํ–‰์‹œ์— ์‚ฌ์šฉ๋Ÿ‰์ด ์ฆ๊ฐ€ํ•œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์—ˆ๋‹ค.

`cache()` ๋ฅผ ์‹คํ–‰ํ–ˆ์„ ๋•Œ ์ค‘๊ฐ„์— `InMemoryTableScan`์ด๋ผ๋Š” ๋‹จ๊ณ„๊ฐ€ ์ถ”๊ฐ€๋œ ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

์บ์‹œ๋œ ์ƒํƒœ์—์„œ ๋‹ค๋ฅธ ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ–ˆ์„ ๋•Œ์—๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด csv ํŒŒ์ผ์—์„œ ๋‹ค์‹œ ๋กœ๋“œํ•˜์ง€ ์•Š๊ณ  `InMemoryTableScan`์„ ํ†ตํ•ด 1000๊ฐœ์˜ rows๋ฅผ ๋ถˆ๋Ÿฌ์˜ค๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์—ˆ๋‹ค.

 

๋ฉ”๋ชจ๋ฆฌ์—์„œ ํ•ด์ œํ•˜๊ณ  ์‹ถ์„ ๋•Œ์—๋Š” `unpersist()`๋ฅผ ์‚ฌ์šฉํ•ด์ฃผ๋ฉด ๋œ๋‹ค.

df.unpersist()

 

 

2. StorageLevel

storageLevel ์„ ์‚ดํŽด๋ณด๋ฉด ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

Level
Space Used
CPU Time
In memory
On disk
Nodes with data
MEMORY_ONLY
High
Low
Y
N
1
MEMORY_ONLY_2
High
Low
Y
N
2
MEMORY_ONLY_SER
Low
High
Y
N
1
MEMORY_ONLY_SER_2
Low
High
Y
N
2
MEMORY_AND_DISK
High
Medium
Some
Some
1
MEMORY_AND_DISK_2
High
Medium
Some
Some
2
MEMORY_AND_DISK_SER
Low
High
Some
Some
1
MEMORY_AND_DISK_SER_2
Low
High
Some
Some
2
DISK_ONLY
Low
High
N
Y
1
DISK_ONLY_2
Low
High
N
Y
2

 

Serialization

RDD ์บ์‹ฑ์‹œ ์ง๋ ฌํ™”๋œ java ๊ฐ์ฒด๋กœ ์ €์žฅํ•œ๋‹ค. ์ง๋ ฌํ™”๋ฅผ ํ†ตํ•ด storage cost๋ฅผ ์ค„์ผ ์ˆ˜ ์žˆ์œผ๋‚˜, deserialize๋ฅผ ํ•ด์•ผํ•˜๊ธฐ ๋•Œ๋ฌธ์— CPU Time์ด ๋” ๋†’๋‹ค.

spark์—์„œ๋Š” java serializer๋ฅผ ๊ธฐ๋ณธ serializer๋กœ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ๋‹ค. java์™€ scala์—์„œ๋Š” kryo serializer๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ java serializer๋ณด๋‹ค storage cost ๊ฐ€ ๋” ์ข‹๋‹ค๊ณ  ํ•œ๋‹ค.

 

serialization ํ•˜๋Š” ์บ์‹ฑ๊ณผ ํ•˜์ง€ ์•Š๋Š” ์บ์‹ฑ ์‹œ์— ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰์„ ํ™•์ธํ•ด๋ดค๋‹ค.

 

1. MEMORY_ONLY

df = spark.read.schema(table_schema).csv(csv_file_path)

#  MEMORY_ONLY
df.persist(StorageLevel(False, True, False, True, 1))
df.show()

StorageLevel: MEMORY_ONLY

 

2. MEMORY_ONLY_SER

df = spark.read.schema(table_schema).csv(csv_file_path)
# MEMORY_ONLY_SER
df.persist(StorageLevel(False, True, False, False, 1))

df.show()

 

StorageLEvel: MEMORY_ONLY_SER

serialization ๋˜์—ˆ์„ ๋•Œ๊ฐ€ ์•ˆ๋˜์—ˆ์„ ๋•Œ๋ณด๋‹ค ๋” ์ž‘์€ ์‚ฌ์ด์ฆˆ๊ฐ€ ๋ฉ”๋ชจ๋ฆฌ์— ๋กœ๋“œ๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์—ˆ๋‹ค.

 

(storagelevel ์ฐธ๊ณ : https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/api/java/StorageLevels.java#L25)

 

 

3.์–ธ์ œ?

์—ฌ๋Ÿฌ๋ฒˆ ์žฌ์‚ฌ์šฉ์„ ํ•  ๊ฒƒ์œผ๋กœ ์˜ˆ์ƒ๋  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.

transformation์„ ์ถฉ๋ถ„ํžˆ ๋งˆ์นœ ํ›„ action ์ง์ „์— ์บ์‹ฑํ•ด์ฃผ๋ฉด ์ข‹๋‹ค.

 

์˜ˆ๋ฅผ๋“ค์–ด ์•„๋ž˜์™€ ๊ฐ™์ด action ์—ฐ์‚ฐ์ด 2ํšŒ ์žˆ๋Š” ๊ฒฝ์šฐ,

df = spark.read.schema(table_schema).csv(csv_file_path)

# Action 1
df.show()

# Action 2
B = df.count()
print(B)

์•„๋ž˜์™€ ๊ฐ™์ด ๋‘ ๊ฐœ์˜ ์ฟผ๋ฆฌ๊ฐ€ ๋งŒ๋“ค์–ด์ง€๊ณ ,  ๋‘ ๊ฐœ์˜ ์ฟผ๋ฆฌ์—์„œ ๊ฐ๊ฐ scan์„ ํ•˜๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.



 

์•„๋ž˜์™€ ๊ฐ™์ด action ์ „์— cache๋ฅผ ๋„ฃ์–ด์ฃผ๋ฉด,

df = spark.read.schema(table_schema).csv(csv_file_path)

df.cache()

# Action 1
df.show()

# Action 2
B = df.count()
print(B)

 

๋งˆ์ฐฌ๊ฐ€์ง€๋กœ ๋‘ ๊ฐœ์˜ ์ฟผ๋ฆฌ๊ฐ€ ๋งŒ๋“ค์–ด์ง€์ง€๋งŒ ์ฒซ๋ฒˆ์งธ action์—์„œ๋งŒ scan์ด ์ผ์–ด๋‚˜๊ณ  ์ดํ›„ count() ์—์„œ๋Š” InMemoryTableScan์„ ํ†ตํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ๋กœ๋“œํ•œ ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

 

์˜ˆ์‹œ์—์„œ๋Š” ๋ฐ์ดํ„ฐ ์‚ฌ์ด์ฆˆ๋„ ์ž‘๊ณ  downstream ์ž‘์—…์ด๋‚˜ ๋ณต์žกํ•œ ๋กœ์ง์ด ์—†์–ด ์ˆ˜ํ–‰์‹œ๊ฐ„์€ ๋™์ผํ–ˆ๋Š”๋ฐ, transformation์ด ๋งŽ๊ฑฐ๋‚˜ ๋” ํฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃฐ ๋•Œ์—๋Š” ์บ์‹œ๋ฅผ ์ ์ ˆํ•˜๊ฒŒ ์‚ฌ์šฉํ•˜๋ฉด ์„ฑ๋Šฅํ–ฅ์ƒ์„ ๋ณผ ์ˆ˜ ์žˆ์„ ๊ฒƒ ๊ฐ™๋‹ค.

'๋ฐ์ดํ„ฐ > Spark' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

[Spark] SQL Hint  (0) 2024.04.01
[Spark] ์ŠคํŒŒํฌ ์Šค์ผ€์ฅด๋ง  (0) 2024.04.01
[Spark] Speculative Execution  (0) 2024.04.01
[Spark] Logical Plan ๊ณผ Physical Plan  (0) 2024.03.25
[Spark] RDD vs Dataframe  (2) 2024.03.24