Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZMQ lib change. #958

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/c-api/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ template <class T> static inline T &getReference(std::shared_ptr<T> &t) {
// T is anything that has a .size() method and which can be iterated over for
// swss::KeyOpFieldValuesTuple, eg vector or deque
template <class T> static inline SWSSKeyOpFieldValuesArray makeKeyOpFieldValuesArray(T &&in) {
SWSS_LOG_DEBUG("Entering makeKeyOpFieldValuesArray method");
SWSSKeyOpFieldValues *data = new SWSSKeyOpFieldValues[in.size()];

size_t i = 0;
Expand All @@ -211,6 +212,7 @@ template <class T> static inline SWSSKeyOpFieldValuesArray makeKeyOpFieldValuesA
SWSSKeyOpFieldValuesArray out;
out.len = (uint64_t)in.size();
out.data = data;
// SWSS_LOG_DEBUG("2::: out.len value is: %ld", out.len);
divyagayathri-hcl marked this conversation as resolved.
Show resolved Hide resolved
return out;
}

Expand Down Expand Up @@ -255,9 +257,11 @@ static inline std::vector<swss::KeyOpFieldsValuesTuple>
takeKeyOpFieldValuesArray(SWSSKeyOpFieldValuesArray in) {
std::vector<swss::KeyOpFieldsValuesTuple> out;
for (uint64_t i = 0; i < in.len; i++) {
// SWSS_LOG_DEBUG("i value is: %ld", i);
SWSSKeyOpFieldValues kfv = in.data[i];
out.push_back(takeKeyOpFieldValues(std::move(kfv)));
}
// SWSS_LOG_DEBUG("out.len value is: %ld", out.size());
return out;
}

Expand Down
1 change: 1 addition & 0 deletions common/c-api/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ void SWSSZmqClient_connect(SWSSZmqClient zmqc) {

void SWSSZmqClient_sendMsg(SWSSZmqClient zmqc, const char *dbName, const char *tableName,
SWSSKeyOpFieldValuesArray arr) {
SWSS_LOG_DEBUG("Inside SWSSZmqClient_sendMsg");
SWSSTry({
vector<KeyOpFieldsValuesTuple> kcos = takeKeyOpFieldValuesArray(arr);
((ZmqClient *)zmqc)
Expand Down
44 changes: 38 additions & 6 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@ namespace swss {
ZmqClient::ZmqClient(const std::string& endpoint)
:ZmqClient(endpoint, "")
{
initialize(endpoint);
divyagayathri-hcl marked this conversation as resolved.
Show resolved Hide resolved
}

ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
{
initialize(endpoint, vrf);
}

ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs) :
m_waitTimeMs(waitTimeMs)
{
initialize(endpoint);
}

ZmqClient::~ZmqClient()
{
std::lock_guard<std::mutex> lock(m_socketMutex);
Expand Down Expand Up @@ -55,6 +62,17 @@ void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf)

connect();
}

void ZmqClient::initialize(const std::string& endpoint)
divyagayathri-hcl marked this conversation as resolved.
Show resolved Hide resolved
{
m_connected = false;
m_endpoint = endpoint;
m_context = nullptr;
m_socket = nullptr;
m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT);

connect();
}

bool ZmqClient::isConnected()
{
Expand All @@ -63,6 +81,7 @@ bool ZmqClient::isConnected()

void ZmqClient::connect()
{
SWSS_LOG_ERROR("DIV:: Inside function client connect");
divyagayathri-hcl marked this conversation as resolved.
Show resolved Hide resolved
if (m_connected)
{
SWSS_LOG_DEBUG("Already connected to endpoint: %s", m_endpoint.c_str());
Expand All @@ -88,6 +107,7 @@ void ZmqClient::connect()
m_context = zmq_ctx_new();
m_socket = zmq_socket(m_context, ZMQ_PUSH);

SWSS_LOG_DEBUG("m_socket in client connect() is: %p\n", m_socket);
// timeout all pending send package, so zmq will not block in dtor of this class: http://api.zeromq.org/master:zmq-setsockopt
int linger = 0;
zmq_setsockopt(m_socket, ZMQ_LINGER, &linger, sizeof(linger));
Expand Down Expand Up @@ -119,6 +139,7 @@ void ZmqClient::sendMsg(
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos)
{
SWSS_LOG_ERROR("DIV:: Inside function client sendMsg");
int serializedlen = (int)BinarySerializer::serializeBuffer(
m_sendbuffer.data(),
m_sendbuffer.size(),
Expand All @@ -137,16 +158,18 @@ void ZmqClient::sendMsg(
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
{
{
// ZMQ socket is not thread safe: http://api.zeromq.org/2-1:zmq
std::lock_guard<std::mutex> lock(m_socketMutex);

// Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
}

//SWSS_LOG_DEBUG("Before Sleep() in client sendmsg");
// usleep(10 * 1000);
//SWSS_LOG_DEBUG("After Sleep() in client sendmsg");
if (rc >= 0)
{
SWSS_LOG_DEBUG("zmq sended %d bytes", serializedlen);
Expand All @@ -164,7 +187,7 @@ void ZmqClient::sendMsg(
// For example when ZMQ socket still not receive reply message from last sended package.
// There was state machine inside ZMQ socket, when the socket is not in ready to send state, this error will happen.
// for more detail, please check: http://api.zeromq.org/2-1:zmq-send
SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err);
SWSS_LOG_WARN("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err);

retry_delay = 0;
}
Expand All @@ -183,7 +206,7 @@ void ZmqClient::sendMsg(
else
{
// for other error, send failed immediately.
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}
Expand All @@ -192,9 +215,18 @@ void ZmqClient::sendMsg(
}

// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}

bool ZmqClient::wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
SWSS_LOG_ERROR("DIV:: Inside function wait");
SWSS_LOG_ENTER();

return false;
}
}
14 changes: 12 additions & 2 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ namespace swss {
class ZmqClient
{
public:

ZmqClient(const std::string& endpoint);
ZmqClient(const std::string& endpoint, const std::string& vrf);
ZmqClient(const std::string& endpoint, uint32_t waitTimeMs);
~ZmqClient();

bool isConnected();
Expand All @@ -23,8 +25,14 @@ class ZmqClient
void sendMsg(const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos);

bool wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

private:
void initialize(const std::string& endpoint, const std::string& vrf);
void initialize(const std::string& endpoint);

std::string m_endpoint;

Expand All @@ -36,9 +44,11 @@ class ZmqClient

bool m_connected;

uint32_t m_waitTimeMs;

std::mutex m_socketMutex;
std::vector<char> m_sendbuffer;

std::vector<char> m_sendbuffer;
};

}
4 changes: 4 additions & 0 deletions common/zmqconsumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string

void ZmqConsumerStateTable::handleReceivedData(const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos)
{
SWSS_LOG_DEBUG("Entering ZmqConsumerStateTable::handleReceivedData");
for (auto kco : kcos)
{
std::shared_ptr<KeyOpFieldsValuesTuple> clone = nullptr;
Expand All @@ -53,6 +54,7 @@ void ZmqConsumerStateTable::handleReceivedData(const std::vector<std::shared_ptr
{
std::lock_guard<std::mutex> lock(m_receivedQueueMutex);
m_receivedOperationQueue.push(kco);
SWSS_LOG_DEBUG("Called m_receivedOperationQueue.push in handleReceivedData()");
}

if (m_asyncDBUpdater != nullptr)
Expand All @@ -73,6 +75,7 @@ void ZmqConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const

// For new data append to m_dataQueue during pops, will not be include in result.
count = m_receivedOperationQueue.size();
//SWSS_LOG_DEBUG("count value: %ld", count);
if (!count)
{
return;
Expand All @@ -82,6 +85,7 @@ void ZmqConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const
vkco.clear();
for (size_t ie = 0; ie < count; ie++)
{
//SWSS_LOG_DEBUG("count inside for loop: %ld", count);
auto& kco = *(m_receivedOperationQueue.front());
vkco.push_back(std::move(kco));

Expand Down
9 changes: 9 additions & 0 deletions common/zmqproducerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ void ZmqProducerStateTable::del(const std::vector<std::string> &keys)

void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos)
{
SWSS_LOG_DEBUG("DIV:: Inside function ZmqProducerStateTable::send");
m_zmqClient.sendMsg(
m_dbName,
m_tableNameStr,
Expand All @@ -164,6 +165,14 @@ void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos
}
}

bool ZmqProducerStateTable::wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
SWSS_LOG_DEBUG("DIV:: Inside function ZmqProducerStateTable::wait");
return m_zmqClient.wait(dbName, tableName, kcos);
}

size_t ZmqProducerStateTable::dbUpdaterQueueSize()
{
if (m_asyncDBUpdater == nullptr)
Expand Down
7 changes: 7 additions & 0 deletions common/zmqproducerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ZmqProducerStateTable : public ProducerStateTable
public:
ZmqProducerStateTable(DBConnector *db, const std::string &tableName, ZmqClient &zmqClient, bool dbPersistence = true);
ZmqProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, ZmqClient &zmqClient, bool buffered = false, bool dbPersistence = true);
// ~ZmqProducerStateTable() = default;

/* Implements set() and del() commands using notification messages */
virtual void set(const std::string &key,
Expand All @@ -37,6 +38,12 @@ class ZmqProducerStateTable : public ProducerStateTable
// Batched send that can include both SET and DEL requests.
virtual void send(const std::vector<KeyOpFieldsValuesTuple> &kcos);

// This method should only be used if the ZmqClient enables one-to-one sync.
divyagayathri-hcl marked this conversation as resolved.
Show resolved Hide resolved

virtual bool wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

size_t dbUpdaterQueueSize();
private:
void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence);
Expand Down
Loading
Loading