๐Ÿฅ

Apache Kafka ๋ณธ๋ฌธ

๋ฐ์ดํ„ฐ/ํ•˜๋‘ก

Apache Kafka

•8• 2022. 2. 11. 14:07

์นดํ”„์นด

๋ถ„์‚ฐ์‹œ์Šคํ…œ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ

broker์˜ ๋ชจ์ž„ = kafka cluster

zookeeper: ํด๋Ÿฌ์Šคํ„ฐ ์„œ๋ฒ„๋“ค์ด ๊ณต์œ ํ•˜๋Š” ๋ฐ์ดํ„ฐ ๊ด€๋ฆฌ

→ broker์— ๋ถ„์‚ฐ ์ฒ˜๋ฆฌ๋œ ๋ฉ”์‹œ์ง€ ํ์˜ ์ •๋ณด ๊ด€๋ฆฌ(์นดํ”„์นด์˜ ๋…ธ๋“œ ๊ด€๋ฆฌ, topic์˜ offset ์ •๋ณด ์ €์žฅ)

broker: kafka server/ ํ•œ ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด์—์„œ ์—ฌ๋Ÿฌ๊ฐœ์˜ kafka server(broker)๊ฐ€ ๋™์ž‘ํ•œ๋‹ค.

topic: ๋ฉ”์‹œ์ง€๊ฐ€ ์ƒ์„ฑ๋˜๊ณ  ์†Œ๋น„๋˜๋Š” ์ฃผ์ œ. ๋‹ค๋ฅธ ํ† ํ”ฝ๋“ค๊ณผ isolation (๊ตฌ๋…ํ•œ consumer์—๊ฒŒ๋งŒ ๋ณด์—ฌ์ง)

partition: ํŒŒ๋ž€ ๋„ค๋ชจ๋“ค ํ•œ ์ค„. topic ๋‚ด์—์„œ ๋ฉ”์‹œ์ง€๊ฐ€ ๋ถ„์‚ฐ๋˜์–ด ์ €์žฅ๋˜๋Š” ๋‹จ์œ„. ํ•œ topic์— ์žˆ๋Š” n๊ฐœ์˜ partition์— ๋ฉ”์‹œ์ง€๊ฐ€ ๋ถ„์‚ฐ๋˜์–ด ์ €์žฅ.

  • partition ๋‚ด์—์„œ๋Š” queue ํ˜•ํƒœ๋กœ ์ˆœ์„œ ๋ณด์žฅ (offset ์ด์šฉ)
  • partition๋ผ๋ฆฌ๋Š” ์ˆœ์„œ๋ณด์žฅ X
  • ํ•œ ๋ฒˆ์— ํ•˜๋‚˜์˜ reader (ํ•œ ํŒŒํ‹ฐ์…˜์— ๋™์‹œ์— ์—ฌ๋Ÿฌ consumer๊ฐ€ ์ ‘๊ทผํ•ด์„œ ์ฝ์–ด์˜ฌ ์ˆ˜ ์—†๋‹ค.)

log: ํŒŒ๋ž€ ๋„ค๋ชจ ํ•œ ์นธ. key-value-timestamp๋กœ ๊ตฌ์„ฑ

 

 

์‹ค์ œ๋กœ๋Š” topic๊ณผ partition์ด ์œ„ ๊ทธ๋ฆผ์ฒ˜๋Ÿผ ๋ถ„์‚ฐ์ €์žฅ๋˜์–ด์žˆ๋Š” ๋“ฏ ํ•˜๋‹ค.

(broker:3๊ฐœ, /// topic๋ช…: T1, partition ๊ฐœ์ˆ˜: 3 /// topic๋ช…: T2, partition ๊ฐœ์ˆ˜: 3 /// topic1, 2์˜ replication=1์œผ๋กœ ๊ฐ€์ •)

๋ถ„์‚ฐ ์ €์žฅ์€ ์ž๋™์œผ๋กœ ๋˜์ง€๋งŒ, ์‚ฌ์šฉ์ž๊ฐ€ ์›ํ•œ๋‹ค๋ฉด ์ˆ˜๋™์œผ๋กœ ์žฌ๋ฐฐ์น˜ ๊ฐ€๋Šฅ

ํ† ํ”ฝ ์ƒ์„ฑ ์‹œ

    ์ƒˆ๋กœ์šด ํ† ํ”ฝ์„ ์ƒ์„ฑํ•˜๊ฒŒ ๋˜๋ฉด ๋ธŒ๋กœ์ปค ๊ฐ„ ํŒŒํ‹ฐ์…˜์„ ํ• ๋‹นํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๊ฒฐ์ •ํ•œ๋‹ค.

    1) partition replica๊ฐ€ ๋ธŒ๋กœ์ปค ๊ฐ„ ๊ณ ๋ฅด๊ฒŒ ๋ถ„์‚ฐ

    2) ๊ฐ partition์˜ replica๋Š” ์„œ๋กœ ๋‹ค๋ฅธ ๋ธŒ๋กœ์ปค์— ํ• ๋‹น


consumer group

consumer๋Š” ์‚ฌ์‹ค consumer instance ์—ฌ๋Ÿฌ๊ฐœ๋กœ ์ด๋ฃจ์–ด์ง„ consumer group์ด๋‹ค.

group ๋‚ด์˜ instance๋“ค์€ ์„œ๋กœ ์ •๋ณด๋ฅผ ๊ณต์œ ํ•˜๊ณ  ์žˆ์Œ → ํ•œ ์„œ๋ฒ„๊ฐ€ ๋‹ค์šด๋˜์–ด๋„ ๋Œ€์ฒด ๊ฐ€๋Šฅ

  1. ๊ฐ€์šฉ์„ฑ
    consumer server ํ•˜๋‚˜๊ฐ€ ๋‹ค์šด๋˜์–ด๋„ ๋‹ค๋ฅธ ์„œ๋ฒ„๊ฐ€ ๋Œ€์ฒดํ•  ์ˆ˜ ์žˆ์Œ
    ์—ฌ๋Ÿฌ ๋Œ€์˜ consumer instance๋ฅผ ์„ค์ •ํ•จ์œผ๋กœ์จ ์•ˆ์ •์„ฑ ํ™•๋ณด
  2. offset ๊ด€๋ฆฌ
    ๊ฐ ๊ทธ๋ฃน๋งˆ๋‹ค ์„œ๋กœ ๋‹ค๋ฅธ offset์„ ์œ ์ง€ํ•จ์œผ๋กœ์จ ํ•œ ๊ฐœ์˜ topic์—์„œ ์—ฌ๋Ÿฌ๊ฐœ์˜ consumer group์ด ์ ‘๊ทผํ•ด์„œ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ์Œ

* Partition-Instance ๊ด€๊ณ„

       1:1์ธ๊ฒŒ ๊ฐ€์žฅ ์ด์ƒ์ 

       topic์˜ partition ์ˆ˜๊ฐ€ ๋งŽ๋‹ค๋ฉด ๊ทธ๋งŒํผ instance ์ˆ˜๋ฅผ ๋Š˜๋ ค์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๋น ๋ฅด๊ฒŒ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋‹ค.

       → ๊ทธ๋Ÿฌ๋‚˜ topic partition ์ˆ˜๋Š” ๋Š˜๋ฆด ์ˆ˜ ์žˆ์ง€๋งŒ ๋‹ค์‹œ ์ค„์ผ์ˆ˜๋Š” ์—†์œผ๋ฏ€๋กœ ๋ฌด์ž‘์ • ๋Š˜๋ฆฌ๋ฉด ์•ˆ๋œ๋‹ค.

       # partitions >= # instances

 

๋ฉ”์‹œ์ง€ ๋ณด์กด ๊ธฐ๊ฐ„

       consumer๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ–ˆ๋‹ค๊ณ  ํ•ด์„œ ๋ฐ”๋กœ ์‚ฌ๋ผ์ง€์ง€ ์•Š๊ณ  ์„ค์ •๋œ ๋ณด์กด๊ธฐ๊ฐ„์ด ์ง€๋‚˜์•ผ ์‚ฌ๋ผ์ง.

       default = 7์ผ, log.retention.hours ์„ค์ •์„ ํ†ตํ•ด ๋ณ€๊ฒฝ ๊ฐ€๋Šฅ

 


Replication

๋ณต์ œ๋ฅผ ํ•˜๋ฉด zookeeper๊ฐ€ ์•Œ์•„์„œ broker์— ๋ถ„์‚ฐ์ €์žฅ ์‹œ์ผœ์คŒ

๊ธฐ๋ณธ RF(replication factor) = 3

replication์€ topic๋งˆ๋‹ค ๊ฐ๊ฐ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค. → ๋ฐ์ดํ„ฐ์˜ ์ค‘์š”๋„์— ๋”ฐ๋ผ ์ค‘์š”ํ•œ topic์€ replication↑, ์ƒ๋Œ€์ ์œผ๋กœ ๋œ ์ค‘์š”ํ•œ topic์€ replication↓์œผ๋กœ ์„ค์ •ํ•˜๋Š” ๋ฐฉ์‹์„ ํ†ตํ•ด ์•ˆ์ •์„ฑ ์„ค์ • ๊ฐ€๋Šฅ

 

leader์™€ follower

์œ„ ๊ทธ๋ฆผ์—์„œ topic1์€ replication์„ 2, topic2๋Š” 3์œผ๋กœ ์„ค์ •ํ•œ ๋ชจ์Šต์ด๋‹ค.

์นดํ”„์นด์—์„œ๋Š” leader์™€ follower๋ฅผ ์„ค์ •ํ•˜๋Š”๋ฐ, producer, consumer์™€์˜ ๋ชจ๋“  ํ†ต์‹ ์€ leader์—์„œ ์ด๋ฃจ์–ด์ง€๊ณ  follower๋Š” leader์˜ ์ •๋ณด๋ฅผ ๋ณต์‚ฌํ•œ๋‹ค.

leader๊ฐ€ ์ฃฝ์œผ๋ฉด follower ์ค‘ ํ•˜๋‚˜๊ฐ€ leader๋กœ ์„ ์ถœ๋˜๋ฉฐ ์ด๋Š” zookeeper์— ์˜ํ•ด ์ด๋ฃจ์–ด์ง„๋‹ค.

producer config ์ •๋ณด์—์„œ ack ์˜ต์…˜

       producer๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ผ ๋•Œ ์˜ต์…˜์„ ์„ ํƒํ•ด์„œ ๋ณด๋‚ผ ์ˆ˜ ์žˆ๋Š”๋ฐ, ack๋„ ๊ทธ ์ค‘ ํ•˜๋‚˜.

  1. ack=0
    producer๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๊ณ  kafka๋กœ๋ถ€ํ„ฐ ๋ณ„๋‹ค๋ฅธ ack๋ฅผ ๋ฐ›์ง€ ์•Š์Œ
    ๋ฉ”์‹œ์ง€์œ ์‹ค๊ฐ€๋Šฅ์„ฑ ไธŠ, ์†๋„ ไธŠ
  2. ack=1 (default)
    producer๊ฐ€ msg๋ฅผ leader์—๊ฒŒ ๋ณด๋‚ด๊ณ 
    ack๋ฅผ producer์—๊ฒŒ ๋ณด๋ƒ„
    → follwer์—๊ฒŒ ๋ณต์ œ๊ฐ€ ๋˜๊ธฐ ์ „์— leader์— ์‹คํŒจ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ๋ฉ”์‹œ์ง€ ์œ ์‹ค ๊ฐ€๋Šฅ์„ฑ์ด ์žˆ์Œ
    ๋ฉ”์‹œ์ง€์œ ์‹ค๊ฐ€๋Šฅ์„ฑ ไธญ, ์†๋„ ไธญ
  3. ack=all(-1)
    producer๊ฐ€ msg๋ฅผ leader์—๊ฒŒ ๋ณด๋‚ด๊ณ 
    follower๊ฐ€ ๋ณต์‚ฌ๋ฅผ ๋‹ค ํ–ˆ๋Š”์ง€๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆผ (๋ชจ๋“  ISR ํ™•์ธ)
    → leader์— ์–ด๋Š ์ˆœ๊ฐ„ ์‹คํŒจ๊ฐ€ ๋ฐœ์ƒํ•ด๋„ follower์— ๋ณต์ œ๋œ ๋ฐ์ดํ„ฐ๊ฐ€ ํ™•์‹คํ•˜๊ฒŒ ์žˆ์œผ๋ฏ€๋กœ msg ์œ ์‹ค์ด ์—†์Œ
    ๋ฉ”์‹œ์ง€์œ ์‹ค๊ฐ€๋Šฅ์„ฑ ไธ‹, ์†๋„ ไธ‹

Exactly-once-Delivery(kafka streams api)

       ์ •ํ™•ํžˆ ํ•œ ๋ฒˆ์˜ ๋ฉ”์‹œ์ง€ ์ „์†ก ๋ณด์žฅ. ์ค‘๋ณต๋œ ๋ฉ”์‹œ์ง€์™€ ์œ ์‹ค๋œ ๋ฉ”์‹œ์ง€ ๋ชจ๋‘ ๋น„ํ—ˆ์šฉ

 

enable.idempotence=true๋กœ ์„ค์ •

→ ๋ฉฑ๋“ฑ์„ฑ(=๋™์ผ ์—ฐ์‚ฐ์„ ์—ฌ๋Ÿฌ๋ฒˆ ์ˆ˜ํ–‰ํ•ด๋„ ๊ฒฐ๊ณผ๋ณ€ํ™”๊ฐ€ ์—†์Œ)

  1. producer msg ์ „์†ก ๋ฉฑ๋“ฑ์„ฑ ๋ณด์žฅ
    ๋™์ผํ•œ msg ์ •๋ณด๋ฅผ ์—ฌ๋Ÿฌ๋ฒˆ ๋ณด๋‚ด๋ฉด ์ด ์ •๋ณด๋Š” broker์˜ kafka log์—๋งŒ ๊ธฐ๋ก๋จ
    → ์ค‘๋ณต๋œ ๋ฉ”์‹œ์ง€๊ฐ€ ๋ฐœ์ƒํ•  ๊ฐ€๋Šฅ์„ฑ ์ฐจ๋‹จ
    TCP๊ฐ€ ํŒจํ‚ท์— seq ํฌํ•จํ•ด์„œ ํ๋ฆ„์„ ์ œ์–ดํ•˜๋Š” ๊ฒƒ ์ฒ˜๋Ÿผ ๊ฐ msg์— seq๋ฅผ ๋‘ฌ์„œ ์ค‘๋ณต msg๋ฅผ ์ฒ˜๋ฆฌํ•œ๋‹ค.
  2. ํŠธ๋žœ์žญ์…˜
    atomic write ๋ณด์žฅ: ํŠธ๋žœ์žญ์…˜ api
    kafka streams api