๐ฅ
[Spark] ์คํํฌ ์คํธ๋ฆฌ๋ฐ์ ์ด๋ฒคํธ ์๊ฐ ์ฒ๋ฆฌ์ Watermark ๋ณธ๋ฌธ
[Spark] ์คํํฌ ์คํธ๋ฆฌ๋ฐ์ ์ด๋ฒคํธ ์๊ฐ ์ฒ๋ฆฌ์ Watermark
•8• 2024. 4. 21. 21:30์คํธ๋ฆฌ๋ฐ ์ฟผ๋ฆฌ์์์ ์ง๊ณ๋ ๋ณดํต ํ์ฌ์๊ฐ๋ถํฐ T์๊ฐ ์ ๊น์ง ์ง๊ณ๋ ๋ฐ์ดํฐ๋ฅผ ์๋ฏธํ๋ค.
๊ทธ๋ฐ๋ฐ ์๋ฅผ๋ค์ด ์ง๋ 15์ด ๋์์ ์ง๊ณ ๋ฐ์ดํฐ๋ผ๊ณ ํ๋ฉด 15์ด์ ์๊ฐ์ ์ธ์ ๋ถํฐ ์ธ์ ๊น์ง์ธ์ง ์๋ฌธ์ด ์๊ธธ ์ ์๋ค.
์คํธ๋ฆผ ์ฒ๋ฆฌ์์ ๋ฐ๋ผ๋ณด๋ ์๊ฐ์ ๋ค์ํ ๊ธฐ์ค์ด ์๋ค.
์คํธ๋ฆผ ์ฒ๋ฆฌ์์์ ์๊ฐ ๊ด์
- ์ด๋ฒคํธ ์๊ฐ (event time)
๋ฐ์ดํฐ ์์กด์ ์ธ ํ์์คํ ํ๋ก, ๋ฐ์ดํฐ ๋ด์ ์กด์ฌํ๋ ๋ฐ์ดํฐ ๋ฐ์ ์๊ฐ์ด๋ค.
๋ฐ์ดํฐ ์์กด์ ์ด๊ธฐ ๋๋ฌธ์ ์ด๋ ํ ๊ฐ์ ํ์์คํฌํ๋ก ๋ฃ์ ๊ฒ์ธ์ง๋ ๋ค๋ฅด๊ฒ ์ง๋ง ์ฃผ๋ก ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ ์๊ฐ์ ๋ง์ด ์ฌ์ฉํ๋ค.
windowing ์ ์ฌ์ฉํ ์ ์๋ ์์ฃผ ์ ์ ํ ์๊ฐ ์ค ํ๋์ด๋ค. - ์์ง ์๊ฐ (ingestion time)
์คํธ๋ฆผ ์ฒ๋ฆฌ ์์ง์ ๋ฐ์ดํฐ๊ฐ ์ฒ์์ผ๋ก ์์ง๋, ์ฆ ๋ค์ด์จ ์๊ฐ์ ๋ํ๋ธ๋ค. - ์ฒ๋ฆฌ ์๊ฐ (processing time)
์ค์ ์คํธ๋ฆผ ์ฒ๋ฆฌ ์์ง์์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ์๊ฐ์ด๋ค. ์ฆ, ํด๋น ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ ์ฒ๋ฆฌํ ๋ ์ฒ๋ฆฌํ๋ ์๋ฒ์ ์๊ฐ์ด๋ค.
๋น์ทํด ๋ณด์ด์ง๋ง ์๋์ ์ด๋ฏธ์ง์์ ๋ช ํํ๊ฒ ๊ตฌ๋ถํ ์ ์๋ค.
์ด๋ฒคํธ ๊ธฐ๋ฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ์์์ Challenges
์ด๋ฒคํธ ๊ธฐ๋ฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ์์์ ๋ฌธ์ ๋ ์๋์ ๋ด์ฉ์ด ์๋ค.
- late event (์ง์ฐ ์ด๋ฒคํธ)
์์ค์์ ๋ฐ์ํ ์ด๋ฒคํธ๊ฐ ๋คํธ์ํฌ ์ง์ฐ์ด๋ ์์คํ ์ค๋จ์ผ๋ก ์ธํด ์ฒ๋ฆฌ ์์ง์ ๋ฆ๊ฒ ๋์ฐฉ๋ ์ ์๋ค.
์คํํฌ๋ ์ง๋ 15์ด๋์์ ์ด๋ฒคํธ ์๊ฐ ์ฐฝ์ ๋ชจ๋ ์ด๋ฒคํธ๊ฐ ์ฒ๋ฆฌ ์์คํ ์ ๋๋ฌํ๋์ง ์ ์ ์์ด์ผ ํ๋ค.
์คํํฌ๋ ๊ธฐ๋ณธ์ ์ผ๋ก ์ง์ฐ ์ด๋ฒคํธ๋ฅผ ๋ฌด๊ธฐํ ๊ธฐ๋ค๋ฆฐ๋ค. - state management
์ง์ฐ ์ด๋ฒคํธ๊ฐ ๋์ฐฉํ ๊ฒฝ์ฐ (์๋ฅผ๋ค์ด) 15์ด์ ํน์ ์ด๋ฒคํธ ์๊ฐ window์ ๋ํด ์คํํฌ๋ ๋ชจ๋ ์ด๋ฒคํธ๋ฅผ ๋์กฐํ๊ณ ์ง๊ณ๋ฅผ ์ ์ฉํ๋ค. ๊ธฐ๋ณธ์ ์ผ๋ก ์ง์ฐ ์ด๋ฒคํธ๋ฅผ ์์ฉํ๊ธฐ ์ํด์ ์คํํฌ๋ window์ ์ํ๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ๊ธฐ์ตํด์ผ ํ๋ค.
์๊ฐ์ด ์ง๋จ์ ๋ฐ๋ผ window์ ์๋ ๋์ด๋ ๊ฒ์ด๊ณ , ๋ฆฌ์์ค ์ฌ์ฉ๋๋ ์ฆ๊ฐํ ๊ฒ์ด๋ค. - state recovery
์คํธ๋ฆฌ๋ฐ ์์ ์ด ์คํจํ ๊ฒฝ์ฐ ๋ด๊ฒฐํจ์ฑ ๋ณด์ฅ์ ์ํด ์ํ ๋ณต๊ตฌ๋ฅผ ํ ์ ์์ด์ผ ํ๋ค.
์ด๋ถ๋ถ์ ๋ค๋ฅธ ๊ฒ์๊ธ์ ์ ๋ฆฌํ๋ค.
์ง์ฐ ์ด๋ฒคํธ์ ์ฒ๋ฆฌ
๋คํธ์ํฌ ๋ ์ดํด์ ๋ฑ์ ์์ ๋ก ์ด๋ฒคํธ๊ฐ ์ดํ๋ฆฌ์ผ์ด์ ์ ๋ฆ๊ฒ ๋์ฐฉํ ์ ์๋ค.
์๋์ ์ด๋ฏธ์ง์ ๊ฐ์ด ์๋ฅผ๋ค์ด 12:04์ ์์ฑ๋ dog๊ฐ 12:11์ ์คํํฌ ์คํธ๋ฆฌ๋ฐ์ ์์ ๋๋ ๊ฒฝ์ฐ์๋ [12:00, 12:10) ์๋์ฐ์ ์ ๋ฐ์ดํธ๋ฅผ ํด์ผํ๋ค.
์ฌ์ค ์คํํฌ์์ ์ง์ฐ ์ด๋ฒคํธ๋ฅผ ๋ฌด๊ธฐํ ๊ธฐ๋ค๋ฆฌ๋ฉฐ ์ํ๋ฅผ ๋ฌด์ ํ ์ ์ฅํ๋ ๊ฒ์ ํ์ ๋ ๋ฆฌ์์ค ์ฐจ์์์ ๋ถ๊ฐ๋ฅํ๋ค.
์ผ์ ์๊ณ๊ฐ(threshold)์ ์ ํด๋๊ณ ์๊ณ๊ฐ์ ๋์ด๊ฐ๋ ์ด๋ฒคํธ๋ ๋ฒ๋ฆด ์ ์์ด์ผํ๋ค. ์ฆ ์ด์ ์ง๊ฒ๊ฐ ๋ฉ๋ชจ๋ฆฌ ๋ด state์์ ์ญ์ ๋ ์ ์๋ ์๊ธฐ๋ฅผ ์์คํ ์ด ์์์ผ ํ๋ค๋ ์๋ฏธ์ด๋ค.
์ด๋ ๊ฒ ์ ํ๋ ๋ฐฉ์์ผ๋ก ์ํ๋ฅผ ์ ์ดํ๋ ๋ฉ์ปค๋์ฆ์ธ watermark๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
watermark๋
์ํฐ๋งํฌ๋ ์ง์ฐ ์ด๋ฒคํธ๋ฅผ ์์ฉํ๊ธฐ ์ํด window์ state๋ฅผ ๊ด๋ฆฌํ๋ ๊ธฐ๋ฒ์ด๋ค. state๊ฐ ๋ฌดํ์ ์ปค์ง์ง ๋ชปํ๋๋ก ์ ์ดํ๋ค.
๋ฆ์ ์๊ณ๊ฐ์ ๋ํ ๊ฐ์ ์ ์ํ๊ณ , ์๊ณ๊ฐ์ ํต๊ณผํ๋ฉด ๋ง๋ฃ๋ window์ ์ํ๋ฅผ ์ญ์ ํ๊ณ ๋ง๋ฃ๋ window์ ํด๋นํ๋ ์ง์ฐ ์ด๋ฒคํธ๋ฅผ ์ฟผ๋ฆฌ ์ฒ๋ฆฌ์ ๋ฐ์ํ์ง ์๋๋ค.
์ํฐ๋งํฌ๋ `(max event time seen by the engine - late threshold > T)` ์ธ T ์๊ฐ์ ํด๋นํ๋ ์ง์ฐ ์ด๋ฒคํธ๋ง ์ฒ๋ฆฌํ๋๋ก ํ์ฉํ๋ค.
- ๋์ ์๊ธฐ: Spark 2.1.0
Update Output Mode์์์ Watermark
์ํฐ๋งํฌ์์๋ ์๋์ ๊ฐ์ด ํ๋ก์ธ์ฑ ์ /ํ์ ๊ฐ์ ์ ๋ฐ์ดํธ ํ๋ค.
- before processing
watermark ๊ธฐ๋ฐ์ผ๋ก batch to finalize๋ฅผ ์ ๋ฐ์ดํธ ํ๋ค.
์ํฐ๋งํฌ ๊ฐ๋ณด๋ค ๋ฐฐ์น์ end of boundary ์ค ๊ฐ์ฅ ์์ ๊ฐ์ด ์์ ๊ฒฝ์ฐ์ ์ ๋ฐ์ดํธ๋๋ฉฐ,
๋ ์ด์ ํด๋นํ๋ ๋ฐฐ์น์ ๋ฐ์ดํฐ์๋ ๋ณ๊ฒฝ์ด ์๋ค๋ ์๋ฏธ์ด๋ค. - after trigger
microbatch processing ์ดํ์ watermark ๊ฐ์ด ์ ๋ฐ์ดํธ ๋๋ค.
watermark ๊ฐ์ ์ค๋๋ ๋ฐ์ดํฐ์ limit ์ ์ ํ๋ ์ผ์ข ์ barrier ์ญํ ์ ํ๋ค.
์ wordcount ์ด๋ฏธ์ง๋ฅผ ํตํด ์ดํดํ ์ ์๋ค.
- batch interval: 10๋ถ
- trigger interval: 5๋ถ
- watermark: 10๋ถ
์๊ฐ๋ณ watermark์ ๋ณํ๋ ์๋์ ๊ฐ๋ค.
์ง์ฐ ์ด๋ฒคํธ ํ์ธ
- 12:09, cat (12:15 triggered)
์ด๋๋ watermark ๊ฐ ์์ง ๋ฏธ์ค์ ๋์ด์์ผ๋ฏ๋ก ์ง๊ณ์ ์ฌ์ฉ๋๋ค.
[12:00, 12:10) ์๋์ฐ์ ์ ๋ฐ์ดํธ๋๋ค. - 12:08, dog / 12:13, owl (12:20 triggered)
12:20 ์์ ์ before processing์์ watermark๋ 12:04์ด๋ค,
dog์ owl ๋ชจ๋ 12:04๋ณด๋ค ํฐ ๊ฐ์ด๋ฏ๋ก ์ ๋ฐ์ดํธ๋๋ค. - 12:04, donkey (12:25 triggered)
12:25์์ ์ before processing์์ watermark๋ 12:11์ด๋ค.
donkey์ event time์ธ 12:04๋ watermark๋ณด๋ค ์์ ๊ฐ์ด๋ฏ๋ก dropped๋๋ค. - 12:17, owl (12:25 triggered)
12:25์์ ์ before processing์์ watermark๋ 12:11์ด๋ค.
12:17์ watermark๋ณด๋ค ํฐ ๊ฐ์ด๋ฏ๋ก ์ง๊ณ๋๋ค.
์ถ๊ฐ๋ก 12:21, owl์ ์๊ฐ ํ๋ฆ์ผ๋ก ๋ดค์ ๋ 12:20 ์ด์ ์ event time์ด 12:21 ๋ก ์์ ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
์์ค์ ์ฒ๋ฆฌ์์ง์ clock์ด ๋๊ธฐํ๋์ด ์์ง ์๊ฑฐ๋ ์ฌ๋ฌ device์์ generate ๋๋ ๊ฒฝ์ฐ ์๊ฐ ๋๊ธฐํ ๋ฌธ์ ๋ก ์ผ์ด๋ ์ ์๋ ํํ ์ด์์ด๋ค.
batches to finalize
12:25 ์์ before processing์์ batches to finalize๊ฐ ์ค์ ๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
12:25 before processing ๋์ ์ํฐ๋งํฌ๋ 12:11์ด๊ณ , ์ํฐ๋งํฌ๋ณด๋ค ์์ ๋ฐฐ์น๊ฐ ์๊ฒผ์ผ๋ฏ๋ก ๊ทธ ๋ฐฐ์น์ ํด๋นํ๋ ๋ฐ์ดํฐ๋ค์ ๋์ด์ ์ ๋ฐ์ดํธ๊ฐ ๋์ง ์๋๋ค.
Append Output Mode์์์ Watermark
update ๋์ ๋น์ทํ์ง๋ง ์ฐจ์ด์ ์ ์๋์ ๊ฐ๋ค.
- batch๊ฐ finalized ๋๊ธฐ ์ ๊น์ง ๋ชจ๋ ๋ฐ์ดํฐ๋ internal state์ ์ ์ฅ๋์ด ์๋ค.
์๋์ ํ์์ ๋ณผ ์ ์๋ค์ํผ batches to finalized๊ฐ 12:25 ์ 12:30 ์์ ์์ ์ ํด์ง๊ธฐ ๋๋ฌธ์(์ ๋ฐ์ดํธ๋๊ธฐ ๋๋ฌธ์),
result๋ 12:25 ์์ ๋ถํฐ sinkํ๊ฒ ๋๋ค.
watermark ์ฌ์ฉ ๋ฐฉ๋ฒ
withWatermark๋ฅผ ์ฌ์ฉํ๋ค.
`DataFrame.withWatermark(eventTime, delayThreshold)`
- eventTime: str or column
ํ์ ์ด๋ฒคํธ ์๊ฐ์ ํฌํจํ๋ column ์ด๋ฆ์ด๋ค. - delayThreshold: str
๊ฐ๊ฒฉ ํ์์ผ๋ก ์ฒ๋ฆฌ๋, ์ต์ ๋ ์ฝ๋๋ฅผ ๊ธฐ์ค์ผ๋ก ๋ฐ์ดํฐ๊ฐ ๋ฆ๊ฒ ๋์ฐฉํ ๋๊น์ง ๊ธฐ๋ค๋ฆฌ๋ ์ต์ ์ง์ฐ์๊ฐ์ด๋ค.
windowedCounts = words \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word) \
.count()
update mode ์์๋ stateful ์ฐ์ฐ ์ (์ง๊ณ/join๋ฑ)์๋ ์ ํ์ ์ผ๋ก ์ฌ์ฉ ๊ฐ๋ฅํ๋ฉฐ,
append mode ์์ stateful ์ฐ์ฐ ์ฌ์ฉ ์์๋ (์ง๊ณ/join๋ฑ) ํ์๋ก watermark๊ฐ์ ์ ์ํด์ฃผ์ด์ผ ํ๋ค.
์ฐธ๊ณ
https://seamless.tistory.com/99
https://medium.com/big-data-processing/time-attributes-in-apache-flink-85e2afdda238
https://www.youtube.com/watch?v=XjlKGvUt2dY
'๋ฐ์ดํฐ > Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] Structured Streaming - stateful transformation๊ณผ Window operation (0) | 2024.04.16 |
---|---|
[Spark] Spark Structured Streaming - Fault Tolerance (0) | 2024.04.04 |
[Spark] Spark Structured Streaming ๊ฐ์ (0) | 2024.04.04 |
[Spark] Accumulator์ Broadcast (๊ณต์ ๋ณ์) (0) | 2024.04.01 |
[Spark] SQL Hint (0) | 2024.04.01 |