๐Ÿฅ

[Spark] Spark Structured Streaming - Fault Tolerance ๋ณธ๋ฌธ

๋ฐ์ดํ„ฐ/Spark

[Spark] Spark Structured Streaming - Fault Tolerance

•8• 2024. 4. 4. 00:25

 

Background

์‹ค์‹œ๊ฐ„ stream ์ฒ˜๋ฆฌ๋Š” ์ง€์†์ ์ธ input stream์˜ ํŠน์„ฑ ์ƒ ์ค‘๋‹จ๋˜์ง€ ์•Š๊ณ  24์‹œ๊ฐ„ ์‹คํ–‰๋˜๋ฏ€๋กœ ์—ฌ๋Ÿฌ ์˜ค๋ฅ˜ ์›์ธ์œผ๋กœ failure์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋‹ค.

  • input stream์ด ์ž‘์„ฑ๋œ ์ฝ”๋“œ๋กœ ์ฒ˜๋ฆฌ๋  ์ˆ˜ ์—†๋Š” ๊ฒฝ์šฐ (์ž˜๋ชป๋œ ํ˜•์‹์˜ ๋ฐ์ดํ„ฐ)
  • ์‹œ์Šคํ…œ/cluster ์˜ค๋ฅ˜์˜ ๊ฒฝ์šฐ

Spark Streaming์—์„œ์˜ Fault Tolerance ์ •์˜

์ŠคํŒŒํฌ์˜ ๋ชฉํ‘œ๋Š” end-to-end Exactly Once ๋ณด์žฅ์ด๋‹ค.

 

(์ฐธ๊ณ )

  1. At Most Once: ๊ฐ ๋ ˆ์ฝ”๋“œ๋Š” ํ•œ ๋ฒˆ๋งŒ ์ฒ˜๋ฆฌ๋˜๊ฑฐ๋‚˜ ์•„์˜ˆ ์ฒ˜๋ฆฌ๋˜์ง€ ์•Š๋Š”๋‹ค.
  2. At Least Once: ๊ฐ ๋ ˆ์ฝ”๋“œ๊ฐ€ ํ•œ ๋ฒˆ ์ด์ƒ ์ฒ˜๋ฆฌ๋œ๋‹ค. ๋ฐ์ดํ„ฐ๊ฐ€ ์†์‹ค๋˜์ง€ ์•Š๋„๋ก ๋ณด์žฅํ•˜๋ฏ€๋กœ At Most Once๋ณด๋‹ค ๊ฐ•๋ ฅํ•˜์ง€๋งŒ ์ค‘๋ณต์ด ์žˆ์„ ์ˆ˜ ์žˆ๋‹ค.
  3. Exactly Once: ๊ฐ ๋ ˆ์ฝ”๋“œ๋Š” ์ •ํ™•ํžˆ ํ•œ ๋ฒˆ ์ฒ˜๋ฆฌ๋œ๋‹ค. ๋ฐ์ดํ„ฐ๊ฐ€ ์†์‹ค๋˜์ง€ ์•Š์œผ๋ฉฐ ๋ฐ์ดํ„ฐ๊ฐ€ ์—ฌ๋Ÿฌ ๋ฒˆ ์ฒ˜๋ฆฌ๋˜์ง€ ์•Š๋Š”๋‹ค. ๊ฐ€์žฅ ๊ฐ•๋ ฅํ•œ ๋ณด์žฅ์ด๋‹ค.

 

์•„๋ž˜์™€ ๊ฐ™์€ ์กฐ๊ฑด์—์„œ๋Š” spark structured streamng์€ ์–ด๋–ค ์กฐ๊ฑด์—์„œ๋„ End-to-End Exactly Once๋ฅผ ๋ณด์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

replayable sources (์žฌ์ƒ ๊ฐ€๋Šฅํ•œ ์†Œ์Šค)

๋ชจ๋“  ์ŠคํŠธ๋ฆฌ๋ฐ ์†Œ์Šค์—๋Š” ์ŠคํŠธ๋ฆผ์˜ ์ฝ๊ธฐ ์œ„์น˜๋ฅผ ์ถ”์ ํ•˜๊ธฐ ์œ„ํ•œ Offset์ด ์žˆ๋Š” ๊ฒƒ์œผ๋กœ ๊ฐ€์ •๋œ๋‹ค.

streaming ์—”์ง„์€ Checkpoint์™€ WAL(write-ahead logs)๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ฐ ํŠธ๋ฆฌ๊ทธ์—์„œ ์ฒ˜๋ฆฌ๋˜๋Š” ๋ฐ์ดํ„ฐ์˜ offset ๋ฒ”์œ„๋ฅผ ๊ธฐ๋กํ•œ๋‹ค.

 

idempotent sinks (๋ฉฑ๋“ฑ์„ฑ ์‹ฑํฌ)

streaming sink๋Š” reprocessing ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ๋ฉฑ๋“ฑ์„ฑ์„ ๊ฐ–๋„๋ก ์„ค๊ณ„๋˜์—ˆ๋‹ค.

 

๋ฌธ์„œ์— ์ž˜ ์ •๋ฆฌ๋˜์–ด ์žˆ๋‹ค..</p

 

Checkpoint

์•„๋ž˜์™€ ๊ฐ™์ด checkpoint ์œ„์น˜๋ฅผ ์ง€์ •ํ•ด ์ค„ ์ˆ˜ ์žˆ๋‹ค.

aggDF \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "path/to/some/dir") \
    .format("memory") \
    .start()

 

checkpoint๋Š” ์•„๋ž˜์˜ ๋‘ ์ข…๋ฅ˜์˜ ์˜ค๋ฅ˜ ์‹œ๋‚˜๋ฆฌ์˜ค๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š”๋ฐ์— ์‚ฌ์šฉ๋œ๋‹ค.

  • Driver ํ”„๋กœ์„ธ์Šค ์˜ค๋ฅ˜
  • stateful transformations failures: ์ด์ „ ๋ฐ์ดํ„ฐ ๋ฐฐ์น˜์— ์˜์กดํ•˜๋Š” ๊ฐ micro-batch ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋ฅผ ํฌํ•จํ•˜๋Š” ๋ณ€ํ™˜. ์ƒํƒœ๊ฐ€ ์ €์žฅ๋˜์ง€ ์•Š์œผ๋ฉด ์ด์ „ ์ข…์† ์ƒํƒœ๊ฐ€ ์ „์ฒด์ ์œผ๋กœ ๋‹ค์‹œ ๊ณ„์‚ฐ๋  ์ˆ˜ ์žˆ๋‹ค.

์ฒดํฌํฌ์ธํŠธ๋Š” 4๊ฐ€์ง€ ์œ ํ˜•์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•œ๋‹ค.

  • sources
  • offsets
  • commits
  • metadata

metadata๋ฅผ ์ œ์™ธํ•œ ํŒŒ์ผ๋“ค์€ microbatch๋ณ„๋กœ ๊ฐ๊ฐ ์ €์žฅ๋œ๋‹ค.

๋‘๋ฒˆ ํŠธ๋ฆฌ๊ฑฐ ๋œ ์ŠคํŠธ๋ฆฌ๋ฐ์˜ ๊ฒฝ์šฐ commits ์ƒํƒœ

 

sources

์ŠคํŠธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ์— ์‚ฌ์šฉ๋˜๋Š” ๋‹ค์–‘ํ•œ ์†Œ์Šค์— ๋Œ€ํ•œ ์ •๋ณด๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ๋‹ค.

 

v1
{"path":"file:///home/jovyan/work/streaming_sample/impression_click.json","timestamp":1712156292816,"batchId":0}
{"path":"file:///home/jovyan/work/streaming_sample/impression_click_leftouter.json","timestamp":1712156292879,"batchId":0}
{"path":"file:///home/jovyan/work/streaming_sample/login_event_sample.json","timestamp":1712156292936,"batchId":0}
{"path":"file:///home/jovyan/work/streaming_sample/price_sample.json","timestamp":1712156292988,"batchId":0}
{"path":"file:///home/jovyan/work/streaming_sample/sample.json","timestamp":1712156293055,"batchId":0}
{"path":"file:///home/jovyan/work/streaming_sample/time_window_sample.json","timestamp":1712156293111,"batchId":0}

offsets

์ฃผ์–ด์ง„ micro-batch ์‹คํ–‰์—์„œ ์ฒ˜๋ฆฌ๋  ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•œ ์ •๋ณด(offset)๋ฅผ ์ €์žฅํ•œ๋‹ค. ์ด๋Š” ๋ฐฐ์น˜๊ฐ€ ๋ฌผ๋ฆฌ์ ์œผ๋กœ ์‹คํ–‰๋˜๊ธฐ ์ „์— ์ƒ์„ฑ๋œ๋‹ค.

์˜ˆ์‹œ:

v1
{"batchWatermarkMs":0,"batchTimestampMs":1713249204941,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"logOffset":0}

commits

๋ฐฐ์น˜๋ณ„๋กœ ์™„์ „ํžˆ ์ฒ˜๋ฆฌ๋œ ์˜คํ”„์…‹์ด per batch๋กœ ๋ถ„๋ฆฌ๋˜์–ด ์žˆ๋‹ค. micro-batch์— ์‚ฌ์šฉ๋˜๋Š” watermark ์ •๋ณด๊ฐ€ ํฌํ•จ๋œ ์ผ์ข…์˜ marker file์ด๋‹ค. 

offsets๋Š” ๋ฐฐ์น˜์˜ ๋ฌผ๋ฆฌ์  ์‹คํ–‰ ์ „, commits๋Š” ๋ฐฐ์น˜์˜ ์„ฑ๊ณต์ ์ธ ์ฒ˜๋ฆฌ ํ›„์— write ๋˜๋ฏ€๋กœ spark๊ฐ€ failure ๋ฐœ์ƒ ํ•˜๊ณ  ์ž‘์—…์ด ๋™์ผํ•œ ์ฒดํฌํฌ์ธํŠธ ์œ„์น˜๋กœ ๋‹ค์‹œ ์ œ์ถœ๋  ๋•Œ ๋ฐฐ์น˜๋ฅผ ์–ด๋””์—์„œ๋ถ€ํ„ฐ ์‹œ์ž‘ํ•ด์•ผํ•˜๋Š”์ง€๋ฅผ ์•Œ ์ˆ˜ ์žˆ๊ฒŒ ๋œ๋‹ค.

์˜ˆ์‹œ:

v1
{"nextBatchWatermarkMs":0}

metadata

์ŠคํŠธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ์˜ ID ๊ฐ’์ด ์ €์žฅ๋˜์–ด ์žˆ๋‹ค. ์ด ๊ฐ’์€ ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜ ์‹คํ–‰ ์ค‘ ๋ฐ”๋€Œ์ง€ ์•Š๋Š”๋‹ค.

์˜ˆ์‹œ:

{"id":"a4f94e97-8c2e-48a1-a2cc-e7aa4c230364"}

 

https://www.waitingforcode.com/apache-spark-structured-streaming/checkpoint-storage-structured-streaming/read

1. ์ฒ˜๋ฆฌํ•  ๋ฐ์ดํ„ฐ๋Š” ์ฟผ๋ฆฌ ์‹คํ–‰ ์ „์— offset log์— ๊ธฐ๋ก๋œ๋‹ค.

2. ๊ณผ๊ฑฐ ๋ฐฐ์น˜์— ์˜์กด ์ฟผ๋ฆฌ์˜ ๊ฒฝ์šฐ state store๋ฅผ ๋ถˆ๋Ÿฌ์˜จ๋‹ค.

3. ์ฟผ๋ฆฌ๋ฅผ ์‹คํ–‰ํ•œ๋‹ค.

4. ์ฟผ๋ฆฌ ์ข…๋ฃŒ ํ›„ state์— ๊ฒฐ๊ณผ๋ฅผ ์ปค๋ฐ‹ํ•œ๋‹ค.

5. watermark ์ •๋ณด๋ฅผ ๊ธฐ๋กํ•œ๋‹ค.

 

Write-ahead Logs

(๊ณผ๊ฑฐ์—) Driver ํ”„๋กœ์„ธ์Šค ์˜ค๋ฅ˜ ๋ณต๊ตฌ ์‹œ์— ์‚ฌ์šฉ๋˜์—ˆ๋‹ค.

structured streaming ์€ WAL ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณต์‚ฌํ•˜๊ณ  ์บ์‹œํ•ด์„œ ์‚ฌ์šฉํ–ˆ๋‹ค. (state)

๊ทธ๋Ÿฌ๋‚˜ ์•„๋ž˜์™€ ๊ฐ™์€ ๋ณ€๊ฒฝ์œผ๋กœ ํ˜„์žฌ๋Š” checkpoint์— ํŽธ์ž…๋œ ๋“ฏ ํ•˜๋‹ค.

  • Retrieve Less๋กœ ์ธํ•œ ๋ณ€๊ฒฝ์‚ฌํ•ญ
    retrive less๊ฐ€ ๋„์ž…๋˜๋ฉฐ structured streaming ๋™์ž‘ ๋ฐฉ์‹์ด ๊ฐœ์„ ๋˜์—ˆ๋‹ค.
    ์ด์— ๋”ฐ๋ผ ์ „์ฒด ๋ฐ์ดํ„ฐ๋ฅผ WAL์— ๋ณต์‚ฌํ•˜์ง€ ์•Š๊ณ  ์˜คํ”„์…‹๋งŒ ์ €์žฅํ•œ๋‹ค.

 

 

์•„๋ž˜ ์ฃผ์„์—์„œ๋„ offsets ๊ด€๋ จ ๋ณ€์ˆ˜์—์„œ write-ahead-log๋ฅผ ์–ธ๊ธ‰ํ•˜๊ณ  ์žˆ๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L232C7-L232C17

/**
   * A write-ahead-log that records the offsets that are present in each batch. In order to ensure
   * that a given batch will always consist of the same data, we write to this log *before* any
   * processing is done.  Thus, the Nth record in this log indicated data that is currently being
   * processed and the N-1th entry indicates which offsets have been durably committed to the sink.
   */
  val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))

  /**
   * A log that records the batch ids that have completed. This is used to check if a batch was
   * fully processed, and its output was committed to the sink, hence no need to process it again.
   * This is used (for instance) during restart, to help identify which batch to run next.
   */
  val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))

 

์ฐธ๊ณ 

https://spark.apache.org/docs/2.4.8/streaming-programming-guide.html#fault-tolerance-semantics

https://www.waitingforcode.com/apache-spark-structured-streaming/checkpoint-storage-structured-streaming/read

https://community.databricks.com/t5/data-engineering/wal-for-structured-streaming/td-p/63727