diff --git a/ChronoGrapher/ChronoGrapher.cpp b/ChronoGrapher/ChronoGrapher.cpp index 2daddd52..5b257d29 100644 --- a/ChronoGrapher/ChronoGrapher.cpp +++ b/ChronoGrapher/ChronoGrapher.cpp @@ -104,6 +104,7 @@ int main(int argc, char**argv) LOG_INFO("[ChronoGrapher] DataStoreAdminService started successfully."); // Instantiate GrapherRecordingService + chronolog::RecordingGroupId recording_group_id = confManager.GRAPHER_CONF.RECORDING_GROUP; std::string RECORDING_SERVICE_PROTOCOL = confManager.GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF; std::string RECORDING_SERVICE_IP = confManager.GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP; uint16_t RECORDING_SERVICE_PORT = confManager.GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT; @@ -125,8 +126,8 @@ int main(int argc, char**argv) LOG_INFO("[ChronoGrapher] RecordingService started successfully."); // create GrapherIdCard to identify this Grapher process in ChronoVisor's Registry - chronolog::GrapherIdCard processIdCard(recording_endpoint.first, recording_endpoint.second - , recording_service_provider_id); + chronolog::GrapherIdCard processIdCard(recording_group_id, recording_endpoint.first, recording_endpoint.second, + recording_service_provider_id); std::stringstream process_id_string; process_id_string << processIdCard; @@ -202,46 +203,47 @@ int main(int argc, char**argv) return (-1); } -// /// RegistryClient SetUp _____________________________________________________________________________________ -// // create RegistryClient and register the new Recording service with the Registry -// std::string REGISTRY_SERVICE_NA_STRING = -// confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.PROTO_CONF + "://" + -// confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.IP + ":" + -// std::to_string(confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.BASE_PORT); -// -// uint16_t REGISTRY_SERVICE_PROVIDER_ID = confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.SERVICE_PROVIDER_ID; -// -// chronolog::GrapherRegistryClient* grapherRegistryClient = chronolog::GrapherRegistryClient::CreateRegistryClient( -// *dataAdminEngine, REGISTRY_SERVICE_NA_STRING, REGISTRY_SERVICE_PROVIDER_ID); -// -// if(nullptr == grapherRegistryClient) -// { -// LOG_CRITICAL("[ChronoGrapher] failed to create RegistryClient; exiting"); -// delete grapherRecordingService; -// delete keeperDataAdminService; -// return (-1); -// } -// -// /// Registration with ChronoVisor __________________________________________________________________________________ -// // try to register with chronoVisor a few times than log ERROR and exit... -// int registration_status = chronolog::CL_ERR_UNKNOWN; -// int retries = 5; -// while((chronolog::CL_SUCCESS != registration_status) && (retries > 0)) -// { -// registration_status = grapherRegistryClient->send_register_msg( -// chronolog::GrapherRegistrationMsg(processIdCard, collectionServiceId)); -// retries--; -// } -// -// if(chronolog::CL_SUCCESS != registration_status) -// { -// LOG_CRITICAL("[ChronoGrapher] Failed to register with ChronoVisor after multiple attempts. Exiting."); -// delete grapherRegistryClient; -// delete grapherRecordingService; -// delete keeperDataAdminService; -// return (-1); -// } -// LOG_INFO("[ChronoGrapher] Successfully registered with ChronoVisor."); + /// RegistryClient SetUp _____________________________________________________________________________________ + // create RegistryClient and register the new Recording service with the Registry + std::string REGISTRY_SERVICE_NA_STRING = + confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.PROTO_CONF + "://" + + confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.IP + ":" + + std::to_string(confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.BASE_PORT); + + uint16_t REGISTRY_SERVICE_PROVIDER_ID = confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.SERVICE_PROVIDER_ID; + + chronolog::GrapherRegistryClient* grapherRegistryClient = chronolog::GrapherRegistryClient::CreateRegistryClient( + *dataAdminEngine, REGISTRY_SERVICE_NA_STRING, REGISTRY_SERVICE_PROVIDER_ID); + + if(nullptr == grapherRegistryClient) + { + LOG_CRITICAL("[ChronoGrapher] failed to create RegistryClient; exiting"); + delete grapherRecordingService; + delete keeperDataAdminService; + return (-1); + } + + /// Registration with ChronoVisor __________________________________________________________________________________ + // try to register with chronoVisor a few times than log ERROR and exit... + int registration_status = chronolog::CL_ERR_UNKNOWN; + int retries = 5; + while((chronolog::CL_SUCCESS != registration_status) && (retries > 0)) + { + registration_status = grapherRegistryClient->send_register_msg( + chronolog::GrapherRegistrationMsg(processIdCard, collectionServiceId)); + sleep(5); + retries--; + } + + if(chronolog::CL_SUCCESS != registration_status) + { + LOG_CRITICAL("[ChronoGrapher] Failed to register with ChronoVisor after multiple attempts. Exiting."); + delete grapherRegistryClient; + delete grapherRecordingService; + delete keeperDataAdminService; + return (-1); + } + LOG_INFO("[ChronoGrapher] Successfully registered with ChronoVisor."); /// Start data collection and extraction threads ___________________________________________________________________ // services are successfully created and keeper process had registered with ChronoVisor @@ -263,10 +265,10 @@ int main(int argc, char**argv) sleep(30); } -// /// Unregister from ChronoVisor ____________________________________________________________________________________ -// // Unregister from the chronoVisor so that no new story requests would be coming -// grapherRegistryClient->send_unregister_msg(processIdCard); -// delete grapherRegistryClient; + /// Unregister from ChronoVisor ____________________________________________________________________________________ + // Unregister from the chronoVisor so that no new story requests would be coming + grapherRegistryClient->send_unregister_msg(processIdCard); + delete grapherRegistryClient; /// Stop services and shut down ____________________________________________________________________________________ LOG_INFO("[ChronoGrapher] Initiating shutdown procedures."); diff --git a/ChronoKeeper/CSVFileChunkExtractor.cpp b/ChronoKeeper/CSVFileChunkExtractor.cpp index cc6d6910..cabcab6b 100644 --- a/ChronoKeeper/CSVFileChunkExtractor.cpp +++ b/ChronoKeeper/CSVFileChunkExtractor.cpp @@ -3,6 +3,7 @@ #include #include "chronolog_types.h" +#include "chronolog_errcode.h" #include "KeeperIdCard.h" #include "CSVFileChunkExtractor.h" @@ -44,6 +45,8 @@ int chronolog::CSVFileStoryChunkExtractor::processStoryChunk(StoryChunk*story_ch chunk_fstream << event << std::endl; } chunk_fstream.close(); - LOG_INFO("[CSVFileStoryChunkExtractor] Finished processing StoryChunk. File={}", chunk_filename); + LOG_DEBUG("[CSVFileStoryChunkExtractor] Finished processing StoryChunk. File={}", chunk_filename); + + return chronolog::CL_SUCCESS; } diff --git a/ChronoKeeper/ChronoKeeperInstance.cpp b/ChronoKeeper/ChronoKeeperInstance.cpp index c08cce51..cd863e37 100644 --- a/ChronoKeeper/ChronoKeeperInstance.cpp +++ b/ChronoKeeper/ChronoKeeperInstance.cpp @@ -19,8 +19,6 @@ #include "cmd_arg_parse.h" #include "StoryChunkExtractorRDMA.h" -#define KEEPER_GROUP_ID 7 - // we will be using a combination of the uint32_t representation of the service IP address // and uint16_t representation of the port number int @@ -88,7 +86,6 @@ int main(int argc, char**argv) // Instantiate ChronoKeeper MemoryDataStore // instantiate DataStoreAdminService - uint64_t keeper_group_id = KEEPER_GROUP_ID; /// DataStoreAdminService setup ____________________________________________________________________________________ std::string datastore_service_ip = confManager.KEEPER_CONF.KEEPER_DATA_STORE_ADMIN_SERVICE_CONF.RPC_CONF.IP; @@ -132,6 +129,7 @@ int main(int argc, char**argv) LOG_INFO("[ChronoKeeperInstance] KeeperRecordingService started successfully."); // create KeeperIdCard to identify this Keeper process in ChronoVisor's KeeperRegistry + chronolog::RecordingGroupId keeper_group_id = confManager.KEEPER_CONF.RECORDING_GROUP; chronolog::KeeperIdCard keeperIdCard(keeper_group_id, recording_endpoint.first, recording_endpoint.second , recording_service_provider_id); diff --git a/ChronoKeeper/StoryChunkExtractor.h b/ChronoKeeper/StoryChunkExtractor.h index c1f4671b..6b170007 100644 --- a/ChronoKeeper/StoryChunkExtractor.h +++ b/ChronoKeeper/StoryChunkExtractor.h @@ -46,10 +46,7 @@ class StoryChunkExtractorBase void drainExtractionQueue(); - virtual int processStoryChunk(StoryChunk*) //=0 - { - LOG_WARNING("[StoryChunkExtraction] Base processStoryChunk method called. Derived class should implement specific logic."); - } + virtual int processStoryChunk(StoryChunk*) = 0; void startExtractionThreads(int); diff --git a/ChronoVisor/CMakeLists.txt b/ChronoVisor/CMakeLists.txt index 922c1d0a..fadae6c2 100644 --- a/ChronoVisor/CMakeLists.txt +++ b/ChronoVisor/CMakeLists.txt @@ -15,6 +15,7 @@ target_sources(chronovisor_server PRIVATE ./src/ClientRegistryRecord.cpp ./src/ChronicleMetaDirectory.cpp ./src/KeeperRegistry.cpp + ../chrono_common/ConfigurationManager.cpp ../ChronoAPI/ChronoLog/src/city.cpp ../ChronoAPI/ChronoLog/src/log.cpp) diff --git a/ChronoVisor/include/ChronicleMetaDirectory.h b/ChronoVisor/include/ChronicleMetaDirectory.h index 3474573f..ae302711 100644 --- a/ChronoVisor/include/ChronicleMetaDirectory.h +++ b/ChronoVisor/include/ChronicleMetaDirectory.h @@ -32,17 +32,15 @@ class ChronicleMetaDirectory int destroy_chronicle(const std::string &name); - //int create_story(std::string const& chronicle_name, const std::string& story_name, - // const std::unordered_map& attrs); int destroy_story(std::string const &chronicle_name, const std::string &story_name); int acquire_story(chronolog::ClientId const &client_id, const std::string &chronicle_name, const std::string &story_name - , const std::unordered_map &attrs, int &flags, StoryId &, bool &); + , const std::unordered_map &attrs, int &flags, StoryId &); int release_story(chronolog::ClientId const &client_id, const std::string &chronicle_name, const std::string &story_name - , StoryId &, bool &); + , StoryId &); int get_chronicle_attr(std::string const &name, const std::string &key, std::string &value); diff --git a/ChronoVisor/include/KeeperRegistry.h b/ChronoVisor/include/KeeperRegistry.h index cc199cc5..1954bfee 100644 --- a/ChronoVisor/include/KeeperRegistry.h +++ b/ChronoVisor/include/KeeperRegistry.h @@ -1,16 +1,20 @@ #ifndef KEEPER_REGISTRY_H #define KEEPER_REGISTRY_H +#include +#include #include +#include #include -#include -#include + #include #include "chronolog_types.h" #include "KeeperIdCard.h" #include "KeeperStatsMsg.h" #include "KeeperRegistrationMsg.h" +#include "GrapherIdCard.h" +#include "GrapherRegistrationMsg.h" #include "ConfigurationManager.h" namespace chronolog @@ -29,94 +33,148 @@ class DataStoreAdminClient; class KeeperRegistryService; -class KeeperRegistry +class KeeperProcessEntry { +public: + KeeperProcessEntry(KeeperIdCard const& keeper_id_card, ServiceId const& admin_service_id) + : idCard(keeper_id_card) + , adminServiceId(admin_service_id) + , keeperAdminClient(nullptr) + , active(false) + , lastStatsTime(0) + , activeStoryCount(0) + { + idCardString += idCard; + } + + KeeperProcessEntry(KeeperProcessEntry const& other) = default; + ~KeeperProcessEntry() = default;// Registry is reponsible for creating & deleting keeperAdminClient + + KeeperIdCard idCard; + ServiceId adminServiceId; + std::string idCardString; + DataStoreAdminClient* keeperAdminClient; + bool active; + uint64_t lastStatsTime; + uint32_t activeStoryCount; + std::list> delayedExitClients; +}; + +class GrapherProcessEntry +{ +public: + GrapherProcessEntry(GrapherIdCard const& id_card, ServiceId const& admin_service_id) + : idCard(id_card) + , adminServiceId(admin_service_id) + , adminClient(nullptr) + , active(false) + , lastStatsTime(0) + , activeStoryCount(0) + { + idCardString += idCard; + } + + GrapherProcessEntry(GrapherProcessEntry const& other) = default; + ~GrapherProcessEntry() = default;// Registry is reponsible for creating & deleting keeperAdminClient + + GrapherIdCard idCard; + ServiceId adminServiceId; + std::string idCardString; + DataStoreAdminClient* adminClient; + bool active; + uint64_t lastStatsTime; + uint32_t activeStoryCount; + std::list> delayedExitGrapherClients; + }; - struct KeeperProcessEntry { + + class RecordingGroup + { public: - KeeperProcessEntry(KeeperIdCard const& keeper_id_card, ServiceId const& admin_service_id) - : idCard(keeper_id_card) - , adminServiceId(admin_service_id) - , keeperAdminClient(nullptr) - , active(false) - , lastStatsTime(0) - , activeStoryCount(0) + RecordingGroup(RecordingGroupId group_id, GrapherProcessEntry* grapher_ptr = nullptr) + : groupId(group_id) + , activeKeeperCount(0) + , grapherProcess(grapher_ptr) {} - KeeperProcessEntry(KeeperProcessEntry const &other) = default; + RecordingGroup(RecordingGroup const& other) = default; + ~RecordingGroup() = default; - void reset() - { - keeperAdminClient = nullptr; - active = false; - lastStatsTime = 0; - activeStoryCount = 0; - } - - ~KeeperProcessEntry() = default; // Registry is reponsible for creating & deleting keeperAdminClient - - KeeperIdCard idCard; - ServiceId adminServiceId; - DataStoreAdminClient* keeperAdminClient; - bool active; - uint64_t lastStatsTime; - uint32_t activeStoryCount; - std::list> delayedExitClients; - }; + bool isActive() const; + void startDelayedGrapherExit(GrapherProcessEntry&, std::time_t); + void clearDelayedExitGrapher(GrapherProcessEntry&, std::time_t); + void startDelayedKeeperExit(KeeperProcessEntry&, std::time_t); + void clearDelayedExitKeeper(KeeperProcessEntry&, std::time_t); - enum RegistryState - { - UNKNOWN = 0, INITIALIZED = 1, // RegistryService is initialized, no active keepers - RUNNING = 2, // RegistryService and active Keepers - SHUTTING_DOWN = 3 // Shutting down services - }; + std::vector& getActiveKeepers(std::vector& keeper_id_cards); -public: - KeeperRegistry(): registryState(UNKNOWN), registryEngine(nullptr), keeperRegistryService(nullptr) - {} + RecordingGroupId groupId; + size_t activeKeeperCount; + GrapherProcessEntry* grapherProcess; + std::map, KeeperProcessEntry> keeperProcesses; + }; - ~KeeperRegistry(); + class KeeperRegistry + { + enum RegistryState + { + UNKNOWN = 0, + INITIALIZED = 1, // RegistryService is initialized, no active keepers + RUNNING = 2, // RegistryService and active Keepers + SHUTTING_DOWN = 3// Shutting down services + }; - bool is_initialized() const - { return (INITIALIZED == registryState); } + public: + KeeperRegistry(); - bool is_running() const - { return (RUNNING == registryState); } + ~KeeperRegistry(); - bool is_shutting_down() const - { return (SHUTTING_DOWN == registryState); } + bool is_initialized() const { return (INITIALIZED == registryState); } - int InitializeRegistryService(ChronoLog::ConfigurationManager const &); + bool is_running() const { return (RUNNING == registryState); } - int ShutdownRegistryService(); + bool is_shutting_down() const { return (SHUTTING_DOWN == registryState); } - int registerKeeperProcess(KeeperRegistrationMsg const &keeper_reg_msg); + int InitializeRegistryService(ChronoLog::ConfigurationManager const&); - int unregisterKeeperProcess(KeeperIdCard const &keeper_id_card); + int ShutdownRegistryService(); - void updateKeeperProcessStats(KeeperStatsMsg const &keeperStatsMsg); + int registerKeeperProcess(KeeperRegistrationMsg const& keeper_reg_msg); - std::vector &getActiveKeepers(std::vector &keeper_id_cards); + int unregisterKeeperProcess(KeeperIdCard const& keeper_id_card); - int notifyKeepersOfStoryRecordingStart(std::vector&, ChronicleName const&, StoryName const&, - StoryId const&); + void updateKeeperProcessStats(KeeperStatsMsg const& keeperStatsMsg); - int notifyKeepersOfStoryRecordingStop(std::vector const &, StoryId const &); + int notifyRecordingGroupOfStoryRecordingStart(ChronicleName const&, StoryName const&, StoryId const&, + std::vector&); + int notifyRecordingGroupOfStoryRecordingStop(StoryId const&); + int registerGrapherProcess(GrapherRegistrationMsg const& reg_msg); + int unregisterGrapherProcess(GrapherIdCard const& id_card); -private: + private: + KeeperRegistry(KeeperRegistry const&) = delete;//disable copying + KeeperRegistry& operator=(KeeperRegistry const&) = delete; - KeeperRegistry(KeeperRegistry const &) = delete; //disable copying - KeeperRegistry &operator=(KeeperRegistry const &) = delete; + int notifyGrapherOfStoryRecordingStart(RecordingGroup&, ChronicleName const&, StoryName const&, StoryId const&, + uint64_t); + int notifyGrapherOfStoryRecordingStop(RecordingGroup&, StoryId const&); + int notifyKeepersOfStoryRecordingStart(RecordingGroup&, std::vector&, ChronicleName const&, + StoryName const&, StoryId const&, uint64_t); + int notifyKeepersOfStoryRecordingStop(RecordingGroup&, std::vector const&, StoryId const&); - RegistryState registryState; - std::mutex registryLock; - std::map , KeeperProcessEntry> keeperProcessRegistry; - thallium::engine*registryEngine; - KeeperRegistryService*keeperRegistryService; - size_t delayedDataAdminExitSeconds; -}; + RegistryState registryState; + std::mutex registryLock; + thallium::engine* registryEngine; + KeeperRegistryService* keeperRegistryService; + size_t delayedDataAdminExitSeconds; + std::map recordingGroups; + std::vector activeGroups; + std::mt19937 mt_random;//mersene twister random int generator + std::uniform_int_distribution group_id_distribution; + std::map activeStories; + }; } #endif diff --git a/ChronoVisor/include/KeeperRegistryService.h b/ChronoVisor/include/KeeperRegistryService.h index 32a054c2..12bc59a8 100644 --- a/ChronoVisor/include/KeeperRegistryService.h +++ b/ChronoVisor/include/KeeperRegistryService.h @@ -2,13 +2,14 @@ #define KEEPER_REGISTRY_SERVICE_H #include -//#include #include #include #include "KeeperIdCard.h" #include "KeeperRegistrationMsg.h" #include "KeeperStatsMsg.h" +#include "GrapherIdCard.h" +#include "GrapherRegistrationMsg.h" #include "log.h" #include "KeeperRegistry.h" @@ -53,6 +54,23 @@ class KeeperRegistryService: public tl::provider theKeeperProcessRegistry.updateKeeperProcessStats(keeper_stats_msg); } + void register_grapher(tl::request const &request, chronolog::GrapherRegistrationMsg const & registrationMsg) + { + int return_code = 0; + std::stringstream ss; + ss << registrationMsg; + LOG_INFO("[KeeperRegistryService] register_grapher: {}", ss.str()); + return_code = theKeeperProcessRegistry.registerGrapherProcess(registrationMsg); + request.respond(return_code); + } + + void unregister_grapher(tl::request const &request, chronolog::GrapherIdCard const &id_card) + { + int return_code = 0; + return_code = theKeeperProcessRegistry.unregisterGrapherProcess(id_card); + request.respond(return_code); + } + KeeperRegistryService(tl::engine &tl_engine, uint16_t service_provider_id, KeeperRegistry &keeperRegistry) : tl::provider (tl_engine, service_provider_id), theKeeperProcessRegistry( keeperRegistry) @@ -60,6 +78,8 @@ class KeeperRegistryService: public tl::provider define("register_keeper", &KeeperRegistryService::register_keeper); define("unregister_keeper", &KeeperRegistryService::unregister_keeper); define("handle_stats_msg", &KeeperRegistryService::handle_stats_msg, tl::ignore_return_value()); + define("register_grapher", &KeeperRegistryService::register_grapher); + define("unregister_grapher", &KeeperRegistryService::unregister_grapher); //setup finalization callback in case this ser vice provider is still alive when the engine is finalized get_engine().push_finalize_callback(this, [p = this]() { delete p; }); diff --git a/ChronoVisor/src/ChronicleMetaDirectory.cpp b/ChronoVisor/src/ChronicleMetaDirectory.cpp index 0b02f1ab..29146b34 100644 --- a/ChronoVisor/src/ChronicleMetaDirectory.cpp +++ b/ChronoVisor/src/ChronicleMetaDirectory.cpp @@ -171,53 +171,6 @@ int ChronicleMetaDirectory::destroy_chronicle(const std::string &name) } } -/** - * Create a Story - * @param chronicle_name: name of the Chronicle that the Story belongs to - * @param story_name: name of the Story - * @param attrs: attributes associated with the Story - * @return CL_SUCCESS if succeed to create the Story \n - * CL_ERR_NOT_EXIST if the Chronicle does not exist \n - * CL_ERR_STORY_EXISTS if a Story with the same name already exists \n - * CL_ERR_UNKNOWN otherwise - */ -/*int ChronicleMetaDirectory::create_story(std::string const& chronicle_name, - const std::string &story_name, - const std::unordered_map &attrs) { - LOG_DEBUG("creating Story name=%s in Chronicle name=%s", story_name.c_str(), chronicle_name.c_str()); - std::chrono::steady_clock::time_point t1, t2; - t1 = std::chrono::steady_clock::now(); - std::lock_guard chronicleMapLock(g_chronicleMetaDirectoryMutex_); - // First check if Chronicle exists, fail if false - uint64_t cid; -// auto name2IdRecord = chronicleName2IdMap_->find(chronicle_name); -// if (name2IdRecord != chronicleName2IdMap_->end()) { -// cid = name2IdRecord->second; - cid = CityHash64(chronicle_name.c_str(), chronicle_name.length()); - auto chronicleMapRecord = chronicleMap_->find(cid); - if (chronicleMapRecord != chronicleMap_->end()) { - Chronicle *pChronicle = chronicleMap_->find(cid)->second; - LOG_DEBUG("Chronicle@%p", &(*pChronicle)); - //TODO: check if the story exists and handle it gracefully - uint64_t sid = pChronicle->getStoryId(story_name); - if (sid> 0) { - LOG_DEBUG("StoryID=%lu name=%s exists", sid, story_name.c_str()); - return CL_ERR_STORY_EXISTS; - } - else - { - CL_Status res = pChronicle->addStory(chronicle_name, cid, story_name, attrs); - t2 = std::chrono::steady_clock::now(); - std::chrono::duration duration = (t2 - t1); - LOG_DEBUG("time in %s: %lf ns", __FUNCTION__, duration.count()); - return res; - } - } else { - LOG_DEBUG("Chronicle name=%s does not exist", chronicle_name.c_str()); - return CL_ERR_NOT_EXIST; - } -} -*/ /** * Destroy a Story @@ -285,7 +238,6 @@ int ChronicleMetaDirectory::destroy_story(std::string const &chronicle_name, con * @param story_name: name of the Story * @param flags: flags * @param story_id to populate with the story_id assigned to the story - * @param notify_keepers , bool value that would be set to true if this is the first client to acquire the story * @return CL_SUCCESS if succeed to destroy the Story \n * CL_ERR_NOT_EXIST if the Chronicle does not exist \n * CL_ERR_UNKNOWN otherwise @@ -293,7 +245,7 @@ int ChronicleMetaDirectory::destroy_story(std::string const &chronicle_name, con int ChronicleMetaDirectory::acquire_story(chl::ClientId const &client_id, const std::string &chronicle_name , const std::string &story_name , const std::unordered_map &attrs, int &flags - , StoryId &story_id, bool ¬ify_keepers) + , StoryId &story_id) { LOG_DEBUG("[ChronicleMetaDirectory] ClientID={} acquiring StoryName={} in ChronicleName={} with Flags={}", client_id , story_name.c_str(), chronicle_name.c_str(), flags); @@ -332,7 +284,6 @@ int ChronicleMetaDirectory::acquire_story(chl::ClientId const &client_id, const /* All checks passed, manipulate metadata */ story_id = pStory->getSid(); - notify_keepers = (pStory->getAcquisitionCount() == 0 ? true : false); /* Increment AcquisitionCount */ pStory->incrementAcquisitionCount(); @@ -350,14 +301,13 @@ int ChronicleMetaDirectory::acquire_story(chl::ClientId const &client_id, const * @param story_name: name of the Story * @param flags: flags * @param story_id to populate with the story_id assigned to the story - * @param notify_keepers , bool value that would be set to true if this is the last client to release the story * @return CL_SUCCESS if succeed to destroy the Story \n * CL_ERR_NOT_EXIST if the Chronicle does not exist \n * CL_ERR_UNKNOWN otherwise */ //TO_DO return acquisition_count after the story has been released int ChronicleMetaDirectory::release_story(chl::ClientId const &client_id, const std::string &chronicle_name - , const std::string &story_name, StoryId &story_id, bool ¬ify_keepers) + , const std::string &story_name, StoryId &story_id) { LOG_DEBUG("[ChronicleMetaDirectory] ClientID={} releasing StoryName={} in ChronicleName={}", client_id , story_name.c_str(), chronicle_name.c_str()); @@ -389,7 +339,6 @@ int ChronicleMetaDirectory::release_story(chl::ClientId const &client_id, const /* Decrement AcquisitionCount */ pStory->decrementAcquisitionCount(); story_id = pStory->getSid(); - notify_keepers = (pStory->getAcquisitionCount() == 0 ? true : false); /* Remove this client from acquirerClientList of the Story */ pStory->removeAcquirerClient(client_id); /* Remove this Story from acquiredStoryMap for this client */ diff --git a/ChronoVisor/src/KeeperRegistry.cpp b/ChronoVisor/src/KeeperRegistry.cpp index 502fd7c6..0822c1f3 100644 --- a/ChronoVisor/src/KeeperRegistry.cpp +++ b/ChronoVisor/src/KeeperRegistry.cpp @@ -50,6 +50,7 @@ int KeeperRegistry::InitializeRegistryService(ChronoLog::ConfigurationManager co keeperRegistryService = KeeperRegistryService::CreateKeeperRegistryService(*registryEngine, provider_id, *this); delayedDataAdminExitSeconds = confManager.VISOR_CONF.DELAYED_DATA_ADMIN_EXIT_IN_SECS; + registryState = INITIALIZED; status = chronolog::CL_SUCCESS; } @@ -61,6 +62,21 @@ int KeeperRegistry::InitializeRegistryService(ChronoLog::ConfigurationManager co return status; } + +KeeperRegistry::KeeperRegistry() + : registryState(UNKNOWN) + , registryEngine(nullptr) + , keeperRegistryService(nullptr) + , delayedDataAdminExitSeconds(3) +{ + // INNA: I'm using current time for seeding Mersene Twister number generator + // there are different opinions on the use of std::random_device for seeding of Mersene Twister.. + // TODO: reseach the seeding of Mersense Twister int number generator + + size_t new_seed = std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now()); + mt_random.seed(new_seed);//initial seed for the 32 int Mersene Twister generator +} + ///////////////// int KeeperRegistry::ShutdownRegistryService() @@ -78,58 +94,100 @@ int KeeperRegistry::ShutdownRegistryService() registryState = SHUTTING_DOWN; LOG_INFO("[KeeperRegistry] Shutting down..."); - // send out shutdown instructions to - // all active keeper processes - // then drain the registry - while(!keeperProcessRegistry.empty()) + activeGroups.clear(); + activeStories.clear(); + + while(!recordingGroups.empty()) { - for(auto process_iter = keeperProcessRegistry.begin(); process_iter != keeperProcessRegistry.end();) + std::time_t current_time = + std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now()); + std::time_t delayedExitTime = std::chrono::high_resolution_clock::to_time_t( + std::chrono::high_resolution_clock::now() + std::chrono::seconds(delayedDataAdminExitSeconds)); + + for(auto group_iter = recordingGroups.begin(); group_iter != recordingGroups.end();) { - std::time_t current_time = - std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now()); - std::stringstream id_string; - id_string << (*process_iter).second.idCard; + RecordingGroup& recording_group = ((*group_iter).second); - if((*process_iter).second.active) + if(recording_group.grapherProcess != nullptr) { - LOG_INFO("[KeeperRegistry] Sending shutdown to keeper {}", id_string.str()); - (*process_iter).second.keeperAdminClient->shutdown_collection(); + std::stringstream id_string; + id_string << recording_group.grapherProcess->idCard; - (*process_iter).second.active = false; + chl::GrapherProcessEntry* grapher_process = recording_group.grapherProcess; + if(grapher_process->active) + { + LOG_INFO("[KeeperRegistry] Sending shutdown to grapher {}", id_string.str()); + if(grapher_process->adminClient != nullptr) { grapher_process->adminClient->shutdown_collection(); } + + // start delayed destruction for the lingering Adminclient to be safe... + recording_group.startDelayedGrapherExit(*grapher_process, delayedExitTime); + } + else + { + //check if any existing delayed exit grapher processes can be cleared... + recording_group.clearDelayedExitGrapher(*grapher_process, current_time); + } - if((*process_iter).second.keeperAdminClient != nullptr) + if(grapher_process->delayedExitGrapherClients.empty()) { - std::time_t delayedExitTime = std::chrono::high_resolution_clock::to_time_t( - std::chrono::high_resolution_clock::now() + std::chrono::seconds(delayedDataAdminExitSeconds)); - LOG_INFO("[KeeperRegistry] shutdown: starting delayedExit for keeperProcess {} current_time={} delayedExitTime={}",id_string.str(), ctime(¤t_time), std::ctime(&delayedExitTime));; - (*process_iter) - .second.delayedExitClients.push_back(std::pair( - delayedExitTime, (*process_iter).second.keeperAdminClient)); - (*process_iter).second.keeperAdminClient = nullptr; + LOG_INFO("[KeeperRegistry] shudown: destroyed old entry for grapher {}", id_string.str()); + delete grapher_process; + recording_group.grapherProcess = nullptr; } } - while(!(*process_iter).second.delayedExitClients.empty() && - (current_time >= (*process_iter).second.delayedExitClients.front().first)) + // send out shutdown instructions to all active keeper processes + // then start delayedExit procedure for them + for(auto process_iter = recording_group.keeperProcesses.begin(); + process_iter != recording_group.keeperProcesses.end();) { - auto dataStoreClientPair = (*process_iter).second.delayedExitClients.front(); - LOG_INFO("[KeeperRegistry] shutdown() destroys old dataAdminClient for keeper {} delayedTime={}", id_string.str(), ctime(&(dataStoreClientPair.first))); - if(dataStoreClientPair.second != nullptr) { delete dataStoreClientPair.second; } - (*process_iter).second.delayedExitClients.pop_front(); - } + std::stringstream id_string; + id_string << (*process_iter).second.idCard; + + if((*process_iter).second.active) + { + LOG_INFO("[KeeperRegistry] Sending shutdown to keeper {}", id_string.str()); + (*process_iter).second.keeperAdminClient->shutdown_collection(); - if((*process_iter).second.delayedExitClients.empty()) + LOG_INFO("[KeeperRegistry] shutdown: starting delayedExit for keeper {} delayedExitTime={}", + id_string.str(), std::ctime(&delayedExitTime)); + recording_group.startDelayedKeeperExit((*process_iter).second, delayedExitTime); + } + else + { + LOG_INFO("[KeeperRegistry] shutdown: clear delayedAdminClient for keeper {}", id_string.str()); + recording_group.clearDelayedExitKeeper((*process_iter).second, current_time); + } + + if((*process_iter).second.delayedExitClients.empty()) + { + LOG_INFO("[KeeperRegistry] shutdown : destroys old keeperProcessEntry for {}", id_string.str()); + process_iter = recording_group.keeperProcesses.erase(process_iter); + } + else + { + LOG_INFO("[KeeperRegistry] shutdown: old dataAdminClient for {} can't yet be destroyed", + id_string.str()); + ++process_iter; + } + } + if(recording_group.grapherProcess == nullptr && recording_group.keeperProcesses.empty()) { - LOG_INFO("[KeeperRegistry] registerKeeperProcess() destroys old keeperProcessEntry for {}",id_string.str()); - process_iter = keeperProcessRegistry.erase(process_iter); + LOG_INFO("[KeeperRegistry] shutdown: recordingGroup {} is destroyed", recording_group.groupId); + group_iter = recordingGroups.erase(group_iter); } else { - LOG_INFO("[KeeperRegistry] registerKeeperProcess() old dataAdminClient for {} can't yet be destroyed", id_string.str()); - ++process_iter; + LOG_INFO("[KeeperRegistry] shutdown: recordingGroup {} can't yet be destroyed", + recording_group.groupId); + ++group_iter; } } + + if(!recordingGroups.empty()) { sleep(1); } } + + if(nullptr != keeperRegistryService) { delete keeperRegistryService; } return chronolog::CL_SUCCESS; @@ -157,47 +215,58 @@ int KeeperRegistry::registerKeeperProcess(KeeperRegistrationMsg const &keeper_re KeeperIdCard keeper_id_card = keeper_reg_msg.getKeeperIdCard(); ServiceId admin_service_id = keeper_reg_msg.getAdminServiceId(); + + //find the group that keeper belongs to in the registry + auto group_iter = recordingGroups.find(keeper_id_card.getGroupId()); + if(group_iter == recordingGroups.end()) + { + auto insert_return = recordingGroups.insert(std::pair( + keeper_id_card.getGroupId(), RecordingGroup(keeper_id_card.getGroupId()))); + if(false == insert_return.second) + { + LOG_ERROR("[KeeperRegistry] keeper registration failed to find RecordingGroup {}", + keeper_id_card.getGroupId()); + return chronolog::CL_ERR_UNKNOWN; + } + else { group_iter = insert_return.first; } + } + + RecordingGroup& recording_group = (*group_iter).second; + // unlikely but possible that the Registry still retains the record of the previous re-incarnation of hte Keeper process // running on the same host... check for this case and clean up the leftover record... - auto keeper_process_iter = keeperProcessRegistry.find( + auto keeper_process_iter = recording_group.keeperProcesses.find( std::pair(keeper_id_card.getIPaddr(), keeper_id_card.getPort())); std::stringstream id_string; id_string << keeper_id_card; - if(keeper_process_iter != keeperProcessRegistry.end()) + if(keeper_process_iter != recording_group.keeperProcesses.end()) { // must be a case of the KeeperProcess exiting without unregistering or some unexpected break in communication... // start delayed destruction process for hte lingering keeperAdminclient to be safe... - std::time_t current_time = - std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now()); - (*keeper_process_iter).second.active = false; - - if((*keeper_process_iter).second.keeperAdminClient != nullptr) + if((*keeper_process_iter).second.active) { - std::time_t delayedExitTime = std::chrono::high_resolution_clock::to_time_t( std::chrono::high_resolution_clock::now() + std::chrono::seconds(delayedDataAdminExitSeconds)); - LOG_WARNING("[KeeperRegistry] registerKeeperProcess for keeper {} found old instance of dataAdminclient; starting delayedExit current_time={} delayedExitTime={}",id_string.str(), ctime(¤t_time), std::ctime(&delayedExitTime));; + LOG_WARNING("[KeeperRegistry] registerKeeperProcess: found old instance of dataAdminclient for {} " + "delayedExitTime={}", + id_string.str(), std::ctime(&delayedExitTime)); + ; - (*keeper_process_iter) - .second.delayedExitClients.push_back(std::pair( - delayedExitTime, (*keeper_process_iter).second.keeperAdminClient)); - (*keeper_process_iter).second.keeperAdminClient = nullptr; + recording_group.startDelayedKeeperExit((*keeper_process_iter).second, delayedExitTime); } - - while(!(*keeper_process_iter).second.delayedExitClients.empty() && - (current_time >= (*keeper_process_iter).second.delayedExitClients.front().first)) + else { - auto dataStoreClientPair = (*keeper_process_iter).second.delayedExitClients.front(); - LOG_INFO("[KeeperRegistry] registerKeeperProcess destroys delayed dataAdmindClient for {}",id_string.str()); - if(dataStoreClientPair.second != nullptr) { delete dataStoreClientPair.second; } - (*keeper_process_iter).second.delayedExitClients.pop_front(); + std::time_t current_time = + std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now()); + LOG_INFO("[KeeperRegistry] registerKeeperProcess tries to clear dataAdmindClient for {}", id_string.str()); + recording_group.clearDelayedExitKeeper((*keeper_process_iter).second, current_time); } if((*keeper_process_iter).second.delayedExitClients.empty()) { LOG_INFO("[KeeperRegistry] registerKeeperProcess has destroyed old entry for keeper {}",id_string.str()); - keeperProcessRegistry.erase(keeper_process_iter); + recording_group.keeperProcesses.erase(keeper_process_iter); } else { @@ -207,7 +276,7 @@ int KeeperRegistry::registerKeeperProcess(KeeperRegistrationMsg const &keeper_re } //create a client of Keeper's DataStoreAdminService listenning at adminServiceId - std::string service_na_string("ofi+sockets://"); + std::string service_na_string("ofi+sockets://"); //TODO: add protocol to serviceId and keeperIdCard service_na_string = admin_service_id.getIPasDottedString(service_na_string) + ":" + std::to_string(admin_service_id.port); @@ -222,9 +291,10 @@ int KeeperRegistry::registerKeeperProcess(KeeperRegistrationMsg const &keeper_re } //now create a new KeeperRecord with the new DataAdminclient - auto insert_return = keeperProcessRegistry.insert(std::pair , KeeperProcessEntry>( - std::pair (keeper_id_card.getIPaddr(), keeper_id_card.getPort()), KeeperProcessEntry( - keeper_id_card, admin_service_id))); + auto insert_return = + recording_group.keeperProcesses.insert(std::pair, KeeperProcessEntry>( + std::pair(keeper_id_card.getIPaddr(), keeper_id_card.getPort()), + KeeperProcessEntry(keeper_id_card, admin_service_id))); if(false == insert_return.second) { LOG_ERROR("[KeeperRegistry] registration failed for Keeper {}", id_string.str()); @@ -234,17 +304,28 @@ int KeeperRegistry::registerKeeperProcess(KeeperRegistrationMsg const &keeper_re (*insert_return.first).second.keeperAdminClient = collectionClient; (*insert_return.first).second.active = true; + recording_group.activeKeeperCount += 1; LOG_INFO("[KeeperRegistry] Register Keeper: KeeperIdCard: {} created DataStoreAdminClient for {}: provider_id={}" , id_string.str(), service_na_string, admin_service_id.provider_id); - // now that communnication with the Keeper is established and we still holding registryLock - // update registryState in case this is the first KeeperProcess registration - if(keeperProcessRegistry.size() > 0) - { registryState = RUNNING; + LOG_INFO("[KeeperRegistry] RecordingGroup {} has {} keepers", recording_group.groupId, + recording_group.keeperProcesses.size()); - LOG_INFO("[KeeperRegistry] RUNNING with {} KeeperProcesses", keeperProcessRegistry.size()); + // check if this is the first keeper for the recording group and the group is ready to be part of + // the activeGroups rotation + + if(recording_group.isActive() && recording_group.activeKeeperCount == 1) + { + activeGroups.push_back(&recording_group); + size_t new_seed = std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now()); + mt_random.seed(new_seed);//re-seed the mersene_twister_generator + group_id_distribution = + std::uniform_int_distribution(0, activeGroups.size() - 1);//reset the distribution range } + + LOG_INFO("[KeeperRegistry] has {} activeGroups; {} RecordingGroups ", activeGroups.size(), recordingGroups.size()); + if(activeGroups.size() > 0) { registryState = RUNNING; } return chronolog::CL_SUCCESS; } ///////////////// @@ -259,44 +340,51 @@ int KeeperRegistry::unregisterKeeperProcess(KeeperIdCard const &keeper_id_card) if(is_shutting_down()) { return chronolog::CL_ERR_UNKNOWN; } - auto keeper_process_iter = keeperProcessRegistry.find( + auto group_iter = recordingGroups.find(keeper_id_card.getGroupId()); + if(group_iter == recordingGroups.end()) { return chronolog::CL_SUCCESS; } + + RecordingGroup& recording_group = (*group_iter).second; + + auto keeper_process_iter = recording_group.keeperProcesses.find( std::pair(keeper_id_card.getIPaddr(), keeper_id_card.getPort())); - if(keeper_process_iter != keeperProcessRegistry.end()) + if(keeper_process_iter == recording_group.keeperProcesses.end()) + { + //we don't have a record of this keeper, we have nothing to do + return CL_SUCCESS; + } + else { + // check if the group is active and the keeper we are about to unregister is the only one this group has + // and the group needs to be removed from the active group rotation + if(recording_group.isActive() && (*keeper_process_iter).second.active && recording_group.activeKeeperCount == 1) + { + activeGroups.erase(std::remove(activeGroups.begin(), activeGroups.end(), &recording_group)); + if(activeGroups.size() > 0) + {//reset the group distribution + size_t new_seed = std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now()); + mt_random.seed(new_seed);//re-seed the mersene_twister_generator + group_id_distribution = + std::uniform_int_distribution(0, activeGroups.size() - 1);//reset the distribution range + } + } + // we mark the keeperProcessEntry as inactive and set the time it would be safe to delete. // we delay the destruction of the keeperEntry & keeperAdminClient by 5 secs // to prevent the case of deleting the keeperAdminClient while it might be waiting for rpc response on the // other thread - std::stringstream id_string; - id_string << keeper_id_card; - (*keeper_process_iter).second.active = false; - - if((*keeper_process_iter).second.keeperAdminClient != nullptr) - { - - std::time_t current_time = std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now()); - std::time_t delayedExitTime = std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now() + std::chrono::seconds(delayedDataAdminExitSeconds)); - LOG_INFO("[KeeperRegistry] unregisterKeeperProcess() starting delayedExit for keeper {} current_time={} delayedExitTime={}",id_string.str(), std::ctime(¤t_time), std::ctime(&delayedExitTime));; + std::time_t delayedExitTime = std::chrono::high_resolution_clock::to_time_t( + std::chrono::high_resolution_clock::now() + std::chrono::seconds(delayedDataAdminExitSeconds)); + recording_group.startDelayedKeeperExit((*keeper_process_iter).second, delayedExitTime); + } - (*keeper_process_iter) - .second.delayedExitClients.push_back(std::pair( - delayedExitTime, (*keeper_process_iter).second.keeperAdminClient)); - (*keeper_process_iter).second.keeperAdminClient = nullptr; - } + LOG_INFO("[KeeperRegistry] RecordingGroup {} has {} keepers", recording_group.groupId, + recording_group.keeperProcesses.size()); - /*if( (*keeper_process_iter).second.keeperAdminClient != nullptr) - { delete (*keeper_process_iter).second.keeperAdminClient; } - keeperProcessRegistry.erase(keeper_process_iter); - */ - } - // now that we are still holding registryLock + LOG_INFO("[KeeperRegistry] has {} activeGroups; {} RecordingGroups ", activeGroups.size(), recordingGroups.size()); + // update registryState if needed - if(!is_shutting_down() && (1 == keeperProcessRegistry.size())) - { - registryState = INITIALIZED; - LOG_INFO("[KeeperRegistry] INITIALIZED with {} KeeperProcesses", keeperProcessRegistry.size()); - } + if(!is_shutting_down() && activeGroups.size() == 0) { registryState = INITIALIZED; } return chronolog::CL_SUCCESS; } @@ -312,9 +400,13 @@ void KeeperRegistry::updateKeeperProcessStats(KeeperStatsMsg const &keeperStatsM { return; } KeeperIdCard keeper_id_card = keeperStatsMsg.getKeeperIdCard(); - auto keeper_process_iter = keeperProcessRegistry.find( - std::pair(keeper_id_card.getIPaddr(), keeper_id_card.getPort())); - if(keeper_process_iter == keeperProcessRegistry.end() || !((*keeper_process_iter).second.active)) + auto group_iter = recordingGroups.find(keeper_id_card.getGroupId()); + if(group_iter == recordingGroups.end()) { return; } + + auto keeper_process_iter = (*group_iter) + .second.keeperProcesses.find(std::pair( + keeper_id_card.getIPaddr(), keeper_id_card.getPort())); + if(keeper_process_iter == (*group_iter).second.keeperProcesses.end() || !((*keeper_process_iter).second.active)) {// however unlikely it is that the stats msg would be delivered for the keeper that's already unregistered // we should probably log a warning here... return; @@ -325,29 +417,34 @@ void KeeperRegistry::updateKeeperProcessStats(KeeperStatsMsg const &keeperStatsM } ///////////////// -std::vector &KeeperRegistry::getActiveKeepers(std::vector &keeper_id_cards) -{ //the process of keeper selection will probably get more nuanced; - //for now just return all the keepers registered - if(is_shutting_down()) - { return keeper_id_cards; } +// NOTE: RecordingGroup methods are not currently protected by lock +// the assumptions is that the caller would use RegistryLock before calling the RecordingGroup method +// we may decide to revisit this and introduce RecordingGroup level locks later on.. - std::lock_guard lock(registryLock); - if(is_shutting_down()) - { return keeper_id_cards; } +std::vector& RecordingGroup::getActiveKeepers(std::vector& keeper_id_cards) +{ + // NOTE: RecordingGroup methods are not currently protected by lock + // the assumptions is that the caller would use RegistryLock before calling the RecordingGroup + // method + // we may decide to revisit this and introduce RecordingGroup level locks later on.. keeper_id_cards.clear(); + + // pick recording_group from uniform group id distribution using a random int value + // generated by Mirsene Twister generator + + std::time_t current_time = std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now()); - for(auto iter = keeperProcessRegistry.begin(); iter != keeperProcessRegistry.end();) + for(auto iter = keeperProcesses.begin(); iter != keeperProcesses.end();) { - std::stringstream id_string; - id_string << (*iter).second.idCard; // first check if there are any delayedExit DataStoreClients to be deleted while(!(*iter).second.delayedExitClients.empty() && (current_time >= (*iter).second.delayedExitClients.front().first)) { auto dataStoreClientPair = (*iter).second.delayedExitClients.front(); - LOG_INFO("[KeeperRegistry] getActiveKeepers() destroys dataAdminClient for keeper {} current_time={} delayedExitTime={}",id_string.str(), ctime(¤t_time), ctime(&(dataStoreClientPair.first))); + LOG_INFO("[KeeperRegistry] getActiveKeepers() destroys dataAdminClient for keeper {} current_time={} delayedExitTime={}", + (*iter).second.idCardString, ctime(¤t_time)); if(dataStoreClientPair.second != nullptr) { delete dataStoreClientPair.second; } (*iter).second.delayedExitClients.pop_front(); } @@ -361,18 +458,14 @@ std::vector &KeeperRegistry::getActiveKeepers(std::vector &KeeperRegistry::getActiveKeepers(std::vector & vectorOfKeepers) +{ + vectorOfKeepers.clear(); -int KeeperRegistry::notifyKeepersOfStoryRecordingStart(std::vector& vectorOfKeepers, - ChronicleName const& chronicle, StoryName const& story, - StoryId const& storyId) + RecordingGroup* recording_group = nullptr; + + { + //lock KeeperRegistry and choose the recording group for this story + //NOTE we only keep the lock within this paragraph... + std::lock_guard lock(registryLock); + if(!is_running()) + { + LOG_ERROR("[KeeperRegistry] Registry has no active RecordingGroups to start recording story {}", story_id); + return chronolog::CL_ERR_NO_KEEPERS; + } + + + //first check if we are already recording this story for another client and have a recording group assigned to it + + auto story_iter = activeStories.find(story_id); + + if(story_iter != activeStories.end() && (*story_iter).second != nullptr) + { + //INNA:TODO: we should probably check if the group's active status hasn't changed + //and implement group re-assignment procedure when we have recording processes dynamically coming and going.. + + recording_group = (*story_iter).second; + recording_group->getActiveKeepers(vectorOfKeepers); + + //no need for notification , group processes are already recording this story + LOG_INFO("[Registry] RecordingGroup {} is already recording story {}", recording_group->groupId,story_id); + + return chronolog::CL_SUCCESS; + } + + // select recording_group from the group id distribution using a random int value + // generated by Mirsene Twister generator + // NOTE: using uniform_distribution for now, we might add discrete distribution with weights later... + + recording_group = activeGroups[group_id_distribution(mt_random)]; + activeStories[story_id] = recording_group; + } + + LOG_INFO("[Registry] selected RecordingGroup {} for story {}", recording_group->groupId, story_id); + + uint64_t story_start_time = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + + // the registryLock is released by this point.. + // notify Grapher and notifyKeepers functions use delayedExit logic to protect + // the rpc code from DataAdminClients being destroyed while notification is in progress.. + int rpc_return = notifyGrapherOfStoryRecordingStart(*recording_group, chronicle, story, story_id, story_start_time); + + if(rpc_return == CL_SUCCESS) + { + recording_group->getActiveKeepers(vectorOfKeepers); + rpc_return = notifyKeepersOfStoryRecordingStart(*recording_group, vectorOfKeepers, chronicle, story, story_id, + story_start_time); + } + + return rpc_return; +} + +//////////////// +int KeeperRegistry::notifyGrapherOfStoryRecordingStart(RecordingGroup& recordingGroup, ChronicleName const& chronicle, + StoryName const& story, StoryId const& storyId, + uint64_t story_start_time) { + int return_code = chronolog::CL_ERR_NO_KEEPERS; + if(!is_running()) { - LOG_ERROR("[KeeperRegistry] Registry has no Keeper processes to start recording story {}", storyId); + LOG_ERROR("[KeeperRegistry] Registry has no active RecordingGroups to start recording story {}", storyId); return chronolog::CL_ERR_NO_KEEPERS; } - std::chrono::time_point time_now = std::chrono::system_clock::now(); - uint64_t story_start_time = time_now.time_since_epoch().count(); + DataStoreAdminClient* dataAdminClient = nullptr; + + { + // NOTE: we release the registryLock before sending rpc request so that we do not hold it for the duration of rpc communication. + // We delay the destruction of unactive adminClients that might be triggered by the unregister call from a different thread + // to protect us from the unfortunate case of aminClient object being deleted + // while this thread is waiting for rpc response + std::lock_guard lock(registryLock); + + if(recordingGroup.grapherProcess != nullptr && recordingGroup.grapherProcess->active && + recordingGroup.grapherProcess->adminClient != nullptr) + { + dataAdminClient = recordingGroup.grapherProcess->adminClient; + } + else + { + LOG_WARNING("[KeeperRegistry] grapher for recordingGroup {} is not available for notification", + recordingGroup.groupId); + } + } + + if(dataAdminClient == nullptr) { return return_code; } + + try + { + return_code = dataAdminClient->send_start_story_recording(chronicle, story, storyId, story_start_time); + if(return_code != CL_SUCCESS) + { + LOG_WARNING("[KeeperRegistry] Registry failed RPC notification to {}", recordingGroup.grapherProcess->idCardString); + } + else + { + LOG_INFO("[KeeperRegistry] Registry notified {} to start recording StoryID={} with StartTime={}", + recordingGroup.grapherProcess->idCardString, storyId, story_start_time); + } + } + catch(thallium::exception const& ex) + { + LOG_WARNING("[KeeperRegistry] Registry failed RPC notification to grapher {}", recordingGroup.grapherProcess->idCardString); + } + + return return_code; +} + +/////////////// +int KeeperRegistry::notifyGrapherOfStoryRecordingStop(RecordingGroup& recordingGroup, StoryId const& storyId) +{ + int return_code = chronolog::CL_ERR_NO_KEEPERS; + + if(!is_running()) + { + LOG_ERROR("[KeeperRegistry] Registry has no active RecordingGroups to start recording story {}", storyId); + return chronolog::CL_ERR_NO_KEEPERS; + } + + DataStoreAdminClient* dataAdminClient = nullptr; + + std::stringstream id_string; + + { + // NOTE: we release the registryLock before sending rpc request so that we do not hold it for the duration of rpc communication. + // We delay the destruction of unactive adminClients that might be triggered by the unregister call from a different thread + // to protect us from the unfortunate case of aminClient object being deleted + // while this thread is waiting for rpc response + std::lock_guard lock(registryLock); + + if(recordingGroup.grapherProcess != nullptr && recordingGroup.grapherProcess->active && + recordingGroup.grapherProcess->adminClient != nullptr) + { + id_string << recordingGroup.grapherProcess->idCard; + dataAdminClient = recordingGroup.grapherProcess->adminClient; + } + else + { + LOG_WARNING("[KeeperRegistry] grapher for recordingGroup {} is not available for notification", + recordingGroup.groupId); + } + } + + if(dataAdminClient == nullptr) { return return_code; } + + try + { + return_code = dataAdminClient->send_stop_story_recording(storyId); + if(return_code != CL_SUCCESS) + { + LOG_WARNING("[KeeperRegistry] Registry failed RPC notification to {}", id_string.str()); + } + else + { + LOG_INFO("[KeeperRegistry] Registry notified grapher {} to stop recording StoryID={} ", id_string.str(), + storyId); + } + } + catch(thallium::exception const& ex) + { + LOG_WARNING("[KeeperRegistry] Registry failed RPC notification to grapher {}", id_string.str()); + } + + return return_code; +} +///////////////////// + +int KeeperRegistry::notifyKeepersOfStoryRecordingStart(RecordingGroup& recordingGroup, + std::vector& vectorOfKeepers, + ChronicleName const& chronicle, StoryName const& story, + StoryId const& storyId, uint64_t story_start_time) +{ + + // if there are no activeGroups ready for recording + // we are out of luck... + if(!is_running()) + { + LOG_ERROR("[KeeperRegistry] Registry has no active RecordingGroups to start recording story {}", storyId); + return chronolog::CL_ERR_NO_KEEPERS; + } std::vector vectorOfKeepersToNotify = vectorOfKeepers; vectorOfKeepers.clear(); + + auto keeper_processes = recordingGroup.keeperProcesses; + for(KeeperIdCard keeper_id_card: vectorOfKeepersToNotify) { DataStoreAdminClient* dataAdminClient = nullptr; @@ -407,9 +683,9 @@ int KeeperRegistry::notifyKeepersOfStoryRecordingStart(std::vector // (see unregisterKeeperProcess()) to protect us from the unfortunate case of keeperProcessEntry.dataAdminClient object being deleted // while this thread is waiting for rpc response std::lock_guard lock(registryLock); - auto keeper_process_iter = keeperProcessRegistry.find( + auto keeper_process_iter = keeper_processes.find( std::pair(keeper_id_card.getIPaddr(), keeper_id_card.getPort())); - if((keeper_process_iter != keeperProcessRegistry.end() && (*keeper_process_iter).second.active) && + if((keeper_process_iter != keeper_processes.end() && (*keeper_process_iter).second.active) && ((*keeper_process_iter).second.keeperAdminClient != nullptr)) { dataAdminClient = (*keeper_process_iter).second.keeperAdminClient; @@ -430,7 +706,8 @@ int KeeperRegistry::notifyKeepersOfStoryRecordingStart(std::vector } else { - LOG_INFO("[KeeperRegistry] Registry notified keeper {} to start recording StoryID={} with StartTime={}", id_string.str(), storyId, story_start_time); + LOG_INFO("[KeeperRegistry] Registry notified {} to start recording StoryID={} with StartTime={}", + id_string.str(), storyId, story_start_time); vectorOfKeepers.push_back(keeper_id_card); } } @@ -449,8 +726,53 @@ int KeeperRegistry::notifyKeepersOfStoryRecordingStart(std::vector return chronolog::CL_SUCCESS; } ///////////////// +int KeeperRegistry::notifyRecordingGroupOfStoryRecordingStop(StoryId const& story_id) +{ + RecordingGroup* recording_group = nullptr; -int KeeperRegistry::notifyKeepersOfStoryRecordingStop(std::vector const& vectorOfKeepers, + std::vector vectorOfKeepers; + + { + //lock KeeperRegistry and choose the recording group for this story + //NOTE we only keep the lock within this paragraph... + std::lock_guard lock(registryLock); + if(!is_running()) + { + LOG_ERROR("[KeeperRegistry] Registry has no active RecordingGroups to start recording story {}", story_id); + return chronolog::CL_ERR_NO_KEEPERS; + } + + + auto story_iter = activeStories.find(story_id); + + if(story_iter == activeStories.end()) + { + //we don't know of this story + return CL_SUCCESS; + } + + recording_group = (*story_iter).second; + if(recording_group != nullptr) { recording_group->getActiveKeepers(vectorOfKeepers); } + + activeStories.erase(story_iter); + } + + if(recording_group != nullptr) + { + // the registryLock is released by this point.. + // notify Grapher and notifyKeepers functions use delayedExit logic to protect + // the rpc code from DataAdminClients being destroyed while notification is in progress.. + + notifyKeepersOfStoryRecordingStop(*recording_group, vectorOfKeepers, story_id); + + notifyGrapherOfStoryRecordingStop(*recording_group, story_id); + } + + return CL_SUCCESS; +} +////////////// +int KeeperRegistry::notifyKeepersOfStoryRecordingStop(RecordingGroup& recordingGroup, + std::vector const& vectorOfKeepers, StoryId const& storyId) { if(!is_running()) @@ -459,7 +781,8 @@ int KeeperRegistry::notifyKeepersOfStoryRecordingStop(std::vector return chronolog::CL_ERR_NO_KEEPERS; } - size_t keepers_left_to_notify = vectorOfKeepers.size(); + auto keeper_processes = recordingGroup.keeperProcesses; + for(KeeperIdCard keeper_id_card: vectorOfKeepers) { DataStoreAdminClient* dataAdminClient = nullptr; @@ -471,9 +794,9 @@ int KeeperRegistry::notifyKeepersOfStoryRecordingStop(std::vector // (see unregisterKeeperProcess()) to protect us from the unfortunate case of keeperProcessEntry.dataAdminClient object being deleted // while this thread is waiting for rpc response std::lock_guard lock(registryLock); - auto keeper_process_iter = keeperProcessRegistry.find( + auto keeper_process_iter = keeper_processes.find( std::pair(keeper_id_card.getIPaddr(), keeper_id_card.getPort())); - if((keeper_process_iter != keeperProcessRegistry.end() && (*keeper_process_iter).second.active) && + if((keeper_process_iter != keeper_processes.end() && (*keeper_process_iter).second.active) && ((*keeper_process_iter).second.keeperAdminClient != nullptr)) { dataAdminClient = (*keeper_process_iter).second.keeperAdminClient; @@ -493,7 +816,7 @@ int KeeperRegistry::notifyKeepersOfStoryRecordingStop(std::vector } else { - LOG_INFO("[KeeperRegistry] Registry notified keeper {} to stop recording story {}", id_string.str(), storyId); + LOG_INFO("[KeeperRegistry] Registry notified {} to stop recording story {}", id_string.str(), storyId); } } catch(thallium::exception const& ex) @@ -505,7 +828,270 @@ int KeeperRegistry::notifyKeepersOfStoryRecordingStop(std::vector return chronolog::CL_SUCCESS; } +//////////////////////////// + +int KeeperRegistry::registerGrapherProcess(GrapherRegistrationMsg const & reg_msg) +{ + if(is_shutting_down()) { return chronolog::CL_ERR_UNKNOWN; } + + GrapherIdCard grapher_id_card = reg_msg.getGrapherIdCard(); + RecordingGroupId group_id = grapher_id_card.getGroupId(); + ServiceId admin_service_id = reg_msg.getAdminServiceId(); + + std::lock_guard lock(registryLock); + //re-check state after ther lock is aquired + if(is_shutting_down()) { return chronolog::CL_ERR_UNKNOWN; } + + //find the group that grapher belongs to in the registry + auto group_iter = recordingGroups.find(group_id); + if(group_iter == recordingGroups.end()) + { + auto insert_return = + recordingGroups.insert(std::pair(group_id, RecordingGroup(group_id))); + if(false == insert_return.second) + { + LOG_ERROR("[KeeperRegistry] keeper registration failed to find RecordingGroup {}", group_id); + return chronolog::CL_ERR_UNKNOWN; + } + else { group_iter = insert_return.first; } + } + + RecordingGroup& recording_group = ((*group_iter).second); + + // it is possible that the Registry still retains the record of the previous re-incarnation of the grapher process + // check for this case and clean up the leftover record... + + std::stringstream id_string; + id_string << grapher_id_card; + if(recording_group.grapherProcess != nullptr) + { + // start delayed destruction for the lingering Adminclient to be safe... + + chl::GrapherProcessEntry* grapher_process = recording_group.grapherProcess; + if(grapher_process->active) + { + // start delayed destruction for the lingering Adminclient to be safe... + + std::time_t delayedExitTime = std::chrono::high_resolution_clock::to_time_t( + std::chrono::high_resolution_clock::now() + std::chrono::seconds(delayedDataAdminExitSeconds)); + + recording_group.startDelayedGrapherExit(*grapher_process, delayedExitTime); + } + else + { + //check if any existing delayed exit grapher processes can be cleared... + std::time_t current_time = + std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now()); + recording_group.clearDelayedExitGrapher(*grapher_process, current_time); + } + + if(grapher_process->delayedExitGrapherClients.empty()) + { + LOG_INFO("[KeeperRegistry] registerGrapherProcess has destroyed old entry for grapher {}", id_string.str()); + delete grapher_process; + recording_group.grapherProcess = nullptr; + } + else + { + LOG_INFO("[KeeperRegistry] registration for Grapher{} cant's proceed as previous grapherClient isn't yet " + "dismantled", + id_string.str()); + return CL_ERR_UNKNOWN; + } + } + + //create a client of the new grapher's DataStoreAdminService listenning at adminServiceId + std::string service_na_string("ofi+sockets://"); //TODO: add protocol string to serviceIdCard + service_na_string = + admin_service_id.getIPasDottedString(service_na_string) + ":" + std::to_string(admin_service_id.port); + + DataStoreAdminClient* collectionClient = DataStoreAdminClient::CreateDataStoreAdminClient( + *registryEngine, service_na_string, admin_service_id.provider_id); + if(nullptr == collectionClient) + { + LOG_ERROR("[KeeperRegistry] Register Grapher {} failed to create DataStoreAdminClient for {}: provider_id={}", + id_string.str(), service_na_string, admin_service_id.provider_id); + return chronolog::CL_ERR_UNKNOWN; + } + + //now create a new GrapherProcessEntry with the new DataAdminclient + recording_group.grapherProcess = new GrapherProcessEntry(grapher_id_card, admin_service_id); + recording_group.grapherProcess->adminClient = collectionClient; + recording_group.grapherProcess->active = true; + + LOG_INFO("[KeeperRegistry] Register grapher {} created DataStoreAdminClient for {}: provider_id={}", + id_string.str(), service_na_string, admin_service_id.provider_id); + + LOG_INFO("[KeeperRegistry] RecordingGroup {} has a grappher and {} keepers", recording_group.groupId, + recording_group.keeperProcesses.size()); + + // now that communnication with the Grapher is established and we are still holding registryLock + // check if the group is ready for active group rotation + if(recording_group.isActive()) + { + activeGroups.push_back(&recording_group); + + //reset the group distribution + size_t new_seed = std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now()); + mt_random.seed(new_seed);//re-seed the mersene_twister_generator + group_id_distribution = std::uniform_int_distribution(0, activeGroups.size() - 1); + } + + LOG_INFO("[KeeperRegistry] has {} activeGroups; {} RecordingGroups ", activeGroups.size(), recordingGroups.size()); + // we still holding registryLock + // update registryState if needed + if(activeGroups.size() > 0) + { + registryState = RUNNING; + } + return chronolog::CL_SUCCESS; +} +///////////////// + +int KeeperRegistry::unregisterGrapherProcess(GrapherIdCard const& grapher_id_card) +{ + std::lock_guard lock(registryLock); + //check again after the lock is acquired + if(is_shutting_down()) { return chronolog::CL_ERR_UNKNOWN; } + + auto group_iter = recordingGroups.find(grapher_id_card.getGroupId()); + if(group_iter == recordingGroups.end()) { return chronolog::CL_SUCCESS; } + + RecordingGroup& recording_group = ((*group_iter).second); + + // we are about to unregister the grapher so the group can't perform recording duties + // if it were an active recordingGroup before remove it from rotation + if(recording_group.isActive()) + { + auto active_group_iter = activeGroups.begin(); + while (active_group_iter != activeGroups.end()) + { + if((*active_group_iter) != &recording_group) + { ++active_group_iter;} + else + { break; } + } + + if(active_group_iter != activeGroups.end()) + { + //INNA: what do we do with any active Stories that this group were recordng? force release them and notify clients? + // wait for the new grapher? + LOG_INFO("[KeeperRegistry] RecordingGroup {} is not active; activeGroups.size{}", recording_group.groupId,activeGroups.size()); + activeGroups.erase(active_group_iter); + if(activeGroups.size() > 0) + { + //reset the group distribution + size_t new_seed = std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now()); + mt_random.seed(new_seed);//re-seed the mersene_twister_generator + group_id_distribution = + std::uniform_int_distribution(0, activeGroups.size() - 1);//reset the distribution range + } + } + } + + if(recording_group.grapherProcess != nullptr && recording_group.grapherProcess->active) + { + // start delayed destruction for the lingering Adminclient to be safe... + // to prevent the case of deleting the keeperAdminClient while it might be waiting for rpc response on the + // other thread + + std::time_t delayedExitTime = std::chrono::high_resolution_clock::to_time_t( + std::chrono::high_resolution_clock::now() + std::chrono::seconds(delayedDataAdminExitSeconds)); + LOG_INFO("[KeeperRegistry] grapher {} starting delayedExit for grapher {} delayedExitTime={}", recording_group.grapherProcess->idCardString, + std::ctime(&delayedExitTime)); + + recording_group.startDelayedGrapherExit(*(recording_group.grapherProcess), delayedExitTime); + } + + // now that we are still holding registryLock + // update registryState if needed + LOG_INFO("[KeeperRegistry] RecordingGroup {} has no grapher and {} keepers", recording_group.groupId, + recording_group.keeperProcesses.size()); + + LOG_INFO("[KeeperRegistry] has {} activeGroups; {} RecordingGroups ", activeGroups.size(), recordingGroups.size()); + + // update registryState in case this was the last active recordingGroup + if(activeGroups.size() == 0) { registryState = INITIALIZED; } + + return chronolog::CL_SUCCESS; +} }//namespace chronolog +/////////////// + +bool chl::RecordingGroup::isActive() const +{ + //TODO: we might add a check for time since the last stats message received from + // the processes listed as active + + if(grapherProcess != nullptr && grapherProcess->active && activeKeeperCount >0) + { + LOG_DEBUG("[REGISTRY] RecordingGroup {} is active", groupId); + return true; + } + else + { + LOG_DEBUG("[REGISTRY] RecordingGroup {} is not active", groupId); + return false; + } +} + +void chl::RecordingGroup::startDelayedGrapherExit(chl::GrapherProcessEntry& grapher_process, + std::time_t delayedExitTime) +{ + grapher_process.active = false; + + LOG_INFO("[KeeperRegistry] recording_group {} starts delayedExit for {}", groupId, grapher_process.idCardString); + if(grapher_process.adminClient != nullptr) + { + grapher_process.delayedExitGrapherClients.push_back( + std::pair(delayedExitTime, grapher_process.adminClient)); + grapher_process.adminClient = nullptr; + } +} + +void chl::RecordingGroup::clearDelayedExitGrapher(chl::GrapherProcessEntry& grapher_process, std::time_t current_time) +{ + while(!grapher_process.delayedExitGrapherClients.empty() && + (current_time >= grapher_process.delayedExitGrapherClients.front().first)) + { + LOG_INFO("[KeeperRegistry] recording_Group {}, destroys delayed dataAdmindClient for {}", groupId, + grapher_process.idCardString); + auto dataStoreClientPair = grapher_process.delayedExitGrapherClients.front(); + if(dataStoreClientPair.second != nullptr) { delete dataStoreClientPair.second; } + grapher_process.delayedExitGrapherClients.pop_front(); + } +} + +void chl::RecordingGroup::startDelayedKeeperExit(chl::KeeperProcessEntry& keeper_process, std::time_t delayedExitTime) +{ + // we mark the keeperProcessEntry as inactive and set the time it would be safe to delete. + // we delay the destruction of the keeperEntry & keeperAdminClient by 5 secs + // to prevent the case of deleting the keeperAdminClient while it might be waiting for rpc response on the + // other thread + + keeper_process.active = false; + activeKeeperCount -= 1; + + LOG_INFO("[KeeperRegistry] recording_group {} starts delayedExit for {}", groupId, keeper_process.idCardString); + if(keeper_process.keeperAdminClient != nullptr) + { + keeper_process.delayedExitClients.push_back( + std::pair(delayedExitTime, keeper_process.keeperAdminClient)); + keeper_process.keeperAdminClient = nullptr; + } +} + +void chl::RecordingGroup::clearDelayedExitKeeper(chl::KeeperProcessEntry& keeper_process, std::time_t current_time) +{ + while(!keeper_process.delayedExitClients.empty() && + (current_time >= keeper_process.delayedExitClients.front().first)) + { + LOG_INFO("[KeeperRegistry] recording_group {} destroys delayed dataAdminClient for {}", groupId, keeper_process.idCardString); + auto dataStoreClientPair = keeper_process.delayedExitClients.front(); + if(dataStoreClientPair.second != nullptr) { delete dataStoreClientPair.second; } + keeper_process.delayedExitClients.pop_front(); + } +} diff --git a/ChronoVisor/src/VisorClientPortal.cpp b/ChronoVisor/src/VisorClientPortal.cpp index fab6dd68..0f52cd92 100644 --- a/ChronoVisor/src/VisorClientPortal.cpp +++ b/ChronoVisor/src/VisorClientPortal.cpp @@ -206,16 +206,12 @@ chronolog::VisorClientPortal::AcquireStory(chl::ClientId const &client_id, std:: { return chronolog::AcquireStoryResponseMsg(CL_ERR_NOT_AUTHORIZED, story_id, recording_keepers); } int ret = CL_ERR_UNKNOWN; - //ret = chronicleMetaDirectory.create_story(chronicle_name, story_name, attrs); - //if (ret != CL_SUCCESS && ret != CL_ERR_STORY_EXISTS) - //{ return chronolog::AcquireStoryResponseMsg(ret, story_id, recording_keepers); } - - bool notify_keepers = false; - ret = chronicleMetaDirectory.acquire_story(client_id, chronicle_name, story_name, attrs, flags, story_id - , notify_keepers); + ret = chronicleMetaDirectory.acquire_story(client_id, chronicle_name, story_name, attrs, flags, story_id); + if(ret != chronolog::CL_SUCCESS) { + // return the error with the empty recording_keepers vector return chronolog::AcquireStoryResponseMsg(ret, story_id, recording_keepers); } else @@ -224,21 +220,20 @@ chronolog::VisorClientPortal::AcquireStory(chl::ClientId const &client_id, std:: , getpid(), client_id, chronicle_name.c_str(), story_name.c_str(), flags); } - recording_keepers = theKeeperRegistry->getActiveKeepers(recording_keepers); - // if this is the first client to acquire this story we need to notify the recording Keepers + // if this is the first client to acquire this story we need to choose an active recording group + // for the new story and notify the recording Keepers & Graphers // so that they are ready to start recording this story - if(notify_keepers) + + if(chronolog::CL_SUCCESS != theKeeperRegistry->notifyRecordingGroupOfStoryRecordingStart( + chronicle_name, story_name, story_id, recording_keepers)) { - if(chronolog::CL_SUCCESS != - theKeeperRegistry->notifyKeepersOfStoryRecordingStart(recording_keepers, chronicle_name, story_name - , story_id)) - { // RPC notification to the keepers might have failed, release the newly acquired story - chronicleMetaDirectory.release_story(client_id, chronicle_name, story_name, story_id, notify_keepers); - //we do know that there's no need notify keepers of the story ending in this case as it hasn't started... - //return CL_ERR_NO_KEEPERS; - return chronolog::AcquireStoryResponseMsg(chronolog::CL_ERR_NO_KEEPERS, story_id, recording_keepers); - } + // RPC notification to the keepers might have failed, release the newly acquired story + chronicleMetaDirectory.release_story(client_id, chronicle_name, story_name, story_id); + //we do know that there's no need notify keepers of the story ending in this case as it hasn't started... + recording_keepers.clear(); + return chronolog::AcquireStoryResponseMsg(chronolog::CL_ERR_NO_KEEPERS, story_id, recording_keepers); } + return chronolog::AcquireStoryResponseMsg(chronolog::CL_SUCCESS, story_id, recording_keepers); } @@ -253,18 +248,12 @@ int chronolog::VisorClientPortal::ReleaseStory(chl::ClientId const &client_id, s { return CL_ERR_NOT_AUTHORIZED; } StoryId story_id(0); - bool notify_keepers = false; - auto return_code = chronicleMetaDirectory.release_story(client_id, chronicle_name, story_name, story_id - , notify_keepers); + auto return_code = chronicleMetaDirectory.release_story(client_id, chronicle_name, story_name, story_id); if(chronolog::CL_SUCCESS != return_code) { return return_code; } - if(notify_keepers && theKeeperRegistry->is_running()) - { - std::vector recording_keepers; - theKeeperRegistry->notifyKeepersOfStoryRecordingStop(theKeeperRegistry->getActiveKeepers(recording_keepers) - , story_id); - } + theKeeperRegistry->notifyRecordingGroupOfStoryRecordingStop(story_id); + return chronolog::CL_SUCCESS; } diff --git a/chrono_common/ConfigurationManager.cpp b/chrono_common/ConfigurationManager.cpp index c6f0787f..90ffc87d 100644 --- a/chrono_common/ConfigurationManager.cpp +++ b/chrono_common/ConfigurationManager.cpp @@ -5,7 +5,13 @@ void ChronoLog::ConfigurationManager::parseGrapherConf(json_object*json_conf) { json_object_object_foreach(json_conf, key, val) { - if(strcmp(key, "KeeperGrapherDrainService") == 0) + if(strcmp(key, "RecordingGroup") == 0) + { + assert(json_object_is_type(val, json_type_object)); + int value = json_object_get_int(val); + GRAPHER_CONF.RECORDING_GROUP = (value >= 0 ? value : 0); + } + else if(strcmp(key, "KeeperGrapherDrainService") == 0) { assert(json_object_is_type(val, json_type_object)); json_object*keeper_grapher_drain_service_conf = json_object_object_get(json_conf diff --git a/chrono_common/ConfigurationManager.h b/chrono_common/ConfigurationManager.h index 0779ad6f..5b861e62 100644 --- a/chrono_common/ConfigurationManager.h +++ b/chrono_common/ConfigurationManager.h @@ -166,6 +166,7 @@ typedef struct VisorConf_ typedef struct KeeperConf_ { + uint32_t RECORDING_GROUP; KeeperRecordingServiceConf KEEPER_RECORDING_SERVICE_CONF; KeeperDataStoreAdminServiceConf KEEPER_DATA_STORE_ADMIN_SERVICE_CONF; VisorKeeperRegistryServiceConf VISOR_KEEPER_REGISTRY_SERVICE_CONF; @@ -175,7 +176,8 @@ typedef struct KeeperConf_ [[nodiscard]] std::string to_String() const { - return "[KEEPER_RECORDING_SERVICE_CONF: " + KEEPER_RECORDING_SERVICE_CONF.to_String() + + return "[CHRONO_KEEPER_CONFIGURATION : RECORDING_GROUP: "+ std::to_string(RECORDING_GROUP) + + ", KEEPER_RECORDING_SERVICE_CONF: " + KEEPER_RECORDING_SERVICE_CONF.to_String() + ", KEEPER_DATA_STORE_ADMIN_SERVICE_CONF: " + KEEPER_DATA_STORE_ADMIN_SERVICE_CONF.to_String() + ", VISOR_KEEPER_REGISTRY_SERVICE_CONF: " + VISOR_KEEPER_REGISTRY_SERVICE_CONF.to_String() + ", STORY_FILES_DIR:" + STORY_FILES_DIR + ", KEEPER_LOG_CONF:" + KEEPER_LOG_CONF.to_String() + "]"; @@ -206,6 +208,7 @@ typedef struct ExtractorConf_ typedef struct GrapherConf_ { + uint32_t RECORDING_GROUP; RPCProviderConf KEEPER_GRAPHER_DRAIN_SERVICE_CONF; RPCProviderConf DATA_STORE_ADMIN_SERVICE_CONF; RPCProviderConf VISOR_REGISTRY_SERVICE_CONF; @@ -215,7 +218,8 @@ typedef struct GrapherConf_ [[nodiscard]] std::string to_String() const { - return "[CHRONO_GRAPHER_CONFIGURATION : KEEPER_GRAPHER_DRAIN_SERVICE_CONF: " + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.to_String() + + return "[CHRONO_GRAPHER_CONFIGURATION : RECORDING_GROUP: "+ std::to_string(RECORDING_GROUP) + + ", KEEPER_GRAPHER_DRAIN_SERVICE_CONF: " + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.to_String() + ", DATA_STORE_ADMIN_SERVICE_CONF: " + DATA_STORE_ADMIN_SERVICE_CONF.to_String() + ", VISOR_REGISTRY_SERVICE_CONF: " + VISOR_REGISTRY_SERVICE_CONF.to_String() + ", LOG_CONF:" + LOG_CONF.to_String() + @@ -280,6 +284,7 @@ class ConfigurationManager VISOR_CONF.DELAYED_DATA_ADMIN_EXIT_IN_SECS = 3; /* Keeper-related configurations */ + KEEPER_CONF.RECORDING_GROUP = 0; KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.RPC_IMPLEMENTATION = CHRONOLOG_THALLIUM_SOCKETS; KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.PROTO_CONF = "ofi+sockets"; KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.IP = "127.0.0.1"; @@ -301,6 +306,7 @@ class ConfigurationManager KEEPER_CONF.STORY_FILES_DIR = "/tmp/"; /* Grapher-related configurations */ + GRAPHER_CONF.RECORDING_GROUP = 0; GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_IMPLEMENTATION = CHRONOLOG_THALLIUM_SOCKETS; GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF = "ofi+sockets"; GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP = "127.0.0.1"; @@ -801,7 +807,13 @@ class ConfigurationManager { json_object_object_foreach(json_conf, key, val) { - if(strcmp(key, "KeeperRecordingService") == 0) + if(strcmp(key, "RecordingGroup") == 0) + { + assert(json_object_is_type(val, json_type_object)); + int value = json_object_get_int(val); + KEEPER_CONF.RECORDING_GROUP = (value >= 0 ? value : 0); + } + else if(strcmp(key, "KeeperRecordingService") == 0) { assert(json_object_is_type(val, json_type_object)); json_object*keeper_recording_service_conf = json_object_object_get(json_conf, "KeeperRecordingService"); @@ -904,7 +916,6 @@ class ConfigurationManager void parseClientConf(json_object*json_conf) { - const char*string_value = json_object_get_string(json_conf); json_object_object_foreach(json_conf, key, val) { if(strcmp(key, "VisorClientPortalService") == 0) diff --git a/chrono_common/GrapherIdCard.h b/chrono_common/GrapherIdCard.h index fccef30a..df67fd3a 100644 --- a/chrono_common/GrapherIdCard.h +++ b/chrono_common/GrapherIdCard.h @@ -5,6 +5,8 @@ #include +#include "ServiceId.h" + // this class wrapps ChronoGrapher Process identification // that will be used by all the ChronoLog Processes // to both identofy the process and create RPC client channels @@ -15,25 +17,30 @@ namespace chronolog class GrapherIdCard { - + RecordingGroupId groupId; uint32_t ip_addr; //IP address as uint32_t in host byte order uint16_t port; //port number as uint16_t in host byte order uint16_t tl_provider_id; // id of thallium service provider public: - - - GrapherIdCard( uint32_t addr = 0, uint16_t a_port=0, uint16_t provider_id=0) - : ip_addr(addr), port(a_port),tl_provider_id(provider_id) + GrapherIdCard(RecordingGroupId group_id = 0, uint32_t addr = 0, uint16_t a_port = 0, uint16_t provider_id = 0) + : groupId(group_id) + , ip_addr(addr) + , port(a_port) + , tl_provider_id(provider_id) {} - GrapherIdCard( GrapherIdCard const& other) - : ip_addr(other.getIPaddr()), port(other.getPort()),tl_provider_id(other.getProviderId()) - {} + GrapherIdCard(GrapherIdCard const& other) + : groupId(other.getGroupId()) + , ip_addr(other.getIPaddr()) + , port(other.getPort()) + , tl_provider_id(other.getProviderId()) + {} ~GrapherIdCard()=default; - uint32_t getIPaddr() const {return ip_addr; } + RecordingGroupId getGroupId() const { return groupId; } + uint32_t getIPaddr() const { return ip_addr; } uint16_t getPort() const { return port;} uint16_t getProviderId () const { return tl_provider_id; } @@ -43,6 +50,7 @@ class GrapherIdCard template void serialize( SerArchiveT & serT) { + serT& groupId; serT & ip_addr; serT & port; serT & tl_provider_id; @@ -66,18 +74,25 @@ class GrapherIdCard inline bool operator==(chronolog::GrapherIdCard const& card1, chronolog::GrapherIdCard const& card2) { - return ( (card1.getIPaddr()==card2.getIPaddr() && card1.getPort() == card2.getPort() - && card1.getProviderId() == card2.getProviderId()) ? true : false ); - + return ((card1.getGroupId() == card2.getGroupId() && card1.getIPaddr() == card2.getIPaddr() && + card1.getPort() == card2.getPort() && card1.getProviderId() == card2.getProviderId()) + ? true + : false); } inline std::ostream & operator<< (std::ostream & out , chronolog::GrapherIdCard const & id_card) { std::string a_string; - out << "GrapherIdCard{" - <<":"< #include -#include "KeeperIdCard.h" + #include "GrapherIdCard.h" -#include "KeeperRegistrationMsg.h" namespace chronolog { @@ -17,12 +16,11 @@ class GrapherRegistrationMsg ServiceId adminServiceId; public: + GrapherRegistrationMsg(GrapherIdCard const& id_card = GrapherIdCard{0, 0, 0}, + ServiceId const& admin_service_id = ServiceId{0, 0, 0}) + : grapherIdCard(id_card) + , adminServiceId(admin_service_id) - - GrapherRegistrationMsg(GrapherIdCard const &id_card = GrapherIdCard{0, 0, 0} - , ServiceId const &admin_service_id = ServiceId{0, 0, 0}) - : grapherIdCard(id_card) - , adminServiceId(admin_service_id) {} ~GrapherRegistrationMsg() = default; diff --git a/chrono_common/KeeperIdCard.h b/chrono_common/KeeperIdCard.h index c8d01fc3..369bcc1e 100644 --- a/chrono_common/KeeperIdCard.h +++ b/chrono_common/KeeperIdCard.h @@ -5,6 +5,8 @@ #include +#include "ServiceId.h" + // this class wrapps ChronoKeeper Process identification // that will be used by all the ChronoLog Processes // to both identofy the Keepr process and create RPC client channels @@ -20,14 +22,12 @@ typedef uint32_t in_addr_t; typedef uint16_t in_port_t; typedef std::pair service_endpoint; -// KeeperGroup is the logical grouping of KeeperProcesses -typedef uint64_t KeeperGroupId; - +typedef uint32_t KeeperGroupId; class KeeperIdCard { - uint64_t keeper_group_id; + RecordingGroupId keeper_group_id; uint32_t ip_addr; //IP address as uint32_t in host byte order uint16_t port; //port number as uint16_t in host byte order uint16_t tl_provider_id; // id of thallium service provider @@ -35,7 +35,7 @@ class KeeperIdCard public: - KeeperIdCard( uint64_t group_id = 0, uint32_t addr = 0, uint16_t a_port=0, uint16_t provider_id=0) + KeeperIdCard( uint32_t group_id = 0, uint32_t addr = 0, uint16_t a_port=0, uint16_t provider_id=0) : keeper_group_id(group_id), ip_addr(addr), port(a_port),tl_provider_id(provider_id) {} @@ -45,7 +45,7 @@ class KeeperIdCard ~KeeperIdCard()=default; - uint64_t getGroupId() const { return keeper_group_id; } + RecordingGroupId getGroupId() const { return keeper_group_id; } uint32_t getIPaddr() const {return ip_addr; } uint16_t getPort() const { return port;} uint16_t getProviderId () const { return tl_provider_id; } @@ -84,6 +84,7 @@ inline bool operator==(chronolog::KeeperIdCard const& card1, chronolog::KeeperId && card1.getProviderId() == card2.getProviderId()) ? true : false ); } + inline std::ostream & operator<< (std::ostream & out , chronolog::KeeperIdCard const & keeper_id_card) { std::string a_string; @@ -93,5 +94,12 @@ inline std::ostream & operator<< (std::ostream & out , chronolog::KeeperIdCard c return out; } +inline std::string& operator+= (std::string& a_string, chronolog::KeeperIdCard const& keeper_id_card) +{ + a_string += std::string("KeeperIdCard{") + std::to_string(keeper_id_card.getGroupId()) + ":" + + keeper_id_card.getIPasDottedString(a_string) + ":" + std::to_string(keeper_id_card.getPort()) + ":" + + std::to_string(keeper_id_card.getProviderId()) + "}"; + return a_string; +} #endif diff --git a/chrono_common/KeeperRegistrationMsg.h b/chrono_common/KeeperRegistrationMsg.h index cb0f5a27..c6d7fb1f 100644 --- a/chrono_common/KeeperRegistrationMsg.h +++ b/chrono_common/KeeperRegistrationMsg.h @@ -3,46 +3,13 @@ #include #include -#include "KeeperIdCard.h" +#include "KeeperIdCard.h" +#include "ServiceId.h" namespace chronolog { -class ServiceId -{ -public: - ServiceId(uint32_t addr, uint16_t a_port, uint16_t a_provider_id): ip_addr(addr), port(a_port), provider_id( - a_provider_id) - {} - - ~ServiceId() = default; - - uint32_t ip_addr; //32int IP representation in host notation - uint16_t port; //16int port representation in host notation - uint16_t provider_id; //thalium provider id - - template - void serialize(SerArchiveT &serT) - { - serT&ip_addr; - serT&port; - serT&provider_id; - } - - std::string &getIPasDottedString(std::string &a_string) const - { - - char buffer[INET_ADDRSTRLEN]; - // convert ip from host to network byte order uint32_t - uint32_t ip_net_order = htonl(ip_addr); - // convert network byte order uint32_t to a dotted string - if(NULL != inet_ntop(AF_INET, &ip_net_order, buffer, INET_ADDRSTRLEN)) - { a_string += std::string(buffer); } - return a_string; - } -}; - class KeeperRegistrationMsg { @@ -50,11 +17,10 @@ class KeeperRegistrationMsg ServiceId adminServiceId; public: - - - KeeperRegistrationMsg(KeeperIdCard const &keeper_card = KeeperIdCard{0, 0, 0} - , ServiceId const &admin_service_id = ServiceId{0, 0, 0}): keeperIdCard(keeper_card) - , adminServiceId(admin_service_id) + KeeperRegistrationMsg(KeeperIdCard const& keeper_card = KeeperIdCard{0, 0, 0}, + ServiceId const& admin_service_id = ServiceId{0, 0, 0}) + : keeperIdCard(keeper_card) + , adminServiceId(admin_service_id) {} ~KeeperRegistrationMsg() = default; @@ -76,14 +42,6 @@ class KeeperRegistrationMsg }//namespace -inline std::ostream &operator<<(std::ostream &out, chronolog::ServiceId const serviceId) -{ - std::string a_string; - out << "{" << serviceId.getIPasDottedString(a_string) << ":" << serviceId.port << ":" << serviceId.provider_id - << "}"; - return out; -} - inline std::ostream &operator<<(std::ostream &out, chronolog::KeeperRegistrationMsg const &msg) { out << "KeeperRegistrationMsg{" << msg.getKeeperIdCard() << "}{admin:" << msg.getAdminServiceId() << "}"; diff --git a/chrono_common/ServiceId.h b/chrono_common/ServiceId.h new file mode 100644 index 00000000..b10c894d --- /dev/null +++ b/chrono_common/ServiceId.h @@ -0,0 +1,66 @@ +#ifndef CHRONO_SERVICE_ID_H +#define CHRONO_SERVICE_ID_H + +#include +#include + + +namespace chronolog +{ + +typedef uint32_t RecordingGroupId; + +class ServiceId +{ +public: + ServiceId(uint32_t addr, uint16_t a_port, uint16_t a_provider_id) + : ip_addr(addr) + , port(a_port) + , provider_id(a_provider_id) + {} + + ~ServiceId() = default; + + uint32_t ip_addr; //32int IP representation in host notation + uint16_t port; //16int port representation in host notation + uint16_t provider_id;//thalium provider id + + template + void serialize(SerArchiveT& serT) + { + serT& ip_addr; + serT& port; + serT& provider_id; + } + + std::string& getIPasDottedString(std::string& a_string) const + { + + char buffer[INET_ADDRSTRLEN]; + // convert ip from host to network byte order uint32_t + uint32_t ip_net_order = htonl(ip_addr); + // convert network byte order uint32_t to a dotted string + if(NULL != inet_ntop(AF_INET, &ip_net_order, buffer, INET_ADDRSTRLEN)) { a_string += std::string(buffer); } + return a_string; + } +}; + +}//namespace chronolog + + +inline std::ostream& operator<<(std::ostream& out, chronolog::ServiceId const serviceId) +{ + std::string a_string; + out << "ServiceId{" << serviceId.getIPasDottedString(a_string) << ":" << serviceId.port << ":" << serviceId.provider_id + << "}"; + return out; +} + +inline std::string& operator+= (std::string& a_string, chronolog::ServiceId const& serviceId) +{ + a_string += std::string("ServiceId{") + serviceId.getIPasDottedString(a_string) + ":" + std::to_string(serviceId.port) + ":" + + std::to_string(serviceId.provider_id) + "}"; + return a_string; +} + +#endif diff --git a/default_conf.json.in b/default_conf.json.in index cce60df0..fc0fdba1 100644 --- a/default_conf.json.in +++ b/default_conf.json.in @@ -41,6 +41,7 @@ "delayed_data_admin_exit_in_secs": 3 }, "chrono_keeper": { + "RecordingGroup": 7, "KeeperRecordingService": { "rpc": { "rpc_implementation": "Thallium_sockets", @@ -91,6 +92,7 @@ "story_files_dir": "/tmp/" }, "chrono_grapher": { + "RecordingGroup": 7, "KeeperGrapherDrainService": { "rpc": { "rpc_implementation": "Thallium_sockets",