๐Ÿฅ

[Spark] Spark Structured Streaming ๊ฐœ์š” ๋ณธ๋ฌธ

๋ฐ์ดํ„ฐ/Spark

[Spark] Spark Structured Streaming ๊ฐœ์š”

•8• 2024. 4. 4. 00:17

Spark Streaming ์ด๋ž€

https://www.databricks.com/kr/glossary/what-is-spark-streaming

core spark API์˜ ํ™•์žฅ ํ”„๋กœ๊ทธ๋žจ์œผ๋กœ ๋ถ„์‚ฐ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ ํ”„๋กœ์„ธ์‹ฑ์„ ์ง€์›ํ•œ๋‹ค.

streaming ํƒ€์ž…์œผ๋กœ๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด ๋‘ ์ข…๋ฅ˜๊ฐ€ ์žˆ๋Š”๋ฐ spark streaming์€ RDD ๋ฒ ์ด์Šค ์—”์ง„์œผ๋กœ, 2.x๋ฒ„์ „๊นŒ์ง€ ์ง€์›ํ•˜๊ณ  ์ดํ›„ ๋”์ด์ƒ ์—…๋ฐ์ดํŠธ ๋˜์ง€ ์•Š๋Š” ๋ ˆ๊ฑฐ์‹œ ํ”„๋กœ์ ํŠธ์ด๋‹ค.

 

Spark Streaming ์ข…๋ฅ˜

  • Spark Streaming: RDD ๊ธฐ๋ฐ˜์˜ micro-batch ์ˆ˜ํ–‰
  • Spark Structured Streaming: Dataframe ๊ธฐ๋ฐ˜ micro-batch ์ˆ˜ํ–‰, ์ €์ง€์—ฐ ์ฒ˜๋ฆฌ ๋ชจ๋“œ๋ฅผ ๋„์ž…ํ•จ์œผ๋กœ์จ ์‹ค์‹œ๊ฐ„์— ๊ฐ€๊นŒ์šด ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅํ•ด์ง.

 

Spark Structured Streaming 

ํ”„๋กœ๊ทธ๋ž˜๋ฐ ๋ชจ๋ธ

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

  1. structured streaming์—์„œ๋Š” ํŠธ๋ฆฌ๊ฑฐ ๊ฐ„๊ฒฉ๋งˆ๋‹ค ์ˆ˜์‹ ๋˜๋Š” ๋ชจ๋“  input data stream์€  `input table` ์— ์—…๋ฐ์ดํŠธ ๋œ๋‹ค.
  2. ์ดํ›„ output sync์— ๊ธฐ๋ก๋  ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”์„ ๊ณ„์‚ฐํ•˜๊ธฐ ์œ„ํ•ด ๋งˆ์น˜ static table ์ธ ๊ฒƒ์ฒ˜๋Ÿผ input table์— ๋Œ€ํ•œ ์ฟผ๋ฆฌ๋ฅผ ์ •์˜ํ•œ๋‹ค.
  3. spark์—์„œ๋Š” ์ด ์ผ๊ด„ ์ฒ˜๋ฆฌ๋ฅผ ์ŠคํŠธ๋ฆฌ๋ฐ ์‹คํ–‰ ๊ณ„ํš์œผ๋กœ ์ž๋™๋ณ€ํ™˜ํ•œ๋‹ค(= `incrementalization`).
    ๋ ˆ์ฝ”๋“œ๊ฐ€ ๋„์ฐฉํ•  ๋•Œ๋งˆ๋‹ค ๊ฒฐ๊ณผ๋ฅผ ์—…๋ฐ์ดํŠธ ํ•˜๊ธฐ์œ„ํ•ด ์–ด๋–ค ์ƒํƒœ๋ฅผ ์œ ์ง€ํ•ด์•ผ ํ•˜๋Š”์ง€ ํŒŒ์•…ํ•œ๋‹ค.
  4. ๋งˆ์ง€๋ง‰์œผ๋กœ ์ง€์ •๋œ ํŠธ๋ฆฌ๊ฑฐ์— ๋”ฐ๋ผ ์‹คํ–‰๋  ๋•Œ๋งˆ๋‹ค spark์—์„œ๋Š” ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋ฅผ ํ™•์ธํ•˜๊ณ  ๊ฒฐ๊ณผ๋ฅผ ์—…๋ฐ์ดํŠธ ํ•œ๋‹ค.

์ด๋•Œ structured streaming์€ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ๊ฐ€ ์•„๋‹Œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์— ๊ณ„์† ์ถ”๊ฐ€ํ•˜์—ฌ ์ฒ˜๋ฆฌํ•จ์œผ๋กœ์จ (์‹ค์‹œ๊ฐ„์— ๊ฐ€๊นŒ์šด) ์ €์ง€์—ฐ ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅํ•ด์ง„๋‹ค.

https://www.databricks.com/kr/glossary/what-is-structured-streaming

 

3๊ฐ€์ง€ Output Mode

complete mode

๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”์˜ ์ „์ฒด ์ƒํƒœ๋ฅผ ์ถœ๋ ฅํ•œ๋‹ค.

world count ์˜ˆ์ œ๋กœ ์‚ดํŽด๋ณด๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด input stream์ด ์ˆ˜์‹ ๋˜์—ˆ์„ ๋•Œ,

hello		# first input stream
hello world # second input stream
hi world	# third input stream

 

๋งค ๋ฐฐ์น˜๋งˆ๋‹ค ๊ณผ๊ฑฐ์— ์žˆ์—ˆ๋˜ ๋ชจ๋“  ๊ฒฐ๊ณผ๋„ ํ•จ๊ป˜ output์œผ๋กœ ์ถœ๋ ฅ๋˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

complete output mode ์˜ˆ์‹œ

๋ชจ๋“  ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”์„ ์œ ์ง€ํ•ด์•ผ ํ•˜๋ฏ€๋กœ ๋” ๋งŽ์€ ๋ฆฌ์†Œ์Šค๊ฐ€ ํ•„์š”ํ•˜๋ฉฐ,

๋ชจ๋“  ๋ฐ์ดํ„ฐ๊ฐ€ ๊ณ„์†ํ•ด์„œ ๋ณ€๊ฒฝ๋  ์ˆ˜ ์žˆ๋Š” ์ƒํƒœ ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃฐ ๋•Œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

 

update mode

์ด์ „ ์ถœ๋ ฅ ๊ฒฐ๊ณผ์—์„œ ๋ณ€๊ฒฝ๋œ ๋ ˆ์ฝ”๋“œ๋งŒ ์ถœ๋ ฅํ•œ๋‹ค.

์ง‘๊ณ„ ์—ฐ์‚ฐ์„ ํ•˜์ง€ ์•Š๋Š”๋‹ค๋ฉด append ๋ชจ๋“œ์™€ ๋™์ผํ•˜๋‹ค.

hello		# first input stream
hello world # second input stream
hi world	# third input stream

 

update mode ์˜ˆ์ œ

๊ฐ ํŠธ๋ฆฌ๊ฑฐ๋งˆ๋‹ค ์—…๋ฐ์ดํŠธ๋œ ๊ฒฐ๊ณผ๋งŒ ๋ณด์—ฌ์ฃผ๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

append mode

์ƒˆ๋กœ์šด ๋ ˆ์ฝ”๋“œ๊ฐ€ ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”์— ์ถ”๊ฐ€๋˜๋ฉด ์‚ฌ์šฉ์ž๊ฐ€ ๋ช…์‹œํ•œ ํŠธ๋ฆฌ๊ฑฐ์— ๋งž์ถฐ ์ถœ๋ ฅํ•œ๋‹ค.

์ง‘๊ณ„ ์—ฐ์‚ฐ์„ ํ•˜์ง€ ์•Š๋Š”๋‹ค๋ฉด update mode์™€ ๋™์ผํ•˜์ง€๋งŒ,

์ง‘๊ณ„ ์—ฐ์‚ฐ์ด ํฌํ•จ๋˜์–ด ์žˆ๋‹ค๋ฉด ๋ฐ˜๋“œ์‹œ watermark๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค. (๊ด€๋ จ ์ •๋ฆฌ: ๊ฒŒ์‹œ๊ธ€ ์ฐธ๊ณ )

Input Sources

input source๋กœ๋Š” ์•„๋ž˜์™€ ๊ฐ™์€ ๋‚ด์žฅ ์†Œ์Šค๊ฐ€ ์žˆ๋‹ค.

๊ฐ ์†Œ์Šค ๋ณ„ ์„ธ๋ถ€ ์˜ต์…˜์€ ๋ฌธ์„œ ์ฐธ๊ณ 

File Source

๋””๋ ‰ํ† ๋ฆฌ์— ๊ธฐ๋ก๋œ ํŒŒ์ผ์„ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์œผ๋กœ ์ฝ๋Š”๋‹ค. ํŒŒ์ผ ์ˆ˜์ • ์‹œ๊ฐ„ ๊ธฐ๋ฐ˜์œผ๋กœ ํŒŒ์ผ์ด ์ฒ˜๋ฆฌ๋œ๋‹ค.

์ง€์›๋˜๋Š” ํŒŒ์ผ ํ˜•์‹์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค: txt, csv, json, orc, parquet

Kafka Source

์นดํ”„์นด๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๋Š”๋‹ค. kafka broker version 0.10.0 ์ด์ƒ๋ถ€ํ„ฐ ํ˜ธํ™˜๋œ๋‹ค.

Socket Source (ํ…Œ์ŠคํŠธ์šฉ)

์†Œ์ผ“ ์—ฐ๊ฒฐ์—์„œ UTF8 ํ…์ŠคํŠธ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๋Š”๋‹ค. end point ๊ฐ„ fault tolerance๋ฅผ ๋ณด์žฅํ•˜์ง€ ์•Š๋Š”๋‹ค.

Rate Source (ํ…Œ์ŠคํŠธ ์šฉ)

์ดˆ๋‹น ์ง€์ •๋œ ํ–‰ ์ˆ˜๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค. ๊ฐ output row์—๋Š” value์™€ timestamp(๋ฉ”์‹œ์ง€ ๋ฐœ์†ก ์‹œ๊ฐ„)๊ฐ€ ํฌํ•จ๋œ๋‹ค. 

Rate Per Micro-Batch source (ํ…Œ์ŠคํŠธ ์šฉ)

๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜ ๋‹น ์ง€์ •๋œ ํ–‰ ์ˆ˜๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค. 

 

 

5๊ฐ€์ง€ Trigger Type

unspecified (๊ธฐ๋ณธ ๋™์ž‘)

stream write์„ ์œ„ํ•œ ํŠธ๋ฆฌ๊ฑฐ ์˜ต์…˜์„ ์„ค์ •ํ•˜์ง€ ์•Š์œผ๋ฉด spark๋Š” ํ˜„์žฌ์˜ micro batch๊ฐ€ ์™„๋ฃŒ๋˜๋Š” ์ฆ‰์‹œ ๋‹ค์Œ ๋ ˆ์ฝ”๋“œ set์„ ์ฒ˜๋ฆฌํ•˜๋ ค๊ณ  ์‹œ๋„ํ•œ๋‹ค. micro-batch์—์„œ๋Š” ์ˆ˜์‹  ๋ ˆ์ฝ”๋“œ๊ฐ€ ์ž‘์€ window๋กœ ๊ทธ๋ฃนํ™”๋˜์–ด ์ฃผ๊ธฐ์ ์œผ๋กœ ์ฒ˜๋ฆฌ๋œ๋‹ค.

https://medium.com/@sdjemails/spark-trigger-options-cd90e3cf6166

# without any trigger
query = wordCounts \
        .format("console")\
        .option("checkpointLocation", "some_path")\
        .start()

One-time micro-batch (deprecated)

ํ•œ ๋ฒˆ๋งŒ ์ฒ˜๋ฆฌํ•œ ํ›„ ์ŠคํŠธ๋ฆผ์„ ์ข…๋ฃŒํ•œ๋‹ค. ์ŠคํŠธ๋ฆผ์ด ํ•œ ๋ฒˆ ์ƒ์„ฑ๋˜๋ฉด ๋ณด๋ฅ˜์ค‘์ธ ๋ ˆ์ฝ”๋“œ๊ฐ€ ๋ชจ๋‘ ์ฒ˜๋ฆฌ๋œ ํ›„ ์ŠคํŠธ๋ฆผ์ด ์ข…๋ฃŒ๋œ๋‹ค.

# trigger once
query = wordCounts \
        .format("console")\
        .trigger(once=True)
        .option("checkpointLocation", "some_path")\
        .start()

๋ณดํ†ต ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ๊ณ„์† ๊ฐ€๋™์ค‘์ด ์•„๋‹Œ, ์ฃผ๊ธฐ์ ์œผ๋กœ ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ์—…-๋‹ค์šด ํ•˜๋Š” ์‹œ๋‚˜๋ฆฌ์˜ค์— ์‚ฌ์šฉ๋œ๋‹ค.

์ด ๊ฒฝ์šฐ ํด๋Ÿฌ์Šคํ„ฐ๊ฐ€ ๊ฐ€๋™์ค‘์ผ ๋•Œ ์ฒ˜๋ฆฌํ•ด์•ผ ํ•˜๋Š” ๋ชจ๋“  ์ŠคํŠธ๋ฆผ์„ ์ฒ˜๋ฆฌํ•˜๊ณ  ์ข…๋ฃŒํ•œ๋‹ค.

 

Available-now micro-batch

one-time micro-batch์™€ ์œ ์‚ฌํ•˜๊ฒŒ ์ฟผ๋ฆฌ๋Š” ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•œ ๋‹ค์Œ ์ž์ฒด์ ์œผ๋กœ ์ค‘์ง€๋œ๋‹ค.

์ฐจ์ด์ ์€ ์†Œ์Šค ์˜ต์…˜(maxFilesperTrigger ๋“ฑ)์„ ๊ธฐ๋ฐ˜์œผ๋กœ ์—ฌ๋Ÿฌ ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋ฏ€๋กœ ์ฟผ๋ฆฌ ํ™•์žฅ์„ฑ์ด ํ–ฅ์ƒ๋œ๋‹ค๋Š” ๋ฐ์— ์žˆ๋‹ค.

์ด์ „ ์‹คํ–‰์—์„œ ๋‚จ์€ ๋ฐฐ์น˜ ์ˆ˜์— ๊ด€๊ณ„์—†์ด ์‹คํ–‰ ์‹œ ์‚ฌ์šฉ๊ฐ€๋Šฅํ•œ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๊ฐ€ ์ข…๋ฃŒ๋˜๊ธฐ ์ „์— ์ฒ˜๋ฆฌ๋˜๋„๋ก ๋ณด์žฅํ•œ๋‹ค. 

 

# available-now
query = wordCounts \
        .format("console")\
        .trigger(availableNow=True)
        .option("checkpointLocation", "some_path")\
        .start()

Fixed interval micro-batch

์ฟผ๋ฆฌ๊ฐ€ micro-batch ๋ชจ๋“œ๋กœ ์‹คํ–‰๋˜๋ฉฐ, ์—ฌ๊ธฐ์„œ micro-batch๋Š” ์‚ฌ์šฉ์ž๊ฐ€ ์ง€์ •ํ•œ ๊ฐ„๊ฒฉ์œผ๋กœ ์‹œ์ž‘๋œ๋‹ค.

  • ์ด์ „ micro-batch ๊ฐ€ interval ๋‚ด์— ์™„๋ฃŒ๋˜๋ฉด ์—”์ง„์€ ๋‹ค์Œ batch๊ฐ€ ๋Œ์•„์˜ฌ ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฐ๋‹ค.
  • ์ด์ „ micro-batch๊ฐ€ interval ๋‚ด์— ์™„๋ฃŒ๋˜์ง€ ์•Š๋Š”๋‹ค๋ฉด ์ด์ „ batch๊ฐ€ ์™„๋ฃŒ๋˜์ž๋งˆ์ž ๋‹ค์Œ micro-batch๊ฐ€ ์‹œ์ž‘๋œ๋‹ค.
  • ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๊ฐ€ ์กด์žฌํ•˜์ง€ ์•Š๋Š”๋‹ค๋ฉด micro-batch๊ฐ€ ์‹œ์ž‘๋˜์ง€ ์•Š๋Š”๋‹ค.

๊ฐ€์žฅ ๋„๋ฆฌ ์‚ฌ์šฉ๋˜๋ฉฐ ๊ถŒ์žฅ๋˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค. 

# processing time
query = wordCounts \
        .format("console")\
        .trigger(processingTime='2 seconds')
        .option("checkpointLocation", "some_path")\
        .start()

Continuous with fixed checkpoint interval (experimental)

spark 2.3์—์„œ experimental๋กœ ๋„์ž…๋˜์—ˆ๋‹ค.

countinuous ์˜ต์…˜์—์„œ๋Š” ๋ ˆ์ฝ”๋“œ๊ฐ€ micro-batch๋กœ ์ฒ˜๋ฆฌ๋˜์ง€ ์•Š๊ณ  ์žฅ๊ธฐ ์‹คํ–‰ ์ž‘์—…์ด write stream๋ณ„๋กœ ์ƒ์„ฑ๋˜์–ด ์ตœ๋Œ€ํ•œ ๋นจ๋ฆฌ ์ฒ˜๋ฆฌ๋œ๋‹ค.

`Exactly Once`๋Š” ์ง€์›๋˜์ง€ ์•Š์œผ๋ฉฐ, `At Least Once`๋งŒ ์ง€์›๋œ๋‹ค.

https://medium.com/@sdjemails/spark-trigger-options-cd90e3cf6166

  1. Driver์—์„œ ์žฅ๊ธฐ ์‹คํ–‰ ์ž‘์—…(long running task)์„ ์ƒ์„ฑํ•œ๋‹ค.
  2. Input Records๊ฐ€ ์ฒ˜๋ฆฌ๋œ๋‹ค.
  3. ์ฒ˜๋ฆฌ๋œ Records๋Š” Target์— ์ €์žฅ๋œ๋‹ค.
  4. ์ž‘์—…์€ offset์œผ๋กœ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์œ ์ง€ํ•œ๋‹ค.
  5. offset์€ WAL(Write Ahead Log)์— ์ปค๋ฐ‹๋œ๋‹ค.

 

 Spark Structured Streaming ์˜ˆ์ œ

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("StructuredStreamingSum") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("maxFilesPerTrigger", 1) \
    .getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:9999
df = spark \
    .readStream \
    .format("json") \
    .option("path", "streaming_sample") \
    .load()

df1 = df.select("city")

query = df1 \
            .writeStream \
            .format("json") \
            .option("path", "streaming_output") \
            .option("checkpointLocation", "checkpoint") \
            .outputMode("append") \
            .trigger(processingTime='5 seconds') \
            .start()

query.awaitTermination()

json ํŒŒ์ผ์„ ์ฝ์–ด์™€ output path์— ์“ฐ๋Š” ์˜ˆ์ œ์ด๋‹ค.

output path๋กœ ์„ค์ •ํ•œ ๊ฒฝ๋กœ์— ์Œ“์ด๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์—ˆ๋‹ค.

ํ•˜์œ„ ๋””๋ ‰ํ† ๋ฆฌ์— `_spark_metadata`๋ผ๋Š” ๋””๋ ‰ํ† ๋ฆฌ๋ฅผ ์ถ”๊ฐ€๋กœ ๋ฐœ๊ฒฌํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ, ๋งค ๋ฐฐ์น˜๋งˆ๋‹ค ์ƒ์„ฑ๋˜๋ฉฐ output ํŒŒ์ผ์— ๋Œ€ํ•œ ์ •๋ณด๊ฐ’์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

v1
{"path":"file:///home/jovyan/work/streaming_output/part-00000-a2d3bcce-2148-46c1-b188-6dc20bb722f9-c000.json","size":15,"isDir":false,"modificationTime":1713250145443,"blockReplication":1,"blockSize":33554432,"action":"add"}

 

_spark_metadata

output sync๊ฐ€ HDFS, S3 ๋“ฑ ํŒŒ์ผ์‹œ์Šคํ…œ์— ์“ฐ๋Š” file sync ๋ผ๋ฉด structured streaming์—์„œ๋Š” `_spark_metadata` ๋””๋ ‰ํ„ฐ๋ฆฌ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค. ์•„๋ž˜์™€ ๊ฐ™์€ ํŠน์„ฑ์„ ๊ฐ–๋Š”๋‹ค.

  • ์ž‘์—… ๊ฐ„์— ๊ณต์œ ํ•  ์ˆ˜ ์—†์œผ๋ฉฐ, ์ž‘์—… ๋‹น ํ•˜๋‚˜์˜ ๋””๋ ‰ํ† ๋ฆฌ๋งŒ ์žˆ๋‹ค.
  • `_spark_metadata` ๋Š” ๋™์ผํ•œ ์œ„์น˜์— ์žˆ๋Š” ๋‘˜ ์ด์ƒ์˜ spark structured streaming ์“ฐ๊ธฐ๋ฅผ ๋ฐฉ์ง€ํ•œ๋‹ค.
  • output path ๋‚ด์— ์ƒ์„ฑ๋œ๋‹ค.
  • spark๊ฐ€ ํŒŒ์ผ ์‹ฑํฌ ์‹œ exactly-once ๋ฅผ ๋ณด์žฅํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค.

์ด ๋””๋ ‰ํ† ๋ฆฌ๋ฅผ ์‚ญ์ œํ•˜๋ฉด Exception์ด ๋ฐœ์ƒํ•œ๋‹ค. 

`java.lang.IllegalStateException: /home/jovyan/work/streaming_output/_spark_metadata/0 doesn't exist when compacting batch <batchNumber>`

๋‘ ๊ฐœ ์ด์ƒ์˜ ์ฟผ๋ฆฌ sync

file_writer = concat_df \
                .writeStream \
                .queryName("transformed json") \
                .format("json") \
                .outputMode("append") \
                .option("path", "transformed") \
                .option("checkpointLocation", "chk/json") \
                .start()

kafka_writer = output_df \
                .writeStream \
                .queryName("transformed kafka") \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "kafka:9092") \
                .option("checkpointLocation", "chk/kafka") \
                .option("topic", "transformed") \
                .outputMode("append") \
                .start()


# case 1. ์ฟผ๋ฆฌ๋ณ„๋กœ start
file_writer.start()
kafka_writer.start().awaitTermination()

# case 2. built-int function ์‚ฌ์šฉ
sparkSession.streams.awaitAnyTermination()

ํ•˜๋‚˜์˜ application์—์„œ ๋‘ ๊ฐœ ์ด์ƒ์˜ output sync๊ฐ€ ์žˆ์„ ๋•Œ๋Š” 

  • checkpoint ์œ„์น˜๋ฅผ ๋‹ค๋ฅด๊ฒŒ ํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค.
  • ๋‘ ๊ฐœ์˜ ์ฟผ๋ฆฌ ์‹คํ–‰์„ ์œ„ํ•ด case1, case2 ์ค‘ ํ•˜๋‚˜ ์‚ฌ์šฉ. ์™ ๋งŒํ•˜๋ฉด ๋‚ด์žฅํ•จ์ˆ˜ ์‚ฌ์šฉํ•˜์ž.

 

์ฐธ๊ณ 

 

https://vanducng.dev/2020/10/10/Getting-started-with-spark-structure-streaming/#Anatomy-of-structured-streaming

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

https://medium.com/@sdjemails/spark-trigger-options-cd90e3cf6166

https://dev.to/sukumaar/what-is-sparkmetadata-directory-in-spark-structured-streaming--3i42