๐Ÿฅ

[Spark] ์ŠคํŒŒํฌ ์ŠคํŠธ๋ฆฌ๋ฐ์˜ ์ด๋ฒคํŠธ ์‹œ๊ฐ„ ์ฒ˜๋ฆฌ์™€ Watermark ๋ณธ๋ฌธ

๋ฐ์ดํ„ฐ/Spark

[Spark] ์ŠคํŒŒํฌ ์ŠคํŠธ๋ฆฌ๋ฐ์˜ ์ด๋ฒคํŠธ ์‹œ๊ฐ„ ์ฒ˜๋ฆฌ์™€ Watermark

•8• 2024. 4. 21. 21:30

์ŠคํŠธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ์—์„œ์˜ ์ง‘๊ณ„๋Š” ๋ณดํ†ต ํ˜„์žฌ์‹œ๊ฐ„๋ถ€ํ„ฐ T์‹œ๊ฐ„ ์ „๊นŒ์ง€ ์ง‘๊ณ„๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์˜๋ฏธํ•œ๋‹ค.

๊ทธ๋Ÿฐ๋ฐ ์˜ˆ๋ฅผ๋“ค์–ด ์ง€๋‚œ 15์ดˆ ๋™์•ˆ์˜ ์ง‘๊ณ„ ๋ฐ์ดํ„ฐ๋ผ๊ณ  ํ•˜๋ฉด 15์ดˆ์˜ ์‹œ๊ฐ„์€ ์–ธ์ œ๋ถ€ํ„ฐ ์–ธ์ œ๊นŒ์ง€์ธ์ง€ ์˜๋ฌธ์ด ์ƒ๊ธธ ์ˆ˜ ์žˆ๋‹ค.

์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ์—์„œ ๋ฐ”๋ผ๋ณด๋Š” ์‹œ๊ฐ„์€ ๋‹ค์–‘ํ•œ ๊ธฐ์ค€์ด ์žˆ๋‹ค.

์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ์—์„œ์˜ ์‹œ๊ฐ„ ๊ด€์ 

  • ์ด๋ฒคํŠธ ์‹œ๊ฐ„ (event time)
    ๋ฐ์ดํ„ฐ ์˜์กด์ ์ธ ํƒ€์ž„์Šคํ…œํ”„๋กœ, ๋ฐ์ดํ„ฐ ๋‚ด์— ์กด์žฌํ•˜๋Š” ๋ฐ์ดํ„ฐ ๋ฐœ์ƒ ์‹œ๊ฐ„์ด๋‹ค.
    ๋ฐ์ดํ„ฐ ์˜์กด์ ์ด๊ธฐ ๋•Œ๋ฌธ์— ์–ด๋– ํ•œ ๊ฐ’์„ ํƒ€์ž„์Šคํƒฌํ”„๋กœ ๋„ฃ์„ ๊ฒƒ์ธ์ง€๋Š” ๋‹ค๋ฅด๊ฒ ์ง€๋งŒ ์ฃผ๋กœ ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ•œ ์‹œ๊ฐ„์„ ๋งŽ์ด ์‚ฌ์šฉํ•œ๋‹ค.
    windowing ์— ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ์•„์ฃผ ์ ์ ˆํ•œ ์‹œ๊ฐ„ ์ค‘ ํ•˜๋‚˜์ด๋‹ค.
  • ์ˆ˜์ง‘ ์‹œ๊ฐ„ (ingestion time)
    ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ ์—”์ง„์— ๋ฐ์ดํ„ฐ๊ฐ€ ์ฒ˜์Œ์œผ๋กœ ์ˆ˜์ง‘๋œ, ์ฆ‰ ๋“ค์–ด์˜จ ์‹œ๊ฐ„์„ ๋‚˜ํƒ€๋‚ธ๋‹ค.
  • ์ฒ˜๋ฆฌ ์‹œ๊ฐ„ (processing time)
    ์‹ค์ œ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ ์—”์ง„์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ์‹œ๊ฐ„์ด๋‹ค. ์ฆ‰, ํ•ด๋‹น ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„ ์ฒ˜๋ฆฌํ•  ๋•Œ ์ฒ˜๋ฆฌํ•˜๋Š” ์„œ๋ฒ„์˜ ์‹œ๊ฐ„์ด๋‹ค.

๋น„์Šทํ•ด ๋ณด์ด์ง€๋งŒ ์•„๋ž˜์˜ ์ด๋ฏธ์ง€์—์„œ ๋ช…ํ™•ํ•˜๊ฒŒ ๊ตฌ๋ถ„ํ•  ์ˆ˜ ์žˆ๋‹ค.

https://medium.com/big-data-processing/time-attributes-in-apache-flink-85e2afdda238

 

์ด๋ฒคํŠธ ๊ธฐ๋ฐ˜ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ์—์„œ์˜ Challenges

์ด๋ฒคํŠธ ๊ธฐ๋ฐ˜ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ์—์„œ์˜ ๋ฌธ์ œ๋Š” ์•„๋ž˜์˜ ๋‚ด์šฉ์ด ์žˆ๋‹ค.

  • late event (์ง€์—ฐ ์ด๋ฒคํŠธ)
    ์†Œ์Šค์—์„œ ๋ฐœ์ƒํ•œ ์ด๋ฒคํŠธ๊ฐ€ ๋„คํŠธ์›Œํฌ ์ง€์—ฐ์ด๋‚˜ ์‹œ์Šคํ…œ ์ค‘๋‹จ์œผ๋กœ ์ธํ•ด ์ฒ˜๋ฆฌ ์—”์ง„์— ๋Šฆ๊ฒŒ ๋„์ฐฉ๋  ์ˆ˜ ์žˆ๋‹ค.
    ์ŠคํŒŒํฌ๋Š” ์ง€๋‚œ 15์ดˆ๋™์•ˆ์˜ ์ด๋ฒคํŠธ ์‹œ๊ฐ„ ์ฐฝ์˜ ๋ชจ๋“  ์ด๋ฒคํŠธ๊ฐ€ ์ฒ˜๋ฆฌ ์‹œ์Šคํ…œ์— ๋„๋‹ฌํ–ˆ๋Š”์ง€ ์•Œ ์ˆ˜ ์žˆ์–ด์•ผ ํ•œ๋‹ค.
    ์ŠคํŒŒํฌ๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ์ง€์—ฐ ์ด๋ฒคํŠธ๋ฅผ ๋ฌด๊ธฐํ•œ ๊ธฐ๋‹ค๋ฆฐ๋‹ค.
  • state management
    ์ง€์—ฐ ์ด๋ฒคํŠธ๊ฐ€ ๋„์ฐฉํ•œ ๊ฒฝ์šฐ (์˜ˆ๋ฅผ๋“ค์–ด) 15์ดˆ์˜ ํŠน์ • ์ด๋ฒคํŠธ ์‹œ๊ฐ„ window์— ๋Œ€ํ•ด ์ŠคํŒŒํฌ๋Š” ๋ชจ๋“  ์ด๋ฒคํŠธ๋ฅผ ๋Œ€์กฐํ•˜๊ณ  ์ง‘๊ณ„๋ฅผ ์ ์šฉํ•œ๋‹ค. ๊ธฐ๋ณธ์ ์œผ๋กœ ์ง€์—ฐ ์ด๋ฒคํŠธ๋ฅผ ์ˆ˜์šฉํ•˜๊ธฐ ์œ„ํ•ด์„œ ์ŠคํŒŒํฌ๋Š” window์˜ ์ƒํƒœ๋ฅผ ๋ฉ”๋ชจ๋ฆฌ์— ๊ธฐ์–ตํ•ด์•ผ ํ•œ๋‹ค.
    ์‹œ๊ฐ„์ด ์ง€๋‚จ์— ๋”ฐ๋ผ window์˜ ์ˆ˜๋Š” ๋Š˜์–ด๋‚  ๊ฒƒ์ด๊ณ , ๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ๋Ÿ‰๋„ ์ฆ๊ฐ€ํ•  ๊ฒƒ์ด๋‹ค.
  • state recovery
    ์ŠคํŠธ๋ฆฌ๋ฐ ์ž‘์—…์ด ์‹คํŒจํ•œ ๊ฒฝ์šฐ ๋‚ด๊ฒฐํ•จ์„ฑ ๋ณด์žฅ์„ ์œ„ํ•ด ์ƒํƒœ ๋ณต๊ตฌ๋ฅผ ํ•  ์ˆ˜ ์žˆ์–ด์•ผ ํ•œ๋‹ค.
    ์ด๋ถ€๋ถ„์€ ๋‹ค๋ฅธ ๊ฒŒ์‹œ๊ธ€์— ์ •๋ฆฌํ–ˆ๋‹ค.

 

์ง€์—ฐ ์ด๋ฒคํŠธ์˜ ์ฒ˜๋ฆฌ

๋„คํŠธ์›Œํฌ ๋ ˆ์ดํ„ด์‹œ ๋“ฑ์˜ ์ž‰์œ ๋กœ ์ด๋ฒคํŠธ๊ฐ€ ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜์— ๋Šฆ๊ฒŒ ๋„์ฐฉํ•  ์ˆ˜ ์žˆ๋‹ค.

์•„๋ž˜์˜ ์ด๋ฏธ์ง€์™€ ๊ฐ™์ด ์˜ˆ๋ฅผ๋“ค์–ด 12:04์— ์ƒ์„ฑ๋œ dog๊ฐ€ 12:11์— ์ŠคํŒŒํฌ ์ŠคํŠธ๋ฆฌ๋ฐ์— ์ˆ˜์‹ ๋˜๋Š” ๊ฒฝ์šฐ์—๋Š” [12:00, 12:10) ์œˆ๋„์šฐ์— ์—…๋ฐ์ดํŠธ๋ฅผ ํ•ด์•ผํ•œ๋‹ค.

์‚ฌ์‹ค ์ŠคํŒŒํฌ์—์„œ ์ง€์—ฐ ์ด๋ฒคํŠธ๋ฅผ ๋ฌด๊ธฐํ•œ ๊ธฐ๋‹ค๋ฆฌ๋ฉฐ ์ƒํƒœ๋ฅผ ๋ฌด์ œํ•œ ์ €์žฅํ•˜๋Š” ๊ฒƒ์€ ํ•œ์ •๋œ ๋ฆฌ์†Œ์Šค ์ฐจ์›์—์„œ ๋ถˆ๊ฐ€๋Šฅํ•˜๋‹ค.

์ผ์ • ์ž„๊ณ„๊ฐ’(threshold)์„ ์ •ํ•ด๋‘๊ณ  ์ž„๊ณ„๊ฐ’์„ ๋„˜์–ด๊ฐ€๋Š” ์ด๋ฒคํŠธ๋Š” ๋ฒ„๋ฆด ์ˆ˜ ์žˆ์–ด์•ผํ•œ๋‹ค. ์ฆ‰ ์ด์ „ ์ง‘๊ฒŒ๊ฐ€ ๋ฉ”๋ชจ๋ฆฌ ๋‚ด state์—์„œ ์‚ญ์ œ๋  ์ˆ˜ ์žˆ๋Š” ์‹œ๊ธฐ๋ฅผ ์‹œ์Šคํ…œ์ด ์•Œ์•„์•ผ ํ•œ๋‹ค๋Š” ์˜๋ฏธ์ด๋‹ค.

์ด๋ ‡๊ฒŒ ์ œํ•œ๋œ ๋ฐฉ์‹์œผ๋กœ ์ƒํƒœ๋ฅผ ์ œ์–ดํ•˜๋Š” ๋ฉ”์ปค๋‹ˆ์ฆ˜์ธ watermark๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

 

 

watermark๋ž€

์›Œํ„ฐ๋งˆํฌ๋Š” ์ง€์—ฐ ์ด๋ฒคํŠธ๋ฅผ ์ˆ˜์šฉํ•˜๊ธฐ ์œ„ํ•ด window์˜ state๋ฅผ ๊ด€๋ฆฌํ•˜๋Š” ๊ธฐ๋ฒ•์ด๋‹ค. state๊ฐ€ ๋ฌดํ•œ์ • ์ปค์ง€์ง€ ๋ชปํ•˜๋„๋ก ์ œ์–ดํ•œ๋‹ค.

๋Šฆ์€ ์ž„๊ณ„๊ฐ’์— ๋Œ€ํ•œ ๊ฐ’์„ ์ •์˜ํ•˜๊ณ , ์ž„๊ณ„๊ฐ’์„ ํ†ต๊ณผํ•˜๋ฉด ๋งŒ๋ฃŒ๋œ window์˜ ์ƒํƒœ๋ฅผ ์‚ญ์ œํ•˜๊ณ  ๋งŒ๋ฃŒ๋œ window์— ํ•ด๋‹นํ•˜๋Š” ์ง€์—ฐ ์ด๋ฒคํŠธ๋ฅผ ์ฟผ๋ฆฌ ์ฒ˜๋ฆฌ์— ๋ฐ˜์˜ํ•˜์ง€ ์•Š๋Š”๋‹ค.

์›Œํ„ฐ๋งˆํฌ๋Š” `(max event time seen by the engine - late threshold > T)` ์ธ T ์‹œ๊ฐ„์— ํ•ด๋‹นํ•˜๋Š” ์ง€์—ฐ ์ด๋ฒคํŠธ๋งŒ ์ฒ˜๋ฆฌํ•˜๋„๋ก ํ—ˆ์šฉํ•œ๋‹ค.

  • ๋„์ž…์‹œ๊ธฐ: Spark 2.1.0

 

Update Output Mode์—์„œ์˜ Watermark

์›Œํ„ฐ๋งˆํฌ์—์„œ๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด ํ”„๋กœ์„ธ์‹ฑ ์ „/ํ›„์— ๊ฐ’์„ ์—…๋ฐ์ดํŠธ ํ•œ๋‹ค.

  • before processing
    watermark ๊ธฐ๋ฐ˜์œผ๋กœ batch to finalize๋ฅผ ์—…๋ฐ์ดํŠธ ํ•œ๋‹ค.
    ์›Œํ„ฐ๋งˆํฌ ๊ฐ’๋ณด๋‹ค ๋ฐฐ์น˜์˜ end of boundary ์ค‘ ๊ฐ€์žฅ ์ž‘์€ ๊ฐ’์ด ์ž‘์€ ๊ฒฝ์šฐ์— ์—…๋ฐ์ดํŠธ๋˜๋ฉฐ,
    ๋” ์ด์ƒ ํ•ด๋‹นํ•˜๋Š” ๋ฐฐ์น˜์˜ ๋ฐ์ดํ„ฐ์—๋Š” ๋ณ€๊ฒฝ์ด ์—†๋‹ค๋Š” ์˜๋ฏธ์ด๋‹ค.
  • after trigger
    microbatch processing ์ดํ›„์— watermark ๊ฐ’์ด ์—…๋ฐ์ดํŠธ ๋œ๋‹ค.
    watermark ๊ฐ’์€ ์˜ค๋ž˜๋œ ๋ฐ์ดํ„ฐ์˜ limit ์„ ์ •ํ•˜๋Š” ์ผ์ข…์˜ barrier ์—ญํ• ์„ ํ•œ๋‹ค.

์œ„ wordcount ์ด๋ฏธ์ง€๋ฅผ ํ†ตํ•ด ์ดํ•ดํ•  ์ˆ˜ ์žˆ๋‹ค.

  • batch interval: 10๋ถ„
  • trigger interval: 5๋ถ„
  • watermark: 10๋ถ„

์‹œ๊ฐ„๋ณ„ watermark์˜ ๋ณ€ํ™”๋Š” ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

 

์ง€์—ฐ ์ด๋ฒคํŠธ ํ™•์ธ

  • 12:09, cat (12:15 triggered)
    ์ด๋•Œ๋Š” watermark ๊ฐ€ ์•„์ง ๋ฏธ์„ค์ •๋˜์–ด์žˆ์œผ๋ฏ€๋กœ ์ง‘๊ณ„์— ์‚ฌ์šฉ๋œ๋‹ค.
    [12:00, 12:10) ์œˆ๋„์šฐ์— ์—…๋ฐ์ดํŠธ๋œ๋‹ค.
  • 12:08, dog / 12:13, owl (12:20 triggered)
    12:20 ์‹œ์ ์˜ before processing์—์„œ watermark๋Š” 12:04์ด๋‹ค,
    dog์™€ owl ๋ชจ๋‘ 12:04๋ณด๋‹ค ํฐ ๊ฐ’์ด๋ฏ€๋กœ ์—…๋ฐ์ดํŠธ๋œ๋‹ค.
  • 12:04, donkey (12:25 triggered)
    12:25์‹œ์ ์˜ before processing์—์„œ watermark๋Š” 12:11์ด๋‹ค.
    donkey์˜ event time์ธ 12:04๋Š” watermark๋ณด๋‹ค ์ž‘์€ ๊ฐ’์ด๋ฏ€๋กœ dropped๋œ๋‹ค.
  • 12:17, owl (12:25 triggered)
    12:25์‹œ์ ์˜ before processing์—์„œ watermark๋Š” 12:11์ด๋‹ค.
    12:17์€ watermark๋ณด๋‹ค ํฐ ๊ฐ’์ด๋ฏ€๋กœ ์ง‘๊ณ„๋œ๋‹ค.

์ถ”๊ฐ€๋กœ 12:21, owl์€ ์‹œ๊ฐ„ ํ๋ฆ„์œผ๋กœ ๋ดค์„ ๋•Œ 12:20 ์ด์ „์— event time์ด 12:21 ๋กœ ์ˆ˜์‹ ๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

์†Œ์Šค์™€ ์ฒ˜๋ฆฌ์—”์ง„์˜ clock์ด ๋™๊ธฐํ™”๋˜์–ด ์žˆ์ง€ ์•Š๊ฑฐ๋‚˜ ์—ฌ๋Ÿฌ device์—์„œ generate ๋˜๋Š” ๊ฒฝ์šฐ ์‹œ๊ฐ„ ๋™๊ธฐํ™” ๋ฌธ์ œ๋กœ ์ผ์–ด๋‚  ์ˆ˜ ์žˆ๋Š” ํ”ํ•œ ์ด์Šˆ์ด๋‹ค.

 

batches to finalize

12:25 ์‹œ์  before processing์—์„œ batches to finalize๊ฐ€ ์„ค์ •๋œ ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

12:25 before processing ๋•Œ์˜ ์›Œํ„ฐ๋งˆํฌ๋Š” 12:11์ด๊ณ , ์›Œํ„ฐ๋งˆํฌ๋ณด๋‹ค ์ž‘์€ ๋ฐฐ์น˜๊ฐ€ ์ƒ๊ฒผ์œผ๋ฏ€๋กœ ๊ทธ ๋ฐฐ์น˜์— ํ•ด๋‹นํ•˜๋Š” ๋ฐ์ดํ„ฐ๋“ค์€ ๋”์ด์ƒ ์—…๋ฐ์ดํŠธ๊ฐ€ ๋˜์ง€ ์•Š๋Š”๋‹ค.

 

Append Output Mode์—์„œ์˜ Watermark

update ๋•Œ์™€ ๋น„์Šทํ•˜์ง€๋งŒ ์ฐจ์ด์ ์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

  • batch๊ฐ€ finalized ๋˜๊ธฐ ์ „๊นŒ์ง€ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋Š” internal state์— ์ €์žฅ๋˜์–ด ์žˆ๋‹ค.

 

์•„๋ž˜์˜ ํ‘œ์—์„œ ๋ณผ ์ˆ˜ ์žˆ๋‹ค์‹œํ”ผ batches to finalized๊ฐ€ 12:25 ์™€ 12:30 ์‹œ์ ์—์„œ ์ •ํ•ด์ง€๊ธฐ ๋•Œ๋ฌธ์—(์—…๋ฐ์ดํŠธ๋˜๊ธฐ ๋•Œ๋ฌธ์—),

result๋Š” 12:25 ์‹œ์ ๋ถ€ํ„ฐ sinkํ•˜๊ฒŒ ๋œ๋‹ค.

 

watermark ์‚ฌ์šฉ ๋ฐฉ๋ฒ•

withWatermark๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

`DataFrame.withWatermark(eventTime, delayThreshold)`

  • eventTime: str or column
    ํ–‰์˜ ์ด๋ฒคํŠธ ์‹œ๊ฐ„์„ ํฌํ•จํ•˜๋Š” column ์ด๋ฆ„์ด๋‹ค.
  • delayThreshold: str
    ๊ฐ„๊ฒฉ ํ˜•์‹์œผ๋กœ ์ฒ˜๋ฆฌ๋œ, ์ตœ์‹  ๋ ˆ์ฝ”๋“œ๋ฅผ ๊ธฐ์ค€์œผ๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ๋Šฆ๊ฒŒ ๋„์ฐฉํ•  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฌ๋Š” ์ตœ์†Œ ์ง€์—ฐ์‹œ๊ฐ„์ด๋‹ค.
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()

 

update mode ์—์„œ๋Š” stateful ์—ฐ์‚ฐ ์‹œ (์ง‘๊ณ„/join๋“ฑ)์—๋Š” ์„ ํƒ์ ์œผ๋กœ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋ฉฐ,

append mode ์—์„œ stateful ์—ฐ์‚ฐ ์‚ฌ์šฉ ์‹œ์—๋Š” (์ง‘๊ณ„/join๋“ฑ) ํ•„์ˆ˜๋กœ watermark๊ฐ’์„ ์ •์˜ํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค.

 

์ฐธ๊ณ 

https://seamless.tistory.com/99

https://medium.com/big-data-processing/time-attributes-in-apache-flink-85e2afdda238

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking

https://www.youtube.com/watch?v=XjlKGvUt2dY

https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.withWatermark.html

https://stackoverflow.com/questions/44403690/empty-output-for-watermarked-aggregation-query-in-append-mode