๐ฅ
[Kafka] kafka produce ํ๊ธฐ (with C) ๋ณธ๋ฌธ
์ฒ์์ ๊ทธ๋ฅ ํ์ด์ฌ์ผ๋ก ํ์ผ์ ์ฝ์ด์ ์นดํ์นด ๋ฉ์์ง๋ฅผ ๋ณด๋ด๋ ค ํ๋๋ฐ ์๊ฐ๋ณด๋ค ๋๋ ธ๋ค..
์นดํ์นด๋ก ๋ณด๋ด์ผํ๋ ํ์ผ์ ์์ฑํ๋ ์ฝ๋๊ฐ c/c++๋ก ๋์ด์๊ธฐ๋ ํ๊ณ ๋ฉ์์ง ์์ฑ ์ฆ์ ๋ฐ๋ก ๋ณด๋ด๋ฉด ๋ ๋นจ๋ผ์ง ๊ฒ ๊ฐ์ C๋ก ์์ฑํด๋ดค๋ค.
1. librdkafka ํจํค์ง ๋ค์ด๋ก๋
๋๋ centos ์จ์ ์๋์ ๊ฐ์ด ์ค์นํ๋ค.
yum install librdkafka-devel
๋ค๋ฅธ ์ด์์ฒด์ ๋ ์๋ github ํ์ด์ง ์ฐธ๊ณ
https://github.com/confluentinc/librdkafka#installation
2. librdkafka ์ค์น ์์น ํ์ธ
์ค์น ์์น ํ์ธํ๋ ๋ฐฉ๋ฒ ์ฐธ๊ณ : https://quackstudy.tistory.com/entry/CentOS-yum%EC%9C%BC%EB%A1%9C-%EC%84%A4%EC%B9%98%ED%95%9C-%ED%8C%A8%ED%82%A4%EC%A7%80-%EC%84%A4%EC%B9%98-%EA%B2%BD%EB%A1%9C-%ED%99%95%EC%9D%B8%ED%95%98%EB%8A%94-%EB%B0%A9%EB%B2%95
/usr/include/librdkafka/rdkafka.h
/usr/lib64/librdkafka.so
๋น๋ํ ๋์๋ ์ ํ์ผ ์ฌ์ฉ ์์
3. C ์ฝ๋ ์์ฑ
์ฐธ๊ณ ํ ์ฝ๋:https://github.com/confluentinc/librdkafka/blob/master/examples/producer.c
3-0. librdkafka import
#include "rdkafka.h"
3-1. producer instance ์์ฑ
int
createKafkaProducer(rd_kafka_t **rk, const char *brokers){
rd_kafka_conf_t *conf;
char errstr[512];
conf = rd_kafka_conf_new()
/* kafka configuration ์ธํ
*/
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr))!=RD_KAFKA_CONF_OK){
printf("%s\n", errstr);
return -1;
}
/* kafka producer instance ์์ฑ */
*rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if(!rk){
printf("Failed to create new producer: %s\n",errstr);
return 1;
}
return 0;
}
3-2. message produce
void
producetoKafka(rd_kafka_t *rk, const char *topic, const char *message){
size_t message_len = strlen(message);
rd_kafka_resp_err_t err;
retry:
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(message, message_len),
RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
if(err){
fprintf(stderr, "Failed to produce to topic %s: %s\n", topic, rd_kafka_err2str(err));
/* local queue๊ฐ ๊ฐ๋์ฐจ๋ฉด produce ์ฌ์๋ */
if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
fprintf(stderr, "poll...\n");
rd_kafka_poll(rk,1000);
goto retry;
}
} else {
printf("Enqueued message (%zd bytes) for topic %s\n", message_len, topic);
}
rd_kafka_poll(rk, 0);
}
3-3. Destroy producer
void
destroyKafkaProducer(rd_kafka_t *rk){
printf("Flushing final messages..\n");
rd_kafka_flush(rk, 10*1000); //timeout value (ms๋จ์)
if(rd_kafka_outq_len(rk)>0){
fprintf(stderr,"%d message(s) were not delivered\n", rd_kafka_outq_len(rk));
}
rd_kafka_destroy(rk);
}
3-4. Makefile ์์ฑ
2๋ฒ์์ ํ์ธํ๋ ํค๋ํ์ผ ๋๋ ํ ๋ฆฌ๋ฅผ ์ถ๊ฐํด์ฃผ๋ฉด ๋๋ค.
-I/usr/include/librdkafka
-lrdkafka
.so ํ์ผ ์ฌ์ฉ์ ์ํด LD_LIBRARY_PATH
์๋ ๊ฒฝ๋ก ์ถ๊ฐํด์คฌ๋ค.
LD_LIBRARY_PATH=some/path....:/usr/lib64/pkgconfig
ํจ์๋ณ ์์ธํ ์ค๋ช ์ `rdkafka.h`ํ์ผ์ ์ฃผ์ ์ฝ์ด๋ณด๊ธฐ
'๋ฐ์ดํฐ' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Airflow] Nifi Trigger Operatorย ๋ง๋ค๊ธฐ (0) | 2024.04.04 |
---|---|
[Airflow] Telegram ์๋ฆผ operator ๋ง๋ค๊ธฐ (requests ์ฌ์ฉ) (0) | 2024.03.29 |
Python์ ํตํด trino ์ ์ (0) | 2023.06.16 |
TRINO -> Hive metastore ์ฌ์ฉ ์ HIVE_METASTORE_ERROR ์ค๋ฅ ์กฐ์น (0) | 2023.04.21 |
parquet ํ์ผ ๊ตฌ์กฐ (0) | 2022.11.09 |