๐Ÿฅ

Apache Iceberg ์‚ฌ์šฉ ์‚ฌ๋ก€ (LINE Data Platform์—์„œ Apache Iceberg ๋„์ž…) ๋ณธ๋ฌธ

๋ฐ์ดํ„ฐ/ํ•˜๋‘ก

Apache Iceberg ์‚ฌ์šฉ ์‚ฌ๋ก€ (LINE Data Platform์—์„œ Apache Iceberg ๋„์ž…)

•8• 2024. 5. 23. 00:19

https://www.youtube.com/watch?v=7y9gNwqLNtU

์œ„ ๋™์˜์ƒ ์ •๋ฆฌํ•จ

 hive -> iceberg๋กœ ๋ณ€๊ฒฝํ•œ ์‚ฌ๋ก€

 

Line ๋ฐ์ดํ„ฐ ํ”Œ๋žซํผ์˜ ๊ธฐ์กด ๋ฐฉ์‹

๋ฐ์ดํ„ฐ ํ”Œ๋žซํผ์˜ ํŠน์ง•

  • ๋ฐ์ดํ„ฐ์˜ ํฌ๊ธฐ/์ข…๋ฅ˜/ํ™œ์šฉ ๊ทœ๋ชจ๊ฐ€ ํฌ๋‹ค
  • ๋จธ์‹  ์ˆ˜: 5000 ๋Œ€+
  • ๋ฐ์ดํ„ฐ ํฌ๊ธฐ: 290 PB+
  • ํ•˜์ด๋ธŒ ํ…Œ์ด๋ธ” ์ˆ˜: 4๋งŒ๊ฐœ (๋ชจ๋‘ external table)
  • log ingestion rate: ์ดˆ๋‹น 17.5M+
  • ํ•˜๋ฃจ์— ๊ตฌ๋™๋˜๋Š” job ์ˆ˜: 15๋งŒ+
  • ํ”Œ๋žซํผ ์‚ฌ์šฉ์ž ์ˆ˜: 700+

 

Query Processing

SQL ๊ธฐ๋ฐ˜, spark, hive, trino, flink๋ฅผ ๋ถ„์‚ฐ sql ์ฟผ๋ฆฌ ์—”์ง„์œผ๋กœ ์‚ฌ์šฉ

๋ฐ์ดํ„ฐํŒŒ์ผ์ด ์–ด๋””์— ์žˆ๋Š”์ง€, ์–ด๋–ป๊ฒŒ ์ฝ๊ณ  ์“ธ์ˆ˜์žˆ๋Š”์ง€์— ๋Œ€ํ•œ ์ •๋ณด ํ•„์š”

์ฟผ๋ฆฌ์— ๋Œ€ํ•œ ํ…Œ์ด๋ธ”์„ ์–ด๋–ป๊ฒŒ ๊ด€๋ฆฌํ• ์ง€๋Š” ํ…Œ์ด๋ธ” ํฌ๋งท์— ์˜ํ•ด ์ •์˜๋จ

 

* Table Format์ด๋ž€:

์–ด๋–ค ๋ฐ์ดํ„ฐ ์…‹์„ ํ•˜๋‚˜์˜ ํ…Œ์ด๋ธ”๋กœ ํ‘œํ˜„ํ•  ์ˆ˜ ์žˆ๋„๋ก ํŒŒ์ผ ๊ด€๋ฆฌ ๋ฐฉ์‹ ๊ทœ์ •

์ฟผ๋ฆฌ ์‹คํ–‰ ์‹œ์— ์ฃผ๋กœ ์•„๋ž˜์˜ ๋‘ ๊ฐ€์ง€์˜ ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

  1. ์–ด๋Š ํŒŒ์ผ์ด ํ…Œ์ด๋ธ” ๋ฐ์ดํ„ฐ๋ฅผ ๊ตฌ์„ฑํ•˜๋Š”์ง€ ์•Œ๋ ค์คŒ
  2. ๊ทธ ํŒŒ์ผ์„ ์–ด๋–ป๊ฒŒ ์ฝ๊ณ  ์“ธ ์ˆ˜ ์žˆ๋Š”์ง€ ์•Œ๋ ค์คŒ

→ ํŒŒํ‹ฐ์…”๋‹/์Šคํ‚ค๋งˆ ์ •์˜ ๋“ฑ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ์˜ ๊ด€๋ฆฌ๋„ ํ•„์š”ํ•จ

 

๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ ๊ด€๋ฆฌ: Hive Table Format (de-facto standard)

๋ฐ์ดํ„ฐ ํŒŒ์ผ์„ ํŒŒ์ผ ์‹œ์Šคํ…œ์˜ ๋””๋ ‰ํ† ๋ฆฌ์— ์˜ํ•ด ๊ด€๋ฆฌํ•˜๋ฉฐ, ํ…Œ์ด๋ธ” ๊ด€๋ฆฌ์— ์‚ฌ์šฉ๋˜๋Š” ํŒŒํ‹ฐ์…˜/์Šคํ‚ค๋งˆ ๋“ฑ์˜ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ๋ณ€๊ฒฝ ๋ฐ ์กฐํšŒ๋Š” metastore ์„œ๋ธŒ๋ฅผ ์ด์šฉํ•ด ์ด๋ฃจ์–ด์ง„๋‹ค.

hive metastore์˜ ๋ฐฑ์—”๋“œ์—์„œ๋Š” ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๊ฐ€ metastore db๋ผ๊ณ  ํ•˜๋Š” rdbms์— ์ €์žฅ๋œ๋‹ค. (Thrift API ์‚ฌ์šฉ)

 

 

Log ๊ด€๋ จ End-to-End ํŒŒ์ดํ”„๋ผ์ธ

  • Source -> Kafka -> Flink -> RAW Table -> Tez on YARN -> ORC table -> User

columnar ํŒŒ์ผ ํฌ๋งท ์‚ฌ์šฉ์„ ์œ„ํ•ด Raw Table์„ ORC Table๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ์ž‘์—… ์ˆ˜ํ–‰

 

 

๊ธฐ์กด ๋ฐฉ์‹์˜ ๋ฌธ์ œ์ 

Hive metastore์™€ DB์˜ Capacity

๋ชจ๋“  ํ…Œ์ด๋ธ”์˜ ๋ฉ”ํƒ€์Šคํ† ์–ด๊ฐ€ Hive Metastore์— ์ €์žฅ๋˜์–ด ์žˆ์–ด Metastore DB ์— ๋Œ€ํ•œ ๋ถ€ํ•˜ ๋ฌธ์ œ๊ฐ€ ์žˆ์Œ.

DB์˜ QPS ๋Š” 5์ฒœ๊ฐœ๋กœ, CPU ์‚ฌ์šฉ๋ฅ ์ด 50~60% ์ด์ƒ์ด์—ˆ๊ณ , Abnormalํ•œ ์‚ฌ์šฉ๋Ÿ‰(90% ์ด์ƒ)๋„ ์ข…์ข… ๊ด€์ฐฐ๋จ.

๋Œ€๋Ÿ‰์˜ ํŒŒํ‹ฐ์…˜ ์กฐํšŒ๋กœ Memory Pressure, OOM์œผ๋กœ ์ธํ•œ Metastore ๋‹ค์šด ๋ฌธ์ œ๊ฐ€ ์žˆ์Œ.

์กฐํšŒ์˜ ํผํฌ๋จผ์Šค๊ฐ€ ์ œํ•œ๋˜์ง€๋งŒ, Scale-out ํ•˜๊ธฐ๊ฐ€ ์–ด๋ ค์›€.

 

์ •๋ฆฌ

1. bottleneck

2. inefficient data access

3. less opportunites for optimization

 

Log ํŒŒ์ดํ”„๋ผ์ธ End-to-End์˜ high latency

1. RAW Table Truncate๋กœ ์ธํ•œ ์ง€์—ฐ

small files๊ฐ€ ๋Œ€๋Ÿ‰์œผ๋กœ ์กด์žฌํ•˜๊ฒŒ ๋˜๋ฉด Namenode์˜ ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰์ด ์ฆ๊ฐ€ํ•œ๋‹ค.

์ด๋Ÿฌํ•œ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด truncate๋ฅผ ์ง€์›ํ•˜๋Š” Flink์˜ BucketingSink๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ฐ€๋Šฅํ•œ ํ•œ ํŒŒ์ผ์„ ์ƒ์„ฑํ•˜์ง€ ์•Š๋„๋ก ์ฃผ๊ธฐ์ ์œผ๋กœ Flush ํ•จ

์ด Flush ์ฃผ๊ธฐ๋•Œ๋ฌธ์— ์ตœ๋Œ€ 1์‹œ๊ฐ„์˜ Latency ๋ฐœ์ƒ

 

2. ORC ํ…Œ์ด๋ธ” ๋ณ€ํ™˜

ํ…Œ์ด๋ธ” ์ˆ˜๊ฐ€ ๋งŽ์•„ ๋ณ€ํ™˜์— ์‹œ๊ฐ„์ด ๊ฑธ๋ฆผ(์•ฝ 50๋ถ„)

 

ํŒŒ์ดํ”„๋ผ์ธ์˜ Robustness

1. ์˜์กดํ•˜๊ณ  ์žˆ๋Š” componets๊ฐ€ ๋„ˆ๋ฌด ๋งŽ์Œ

  • Flink, Watcher, HDFS, Hive Metastore, Hiveserver2, YARN

๋ชจ๋“  components๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ์ž‘๋™ํ•˜์ง€ ์•Š์œผ๋ฉด ํŒŒ์ดํ”„๋ผ์ธ์ด ์ค‘์ง€๋˜๊ธฐ ๋•Œ๋ฌธ์—

์žฅ์•  ๋ฐœ์ƒ ํ™•๋ฅ ์ด ๋†’์•„์ง€๊ณ  ์žฅ์•  ๋ฐœ์ƒ ์‹œ ์›์ธ ํŒŒ์•… ๋‚œ์ด๋„๋„ ๋†’์•„์ง.

 

2. Table ์šด์˜ ๋ฌธ์ œ

Table Partition์˜ ์กฐ์ž‘์€ ์ž๋™ํ™” ๋˜์–ด ์žˆ์ง€๋งŒ ์œ ์ €๋กœ๋ถ€ํ„ฐ ํŠน์ˆ˜ํ•œ ์š”์ฒญ์ด ์žˆ์„ ๋•Œ์—๋Š” ์ˆ˜๋™์œผ๋กœ ๊ด€๋ฆฌํ•ด์•ผํ•  ๋•Œ๊ฐ€ ์žˆ๋‹ค.

์ด๋กœ ์ธํ•ด ๋‘ ์ข…๋ฅ˜์˜ Hive External Table ์ด ํ•„์š”ํ•ด์ง„๋‹ค.

 

3. ๊ณ ๋ถ€ํ•˜ ๋ฌธ์ œ

partition/file๋งˆ๋‹ค ๋„ˆ๋ฌด ๋งŽ์Œ metadata๊ฐ€ ์žˆ์–ด ์•„๋ž˜์™€ ๊ฐ™์€ ๋ฌธ์ œ๋ฅผ ์•ผ๊ธฐํ•œ๋‹ค.

  • hive metastore์— heavy partition scan
  • namenode์— heavy directory scan

 

Apache Iceberg

table ์ƒ์„ฑ ํ•˜๋ฉด ์•„๋ž˜์™€ ๊ฐ™์€ ๋ ˆ์ด์•„์›ƒ์ด ์ƒ์„ฑ๋จ

  • ./data 
  • ./metadata

Key Concept: Snapshot

iceberg๋Š” ํ…Œ์ด๋ธ” ํŒŒ์ผ ์ƒํƒœ๋ฅผ ์Šค๋ƒ…์ƒท์œผ๋กœ ๊ด€๋ฆฌํ•จ์œผ๋กœ์จ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ ํ•˜๋ฉฐ, ์Šค๋ƒ…์ƒท์„ ํ†ตํ•ด ํ…Œ์ด๋ธ”์„ ๊ตฌ์„ฑํ•˜๋Š” ๋ชจ๋“  ํŒŒ์ผ์˜ ์ฐธ์กฐ ์ •๋ณด ์ €์žฅ ํŒŒ์ผ Path, ๋ฐ์ดํ„ฐ์˜ ํ†ต๊ณ„์ •๋ณด ๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ๋‹ค..

(wirte & commit ์ด ์ผ์–ด๋‚˜๋ฉด ์ƒˆ๋กœ์šด ์Šค๋ƒ…์ƒท์„ ์ƒ์„ฑํ•จ)

 

hive์™€ ๊ฒฐ์ •์ ์œผ๋กœ ๋‹ค๋ฅธ ๋ถ€๋ถ„์œผ๋กœ iceberg๋Š” ํ…Œ์ด๋ธ” ์ƒํƒœ๊ฐ€ ํŒŒ์ผ(snapshot, manifast list file, manifest file)์— ์˜ํ•ด ์ถ”์ ์ด ๋˜๊ธฐ ๋•Œ๋ฌธ์— ์•„๋ž˜์™€ ๊ฐ™์€ ์ด์ ์„ ์–ป์„ ์ˆ˜ ์žˆ๋‹ค.

  • hive metastore ์— ๋Œ€ํ•œ ์˜์กด์„ ์—†์•จ ์ˆ˜ ์žˆ์Œ
  • hdfs ๋“ฑ scale out ํ•˜๊ธฐ ์‰ฌ์šด ํŒŒ์ผ์‹œ์Šคํ…œ์„ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ์˜ ๋ณด๊ด€ ์žฅ์†Œ๋กœ ์ด์šฉํ•  ์ˆ˜ ์žˆ์Œ

Iceberg๊ฐ€ ํŒŒ์ผ์„ ์ถ”์ ํ•˜๋Š” ๋ฐฉ์‹ (์ถœ์ฒ˜: https://www.dremio.com/resources/guides/apache-iceberg-an-architectural-look-under-the-covers/)

 

Iceberg๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ณผ์ •์„ ํ†ตํ•ด ํ…Œ์ด๋ธ”์˜ ํŒŒ์ผ์„ ์ถ”์ ํ•œ๋‹ค.

  1. ํ˜„์žฌ Snapshot์„ ์ฐธ์กฐํ•˜์—ฌ ์ง€์ •๋œ Manifest list ํŒŒ์ผ์„ ์ฐพ๋Š”๋‹ค. (์ด๋ฏธ์ง€์—์„œ s0, s1)
  2. Manifest List ํŒŒ์ผ์€ 1) Manifest File์˜ ๊ฒฝ๋กœ, 2) ๊ทธ Manifest File๋กœ ๊ด€๋ฆฌ๋˜๋Š” ๋ฐ์ดํ„ฐ์˜ Partition Value์˜ ์ƒํ•œ์น˜/ํ•˜ํ•œ์น˜๋ฅผ ๊ฐ€์ ธ์˜จ๋‹ค.
    * ์ด๋•Œ ์ฟผ๋ฆฌ์— ํŒŒํ‹ฐ์…˜ ํ•„ํ„ฐ๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ Range๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์‹ค์ œ๋กœ ์ฝ๋Š” Manifest File์„ ํ•„ํ„ฐ๋งํ•  ์ˆ˜ ์žˆ๋‹ค.
  3. Manifest File์„ ์ฝ์–ด์„œ Data path file์„ ์ทจ๋“ํ•œ๋‹ค.
    Menifest File์—์„œ๋Š” ํŒŒ์ผ๋ณ„/์ปฌ๋Ÿผ๋ณ„ Status๋ฅผ ์ €์žฅํ•˜๊ณ  ์žˆ์–ด ์ฟผ๋ฆฌ ์‹œ์— ์‹ค์ œ๋กœ ์ฝ๋Š” ํŒŒ์ผ์„ ๋” ์ค„์ผ ์ˆ˜ ์žˆ๋‹ค.

Hive์™€ Iceberg์˜ ์ฐจ์ด์ ๊ณผ ์ด์ ์€ ์•„๋ž˜์™€ ๊ฐ™์ด ์ •๋ฆฌ๋  ์ˆ˜ ์žˆ๋‹ค.

  hive apache iceberg iceberg์˜ ์ด์ 
Metadata stored at hive metastore files scalability
Finer partitioning granularity limited unblokced efficiency
stats support per partition per file performance

 

 

Iceberg ๋„์ž…์„ ํ†ตํ•œ ๊ฐœ์„  ๋‚ด์šฉ

Hive Metastore ์˜์กด ์‚ญ์ œ๋ฅผ ํ†ตํ•œ ๋ถ€ํ•˜ ๋ฌธ์ œ ํ•ด๊ฒฐ

iceberg๋Š” metastore์„ db๋กœ ๊ด€๋ฆฌํ•˜์ง€ ์•Š๊ณ  ํŒŒ์ผ์„ ์ด์šฉํ•ด์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ ํ•œ๋‹ค.

๊ทธ๋ ‡๊ธฐ๋•Œ๋ฌธ์— HDFS์™€ ๊ฐ™์€ Scale-out์ด ์‰ฌ์šด ํŒŒ์ผ ์‹œ์Šคํ…œ์„ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ €์žฅ์†Œ๋กœ ์‚ฌ์šฉํ•จ์œผ๋กœ์„œ Capablity ๋ฅผ ํ™•๋ณดํ•  ์ˆ˜ ์žˆ์—ˆ์Œ.

 

๋กœ๊ทธ ํŒŒ์ดํ”„๋ผ์ธ ๊ฐœ์„ 

  • Source -> Kafka -> Flink -> Iceberg Table -> User
                                                        |
                                              spark on YARN

Flink์—์„œ๋Š” Iceberg file์„ ๋‹ค์ด๋ ‰ํŠธ๋กœ writeํ•˜๊ณ , (parquet/ORC ํฌ๋งท)

Spark ๋ฅผ ์ด์šฉํ•ด 5๋ถ„์— ํ•œ ๋ฒˆ์”ฉ Flushํ•˜๋„๋ก ๋ณ€๊ฒฝํ•จ

Spark์—์„œ๋Š” ์•„๋ž˜์˜ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

  • merge Small File
  • Expire Snapshots 
  • Rewrite manifests
  • Remove orphans
  • delete expired records

์ด๋กœ ์ธํ•ด ์–ป๋Š” ์ด์ ์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

  1. low latency: 2์‹œ๊ฐ„ -> 5๋ถ„์œผ๋กœ ๋‹จ์ถ•
  2. simple/scalableํ•œ ์•„ํ‚คํ…์ฒ˜๋กœ ๋ณ€๊ฒฝ
    - yarn, hiveserver2์˜ ๋””ํŽœ๋˜์‹œ๊ฐ€ ์‚ฌ๋ผ์ง
    - Hidden partition ๊ธฐ๋Šฅ์œผ๋กœ ํŒŒํ‹ฐ์…˜ ๊ด€๋ฆฌ๊ฐ€ ์šฉ์ดํ•ด์ง (external table์—์„œ ์‚ฌ์šฉํ•ด์•ผ ํ•˜๋Š” partition operation์€ ๋ถˆํ•„์š”ํ•ด์ง)
    - Namenode/Hive metastore์— ๋Œ€ํ•œ ๋ถ€๋‹ด์ด ๊ฒฝ๊ฐ๋จ
  3. Schema Evolution
    Iceberg์—์„œ๋Š” ์ปฌ๋Ÿผ์„ column name, position์œผ๋กœ ๊ตฌ๋ถ„ํ•˜์ง€ ์•Š์Œ
    ๊ฐ ํ•„๋“œ๋งˆ๋‹ค ์œ ๋‹ˆํฌํ•œ ID๊ฐ€ ๋ถ€์—ฌ๋˜์–ด ์žˆ์Œ
    ์ด๋กœ ์ธํ•ด ํ•„๋“œ ์ถ”๊ฐ€/ Rename/ ์‚ญ์ œ/ ์‚ฝ์ž…/ ์ด๋™ ๋“ฑ ๋ชจ๋“  ๋ณ€๊ฒฝ์ด Field ID ์— ๊ธฐ๋ฐ˜ํ•˜์—ฌ ์‰ฝ๊ฒŒ ์Šคํ‚ค๋งˆ๋ฅผ ๋ณ€๊ฒฝํ•  ์ˆ˜ ์žˆ๋‹ค.