๐ฅ
Apache Kafka ๋ณธ๋ฌธ
์นดํ์นด
๋ถ์ฐ์์คํ ๋ฉ์์ง ์ฒ๋ฆฌ
broker์ ๋ชจ์ = kafka cluster
zookeeper: ํด๋ฌ์คํฐ ์๋ฒ๋ค์ด ๊ณต์ ํ๋ ๋ฐ์ดํฐ ๊ด๋ฆฌ
→ broker์ ๋ถ์ฐ ์ฒ๋ฆฌ๋ ๋ฉ์์ง ํ์ ์ ๋ณด ๊ด๋ฆฌ(์นดํ์นด์ ๋ ธ๋ ๊ด๋ฆฌ, topic์ offset ์ ๋ณด ์ ์ฅ)
broker: kafka server/ ํ ํด๋ฌ์คํฐ ๋ด์์ ์ฌ๋ฌ๊ฐ์ kafka server(broker)๊ฐ ๋์ํ๋ค.
topic: ๋ฉ์์ง๊ฐ ์์ฑ๋๊ณ ์๋น๋๋ ์ฃผ์ . ๋ค๋ฅธ ํ ํฝ๋ค๊ณผ isolation (๊ตฌ๋ ํ consumer์๊ฒ๋ง ๋ณด์ฌ์ง)
partition: ํ๋ ๋ค๋ชจ๋ค ํ ์ค. topic ๋ด์์ ๋ฉ์์ง๊ฐ ๋ถ์ฐ๋์ด ์ ์ฅ๋๋ ๋จ์. ํ topic์ ์๋ n๊ฐ์ partition์ ๋ฉ์์ง๊ฐ ๋ถ์ฐ๋์ด ์ ์ฅ.
- partition ๋ด์์๋ queue ํํ๋ก ์์ ๋ณด์ฅ (offset ์ด์ฉ)
- partition๋ผ๋ฆฌ๋ ์์๋ณด์ฅ X
- ํ ๋ฒ์ ํ๋์ reader (ํ ํํฐ์ ์ ๋์์ ์ฌ๋ฌ consumer๊ฐ ์ ๊ทผํด์ ์ฝ์ด์ฌ ์ ์๋ค.)
log: ํ๋ ๋ค๋ชจ ํ ์นธ. key-value-timestamp๋ก ๊ตฌ์ฑ
์ค์ ๋ก๋ topic๊ณผ partition์ด ์ ๊ทธ๋ฆผ์ฒ๋ผ ๋ถ์ฐ์ ์ฅ๋์ด์๋ ๋ฏ ํ๋ค.
(broker:3๊ฐ, /// topic๋ช : T1, partition ๊ฐ์: 3 /// topic๋ช : T2, partition ๊ฐ์: 3 /// topic1, 2์ replication=1์ผ๋ก ๊ฐ์ )
๋ถ์ฐ ์ ์ฅ์ ์๋์ผ๋ก ๋์ง๋ง, ์ฌ์ฉ์๊ฐ ์ํ๋ค๋ฉด ์๋์ผ๋ก ์ฌ๋ฐฐ์น ๊ฐ๋ฅ
ํ ํฝ ์์ฑ ์
์๋ก์ด ํ ํฝ์ ์์ฑํ๊ฒ ๋๋ฉด ๋ธ๋ก์ปค ๊ฐ ํํฐ์ ์ ํ ๋นํ๋ ๋ฐฉ๋ฒ์ ๊ฒฐ์ ํ๋ค.
1) partition replica๊ฐ ๋ธ๋ก์ปค ๊ฐ ๊ณ ๋ฅด๊ฒ ๋ถ์ฐ
2) ๊ฐ partition์ replica๋ ์๋ก ๋ค๋ฅธ ๋ธ๋ก์ปค์ ํ ๋น
consumer group
consumer๋ ์ฌ์ค consumer instance ์ฌ๋ฌ๊ฐ๋ก ์ด๋ฃจ์ด์ง consumer group์ด๋ค.
group ๋ด์ instance๋ค์ ์๋ก ์ ๋ณด๋ฅผ ๊ณต์ ํ๊ณ ์์ → ํ ์๋ฒ๊ฐ ๋ค์ด๋์ด๋ ๋์ฒด ๊ฐ๋ฅ
- ๊ฐ์ฉ์ฑ
consumer server ํ๋๊ฐ ๋ค์ด๋์ด๋ ๋ค๋ฅธ ์๋ฒ๊ฐ ๋์ฒดํ ์ ์์
์ฌ๋ฌ ๋์ consumer instance๋ฅผ ์ค์ ํจ์ผ๋ก์จ ์์ ์ฑ ํ๋ณด - offset ๊ด๋ฆฌ
๊ฐ ๊ทธ๋ฃน๋ง๋ค ์๋ก ๋ค๋ฅธ offset์ ์ ์งํจ์ผ๋ก์จ ํ ๊ฐ์ topic์์ ์ฌ๋ฌ๊ฐ์ consumer group์ด ์ ๊ทผํด์ ๊ฐ์ ธ์ฌ ์ ์์
* Partition-Instance ๊ด๊ณ
1:1์ธ๊ฒ ๊ฐ์ฅ ์ด์์
topic์ partition ์๊ฐ ๋ง๋ค๋ฉด ๊ทธ๋งํผ instance ์๋ฅผ ๋๋ ค์ ๋ฐ์ดํฐ๋ฅผ ๋น ๋ฅด๊ฒ ๊ฐ์ ธ์ฌ ์ ์๋ค.
→ ๊ทธ๋ฌ๋ topic partition ์๋ ๋๋ฆด ์ ์์ง๋ง ๋ค์ ์ค์ผ์๋ ์์ผ๋ฏ๋ก ๋ฌด์์ ๋๋ฆฌ๋ฉด ์๋๋ค.
# partitions >= # instances
๋ฉ์์ง ๋ณด์กด ๊ธฐ๊ฐ
consumer๊ฐ ๋ฉ์์ง๋ฅผ ์๋นํ๋ค๊ณ ํด์ ๋ฐ๋ก ์ฌ๋ผ์ง์ง ์๊ณ ์ค์ ๋ ๋ณด์กด๊ธฐ๊ฐ์ด ์ง๋์ผ ์ฌ๋ผ์ง.
default = 7์ผ, log.retention.hours ์ค์ ์ ํตํด ๋ณ๊ฒฝ ๊ฐ๋ฅ
Replication
๋ณต์ ๋ฅผ ํ๋ฉด zookeeper๊ฐ ์์์ broker์ ๋ถ์ฐ์ ์ฅ ์์ผ์ค
๊ธฐ๋ณธ RF(replication factor) = 3
replication์ topic๋ง๋ค ๊ฐ๊ฐ ์ค์ ํ ์ ์๋ค. → ๋ฐ์ดํฐ์ ์ค์๋์ ๋ฐ๋ผ ์ค์ํ topic์ replication↑, ์๋์ ์ผ๋ก ๋ ์ค์ํ topic์ replication↓์ผ๋ก ์ค์ ํ๋ ๋ฐฉ์์ ํตํด ์์ ์ฑ ์ค์ ๊ฐ๋ฅ
leader์ follower
์ ๊ทธ๋ฆผ์์ topic1์ replication์ 2, topic2๋ 3์ผ๋ก ์ค์ ํ ๋ชจ์ต์ด๋ค.
์นดํ์นด์์๋ leader์ follower๋ฅผ ์ค์ ํ๋๋ฐ, producer, consumer์์ ๋ชจ๋ ํต์ ์ leader์์ ์ด๋ฃจ์ด์ง๊ณ follower๋ leader์ ์ ๋ณด๋ฅผ ๋ณต์ฌํ๋ค.
leader๊ฐ ์ฃฝ์ผ๋ฉด follower ์ค ํ๋๊ฐ leader๋ก ์ ์ถ๋๋ฉฐ ์ด๋ zookeeper์ ์ํด ์ด๋ฃจ์ด์ง๋ค.
producer config ์ ๋ณด์์ ack ์ต์
producer๊ฐ ๋ฉ์์ง๋ฅผ ๋ณด๋ผ ๋ ์ต์ ์ ์ ํํด์ ๋ณด๋ผ ์ ์๋๋ฐ, ack๋ ๊ทธ ์ค ํ๋.
- ack=0
producer๊ฐ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๊ณ kafka๋ก๋ถํฐ ๋ณ๋ค๋ฅธ ack๋ฅผ ๋ฐ์ง ์์
๋ฉ์์ง์ ์ค๊ฐ๋ฅ์ฑ ไธ, ์๋ ไธ - ack=1 (default)
producer๊ฐ msg๋ฅผ leader์๊ฒ ๋ณด๋ด๊ณ
ack๋ฅผ producer์๊ฒ ๋ณด๋
→ follwer์๊ฒ ๋ณต์ ๊ฐ ๋๊ธฐ ์ ์ leader์ ์คํจ๊ฐ ๋ฐ์ํ๋ฉด ๋ฉ์์ง ์ ์ค ๊ฐ๋ฅ์ฑ์ด ์์
๋ฉ์์ง์ ์ค๊ฐ๋ฅ์ฑ ไธญ, ์๋ ไธญ - ack=all(-1)
producer๊ฐ msg๋ฅผ leader์๊ฒ ๋ณด๋ด๊ณ
follower๊ฐ ๋ณต์ฌ๋ฅผ ๋ค ํ๋์ง๊น์ง ๊ธฐ๋ค๋ฆผ (๋ชจ๋ ISR ํ์ธ)
→ leader์ ์ด๋ ์๊ฐ ์คํจ๊ฐ ๋ฐ์ํด๋ follower์ ๋ณต์ ๋ ๋ฐ์ดํฐ๊ฐ ํ์คํ๊ฒ ์์ผ๋ฏ๋ก msg ์ ์ค์ด ์์
๋ฉ์์ง์ ์ค๊ฐ๋ฅ์ฑ ไธ, ์๋ ไธ
Exactly-once-Delivery(kafka streams api)
์ ํํ ํ ๋ฒ์ ๋ฉ์์ง ์ ์ก ๋ณด์ฅ. ์ค๋ณต๋ ๋ฉ์์ง์ ์ ์ค๋ ๋ฉ์์ง ๋ชจ๋ ๋นํ์ฉ
enable.idempotence=true๋ก ์ค์
→ ๋ฉฑ๋ฑ์ฑ(=๋์ผ ์ฐ์ฐ์ ์ฌ๋ฌ๋ฒ ์ํํด๋ ๊ฒฐ๊ณผ๋ณํ๊ฐ ์์)
- producer msg ์ ์ก ๋ฉฑ๋ฑ์ฑ ๋ณด์ฅ
๋์ผํ msg ์ ๋ณด๋ฅผ ์ฌ๋ฌ๋ฒ ๋ณด๋ด๋ฉด ์ด ์ ๋ณด๋ broker์ kafka log์๋ง ๊ธฐ๋ก๋จ
→ ์ค๋ณต๋ ๋ฉ์์ง๊ฐ ๋ฐ์ํ ๊ฐ๋ฅ์ฑ ์ฐจ๋จ
TCP๊ฐ ํจํท์ seq ํฌํจํด์ ํ๋ฆ์ ์ ์ดํ๋ ๊ฒ ์ฒ๋ผ ๊ฐ msg์ seq๋ฅผ ๋ฌ์ ์ค๋ณต msg๋ฅผ ์ฒ๋ฆฌํ๋ค. - ํธ๋์ญ์
atomic write ๋ณด์ฅ: ํธ๋์ญ์ api
kafka streams api
'๋ฐ์ดํฐ > ํ๋ก' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
parquet ํ์ผ ์ ๋ณด ๋ณด๊ธฐ (0) | 2022.10.05 |
---|---|
[CM] Yarn Node Manager ์ถ๊ฐ ์ ava.lang.IllegalArgumentException: java.net.UnknownHostException: HOSTNAME (0) | 2022.05.13 |
Python์ ํตํด impala ์ ์ (0) | 2022.03.03 |
Apache Flink (0) | 2022.02.11 |
ํ๋ก ์์ฝ์์คํ (0) | 2022.02.11 |