Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZMQ lib change. #958

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
initialize(endpoint, vrf);
}

ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs)
: m_waitTimeMs(waitTimeMs)
{
initialize(endpoint);
}

ZmqClient::~ZmqClient()
{
std::lock_guard<std::mutex> lock(m_socketMutex);
Expand Down Expand Up @@ -55,7 +61,7 @@ void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf)

connect();
}

bool ZmqClient::isConnected()
{
return m_connected;
Expand Down Expand Up @@ -137,7 +143,7 @@ void ZmqClient::sendMsg(
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
{
{
// ZMQ socket is not thread safe: http://api.zeromq.org/2-1:zmq
Expand All @@ -146,7 +152,6 @@ void ZmqClient::sendMsg(
// Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
}

if (rc >= 0)
{
SWSS_LOG_DEBUG("zmq sended %d bytes", serializedlen);
Expand Down Expand Up @@ -197,4 +202,14 @@ void ZmqClient::sendMsg(
throw system_error(make_error_code(errc::io_error), message);
}

//TODO: To implement the wait() method later.
divyagayathri-hcl marked this conversation as resolved.
Show resolved Hide resolved
// To implement ZMQ_CLIENT and ZMQ_SERVER socket types for response path.
bool ZmqClient::wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
SWSS_LOG_ENTER();

return false;
}
}
13 changes: 11 additions & 2 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ namespace swss {
class ZmqClient
{
public:

ZmqClient(const std::string& endpoint);
ZmqClient(const std::string& endpoint, const std::string& vrf);
ZmqClient(const std::string& endpoint, uint32_t waitTimeMs);
~ZmqClient();

bool isConnected();
Expand All @@ -23,8 +25,13 @@ class ZmqClient
void sendMsg(const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos);

bool wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

private:
void initialize(const std::string& endpoint, const std::string& vrf);
void initialize(const std::string& endpoint, const std::string& vrf = "");

std::string m_endpoint;

Expand All @@ -36,8 +43,10 @@ class ZmqClient

bool m_connected;

uint32_t m_waitTimeMs;

std::mutex m_socketMutex;

std::vector<char> m_sendbuffer;
};

Expand Down
7 changes: 7 additions & 0 deletions common/zmqproducerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos
}
}

bool ZmqProducerStateTable::wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
return m_zmqClient.wait(dbName, tableName, kcos);
}

size_t ZmqProducerStateTable::dbUpdaterQueueSize()
{
if (m_asyncDBUpdater == nullptr)
Expand Down
5 changes: 5 additions & 0 deletions common/zmqproducerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class ZmqProducerStateTable : public ProducerStateTable
// Batched send that can include both SET and DEL requests.
virtual void send(const std::vector<KeyOpFieldsValuesTuple> &kcos);

// To wait for the response from the peer.
virtual bool wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

size_t dbUpdaterQueueSize();
private:
void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence);
Expand Down
68 changes: 40 additions & 28 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <unistd.h>
#include <string>
#include <deque>
#include <limits>
Expand All @@ -20,6 +21,7 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
: m_endpoint(endpoint),
m_vrf(vrf)
{
connect();
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
m_runThread = true;
m_mqPollThread = std::make_shared<std::thread>(&ZmqServer::mqPollThread, this);
Expand All @@ -31,6 +33,33 @@ ZmqServer::~ZmqServer()
{
m_runThread = false;
m_mqPollThread->join();

zmq_close(m_socket);
zmq_ctx_destroy(m_context);
}

void ZmqServer::connect()
{
SWSS_LOG_ENTER();
m_context = zmq_ctx_new();
m_socket = zmq_socket(m_context, ZMQ_PULL);

// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));

if (!m_vrf.empty())
{
zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}

int rc = zmq_bind(m_socket, m_endpoint.c_str());
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d",
m_endpoint.c_str(),
zmq_errno());
}
}

void ZmqServer::registerMessageHandler(
Expand Down Expand Up @@ -90,40 +119,19 @@ void ZmqServer::mqPollThread()
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("mqPollThread begin");

// Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket
void* context = zmq_ctx_new();;
void* socket = zmq_socket(context, ZMQ_PULL);

// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));

if (!m_vrf.empty())
{
zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}

int rc = zmq_bind(socket, m_endpoint.c_str());
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s",
m_endpoint.c_str(),
zmq_errno(),
strerror(zmq_errno()));
}

// zmq_poll will use less CPU
zmq_pollitem_t poll_item;
poll_item.fd = 0;
poll_item.socket = socket;
poll_item.socket = m_socket;
poll_item.events = ZMQ_POLLIN;
poll_item.revents = 0;

SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str());
SWSS_LOG_DEBUG("m_runThread: %d", m_runThread);
while (m_runThread)
{
// receive message
rc = zmq_poll(&poll_item, 1, 1000);
auto rc = zmq_poll(&poll_item, 1, 1000);
if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN))
{
// timeout or other event
Expand All @@ -132,7 +140,7 @@ void ZmqServer::mqPollThread()
}

// receive message
rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
if (rc < 0)
{
int zmq_err = zmq_errno();
Expand Down Expand Up @@ -160,11 +168,15 @@ void ZmqServer::mqPollThread()
// deserialize and write to redis:
handleReceivedData(m_buffer.data(), rc);
}
SWSS_LOG_NOTICE("mqPollThread end");
}

zmq_close(socket);
zmq_ctx_destroy(context);
//TODO: To implement the sendMsg() method later.
void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values)
{

SWSS_LOG_NOTICE("mqPollThread end");
return;
}

}
10 changes: 10 additions & 0 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ class ZmqServer
const std::string tableName,
ZmqMessageHandler* handler);

void sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values);

private:

void connect();

void handleReceivedData(const char* buffer, const size_t size);

void mqPollThread();
Expand All @@ -56,6 +62,10 @@ class ZmqServer

std::string m_vrf;

void* m_context;

void* m_socket;

std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap;
};

Expand Down
4 changes: 2 additions & 2 deletions tests/c_api_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,7 @@ TEST(c_api, SubscriberStateTable) {
TEST(c_api, ZmqConsumerProducerStateTable) {
clearDB();
SWSSStringManager sm;

SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true);

SWSSZmqServer srv = SWSSZmqServer_new("tcp://127.0.0.1:42312");
SWSSZmqClient cli = SWSSZmqClient_new("tcp://127.0.0.1:42312");
EXPECT_TRUE(SWSSZmqClient_isConnected(cli));
Expand Down Expand Up @@ -267,6 +265,7 @@ TEST(c_api, ZmqConsumerProducerStateTable) {
SWSSZmqProducerStateTable_set(pst, arr.data[i].key, arr.data[i].fieldValues);
else
SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr);
sleep(2);

ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 1500, true), SWSSSelectResult_DATA);
arr = SWSSZmqConsumerStateTable_pops(cst);
Expand Down Expand Up @@ -305,6 +304,7 @@ TEST(c_api, ZmqConsumerProducerStateTable) {
SWSSZmqProducerStateTable_del(pst, arr.data[i].key);
else
SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr);
sleep(2);

ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500, true), SWSSSelectResult_DATA);
arr = SWSSZmqConsumerStateTable_pops(cst);
Expand Down
Loading
Loading