diff --git a/common/c-api/util.h b/common/c-api/util.h index 91d6e5c2d..06aeac15e 100644 --- a/common/c-api/util.h +++ b/common/c-api/util.h @@ -202,7 +202,6 @@ template static inline T &getReference(std::shared_ptr &t) { // T is anything that has a .size() method and which can be iterated over for // swss::KeyOpFieldValuesTuple, eg vector or deque template static inline SWSSKeyOpFieldValuesArray makeKeyOpFieldValuesArray(T &&in) { - SWSS_LOG_DEBUG("Entering makeKeyOpFieldValuesArray method"); SWSSKeyOpFieldValues *data = new SWSSKeyOpFieldValues[in.size()]; size_t i = 0; @@ -212,7 +211,6 @@ template static inline SWSSKeyOpFieldValuesArray makeKeyOpFieldValuesA SWSSKeyOpFieldValuesArray out; out.len = (uint64_t)in.size(); out.data = data; -// SWSS_LOG_DEBUG("2::: out.len value is: %ld", out.len); return out; } @@ -257,11 +255,9 @@ static inline std::vector takeKeyOpFieldValuesArray(SWSSKeyOpFieldValuesArray in) { std::vector out; for (uint64_t i = 0; i < in.len; i++) { -// SWSS_LOG_DEBUG("i value is: %ld", i); SWSSKeyOpFieldValues kfv = in.data[i]; out.push_back(takeKeyOpFieldValues(std::move(kfv))); } -// SWSS_LOG_DEBUG("out.len value is: %ld", out.size()); return out; } diff --git a/common/c-api/zmqclient.cpp b/common/c-api/zmqclient.cpp index e2c7636e7..fa1d59ca2 100644 --- a/common/c-api/zmqclient.cpp +++ b/common/c-api/zmqclient.cpp @@ -25,7 +25,6 @@ void SWSSZmqClient_connect(SWSSZmqClient zmqc) { void SWSSZmqClient_sendMsg(SWSSZmqClient zmqc, const char *dbName, const char *tableName, SWSSKeyOpFieldValuesArray arr) { - SWSS_LOG_DEBUG("Inside SWSSZmqClient_sendMsg"); SWSSTry({ vector kcos = takeKeyOpFieldValuesArray(arr); ((ZmqClient *)zmqc) diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index 588fbd49a..de9f9a4f1 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -18,7 +18,6 @@ namespace swss { ZmqClient::ZmqClient(const std::string& endpoint) :ZmqClient(endpoint, "") { - initialize(endpoint); } ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf) @@ -26,8 +25,8 @@ ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf) initialize(endpoint, vrf); } -ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs) : - m_waitTimeMs(waitTimeMs) +ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs) +: m_waitTimeMs(waitTimeMs) { initialize(endpoint); } @@ -63,17 +62,6 @@ void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf) connect(); } -void ZmqClient::initialize(const std::string& endpoint) -{ - m_connected = false; - m_endpoint = endpoint; - m_context = nullptr; - m_socket = nullptr; - m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT); - - connect(); -} - bool ZmqClient::isConnected() { return m_connected; @@ -81,7 +69,6 @@ bool ZmqClient::isConnected() void ZmqClient::connect() { -SWSS_LOG_ERROR("DIV:: Inside function client connect"); if (m_connected) { SWSS_LOG_DEBUG("Already connected to endpoint: %s", m_endpoint.c_str()); @@ -107,7 +94,6 @@ SWSS_LOG_ERROR("DIV:: Inside function client connect"); m_context = zmq_ctx_new(); m_socket = zmq_socket(m_context, ZMQ_PUSH); - SWSS_LOG_DEBUG("m_socket in client connect() is: %p\n", m_socket); // timeout all pending send package, so zmq will not block in dtor of this class: http://api.zeromq.org/master:zmq-setsockopt int linger = 0; zmq_setsockopt(m_socket, ZMQ_LINGER, &linger, sizeof(linger)); @@ -139,7 +125,6 @@ void ZmqClient::sendMsg( const std::string& tableName, const std::vector& kcos) { -SWSS_LOG_ERROR("DIV:: Inside function client sendMsg"); int serializedlen = (int)BinarySerializer::serializeBuffer( m_sendbuffer.data(), m_sendbuffer.size(), @@ -165,11 +150,8 @@ SWSS_LOG_ERROR("DIV:: Inside function client sendMsg"); std::lock_guard lock(m_socketMutex); // Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send - rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK); + rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK); } -//SWSS_LOG_DEBUG("Before Sleep() in client sendmsg"); -// usleep(10 * 1000); -//SWSS_LOG_DEBUG("After Sleep() in client sendmsg"); if (rc >= 0) { SWSS_LOG_DEBUG("zmq sended %d bytes", serializedlen); @@ -187,7 +169,7 @@ SWSS_LOG_ERROR("DIV:: Inside function client sendMsg"); // For example when ZMQ socket still not receive reply message from last sended package. // There was state machine inside ZMQ socket, when the socket is not in ready to send state, this error will happen. // for more detail, please check: http://api.zeromq.org/2-1:zmq-send - SWSS_LOG_WARN("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err); + SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err); retry_delay = 0; } @@ -206,7 +188,7 @@ SWSS_LOG_ERROR("DIV:: Inside function client sendMsg"); else { // for other error, send failed immediately. - auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); + auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); SWSS_LOG_ERROR("%s", message.c_str()); throw system_error(make_error_code(errc::io_error), message); } @@ -215,16 +197,16 @@ SWSS_LOG_ERROR("DIV:: Inside function client sendMsg"); } // failed after retry - auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); + auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); SWSS_LOG_ERROR("%s", message.c_str()); throw system_error(make_error_code(errc::io_error), message); } +//TODO: To implement the wait() method later. bool ZmqClient::wait(const std::string& dbName, const std::string& tableName, const std::vector>& kcos) { -SWSS_LOG_ERROR("DIV:: Inside function wait"); SWSS_LOG_ENTER(); return false; diff --git a/common/zmqclient.h b/common/zmqclient.h index cd2f3f3e6..fdfe9e343 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -31,8 +31,7 @@ class ZmqClient const std::vector>& kcos); private: - void initialize(const std::string& endpoint, const std::string& vrf); - void initialize(const std::string& endpoint); + void initialize(const std::string& endpoint, const std::string& vrf = ""); std::string m_endpoint; @@ -48,7 +47,7 @@ class ZmqClient std::mutex m_socketMutex; - std::vector m_sendbuffer; + std::vector m_sendbuffer; }; } diff --git a/common/zmqconsumerstatetable.cpp b/common/zmqconsumerstatetable.cpp index eddb2cc9d..5f58482f9 100644 --- a/common/zmqconsumerstatetable.cpp +++ b/common/zmqconsumerstatetable.cpp @@ -41,7 +41,6 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string void ZmqConsumerStateTable::handleReceivedData(const std::vector> &kcos) { - SWSS_LOG_DEBUG("Entering ZmqConsumerStateTable::handleReceivedData"); for (auto kco : kcos) { std::shared_ptr clone = nullptr; @@ -54,7 +53,6 @@ void ZmqConsumerStateTable::handleReceivedData(const std::vector lock(m_receivedQueueMutex); m_receivedOperationQueue.push(kco); -SWSS_LOG_DEBUG("Called m_receivedOperationQueue.push in handleReceivedData()"); } if (m_asyncDBUpdater != nullptr) @@ -75,7 +73,6 @@ void ZmqConsumerStateTable::pops(std::deque &vkco, const // For new data append to m_dataQueue during pops, will not be include in result. count = m_receivedOperationQueue.size(); - //SWSS_LOG_DEBUG("count value: %ld", count); if (!count) { return; @@ -85,7 +82,6 @@ void ZmqConsumerStateTable::pops(std::deque &vkco, const vkco.clear(); for (size_t ie = 0; ie < count; ie++) { - //SWSS_LOG_DEBUG("count inside for loop: %ld", count); auto& kco = *(m_receivedOperationQueue.front()); vkco.push_back(std::move(kco)); diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index 1d648a1f3..5aff117b2 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -148,7 +148,6 @@ void ZmqProducerStateTable::del(const std::vector &keys) void ZmqProducerStateTable::send(const std::vector &kcos) { -SWSS_LOG_DEBUG("DIV:: Inside function ZmqProducerStateTable::send"); m_zmqClient.sendMsg( m_dbName, m_tableNameStr, @@ -169,7 +168,6 @@ bool ZmqProducerStateTable::wait(const std::string& dbName, const std::string& tableName, const std::vector>& kcos) { -SWSS_LOG_DEBUG("DIV:: Inside function ZmqProducerStateTable::wait"); return m_zmqClient.wait(dbName, tableName, kcos); } diff --git a/common/zmqproducerstatetable.h b/common/zmqproducerstatetable.h index 85c7b953c..09778d47a 100644 --- a/common/zmqproducerstatetable.h +++ b/common/zmqproducerstatetable.h @@ -18,7 +18,6 @@ class ZmqProducerStateTable : public ProducerStateTable public: ZmqProducerStateTable(DBConnector *db, const std::string &tableName, ZmqClient &zmqClient, bool dbPersistence = true); ZmqProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, ZmqClient &zmqClient, bool buffered = false, bool dbPersistence = true); -// ~ZmqProducerStateTable() = default; /* Implements set() and del() commands using notification messages */ virtual void set(const std::string &key, @@ -38,8 +37,7 @@ class ZmqProducerStateTable : public ProducerStateTable // Batched send that can include both SET and DEL requests. virtual void send(const std::vector &kcos); - // This method should only be used if the ZmqClient enables one-to-one sync. - + // To wait for the response from the peer. virtual bool wait(const std::string& dbName, const std::string& tableName, const std::vector>& kcos); diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index bffba88d6..5feeddfe9 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -17,18 +17,6 @@ ZmqServer::ZmqServer(const std::string& endpoint) { } -ZmqServer::ZmqServer(const std::string& endpoint, bool zmr_test) -: m_endpoint(endpoint), -m_allowZmqPoll(true) -{ - connect(); - m_buffer.resize(MQ_RESPONSE_MAX_COUNT); - m_mqPollThread = std::make_shared(&ZmqServer::mqPollThread, this); - m_runThread = true; - - SWSS_LOG_DEBUG("DIV: ZmqServer ctor endpoint: %s", endpoint.c_str()); -} - ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) : m_endpoint(endpoint), m_vrf(vrf) @@ -43,7 +31,6 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) ZmqServer::~ZmqServer() { - m_allowZmqPoll = true; m_runThread = false; m_mqPollThread->join(); @@ -53,12 +40,10 @@ ZmqServer::~ZmqServer() void ZmqServer::connect() { -SWSS_LOG_ERROR("DIV:: Inside function server connect"); SWSS_LOG_ENTER(); m_context = zmq_ctx_new(); m_socket = zmq_socket(m_context, ZMQ_PULL); - SWSS_LOG_DEBUG("m_socket in server connect() is: %p\n", m_socket); // Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt int high_watermark = MQ_WATERMARK; zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); @@ -82,7 +67,6 @@ void ZmqServer::registerMessageHandler( const std::string tableName, ZmqMessageHandler* handler) { -SWSS_LOG_ERROR("DIV:: Inside function registerMessageHandler"); auto dbResult = m_HandlerMap.insert(pair>(dbName, map())); if (dbResult.second) { SWSS_LOG_DEBUG("ZmqServer add handler mapping for db: %s", dbName.c_str()); @@ -98,7 +82,6 @@ ZmqMessageHandler* ZmqServer::findMessageHandler( const std::string dbName, const std::string tableName) { -SWSS_LOG_ERROR("DIV:: Inside function findMessageHandler"); auto dbMappingIter = m_HandlerMap.find(dbName); if (dbMappingIter == m_HandlerMap.end()) { SWSS_LOG_DEBUG("ZmqServer can't find any handler for db: %s", dbName.c_str()); @@ -116,7 +99,6 @@ SWSS_LOG_ERROR("DIV:: Inside function findMessageHandler"); void ZmqServer::handleReceivedData(const char* buffer, const size_t size) { -SWSS_LOG_ERROR("DIV:: Inside function handleReceivedData"); std::string dbName; std::string tableName; std::vector> kcos; @@ -134,7 +116,6 @@ SWSS_LOG_ERROR("DIV:: Inside function handleReceivedData"); void ZmqServer::mqPollThread() { -SWSS_LOG_ERROR("DIV:: Inside function mqPollThread"); SWSS_LOG_ENTER(); SWSS_LOG_NOTICE("mqPollThread begin"); @@ -149,11 +130,8 @@ SWSS_LOG_ERROR("DIV:: Inside function mqPollThread"); SWSS_LOG_DEBUG("m_runThread: %d", m_runThread); while (m_runThread) { - m_allowZmqPoll = false; - // receive message auto rc = zmq_poll(&poll_item, 1, 1000); - SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq poll: rc value is : %d", rc); if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN)) { // timeout or other event @@ -162,10 +140,7 @@ SWSS_LOG_ERROR("DIV:: Inside function mqPollThread"); } // receive message - SWSS_LOG_DEBUG("m_socket in mqPollThread() server is: %p\n", m_socket); - rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); - SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq recv rc value is : %d", rc); if (rc < 0) { int zmq_err = zmq_errno(); @@ -192,85 +167,16 @@ SWSS_LOG_ERROR("DIV:: Inside function mqPollThread"); // deserialize and write to redis: handleReceivedData(m_buffer.data(), rc); -// SWSS_LOG_DEBUG("Before Sleep() in mqPollThread"); -// usleep(10); } SWSS_LOG_NOTICE("mqPollThread end"); } +//TODO: To implement the sendMsg() method later. void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName, const std::vector& values) { return; -SWSS_LOG_ERROR("DIV:: Inside function server sendMsg"); - int serializedlen = (int)BinarySerializer::serializeBuffer( - m_buffer.data(), - m_buffer.size(), - dbName, - tableName, - values); - SWSS_LOG_DEBUG("sending: %d", serializedlen); - int zmq_err = 0; - int retry_delay = 10; - int rc = 0; - for (int i = 0; i <= MQ_MAX_RETRY; ++i) - { - SWSS_LOG_DEBUG("1. m_socket in server sendmsg() is: %p\n", m_socket); - rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0); - SWSS_LOG_DEBUG("ser: rc value is : %d", rc); - if (rc >= 0) - { - m_allowZmqPoll = true; - SWSS_LOG_DEBUG("zmq sent %d bytes", serializedlen); - return; - } - zmq_err = zmq_errno(); - // sleep (2 ^ retry time) * 10 ms - retry_delay *= 2; - SWSS_LOG_DEBUG("2. m_socket in server sendmsg() is: %p\n", m_socket); - SWSS_LOG_DEBUG("zmq_err is : %d", zmq_err); - - if (zmq_err == EINTR - || zmq_err == EFSM) - { - // EINTR: interrupted by signal - // EFSM: socket state not ready - // For example when ZMQ socket still not receive reply message from last sended package. - // There was state machine inside ZMQ socket, when the socket is not in ready to send state, this - // error will happen. - // for more detail, please check: http://api.zeromq.org/2-1:zmq-send - SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err); - - retry_delay = 0; - } - else if (zmq_err == EAGAIN) - { - // EAGAIN: ZMQ is full to need try again - SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err); - } - else if (zmq_err == ETERM) - { - auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc); - SWSS_LOG_ERROR("%s", message.c_str()); - throw system_error(make_error_code(errc::connection_reset), message); - } - else - { - // for other error, send failed immediately. - auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); - SWSS_LOG_ERROR("%s", message.c_str()); - SWSS_LOG_DEBUG("3. m_socket in server sendmsg() is: %p\n", m_socket); - throw system_error(make_error_code(errc::io_error), message); - return; - } - usleep(retry_delay * 1000); - } - - // failed after retry - auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); - SWSS_LOG_ERROR("%s", message.c_str()); - throw system_error(make_error_code(errc::io_error), message); } } diff --git a/common/zmqserver.h b/common/zmqserver.h index e24dcc01d..1b78b7a25 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -31,7 +31,6 @@ class ZmqServer static constexpr int DEFAULT_POP_BATCH_SIZE = 128; ZmqServer(const std::string& endpoint); - ZmqServer(const std::string& endpoint, bool zmr_test); ZmqServer(const std::string& endpoint, const std::string& vrf); ~ZmqServer(); @@ -67,8 +66,6 @@ class ZmqServer void* m_socket; - bool m_allowZmqPoll; - std::map> m_HandlerMap; }; diff --git a/pyext/swsscommon.i b/pyext/swsscommon.i index bd185cb43..b3d015e03 100644 --- a/pyext/swsscommon.i +++ b/pyext/swsscommon.i @@ -76,8 +76,6 @@ %template(FieldValuePair) std::pair; %template(FieldValuePairs) std::vector>; %template(FieldValuePairsList) std::vector>>; -%template(KeyFieldValuePairs) std::pair>>; -%template(KeyFieldValuePairsList) std::vector>>>; %template(FieldValueMap) std::map; %template(VectorString) std::vector; %template(ScanResult) std::pair>; @@ -291,22 +289,6 @@ T castSelectableObj(swss::Selectable *temp) %template(hgetall) hgetall>; } -%inline %{ -std::vector>> zmqWait(swss::ZmqProducerStateTable &p) -{ - std::vector>> ret; - std::string db_name; - std::string table_name; - std::vector> kcos_ptr; - p.wait(db_name, table_name, kcos_ptr); - for (const auto kco : kcos_ptr) - { - ret.push_back(std::pair>{kfvKey(*kco), kfvFieldsValues(*kco)}); - } - return ret; -} -%} - %ignore swss::TableEntryPoppable::pops(std::deque &, const std::string &); %apply std::vector& OUTPUT {std::vector &keys}; %apply std::vector& OUTPUT {std::vector &ops}; diff --git a/tests/c_api_ut.cpp b/tests/c_api_ut.cpp index 4e3c78705..864fb8fc2 100644 --- a/tests/c_api_ut.cpp +++ b/tests/c_api_ut.cpp @@ -239,11 +239,9 @@ TEST(c_api, ZmqConsumerProducerStateTable) { ASSERT_EQ(arr.len, 0); freeKeyOpFieldValuesArray(arr); -SWSS_LOG_DEBUG("print7"); // On flag = 0, we use the ZmqProducerStateTable // On flag = 1, we use the ZmqClient directly for (int flag = 0; flag < 2; flag++) { -SWSS_LOG_DEBUG("print7 for loop, flag set is : %d", flag); SWSSFieldValueTuple values_key1_data[2] = {{.field = "myfield1", .value = sm.makeString("myvalue1")}, {.field = "myfield2", .value = sm.makeString("myvalue2")}}; SWSSFieldValueArray values_key1 = { @@ -251,58 +249,29 @@ SWSS_LOG_DEBUG("print7 for loop, flag set is : %d", flag); .data = values_key1_data, }; -SWSS_LOG_DEBUG("print8"); SWSSFieldValueTuple values_key2_data[1] = {{.field = "myfield3", .value = sm.makeString("myvalue3")}}; SWSSFieldValueArray values_key2 = { .len = 1, .data = values_key2_data, }; -SWSS_LOG_DEBUG("print9"); SWSSKeyOpFieldValues arr_data[2] = { {.key = "mykey1", .operation = SWSSKeyOperation_SET, .fieldValues = values_key1}, {.key = "mykey2", .operation = SWSSKeyOperation_SET, .fieldValues = values_key2}}; arr = {.len = 2, .data = arr_data}; -SWSS_LOG_DEBUG("print10"); if (flag == 0) for (uint64_t i = 0; i < arr.len; i++) { -// SWSS_LOG_DEBUG("flag 0 case before calling SWSSZmqProducerStateTable_set, i: %ld", i); SWSSZmqProducerStateTable_set(pst, arr.data[i].key, arr.data[i].fieldValues); } else { - SWSS_LOG_DEBUG("print10 else loop, flag is: %d", flag); SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr); } -SWSS_LOG_DEBUG("print11"); ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 1500, true), SWSSSelectResult_DATA); -/* int retry_cnt = 1; - vector kfvs; - while (true) - { - arr = SWSSZmqConsumerStateTable_pops(cst); - - SWSS_LOG_DEBUG("print12"); - kfvs = takeKeyOpFieldValuesArray(arr); - SWSS_LOG_DEBUG("1 : kfvs.size() is: %ld", kfvs.size()); - sortKfvs(kfvs); - - SWSS_LOG_DEBUG("2 : kfvs.size() is: %ld", kfvs.size()); - SWSS_LOG_DEBUG("print13"); - if(kfvs.size() == 2 || retry_cnt == 3) - break; - retry_cnt++; - SWSS_LOG_DEBUG("Retry count is: %d, Before sleep()", retry_cnt); - usleep(1 * 1000); - SWSS_LOG_DEBUG("Retry count is: %d, After sleep()", retry_cnt); - } */ - - SWSS_LOG_DEBUG("Before sleep(2)"); sleep(2); - SWSS_LOG_DEBUG("After sleep(2)"); arr = SWSSZmqConsumerStateTable_pops(cst); vector kfvs = takeKeyOpFieldValuesArray(arr); @@ -319,7 +288,6 @@ SWSS_LOG_DEBUG("print11"); EXPECT_EQ(fieldValues0[1].first, "myfield2"); EXPECT_EQ(fieldValues0[1].second, "myvalue2"); -SWSS_LOG_DEBUG("print14"); EXPECT_EQ(kfvKey(kfvs[1]), "mykey2"); EXPECT_EQ(kfvOp(kfvs[1]), "SET"); vector> &fieldValues1 = kfvFieldsValues(kfvs[1]); @@ -327,6 +295,7 @@ SWSS_LOG_DEBUG("print14"); EXPECT_EQ(fieldValues1[0].first, "myfield3"); EXPECT_EQ(fieldValues1[0].second, "myvalue3"); + sleep(2); arr = SWSSZmqConsumerStateTable_pops(cst); ASSERT_EQ(arr.len, 0); freeKeyOpFieldValuesArray(arr); @@ -341,40 +310,13 @@ SWSS_LOG_DEBUG("print14"); else SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr); -SWSS_LOG_DEBUG("print15"); ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500, true), SWSSSelectResult_DATA); - -/* retry_cnt = 1; - while (true) - { - arr = SWSSZmqConsumerStateTable_pops(cst); - - kfvs = takeKeyOpFieldValuesArray(arr); - SWSS_LOG_DEBUG("3 : kfvs.size() is: %ld", kfvs.size()); - sortKfvs(kfvs); - SWSS_LOG_DEBUG("4 : kfvs.size() is: %ld", kfvs.size()); - if(kfvs.size() == 2 || retry_cnt == 3) - break; - retry_cnt++; - SWSS_LOG_DEBUG("Retry count is: %d, Before sleep()", retry_cnt); - usleep(1 * 1000); - SWSS_LOG_DEBUG("Retry count is: %d, After sleep()", retry_cnt); - } */ - - SWSS_LOG_DEBUG("Before sleep(2)"); - sleep(2); - SWSS_LOG_DEBUG("After sleep(2)"); arr = SWSSZmqConsumerStateTable_pops(cst); -// SWSS_LOG_DEBUG("5 : kfvs.size() is: %ld", kfvs.size()); -SWSS_LOG_DEBUG("print16"); kfvs = takeKeyOpFieldValuesArray(arr); -// SWSS_LOG_DEBUG("6 : kfvs.size() is: %ld", kfvs.size()); sortKfvs(kfvs); freeKeyOpFieldValuesArray(arr); -// SWSS_LOG_DEBUG("7 : kfvs.size() is: %ld", kfvs.size()); -SWSS_LOG_DEBUG("print17"); ASSERT_EQ(kfvs.size(), 2); EXPECT_EQ(kfvKey(kfvs[0]), "mykey3"); EXPECT_EQ(kfvOp(kfvs[0]), "DEL"); diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index bd056f48e..ca2b7f29c 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -58,11 +58,8 @@ static bool allDataReceived = false; static void producerWorker(string tableName, string endpoint, bool dbPersistence) { -SWSS_LOG_DEBUG("Inside producerWorker"); DBConnector db(TEST_DB, 0, true); -SWSS_LOG_DEBUG("producerWorker: After DBConnector"); ZmqClient client(endpoint); -SWSS_LOG_DEBUG("producerWorker: After zmqclient"); ZmqProducerStateTable p(&db, tableName, client, dbPersistence); cout << "Producer thread started: " << tableName << endl; @@ -485,18 +482,13 @@ static bool zmq_done = false; static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersistence) { -// std::string testTableName = "ZMQ_PROD_CONS_UT"; std::string pushEndpoint = "tcp://localhost:1234"; std::string pullEndpoint = "tcp://*:1234"; - cout << "DIV:: Function zmqConsumerWorker 473" << endl; cout << "Consumer thread started: " << tableName << endl; DBConnector db(TEST_DB, 0, true); - cout << "DIV:: Function zmqConsumerWorker 476" << endl; - ZmqServer server(endpoint, true); - cout << "DIV:: Function zmqConsumerWorker 478" << endl; + ZmqServer server(endpoint, ""); ZmqConsumerStateTable c(&db, tableName, server, 128, 0, dbPersistence); - cout << "DIV:: Function zmqConsumerWorker 480" << endl; //validate received data std::vector values; values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); @@ -514,7 +506,6 @@ static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersiste { deserialized_kcos.push_back(*kco_ptr); } - SWSS_LOG_DEBUG("dbname is: %s & tablename is : %s", rec_dbName.c_str(), rec_tableName.c_str()); EXPECT_EQ(rec_dbName, TEST_DB); EXPECT_EQ(rec_tableName, tableName); EXPECT_EQ(deserialized_kcos, values); @@ -524,7 +515,6 @@ static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersiste allDataReceived = true; if (dbPersistence) { - cout << "DIV:: Function zmqConsumerWorker 509" << endl; // wait all persist data write to redis while (c.dbUpdaterQueueSize() > 0) { @@ -544,27 +534,18 @@ static void ZmqWithResponse(bool producerPersistence) // start consumer first, SHM can only have 1 consumer per table. thread *consumerThread = new thread(zmqConsumerWorker, testTableName, pullEndpoint, !producerPersistence); - cout << "DIV:: Function ZmqWithResponse ut 1 529" << endl; // Wait for the consumer to be ready. sleep(1); DBConnector db(TEST_DB, 0, true); - cout << "DIV:: Function ZmqWithResponse ut 1 533" << endl; ZmqClient client(pushEndpoint, 3000); - cout << "DIV:: Function ZmqWithResponse ut 1 535" << endl; ZmqProducerStateTable p(&db, testTableName, client, true); - cout << "DIV:: Function ZmqWithResponse ut 1 537" << endl; std::vector kcos; kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); - //std::vector> kcos_p; - cout << "DIV:: Function ZmqWithResponse ut 1 541" << endl; - //std::string dbName, tableName; for (int i = 0; i < 5; ++i) { - cout << "DIV:: Function ZmqWithResponse ut 1 545" << endl; p.send(kcos); } - cout << "DIV:: Function ZmqWithResponse ut 1 558" << endl; zmq_done = true; sleep(10); consumerThread->join(); diff --git a/ut_dump_file.txt b/ut_dump_file.txt deleted file mode 100644 index 68146e2d8..000000000 --- a/ut_dump_file.txt +++ /dev/null @@ -1,19 +0,0 @@ -[ -{ - "OP": "SET", - "UT_REDIS:test_key_1": { - "test_field_1": "test_value_1" - } -}, -{ - "OP": "SET", - "UT_REDIS:test_key_2": { - "test_field_1": "test_value_1", - "test_field_2": "test_value_2" - } -}, -{ - "OP": "DEL", - "UT_REDIS:test_key_1": {} -} -]