๐ฅ
[Spark] Spark Structured Streaming ๊ฐ์ ๋ณธ๋ฌธ
Spark Streaming ์ด๋
core spark API์ ํ์ฅ ํ๋ก๊ทธ๋จ์ผ๋ก ๋ถ์ฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ ํ๋ก์ธ์ฑ์ ์ง์ํ๋ค.
streaming ํ์ ์ผ๋ก๋ ์๋์ ๊ฐ์ด ๋ ์ข ๋ฅ๊ฐ ์๋๋ฐ spark streaming์ RDD ๋ฒ ์ด์ค ์์ง์ผ๋ก, 2.x๋ฒ์ ๊น์ง ์ง์ํ๊ณ ์ดํ ๋์ด์ ์ ๋ฐ์ดํธ ๋์ง ์๋ ๋ ๊ฑฐ์ ํ๋ก์ ํธ์ด๋ค.
Spark Streaming ์ข ๋ฅ
- Spark Streaming: RDD ๊ธฐ๋ฐ์ micro-batch ์ํ
- Spark Structured Streaming: Dataframe ๊ธฐ๋ฐ micro-batch ์ํ, ์ ์ง์ฐ ์ฒ๋ฆฌ ๋ชจ๋๋ฅผ ๋์ ํจ์ผ๋ก์จ ์ค์๊ฐ์ ๊ฐ๊น์ด ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํด์ง.
Spark Structured Streaming
ํ๋ก๊ทธ๋๋ฐ ๋ชจ๋ธ
- structured streaming์์๋ ํธ๋ฆฌ๊ฑฐ ๊ฐ๊ฒฉ๋ง๋ค ์์ ๋๋ ๋ชจ๋ input data stream์ `input table` ์ ์ ๋ฐ์ดํธ ๋๋ค.
- ์ดํ output sync์ ๊ธฐ๋ก๋ ๊ฒฐ๊ณผ ํ ์ด๋ธ์ ๊ณ์ฐํ๊ธฐ ์ํด ๋ง์น static table ์ธ ๊ฒ์ฒ๋ผ input table์ ๋ํ ์ฟผ๋ฆฌ๋ฅผ ์ ์ํ๋ค.
- spark์์๋ ์ด ์ผ๊ด ์ฒ๋ฆฌ๋ฅผ ์คํธ๋ฆฌ๋ฐ ์คํ ๊ณํ์ผ๋ก ์๋๋ณํํ๋ค(= `incrementalization`).
๋ ์ฝ๋๊ฐ ๋์ฐฉํ ๋๋ง๋ค ๊ฒฐ๊ณผ๋ฅผ ์ ๋ฐ์ดํธ ํ๊ธฐ์ํด ์ด๋ค ์ํ๋ฅผ ์ ์งํด์ผ ํ๋์ง ํ์ ํ๋ค. - ๋ง์ง๋ง์ผ๋ก ์ง์ ๋ ํธ๋ฆฌ๊ฑฐ์ ๋ฐ๋ผ ์คํ๋ ๋๋ง๋ค spark์์๋ ์๋ก์ด ๋ฐ์ดํฐ๋ฅผ ํ์ธํ๊ณ ๊ฒฐ๊ณผ๋ฅผ ์ ๋ฐ์ดํธ ํ๋ค.
์ด๋ structured streaming์ ๋ฐฐ์น ์ฒ๋ฆฌ๊ฐ ์๋ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ๊ณ์ ์ถ๊ฐํ์ฌ ์ฒ๋ฆฌํจ์ผ๋ก์จ (์ค์๊ฐ์ ๊ฐ๊น์ด) ์ ์ง์ฐ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํด์ง๋ค.
3๊ฐ์ง Output Mode
complete mode
๊ฒฐ๊ณผ ํ ์ด๋ธ์ ์ ์ฒด ์ํ๋ฅผ ์ถ๋ ฅํ๋ค.
world count ์์ ๋ก ์ดํด๋ณด๋ฉด ์๋์ ๊ฐ์ด input stream์ด ์์ ๋์์ ๋,
hello # first input stream
hello world # second input stream
hi world # third input stream
๋งค ๋ฐฐ์น๋ง๋ค ๊ณผ๊ฑฐ์ ์์๋ ๋ชจ๋ ๊ฒฐ๊ณผ๋ ํจ๊ป output์ผ๋ก ์ถ๋ ฅ๋๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
๋ชจ๋ ๊ฒฐ๊ณผ ํ ์ด๋ธ์ ์ ์งํด์ผ ํ๋ฏ๋ก ๋ ๋ง์ ๋ฆฌ์์ค๊ฐ ํ์ํ๋ฉฐ,
๋ชจ๋ ๋ฐ์ดํฐ๊ฐ ๊ณ์ํด์ ๋ณ๊ฒฝ๋ ์ ์๋ ์ํ ๊ธฐ๋ฐ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃฐ ๋ ์ฌ์ฉํ ์ ์๋ค.
update mode
์ด์ ์ถ๋ ฅ ๊ฒฐ๊ณผ์์ ๋ณ๊ฒฝ๋ ๋ ์ฝ๋๋ง ์ถ๋ ฅํ๋ค.
์ง๊ณ ์ฐ์ฐ์ ํ์ง ์๋๋ค๋ฉด append ๋ชจ๋์ ๋์ผํ๋ค.
hello # first input stream
hello world # second input stream
hi world # third input stream
๊ฐ ํธ๋ฆฌ๊ฑฐ๋ง๋ค ์ ๋ฐ์ดํธ๋ ๊ฒฐ๊ณผ๋ง ๋ณด์ฌ์ฃผ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
append mode
์๋ก์ด ๋ ์ฝ๋๊ฐ ๊ฒฐ๊ณผ ํ ์ด๋ธ์ ์ถ๊ฐ๋๋ฉด ์ฌ์ฉ์๊ฐ ๋ช ์ํ ํธ๋ฆฌ๊ฑฐ์ ๋ง์ถฐ ์ถ๋ ฅํ๋ค.
์ง๊ณ ์ฐ์ฐ์ ํ์ง ์๋๋ค๋ฉด update mode์ ๋์ผํ์ง๋ง,
์ง๊ณ ์ฐ์ฐ์ด ํฌํจ๋์ด ์๋ค๋ฉด ๋ฐ๋์ watermark๋ฅผ ์ฌ์ฉํด์ผ ํ๋ค. (๊ด๋ จ ์ ๋ฆฌ: ๊ฒ์๊ธ ์ฐธ๊ณ )
Input Sources
input source๋ก๋ ์๋์ ๊ฐ์ ๋ด์ฅ ์์ค๊ฐ ์๋ค.
๊ฐ ์์ค ๋ณ ์ธ๋ถ ์ต์ ์ ๋ฌธ์ ์ฐธ๊ณ
File Source
๋๋ ํ ๋ฆฌ์ ๊ธฐ๋ก๋ ํ์ผ์ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ผ๋ก ์ฝ๋๋ค. ํ์ผ ์์ ์๊ฐ ๊ธฐ๋ฐ์ผ๋ก ํ์ผ์ด ์ฒ๋ฆฌ๋๋ค.
์ง์๋๋ ํ์ผ ํ์์ ๋ค์๊ณผ ๊ฐ๋ค: txt, csv, json, orc, parquet
Kafka Source
์นดํ์นด๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ์ฝ๋๋ค. kafka broker version 0.10.0 ์ด์๋ถํฐ ํธํ๋๋ค.
Socket Source (ํ ์คํธ์ฉ)
์์ผ ์ฐ๊ฒฐ์์ UTF8 ํ ์คํธ ๋ฐ์ดํฐ๋ฅผ ์ฝ๋๋ค. end point ๊ฐ fault tolerance๋ฅผ ๋ณด์ฅํ์ง ์๋๋ค.
Rate Source (ํ ์คํธ ์ฉ)
์ด๋น ์ง์ ๋ ํ ์๋ก ๋ฐ์ดํฐ๋ฅผ ์์ฑํ๋ค. ๊ฐ output row์๋ value์ timestamp(๋ฉ์์ง ๋ฐ์ก ์๊ฐ)๊ฐ ํฌํจ๋๋ค.
Rate Per Micro-Batch source (ํ ์คํธ ์ฉ)
๋ง์ดํฌ๋ก ๋ฐฐ์น ๋น ์ง์ ๋ ํ ์๋ก ๋ฐ์ดํฐ๋ฅผ ์์ฑํ๋ค.
5๊ฐ์ง Trigger Type
unspecified (๊ธฐ๋ณธ ๋์)
stream write์ ์ํ ํธ๋ฆฌ๊ฑฐ ์ต์ ์ ์ค์ ํ์ง ์์ผ๋ฉด spark๋ ํ์ฌ์ micro batch๊ฐ ์๋ฃ๋๋ ์ฆ์ ๋ค์ ๋ ์ฝ๋ set์ ์ฒ๋ฆฌํ๋ ค๊ณ ์๋ํ๋ค. micro-batch์์๋ ์์ ๋ ์ฝ๋๊ฐ ์์ window๋ก ๊ทธ๋ฃนํ๋์ด ์ฃผ๊ธฐ์ ์ผ๋ก ์ฒ๋ฆฌ๋๋ค.
# without any trigger
query = wordCounts \
.format("console")\
.option("checkpointLocation", "some_path")\
.start()
One-time micro-batch (deprecated)
ํ ๋ฒ๋ง ์ฒ๋ฆฌํ ํ ์คํธ๋ฆผ์ ์ข ๋ฃํ๋ค. ์คํธ๋ฆผ์ด ํ ๋ฒ ์์ฑ๋๋ฉด ๋ณด๋ฅ์ค์ธ ๋ ์ฝ๋๊ฐ ๋ชจ๋ ์ฒ๋ฆฌ๋ ํ ์คํธ๋ฆผ์ด ์ข ๋ฃ๋๋ค.
# trigger once
query = wordCounts \
.format("console")\
.trigger(once=True)
.option("checkpointLocation", "some_path")\
.start()
๋ณดํต ํด๋ฌ์คํฐ๋ฅผ ๊ณ์ ๊ฐ๋์ค์ด ์๋, ์ฃผ๊ธฐ์ ์ผ๋ก ํด๋ฌ์คํฐ๋ฅผ ์ -๋ค์ด ํ๋ ์๋๋ฆฌ์ค์ ์ฌ์ฉ๋๋ค.
์ด ๊ฒฝ์ฐ ํด๋ฌ์คํฐ๊ฐ ๊ฐ๋์ค์ผ ๋ ์ฒ๋ฆฌํด์ผ ํ๋ ๋ชจ๋ ์คํธ๋ฆผ์ ์ฒ๋ฆฌํ๊ณ ์ข ๋ฃํ๋ค.
Available-now micro-batch
one-time micro-batch์ ์ ์ฌํ๊ฒ ์ฟผ๋ฆฌ๋ ์ฌ์ฉ ๊ฐ๋ฅํ ๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ๋ค์ ์์ฒด์ ์ผ๋ก ์ค์ง๋๋ค.
์ฐจ์ด์ ์ ์์ค ์ต์ (maxFilesperTrigger ๋ฑ)์ ๊ธฐ๋ฐ์ผ๋ก ์ฌ๋ฌ ๋ง์ดํฌ๋ก ๋ฐฐ์น๋ก ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ฏ๋ก ์ฟผ๋ฆฌ ํ์ฅ์ฑ์ด ํฅ์๋๋ค๋ ๋ฐ์ ์๋ค.
์ด์ ์คํ์์ ๋จ์ ๋ฐฐ์น ์์ ๊ด๊ณ์์ด ์คํ ์ ์ฌ์ฉ๊ฐ๋ฅํ ๋ชจ๋ ๋ฐ์ดํฐ๊ฐ ์ข ๋ฃ๋๊ธฐ ์ ์ ์ฒ๋ฆฌ๋๋๋ก ๋ณด์ฅํ๋ค.
# available-now
query = wordCounts \
.format("console")\
.trigger(availableNow=True)
.option("checkpointLocation", "some_path")\
.start()
Fixed interval micro-batch
์ฟผ๋ฆฌ๊ฐ micro-batch ๋ชจ๋๋ก ์คํ๋๋ฉฐ, ์ฌ๊ธฐ์ micro-batch๋ ์ฌ์ฉ์๊ฐ ์ง์ ํ ๊ฐ๊ฒฉ์ผ๋ก ์์๋๋ค.
- ์ด์ micro-batch ๊ฐ interval ๋ด์ ์๋ฃ๋๋ฉด ์์ง์ ๋ค์ batch๊ฐ ๋์์ฌ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ๋ค.
- ์ด์ micro-batch๊ฐ interval ๋ด์ ์๋ฃ๋์ง ์๋๋ค๋ฉด ์ด์ batch๊ฐ ์๋ฃ๋์๋ง์ ๋ค์ micro-batch๊ฐ ์์๋๋ค.
- ์๋ก์ด ๋ฐ์ดํฐ๊ฐ ์กด์ฌํ์ง ์๋๋ค๋ฉด micro-batch๊ฐ ์์๋์ง ์๋๋ค.
๊ฐ์ฅ ๋๋ฆฌ ์ฌ์ฉ๋๋ฉฐ ๊ถ์ฅ๋๋ ๋ฐฉ๋ฒ์ด๋ค.
# processing time
query = wordCounts \
.format("console")\
.trigger(processingTime='2 seconds')
.option("checkpointLocation", "some_path")\
.start()
Continuous with fixed checkpoint interval (experimental)
spark 2.3์์ experimental๋ก ๋์ ๋์๋ค.
countinuous ์ต์ ์์๋ ๋ ์ฝ๋๊ฐ micro-batch๋ก ์ฒ๋ฆฌ๋์ง ์๊ณ ์ฅ๊ธฐ ์คํ ์์ ์ด write stream๋ณ๋ก ์์ฑ๋์ด ์ต๋ํ ๋นจ๋ฆฌ ์ฒ๋ฆฌ๋๋ค.
`Exactly Once`๋ ์ง์๋์ง ์์ผ๋ฉฐ, `At Least Once`๋ง ์ง์๋๋ค.
- Driver์์ ์ฅ๊ธฐ ์คํ ์์ (long running task)์ ์์ฑํ๋ค.
- Input Records๊ฐ ์ฒ๋ฆฌ๋๋ค.
- ์ฒ๋ฆฌ๋ Records๋ Target์ ์ ์ฅ๋๋ค.
- ์์ ์ offset์ผ๋ก ๋น๋๊ธฐ์ ์ผ๋ก ์ ์งํ๋ค.
- offset์ WAL(Write Ahead Log)์ ์ปค๋ฐ๋๋ค.
Spark Structured Streaming ์์
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("StructuredStreamingSum") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.streaming.schemaInference", "true") \
.config("maxFilesPerTrigger", 1) \
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:9999
df = spark \
.readStream \
.format("json") \
.option("path", "streaming_sample") \
.load()
df1 = df.select("city")
query = df1 \
.writeStream \
.format("json") \
.option("path", "streaming_output") \
.option("checkpointLocation", "checkpoint") \
.outputMode("append") \
.trigger(processingTime='5 seconds') \
.start()
query.awaitTermination()
json ํ์ผ์ ์ฝ์ด์ output path์ ์ฐ๋ ์์ ์ด๋ค.
output path๋ก ์ค์ ํ ๊ฒฝ๋ก์ ์์ด๋ ๊ฒ์ ํ์ธํ ์ ์์๋ค.
ํ์ ๋๋ ํ ๋ฆฌ์ `_spark_metadata`๋ผ๋ ๋๋ ํ ๋ฆฌ๋ฅผ ์ถ๊ฐ๋ก ๋ฐ๊ฒฌํ ์ ์๋๋ฐ, ๋งค ๋ฐฐ์น๋ง๋ค ์์ฑ๋๋ฉฐ output ํ์ผ์ ๋ํ ์ ๋ณด๊ฐ์ ํ์ธํ ์ ์๋ค.
v1
{"path":"file:///home/jovyan/work/streaming_output/part-00000-a2d3bcce-2148-46c1-b188-6dc20bb722f9-c000.json","size":15,"isDir":false,"modificationTime":1713250145443,"blockReplication":1,"blockSize":33554432,"action":"add"}
_spark_metadata
output sync๊ฐ HDFS, S3 ๋ฑ ํ์ผ์์คํ ์ ์ฐ๋ file sync ๋ผ๋ฉด structured streaming์์๋ `_spark_metadata` ๋๋ ํฐ๋ฆฌ๋ฅผ ์์ฑํ๋ค. ์๋์ ๊ฐ์ ํน์ฑ์ ๊ฐ๋๋ค.
- ์์ ๊ฐ์ ๊ณต์ ํ ์ ์์ผ๋ฉฐ, ์์ ๋น ํ๋์ ๋๋ ํ ๋ฆฌ๋ง ์๋ค.
- `_spark_metadata` ๋ ๋์ผํ ์์น์ ์๋ ๋ ์ด์์ spark structured streaming ์ฐ๊ธฐ๋ฅผ ๋ฐฉ์งํ๋ค.
- output path ๋ด์ ์์ฑ๋๋ค.
- spark๊ฐ ํ์ผ ์ฑํฌ ์ exactly-once ๋ฅผ ๋ณด์ฅํ ์ ์๋ ๋ฐฉ๋ฒ์ด๋ค.
์ด ๋๋ ํ ๋ฆฌ๋ฅผ ์ญ์ ํ๋ฉด Exception์ด ๋ฐ์ํ๋ค.
`java.lang.IllegalStateException: /home/jovyan/work/streaming_output/_spark_metadata/0 doesn't exist when compacting batch <batchNumber>`
๋ ๊ฐ ์ด์์ ์ฟผ๋ฆฌ sync
file_writer = concat_df \
.writeStream \
.queryName("transformed json") \
.format("json") \
.outputMode("append") \
.option("path", "transformed") \
.option("checkpointLocation", "chk/json") \
.start()
kafka_writer = output_df \
.writeStream \
.queryName("transformed kafka") \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("checkpointLocation", "chk/kafka") \
.option("topic", "transformed") \
.outputMode("append") \
.start()
# case 1. ์ฟผ๋ฆฌ๋ณ๋ก start
file_writer.start()
kafka_writer.start().awaitTermination()
# case 2. built-int function ์ฌ์ฉ
sparkSession.streams.awaitAnyTermination()
ํ๋์ application์์ ๋ ๊ฐ ์ด์์ output sync๊ฐ ์์ ๋๋
- checkpoint ์์น๋ฅผ ๋ค๋ฅด๊ฒ ํด์ฃผ์ด์ผ ํ๋ค.
- ๋ ๊ฐ์ ์ฟผ๋ฆฌ ์คํ์ ์ํด case1, case2 ์ค ํ๋ ์ฌ์ฉ. ์ ๋งํ๋ฉด ๋ด์ฅํจ์ ์ฌ์ฉํ์.
์ฐธ๊ณ
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
https://medium.com/@sdjemails/spark-trigger-options-cd90e3cf6166
https://dev.to/sukumaar/what-is-sparkmetadata-directory-in-spark-structured-streaming--3i42
'๋ฐ์ดํฐ > Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] Structured Streaming - stateful transformation๊ณผ Window operation (0) | 2024.04.16 |
---|---|
[Spark] Spark Structured Streaming - Fault Tolerance (0) | 2024.04.04 |
[Spark] Accumulator์ Broadcast (๊ณต์ ๋ณ์) (0) | 2024.04.01 |
[Spark] SQL Hint (0) | 2024.04.01 |
[Spark] ์คํํฌ ์ค์ผ์ฅด๋ง (0) | 2024.04.01 |