Skip to content

Commit

Permalink
fix memory leak in case of error on consumer/producer creation
Browse files Browse the repository at this point in the history
Before this patch we didn't free some resources in case of error
in Consumer.create or Producer.create functions. This patch fixes
it.

Closes #108
  • Loading branch information
olegrok committed Apr 30, 2024
1 parent 825579a commit 38ead88
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 87 deletions.
2 changes: 2 additions & 0 deletions kafka/callbacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ new_event_queues() {

void
destroy_event_queues(struct lua_State *L, event_queues_t *event_queues) {
if (event_queues == NULL)
return;
if (event_queues->consume_queue != NULL) {
msg_t *msg = NULL;
while (true) {
Expand Down
46 changes: 37 additions & 9 deletions kafka/consumer.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "consumer.h"

#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
Expand All @@ -10,14 +12,27 @@
#include <queue.h>
#include <consumer_msg.h>

#include "consumer.h"

////////////////////////////////////////////////////////////////////////////////////////////////////

/**
* Consumer poll thread
*/

typedef struct consumer_poller_t {
rd_kafka_t *rd_consumer;
pthread_t thread;
pthread_attr_t attr;
int should_stop;
pthread_mutex_t lock;
} consumer_poller_t;

typedef struct {
rd_kafka_t *rd_consumer;
rd_kafka_topic_partition_list_t *topics;
event_queues_t *event_queues;
consumer_poller_t *poller;
} consumer_t;

static void *
consumer_poll_loop(void *arg) {
set_thread_name("kafka_consumer");
Expand Down Expand Up @@ -605,7 +620,6 @@ lua_create_consumer(struct lua_State *L) {
}

char errstr[512];
rd_kafka_conf_t *rd_config = rd_kafka_conf_new();

rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
lua_pushstring(L, "default_topic_options");
Expand All @@ -618,15 +632,15 @@ lua_create_consumer(struct lua_State *L) {
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
lua_pushnil(L);
lua_pushliteral(L, "consumer config default topic options must contains only string keys and string values");
return 2;
goto topic_error;
}

const char *value = lua_tostring(L, -1);
const char *key = lua_tostring(L, -2);
if (rd_kafka_topic_conf_set(topic_conf, key, value, errstr, sizeof(errstr))) {
lua_pushnil(L);
lua_pushstring(L, errstr);
return 2;
goto topic_error;
}

// pop value, leaving original key
Expand All @@ -636,6 +650,8 @@ lua_create_consumer(struct lua_State *L) {
// stack now contains: -1 => table
}
lua_pop(L, 1);

rd_kafka_conf_t *rd_config = rd_kafka_conf_new();
rd_kafka_conf_set_default_topic_conf(rd_config, topic_conf);

event_queues_t *event_queues = new_event_queues();
Expand Down Expand Up @@ -678,15 +694,15 @@ lua_create_consumer(struct lua_State *L) {
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
lua_pushnil(L);
lua_pushliteral(L, "consumer config options must contains only string keys and string values");
return 2;
goto config_error;
}

const char *value = lua_tostring(L, -1);
const char *key = lua_tostring(L, -2);
if (rd_kafka_conf_set(rd_config, key, value, errstr, sizeof(errstr))) {
lua_pushnil(L);
lua_pushstring(L, errstr);
return 2;
goto config_error;
}

// pop value, leaving original key
Expand All @@ -701,13 +717,14 @@ lua_create_consumer(struct lua_State *L) {
if (!(rd_consumer = rd_kafka_new(RD_KAFKA_CONSUMER, rd_config, errstr, sizeof(errstr)))) {
lua_pushnil(L);
lua_pushstring(L, errstr);
return 2;
goto config_error;
}

rd_config = NULL; // was freed by rd_kafka_new
if (rd_kafka_brokers_add(rd_consumer, brokers) == 0) {
lua_pushnil(L);
lua_pushliteral(L, "No valid brokers specified");
return 2;
goto broker_error;
}

rd_kafka_poll_set_consumer(rd_consumer);
Expand All @@ -728,6 +745,17 @@ lua_create_consumer(struct lua_State *L) {
luaL_getmetatable(L, consumer_label);
lua_setmetatable(L, -2);
return 1;

broker_error:
rd_kafka_destroy(rd_consumer);
config_error:
if (rd_config != NULL)
rd_kafka_conf_destroy(rd_config);
destroy_event_queues(L, event_queues);
return 2;
topic_error:
rd_kafka_topic_conf_destroy(topic_conf);
return 2;
}

int
Expand Down
27 changes: 0 additions & 27 deletions kafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,6 @@
#include <lualib.h>
#include <lauxlib.h>

#include <librdkafka/rdkafka.h>

#include <common.h>
#include <queue.h>
#include <callbacks.h>
#include <consumer_msg.h>

////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Consumer
*/

typedef struct {
rd_kafka_t *rd_consumer;
pthread_t thread;
pthread_attr_t attr;
int should_stop;
pthread_mutex_t lock;
} consumer_poller_t;

typedef struct {
rd_kafka_t *rd_consumer;
rd_kafka_topic_partition_list_t *topics;
event_queues_t *event_queues;
consumer_poller_t *poller;
} consumer_t;

int
lua_consumer_subscribe(struct lua_State *L);

Expand Down
59 changes: 46 additions & 13 deletions kafka/producer.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "producer.h"

#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
Expand All @@ -10,14 +12,33 @@
#include <callbacks.h>
#include <queue.h>

#include "producer.h"

////////////////////////////////////////////////////////////////////////////////////////////////////

/**
* Producer poll thread
*/

typedef struct producer_poller_t {
rd_kafka_t *rd_producer;
pthread_t thread;
pthread_attr_t attr;
int should_stop;
pthread_mutex_t lock;
} producer_poller_t;

typedef struct producer_topics_t {
rd_kafka_topic_t **elements;
int32_t count;
int32_t capacity;
} producer_topics_t;

typedef struct {
rd_kafka_t *rd_producer;
producer_topics_t *topics;
event_queues_t *event_queues;
producer_poller_t *poller;
} producer_t;

static void *
producer_poll_loop(void *arg) {
set_thread_name("kafka_producer");
Expand Down Expand Up @@ -98,7 +119,7 @@ destroy_producer_poller(producer_poller_t *poller) {
* Producer
*/

producer_topics_t *
static producer_topics_t *
new_producer_topics(int32_t capacity) {
rd_kafka_topic_t **elements;
elements = xmalloc(sizeof(rd_kafka_topic_t *) * capacity);
Expand All @@ -111,7 +132,7 @@ new_producer_topics(int32_t capacity) {
return topics;
}

void
static void
add_producer_topics(producer_topics_t *topics, rd_kafka_topic_t *element) {
if (topics->count >= topics->capacity) {
rd_kafka_topic_t **new_elements = xrealloc(topics->elements, sizeof(rd_kafka_topic_t *) * topics->capacity * 2);
Expand All @@ -133,7 +154,7 @@ find_producer_topic_by_name(producer_topics_t *topics, const char *name) {
return NULL;
}

void
static void
destroy_producer_topics(producer_topics_t *topics) {
rd_kafka_topic_t **topic_p;
rd_kafka_topic_t **end = topics->elements + topics->count;
Expand Down Expand Up @@ -448,8 +469,6 @@ lua_create_producer(struct lua_State *L) {

char errstr[512];

rd_kafka_conf_t *rd_config = rd_kafka_conf_new();

rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
lua_pushstring(L, "default_topic_options");
lua_gettable(L, -2);
Expand All @@ -461,15 +480,15 @@ lua_create_producer(struct lua_State *L) {
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
lua_pushnil(L);
lua_pushliteral(L, "producer config default topic options must contains only string keys and string values");
return 2;
goto topic_error;
}

const char *value = lua_tostring(L, -1);
const char *key = lua_tostring(L, -2);
if (rd_kafka_topic_conf_set(topic_conf, key, value, errstr, sizeof(errstr))) {
lua_pushnil(L);
lua_pushstring(L, errstr);
return 2;
goto topic_error;
}

// pop value, leaving original key
Expand All @@ -479,6 +498,8 @@ lua_create_producer(struct lua_State *L) {
// stack now contains: -1 => table
}
lua_pop(L, 1);

rd_kafka_conf_t *rd_config = rd_kafka_conf_new();
rd_kafka_conf_set_default_topic_conf(rd_config, topic_conf);

event_queues_t *event_queues = new_event_queues();
Expand Down Expand Up @@ -522,15 +543,15 @@ lua_create_producer(struct lua_State *L) {
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
lua_pushnil(L);
lua_pushliteral(L, "producer config options must contains only string keys and string values");
return 2;
goto config_error;
}

const char *value = lua_tostring(L, -1);
const char *key = lua_tostring(L, -2);
if (rd_kafka_conf_set(rd_config, key, value, errstr, sizeof(errstr))) {
lua_pushnil(L);
lua_pushstring(L, errstr);
return 2;
goto config_error;
}

// pop value, leaving original key
Expand All @@ -545,13 +566,14 @@ lua_create_producer(struct lua_State *L) {
if (!(rd_producer = rd_kafka_new(RD_KAFKA_PRODUCER, rd_config, errstr, sizeof(errstr)))) {
lua_pushnil(L);
lua_pushstring(L, errstr);
return 2;
goto config_error;
}

rd_config = NULL; // was freed by rd_kafka_new
if (rd_kafka_brokers_add(rd_producer, brokers) == 0) {
lua_pushnil(L);
lua_pushliteral(L, "No valid brokers specified");
return 2;
goto broker_error;
}

// creating background thread for polling consumer
Expand All @@ -570,6 +592,17 @@ lua_create_producer(struct lua_State *L) {
luaL_getmetatable(L, producer_label);
lua_setmetatable(L, -2);
return 1;

broker_error:
rd_kafka_destroy(rd_producer);
config_error:
if (rd_config != NULL)
rd_kafka_conf_destroy(rd_config);
destroy_event_queues(L, event_queues);
return 2;
topic_error:
rd_kafka_topic_conf_destroy(topic_conf);
return 2;
}

int
Expand Down
38 changes: 0 additions & 38 deletions kafka/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,6 @@
#include <lualib.h>
#include <lauxlib.h>

#include <librdkafka/rdkafka.h>

#include <queue.h>

////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Producer
*/

typedef struct {
rd_kafka_t *rd_producer;
pthread_t thread;
pthread_attr_t attr;
int should_stop;
pthread_mutex_t lock;
} producer_poller_t;

typedef struct {
rd_kafka_topic_t **elements;
int32_t count;
int32_t capacity;
} producer_topics_t;

producer_topics_t *new_producer_topics(int32_t capacity);

void
add_producer_topics(producer_topics_t *topics, rd_kafka_topic_t *element);

void
destroy_producer_topics(producer_topics_t *topics);

typedef struct {
rd_kafka_t *rd_producer;
producer_topics_t *topics;
event_queues_t *event_queues;
producer_poller_t *poller;
} producer_t;

int
lua_producer_tostring(struct lua_State *L);

Expand Down
Loading

0 comments on commit 38ead88

Please sign in to comment.