๐Ÿฅ

[Kafka] kafka produce ํ•˜๊ธฐ (with C) ๋ณธ๋ฌธ

๋ฐ์ดํ„ฐ

[Kafka] kafka produce ํ•˜๊ธฐ (with C)

•8• 2023. 8. 31. 16:09

์ฒ˜์Œ์—” ๊ทธ๋ƒฅ ํŒŒ์ด์ฌ์œผ๋กœ ํŒŒ์ผ์„ ์ฝ์–ด์„œ ์นดํ”„์นด ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋ ค ํ–ˆ๋Š”๋ฐ ์ƒ๊ฐ๋ณด๋‹ค ๋Š๋ ธ๋‹ค.. 

์นดํ”„์นด๋กœ ๋ณด๋‚ด์•ผํ•˜๋Š” ํŒŒ์ผ์„ ์ƒ์„ฑํ•˜๋Š” ์ฝ”๋“œ๊ฐ€ c/c++๋กœ ๋˜์–ด์žˆ๊ธฐ๋„ ํ•˜๊ณ  ๋ฉ”์‹œ์ง€ ์ƒ์„ฑ ์ฆ‰์‹œ ๋ฐ”๋กœ ๋ณด๋‚ด๋ฉด ๋” ๋นจ๋ผ์งˆ ๊ฒƒ ๊ฐ™์•„ C๋กœ ์ž‘์„ฑํ•ด๋ดค๋‹ค.

 

1. librdkafka ํŒจํ‚ค์ง€ ๋‹ค์šด๋กœ๋“œ

๋‚˜๋Š” centos ์จ์„œ ์•„๋ž˜์™€ ๊ฐ™์ด ์„ค์น˜ํ–ˆ๋‹ค.

yum install librdkafka-devel

๋‹ค๋ฅธ ์šด์˜์ฒด์ œ๋Š” ์•„๋ž˜ github ํŽ˜์ด์ง€ ์ฐธ๊ณ 
https://github.com/confluentinc/librdkafka#installation

2. librdkafka ์„ค์น˜ ์œ„์น˜ ํ™•์ธ

์„ค์น˜ ์œ„์น˜ ํ™•์ธํ•˜๋Š” ๋ฐฉ๋ฒ• ์ฐธ๊ณ : https://quackstudy.tistory.com/entry/CentOS-yum%EC%9C%BC%EB%A1%9C-%EC%84%A4%EC%B9%98%ED%95%9C-%ED%8C%A8%ED%82%A4%EC%A7%80-%EC%84%A4%EC%B9%98-%EA%B2%BD%EB%A1%9C-%ED%99%95%EC%9D%B8%ED%95%98%EB%8A%94-%EB%B0%A9%EB%B2%95

/usr/include/librdkafka/rdkafka.h
/usr/lib64/librdkafka.so

๋นŒ๋“œํ• ๋•Œ์—๋Š” ์œ„ ํŒŒ์ผ ์‚ฌ์šฉ ์˜ˆ์ •

3. C ์ฝ”๋“œ ์ž‘์„ฑ

์ฐธ๊ณ ํ•œ ์ฝ”๋“œ:https://github.com/confluentinc/librdkafka/blob/master/examples/producer.c

3-0. librdkafka import

#include "rdkafka.h"

3-1. producer instance ์ƒ์„ฑ

int
createKafkaProducer(rd_kafka_t **rk, const char *brokers){
	rd_kafka_conf_t *conf;
    char errstr[512];
    
    conf = rd_kafka_conf_new()
    

    /* kafka configuration ์„ธํŒ… */
    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr))!=RD_KAFKA_CONF_OK){
            printf("%s\n", errstr);
            return -1;
    }

    /* kafka producer instance ์ƒ์„ฑ */
    *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if(!rk){
            printf("Failed to create new producer: %s\n",errstr);
            return 1;
    }
    
    return 0;
  
 }

3-2. message produce

void
producetoKafka(rd_kafka_t *rk, const char *topic, const char *message){

        size_t message_len = strlen(message);
        rd_kafka_resp_err_t err;
retry:

        err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic),
                                                        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                                                        RD_KAFKA_V_VALUE(message, message_len),
                                                        RD_KAFKA_V_OPAQUE(NULL),
                                                        RD_KAFKA_V_END);

        if(err){
            fprintf(stderr, "Failed to produce to topic %s: %s\n", topic,  rd_kafka_err2str(err));
            
            /* local queue๊ฐ€ ๊ฐ€๋“์ฐจ๋ฉด produce ์žฌ์‹œ๋„ */
            if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
                fprintf(stderr, "poll...\n");
                rd_kafka_poll(rk,1000);
                goto retry;
            }
        } else {
                printf("Enqueued message (%zd bytes) for topic %s\n", message_len, topic);
        }

        rd_kafka_poll(rk, 0);

}

3-3. Destroy producer

void
destroyKafkaProducer(rd_kafka_t *rk){
    printf("Flushing final messages..\n");
    rd_kafka_flush(rk, 10*1000); //timeout value (ms๋‹จ์œ„)

    if(rd_kafka_outq_len(rk)>0){
        fprintf(stderr,"%d message(s) were not delivered\n", rd_kafka_outq_len(rk));
    }
    rd_kafka_destroy(rk);
}

3-4. Makefile ์ž‘์„ฑ

2๋ฒˆ์—์„œ ํ™•์ธํ–ˆ๋˜ ํ—ค๋”ํŒŒ์ผ ๋””๋ ‰ํ† ๋ฆฌ๋ฅผ ์ถ”๊ฐ€ํ•ด์ฃผ๋ฉด ๋œ๋‹ค.

-I/usr/include/librdkafka
-lrdkafka

.so ํŒŒ์ผ ์‚ฌ์šฉ์„ ์œ„ํ•ด LD_LIBRARY_PATH์—๋„ ๊ฒฝ๋กœ ์ถ”๊ฐ€ํ•ด์คฌ๋‹ค.

LD_LIBRARY_PATH=some/path....:/usr/lib64/pkgconfig

 

 

ํ•จ์ˆ˜๋ณ„ ์ž์„ธํ•œ ์„ค๋ช…์€  `rdkafka.h`ํŒŒ์ผ์— ์ฃผ์„ ์ฝ์–ด๋ณด๊ธฐ