๐ฅ
[Spark] Spark Structured Streaming - Fault Tolerance ๋ณธ๋ฌธ
Background
์ค์๊ฐ stream ์ฒ๋ฆฌ๋ ์ง์์ ์ธ input stream์ ํน์ฑ ์ ์ค๋จ๋์ง ์๊ณ 24์๊ฐ ์คํ๋๋ฏ๋ก ์ฌ๋ฌ ์ค๋ฅ ์์ธ์ผ๋ก failure์ด ๋ฐ์ํ ์ ์๋ค.
- input stream์ด ์์ฑ๋ ์ฝ๋๋ก ์ฒ๋ฆฌ๋ ์ ์๋ ๊ฒฝ์ฐ (์๋ชป๋ ํ์์ ๋ฐ์ดํฐ)
- ์์คํ /cluster ์ค๋ฅ์ ๊ฒฝ์ฐ
Spark Streaming์์์ Fault Tolerance ์ ์
์คํํฌ์ ๋ชฉํ๋ end-to-end Exactly Once ๋ณด์ฅ์ด๋ค.
(์ฐธ๊ณ )
- At Most Once: ๊ฐ ๋ ์ฝ๋๋ ํ ๋ฒ๋ง ์ฒ๋ฆฌ๋๊ฑฐ๋ ์์ ์ฒ๋ฆฌ๋์ง ์๋๋ค.
- At Least Once: ๊ฐ ๋ ์ฝ๋๊ฐ ํ ๋ฒ ์ด์ ์ฒ๋ฆฌ๋๋ค. ๋ฐ์ดํฐ๊ฐ ์์ค๋์ง ์๋๋ก ๋ณด์ฅํ๋ฏ๋ก At Most Once๋ณด๋ค ๊ฐ๋ ฅํ์ง๋ง ์ค๋ณต์ด ์์ ์ ์๋ค.
- Exactly Once: ๊ฐ ๋ ์ฝ๋๋ ์ ํํ ํ ๋ฒ ์ฒ๋ฆฌ๋๋ค. ๋ฐ์ดํฐ๊ฐ ์์ค๋์ง ์์ผ๋ฉฐ ๋ฐ์ดํฐ๊ฐ ์ฌ๋ฌ ๋ฒ ์ฒ๋ฆฌ๋์ง ์๋๋ค. ๊ฐ์ฅ ๊ฐ๋ ฅํ ๋ณด์ฅ์ด๋ค.
์๋์ ๊ฐ์ ์กฐ๊ฑด์์๋ spark structured streamng์ ์ด๋ค ์กฐ๊ฑด์์๋ End-to-End Exactly Once๋ฅผ ๋ณด์ ํ ์ ์๋ค.
replayable sources (์ฌ์ ๊ฐ๋ฅํ ์์ค)
๋ชจ๋ ์คํธ๋ฆฌ๋ฐ ์์ค์๋ ์คํธ๋ฆผ์ ์ฝ๊ธฐ ์์น๋ฅผ ์ถ์ ํ๊ธฐ ์ํ Offset์ด ์๋ ๊ฒ์ผ๋ก ๊ฐ์ ๋๋ค.
streaming ์์ง์ Checkpoint์ WAL(write-ahead logs)๋ฅผ ์ฌ์ฉํ์ฌ ๊ฐ ํธ๋ฆฌ๊ทธ์์ ์ฒ๋ฆฌ๋๋ ๋ฐ์ดํฐ์ offset ๋ฒ์๋ฅผ ๊ธฐ๋กํ๋ค.
idempotent sinks (๋ฉฑ๋ฑ์ฑ ์ฑํฌ)
streaming sink๋ reprocessing ์ฒ๋ฆฌํ๊ธฐ ์ํด ๋ฉฑ๋ฑ์ฑ์ ๊ฐ๋๋ก ์ค๊ณ๋์๋ค.
๋ฌธ์์ ์ ์ ๋ฆฌ๋์ด ์๋ค..</p
Checkpoint
์๋์ ๊ฐ์ด checkpoint ์์น๋ฅผ ์ง์ ํด ์ค ์ ์๋ค.
aggDF \
.writeStream \
.outputMode("complete") \
.option("checkpointLocation", "path/to/some/dir") \
.format("memory") \
.start()
checkpoint๋ ์๋์ ๋ ์ข ๋ฅ์ ์ค๋ฅ ์๋๋ฆฌ์ค๋ฅผ ์ฒ๋ฆฌํ๋๋ฐ์ ์ฌ์ฉ๋๋ค.
- Driver ํ๋ก์ธ์ค ์ค๋ฅ
- stateful transformations failures: ์ด์ ๋ฐ์ดํฐ ๋ฐฐ์น์ ์์กดํ๋ ๊ฐ micro-batch ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ํฌํจํ๋ ๋ณํ. ์ํ๊ฐ ์ ์ฅ๋์ง ์์ผ๋ฉด ์ด์ ์ข ์ ์ํ๊ฐ ์ ์ฒด์ ์ผ๋ก ๋ค์ ๊ณ์ฐ๋ ์ ์๋ค.
์ฒดํฌํฌ์ธํธ๋ 4๊ฐ์ง ์ ํ์ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ค.
- sources
- offsets
- commits
- metadata
metadata๋ฅผ ์ ์ธํ ํ์ผ๋ค์ microbatch๋ณ๋ก ๊ฐ๊ฐ ์ ์ฅ๋๋ค.
sources
์คํธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ์ ์ฌ์ฉ๋๋ ๋ค์ํ ์์ค์ ๋ํ ์ ๋ณด๊ฐ ํฌํจ๋์ด ์๋ค.
v1
{"path":"file:///home/jovyan/work/streaming_sample/impression_click.json","timestamp":1712156292816,"batchId":0}
{"path":"file:///home/jovyan/work/streaming_sample/impression_click_leftouter.json","timestamp":1712156292879,"batchId":0}
{"path":"file:///home/jovyan/work/streaming_sample/login_event_sample.json","timestamp":1712156292936,"batchId":0}
{"path":"file:///home/jovyan/work/streaming_sample/price_sample.json","timestamp":1712156292988,"batchId":0}
{"path":"file:///home/jovyan/work/streaming_sample/sample.json","timestamp":1712156293055,"batchId":0}
{"path":"file:///home/jovyan/work/streaming_sample/time_window_sample.json","timestamp":1712156293111,"batchId":0}
offsets
์ฃผ์ด์ง micro-batch ์คํ์์ ์ฒ๋ฆฌ๋ ๋ฐ์ดํฐ์ ๋ํ ์ ๋ณด(offset)๋ฅผ ์ ์ฅํ๋ค. ์ด๋ ๋ฐฐ์น๊ฐ ๋ฌผ๋ฆฌ์ ์ผ๋ก ์คํ๋๊ธฐ ์ ์ ์์ฑ๋๋ค.
์์:
v1
{"batchWatermarkMs":0,"batchTimestampMs":1713249204941,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"logOffset":0}
commits
๋ฐฐ์น๋ณ๋ก ์์ ํ ์ฒ๋ฆฌ๋ ์คํ์ ์ด per batch๋ก ๋ถ๋ฆฌ๋์ด ์๋ค. micro-batch์ ์ฌ์ฉ๋๋ watermark ์ ๋ณด๊ฐ ํฌํจ๋ ์ผ์ข ์ marker file์ด๋ค.
offsets๋ ๋ฐฐ์น์ ๋ฌผ๋ฆฌ์ ์คํ ์ , commits๋ ๋ฐฐ์น์ ์ฑ๊ณต์ ์ธ ์ฒ๋ฆฌ ํ์ write ๋๋ฏ๋ก spark๊ฐ failure ๋ฐ์ ํ๊ณ ์์ ์ด ๋์ผํ ์ฒดํฌํฌ์ธํธ ์์น๋ก ๋ค์ ์ ์ถ๋ ๋ ๋ฐฐ์น๋ฅผ ์ด๋์์๋ถํฐ ์์ํด์ผํ๋์ง๋ฅผ ์ ์ ์๊ฒ ๋๋ค.
์์:
v1
{"nextBatchWatermarkMs":0}
metadata
์คํธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ์ ID ๊ฐ์ด ์ ์ฅ๋์ด ์๋ค. ์ด ๊ฐ์ ์ดํ๋ฆฌ์ผ์ด์ ์คํ ์ค ๋ฐ๋์ง ์๋๋ค.
์์:
{"id":"a4f94e97-8c2e-48a1-a2cc-e7aa4c230364"}
1. ์ฒ๋ฆฌํ ๋ฐ์ดํฐ๋ ์ฟผ๋ฆฌ ์คํ ์ ์ offset log์ ๊ธฐ๋ก๋๋ค.
2. ๊ณผ๊ฑฐ ๋ฐฐ์น์ ์์กด ์ฟผ๋ฆฌ์ ๊ฒฝ์ฐ state store๋ฅผ ๋ถ๋ฌ์จ๋ค.
3. ์ฟผ๋ฆฌ๋ฅผ ์คํํ๋ค.
4. ์ฟผ๋ฆฌ ์ข ๋ฃ ํ state์ ๊ฒฐ๊ณผ๋ฅผ ์ปค๋ฐํ๋ค.
5. watermark ์ ๋ณด๋ฅผ ๊ธฐ๋กํ๋ค.
Write-ahead Logs
(๊ณผ๊ฑฐ์) Driver ํ๋ก์ธ์ค ์ค๋ฅ ๋ณต๊ตฌ ์์ ์ฌ์ฉ๋์๋ค.
structured streaming ์ WAL ์์ ๋ฐ์ดํฐ๋ฅผ ๋ณต์ฌํ๊ณ ์บ์ํด์ ์ฌ์ฉํ๋ค. (state)
๊ทธ๋ฌ๋ ์๋์ ๊ฐ์ ๋ณ๊ฒฝ์ผ๋ก ํ์ฌ๋ checkpoint์ ํธ์ ๋ ๋ฏ ํ๋ค.
- Retrieve Less๋ก ์ธํ ๋ณ๊ฒฝ์ฌํญ
retrive less๊ฐ ๋์ ๋๋ฉฐ structured streaming ๋์ ๋ฐฉ์์ด ๊ฐ์ ๋์๋ค.
์ด์ ๋ฐ๋ผ ์ ์ฒด ๋ฐ์ดํฐ๋ฅผ WAL์ ๋ณต์ฌํ์ง ์๊ณ ์คํ์ ๋ง ์ ์ฅํ๋ค.
์๋ ์ฃผ์์์๋ offsets ๊ด๋ จ ๋ณ์์์ write-ahead-log๋ฅผ ์ธ๊ธํ๊ณ ์๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
/**
* A write-ahead-log that records the offsets that are present in each batch. In order to ensure
* that a given batch will always consist of the same data, we write to this log *before* any
* processing is done. Thus, the Nth record in this log indicated data that is currently being
* processed and the N-1th entry indicates which offsets have been durably committed to the sink.
*/
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
/**
* A log that records the batch ids that have completed. This is used to check if a batch was
* fully processed, and its output was committed to the sink, hence no need to process it again.
* This is used (for instance) during restart, to help identify which batch to run next.
*/
val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))
์ฐธ๊ณ
https://spark.apache.org/docs/2.4.8/streaming-programming-guide.html#fault-tolerance-semantics
https://community.databricks.com/t5/data-engineering/wal-for-structured-streaming/td-p/63727
'๋ฐ์ดํฐ > Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] ์คํํฌ ์คํธ๋ฆฌ๋ฐ์ ์ด๋ฒคํธ ์๊ฐ ์ฒ๋ฆฌ์ Watermark (0) | 2024.04.21 |
---|---|
[Spark] Structured Streaming - stateful transformation๊ณผ Window operation (0) | 2024.04.16 |
[Spark] Spark Structured Streaming ๊ฐ์ (0) | 2024.04.04 |
[Spark] Accumulator์ Broadcast (๊ณต์ ๋ณ์) (0) | 2024.04.01 |
[Spark] SQL Hint (0) | 2024.04.01 |