๐ฅ
Apache Iceberg ์ฌ์ฉ ์ฌ๋ก (LINE Data Platform์์ Apache Iceberg ๋์ ) ๋ณธ๋ฌธ
Apache Iceberg ์ฌ์ฉ ์ฌ๋ก (LINE Data Platform์์ Apache Iceberg ๋์ )
•8• 2024. 5. 23. 00:19https://www.youtube.com/watch?v=7y9gNwqLNtU
์ ๋์์ ์ ๋ฆฌํจ
hive -> iceberg๋ก ๋ณ๊ฒฝํ ์ฌ๋ก
Line ๋ฐ์ดํฐ ํ๋ซํผ์ ๊ธฐ์กด ๋ฐฉ์
๋ฐ์ดํฐ ํ๋ซํผ์ ํน์ง
- ๋ฐ์ดํฐ์ ํฌ๊ธฐ/์ข ๋ฅ/ํ์ฉ ๊ท๋ชจ๊ฐ ํฌ๋ค
- ๋จธ์ ์: 5000 ๋+
- ๋ฐ์ดํฐ ํฌ๊ธฐ: 290 PB+
- ํ์ด๋ธ ํ ์ด๋ธ ์: 4๋ง๊ฐ (๋ชจ๋ external table)
- log ingestion rate: ์ด๋น 17.5M+
- ํ๋ฃจ์ ๊ตฌ๋๋๋ job ์: 15๋ง+
- ํ๋ซํผ ์ฌ์ฉ์ ์: 700+
Query Processing
SQL ๊ธฐ๋ฐ, spark, hive, trino, flink๋ฅผ ๋ถ์ฐ sql ์ฟผ๋ฆฌ ์์ง์ผ๋ก ์ฌ์ฉ
๋ฐ์ดํฐํ์ผ์ด ์ด๋์ ์๋์ง, ์ด๋ป๊ฒ ์ฝ๊ณ ์ธ์์๋์ง์ ๋ํ ์ ๋ณด ํ์
์ฟผ๋ฆฌ์ ๋ํ ํ ์ด๋ธ์ ์ด๋ป๊ฒ ๊ด๋ฆฌํ ์ง๋ ํ ์ด๋ธ ํฌ๋งท์ ์ํด ์ ์๋จ
* Table Format์ด๋:
์ด๋ค ๋ฐ์ดํฐ ์ ์ ํ๋์ ํ ์ด๋ธ๋ก ํํํ ์ ์๋๋ก ํ์ผ ๊ด๋ฆฌ ๋ฐฉ์ ๊ท์
์ฟผ๋ฆฌ ์คํ ์์ ์ฃผ๋ก ์๋์ ๋ ๊ฐ์ง์ ์ญํ ์ ์ํํ๋ค.
- ์ด๋ ํ์ผ์ด ํ ์ด๋ธ ๋ฐ์ดํฐ๋ฅผ ๊ตฌ์ฑํ๋์ง ์๋ ค์ค
- ๊ทธ ํ์ผ์ ์ด๋ป๊ฒ ์ฝ๊ณ ์ธ ์ ์๋์ง ์๋ ค์ค
→ ํํฐ์ ๋/์คํค๋ง ์ ์ ๋ฑ ๋ฉํ ๋ฐ์ดํฐ์ ๊ด๋ฆฌ๋ ํ์ํจ
๋ฉํ ๋ฐ์ดํฐ ๊ด๋ฆฌ: Hive Table Format (de-facto standard)
๋ฐ์ดํฐ ํ์ผ์ ํ์ผ ์์คํ ์ ๋๋ ํ ๋ฆฌ์ ์ํด ๊ด๋ฆฌํ๋ฉฐ, ํ ์ด๋ธ ๊ด๋ฆฌ์ ์ฌ์ฉ๋๋ ํํฐ์ /์คํค๋ง ๋ฑ์ ๋ฉํ๋ฐ์ดํฐ ๋ณ๊ฒฝ ๋ฐ ์กฐํ๋ metastore ์๋ธ๋ฅผ ์ด์ฉํด ์ด๋ฃจ์ด์ง๋ค.
hive metastore์ ๋ฐฑ์๋์์๋ ๋ฉํ๋ฐ์ดํฐ๊ฐ metastore db๋ผ๊ณ ํ๋ rdbms์ ์ ์ฅ๋๋ค. (Thrift API ์ฌ์ฉ)
Log ๊ด๋ จ End-to-End ํ์ดํ๋ผ์ธ
- Source -> Kafka -> Flink -> RAW Table -> Tez on YARN -> ORC table -> User
columnar ํ์ผ ํฌ๋งท ์ฌ์ฉ์ ์ํด Raw Table์ ORC Table๋ก ๋ณํํ๋ ์์ ์ํ
๊ธฐ์กด ๋ฐฉ์์ ๋ฌธ์ ์
Hive metastore์ DB์ Capacity
๋ชจ๋ ํ ์ด๋ธ์ ๋ฉํ์คํ ์ด๊ฐ Hive Metastore์ ์ ์ฅ๋์ด ์์ด Metastore DB ์ ๋ํ ๋ถํ ๋ฌธ์ ๊ฐ ์์.
DB์ QPS ๋ 5์ฒ๊ฐ๋ก, CPU ์ฌ์ฉ๋ฅ ์ด 50~60% ์ด์์ด์๊ณ , Abnormalํ ์ฌ์ฉ๋(90% ์ด์)๋ ์ข ์ข ๊ด์ฐฐ๋จ.
๋๋์ ํํฐ์ ์กฐํ๋ก Memory Pressure, OOM์ผ๋ก ์ธํ Metastore ๋ค์ด ๋ฌธ์ ๊ฐ ์์.
์กฐํ์ ํผํฌ๋จผ์ค๊ฐ ์ ํ๋์ง๋ง, Scale-out ํ๊ธฐ๊ฐ ์ด๋ ค์.
์ ๋ฆฌ
1. bottleneck
2. inefficient data access
3. less opportunites for optimization
Log ํ์ดํ๋ผ์ธ End-to-End์ high latency
1. RAW Table Truncate๋ก ์ธํ ์ง์ฐ
small files๊ฐ ๋๋์ผ๋ก ์กด์ฌํ๊ฒ ๋๋ฉด Namenode์ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ด ์ฆ๊ฐํ๋ค.
์ด๋ฌํ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด truncate๋ฅผ ์ง์ํ๋ Flink์ BucketingSink๋ฅผ ์ฌ์ฉํ์ฌ ๊ฐ๋ฅํ ํ ํ์ผ์ ์์ฑํ์ง ์๋๋ก ์ฃผ๊ธฐ์ ์ผ๋ก Flush ํจ
์ด Flush ์ฃผ๊ธฐ๋๋ฌธ์ ์ต๋ 1์๊ฐ์ Latency ๋ฐ์
2. ORC ํ ์ด๋ธ ๋ณํ
ํ ์ด๋ธ ์๊ฐ ๋ง์ ๋ณํ์ ์๊ฐ์ด ๊ฑธ๋ฆผ(์ฝ 50๋ถ)
ํ์ดํ๋ผ์ธ์ Robustness
1. ์์กดํ๊ณ ์๋ componets๊ฐ ๋๋ฌด ๋ง์
- Flink, Watcher, HDFS, Hive Metastore, Hiveserver2, YARN
๋ชจ๋ components๊ฐ ์ ์์ ์ผ๋ก ์๋ํ์ง ์์ผ๋ฉด ํ์ดํ๋ผ์ธ์ด ์ค์ง๋๊ธฐ ๋๋ฌธ์
์ฅ์ ๋ฐ์ ํ๋ฅ ์ด ๋์์ง๊ณ ์ฅ์ ๋ฐ์ ์ ์์ธ ํ์ ๋์ด๋๋ ๋์์ง.
2. Table ์ด์ ๋ฌธ์
Table Partition์ ์กฐ์์ ์๋ํ ๋์ด ์์ง๋ง ์ ์ ๋ก๋ถํฐ ํน์ํ ์์ฒญ์ด ์์ ๋์๋ ์๋์ผ๋ก ๊ด๋ฆฌํด์ผํ ๋๊ฐ ์๋ค.
์ด๋ก ์ธํด ๋ ์ข ๋ฅ์ Hive External Table ์ด ํ์ํด์ง๋ค.
3. ๊ณ ๋ถํ ๋ฌธ์
partition/file๋ง๋ค ๋๋ฌด ๋ง์ metadata๊ฐ ์์ด ์๋์ ๊ฐ์ ๋ฌธ์ ๋ฅผ ์ผ๊ธฐํ๋ค.
- hive metastore์ heavy partition scan
- namenode์ heavy directory scan
Apache Iceberg
table ์์ฑ ํ๋ฉด ์๋์ ๊ฐ์ ๋ ์ด์์์ด ์์ฑ๋จ
- ./data
- ./metadata
Key Concept: Snapshot
iceberg๋ ํ ์ด๋ธ ํ์ผ ์ํ๋ฅผ ์ค๋ ์ท์ผ๋ก ๊ด๋ฆฌํจ์ผ๋ก์จ ๋ฐ์ดํฐ๋ฅผ ์ถ์ ํ๋ฉฐ, ์ค๋ ์ท์ ํตํด ํ ์ด๋ธ์ ๊ตฌ์ฑํ๋ ๋ชจ๋ ํ์ผ์ ์ฐธ์กฐ ์ ๋ณด ์ ์ฅ ํ์ผ Path, ๋ฐ์ดํฐ์ ํต๊ณ์ ๋ณด ๋ฅผ ์ป์ ์ ์๋ค..
(wirte & commit ์ด ์ผ์ด๋๋ฉด ์๋ก์ด ์ค๋ ์ท์ ์์ฑํจ)
hive์ ๊ฒฐ์ ์ ์ผ๋ก ๋ค๋ฅธ ๋ถ๋ถ์ผ๋ก iceberg๋ ํ ์ด๋ธ ์ํ๊ฐ ํ์ผ(snapshot, manifast list file, manifest file)์ ์ํด ์ถ์ ์ด ๋๊ธฐ ๋๋ฌธ์ ์๋์ ๊ฐ์ ์ด์ ์ ์ป์ ์ ์๋ค.
- hive metastore ์ ๋ํ ์์กด์ ์์จ ์ ์์
- hdfs ๋ฑ scale out ํ๊ธฐ ์ฌ์ด ํ์ผ์์คํ ์ ๋ฉํ๋ฐ์ดํฐ์ ๋ณด๊ด ์ฅ์๋ก ์ด์ฉํ ์ ์์
Iceberg๋ ๋ค์๊ณผ ๊ฐ์ ๊ณผ์ ์ ํตํด ํ ์ด๋ธ์ ํ์ผ์ ์ถ์ ํ๋ค.
- ํ์ฌ Snapshot์ ์ฐธ์กฐํ์ฌ ์ง์ ๋ Manifest list ํ์ผ์ ์ฐพ๋๋ค. (์ด๋ฏธ์ง์์ s0, s1)
- Manifest List ํ์ผ์ 1) Manifest File์ ๊ฒฝ๋ก, 2) ๊ทธ Manifest File๋ก ๊ด๋ฆฌ๋๋ ๋ฐ์ดํฐ์ Partition Value์ ์ํ์น/ํํ์น๋ฅผ ๊ฐ์ ธ์จ๋ค.
* ์ด๋ ์ฟผ๋ฆฌ์ ํํฐ์ ํํฐ๊ฐ ์๋ ๊ฒฝ์ฐ Range๋ฅผ ์ฌ์ฉํ์ฌ ์ค์ ๋ก ์ฝ๋ Manifest File์ ํํฐ๋งํ ์ ์๋ค. - Manifest File์ ์ฝ์ด์ Data path file์ ์ทจ๋ํ๋ค.
Menifest File์์๋ ํ์ผ๋ณ/์ปฌ๋ผ๋ณ Status๋ฅผ ์ ์ฅํ๊ณ ์์ด ์ฟผ๋ฆฌ ์์ ์ค์ ๋ก ์ฝ๋ ํ์ผ์ ๋ ์ค์ผ ์ ์๋ค.
Hive์ Iceberg์ ์ฐจ์ด์ ๊ณผ ์ด์ ์ ์๋์ ๊ฐ์ด ์ ๋ฆฌ๋ ์ ์๋ค.
hive | apache iceberg | iceberg์ ์ด์ | |
Metadata stored at | hive metastore | files | scalability |
Finer partitioning granularity | limited | unblokced | efficiency |
stats support | per partition | per file | performance |
Iceberg ๋์ ์ ํตํ ๊ฐ์ ๋ด์ฉ
Hive Metastore ์์กด ์ญ์ ๋ฅผ ํตํ ๋ถํ ๋ฌธ์ ํด๊ฒฐ
iceberg๋ metastore์ db๋ก ๊ด๋ฆฌํ์ง ์๊ณ ํ์ผ์ ์ด์ฉํด์ ๋ฐ์ดํฐ๋ฅผ ์ถ์ ํ๋ค.
๊ทธ๋ ๊ธฐ๋๋ฌธ์ HDFS์ ๊ฐ์ Scale-out์ด ์ฌ์ด ํ์ผ ์์คํ ์ ๋ฉํ๋ฐ์ดํฐ ์ ์ฅ์๋ก ์ฌ์ฉํจ์ผ๋ก์ Capablity ๋ฅผ ํ๋ณดํ ์ ์์์.
๋ก๊ทธ ํ์ดํ๋ผ์ธ ๊ฐ์
- Source -> Kafka -> Flink -> Iceberg Table -> User
|
spark on YARN
Flink์์๋ Iceberg file์ ๋ค์ด๋ ํธ๋ก writeํ๊ณ , (parquet/ORC ํฌ๋งท)
Spark ๋ฅผ ์ด์ฉํด 5๋ถ์ ํ ๋ฒ์ฉ Flushํ๋๋ก ๋ณ๊ฒฝํจ
Spark์์๋ ์๋์ ์์ ์ ์ํํ๋ค.
- merge Small File
- Expire Snapshots
- Rewrite manifests
- Remove orphans
- delete expired records
์ด๋ก ์ธํด ์ป๋ ์ด์ ์ ์๋์ ๊ฐ๋ค.
- low latency: 2์๊ฐ -> 5๋ถ์ผ๋ก ๋จ์ถ
- simple/scalableํ ์ํคํ
์ฒ๋ก ๋ณ๊ฒฝ
- yarn, hiveserver2์ ๋ํ๋์๊ฐ ์ฌ๋ผ์ง
- Hidden partition ๊ธฐ๋ฅ์ผ๋ก ํํฐ์ ๊ด๋ฆฌ๊ฐ ์ฉ์ดํด์ง (external table์์ ์ฌ์ฉํด์ผ ํ๋ partition operation์ ๋ถํ์ํด์ง)
- Namenode/Hive metastore์ ๋ํ ๋ถ๋ด์ด ๊ฒฝ๊ฐ๋จ - Schema Evolution
Iceberg์์๋ ์ปฌ๋ผ์ column name, position์ผ๋ก ๊ตฌ๋ถํ์ง ์์
๊ฐ ํ๋๋ง๋ค ์ ๋ํฌํ ID๊ฐ ๋ถ์ฌ๋์ด ์์
์ด๋ก ์ธํด ํ๋ ์ถ๊ฐ/ Rename/ ์ญ์ / ์ฝ์ / ์ด๋ ๋ฑ ๋ชจ๋ ๋ณ๊ฒฝ์ด Field ID ์ ๊ธฐ๋ฐํ์ฌ ์ฝ๊ฒ ์คํค๋ง๋ฅผ ๋ณ๊ฒฝํ ์ ์๋ค.