Skip to content

Commit

Permalink
Merge pull request #153 from grc-iit/126-grapher-registration-with-ch…
Browse files Browse the repository at this point in the history
…ronovisor

126 grapher registration with chronovisor, RecordingGroup, uniform distribution of Stories to REcordingGroups
  • Loading branch information
ibrodkin authored Apr 26, 2024
2 parents fe87690 + 06295ec commit 763dfb7
Show file tree
Hide file tree
Showing 19 changed files with 1,080 additions and 415 deletions.
94 changes: 48 additions & 46 deletions ChronoGrapher/ChronoGrapher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ int main(int argc, char**argv)
LOG_INFO("[ChronoGrapher] DataStoreAdminService started successfully.");

// Instantiate GrapherRecordingService
chronolog::RecordingGroupId recording_group_id = confManager.GRAPHER_CONF.RECORDING_GROUP;
std::string RECORDING_SERVICE_PROTOCOL = confManager.GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF;
std::string RECORDING_SERVICE_IP = confManager.GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP;
uint16_t RECORDING_SERVICE_PORT = confManager.GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT;
Expand All @@ -125,8 +126,8 @@ int main(int argc, char**argv)
LOG_INFO("[ChronoGrapher] RecordingService started successfully.");

// create GrapherIdCard to identify this Grapher process in ChronoVisor's Registry
chronolog::GrapherIdCard processIdCard(recording_endpoint.first, recording_endpoint.second
, recording_service_provider_id);
chronolog::GrapherIdCard processIdCard(recording_group_id, recording_endpoint.first, recording_endpoint.second,
recording_service_provider_id);

std::stringstream process_id_string;
process_id_string << processIdCard;
Expand Down Expand Up @@ -202,46 +203,47 @@ int main(int argc, char**argv)
return (-1);
}

// /// RegistryClient SetUp _____________________________________________________________________________________
// // create RegistryClient and register the new Recording service with the Registry
// std::string REGISTRY_SERVICE_NA_STRING =
// confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.PROTO_CONF + "://" +
// confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.IP + ":" +
// std::to_string(confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.BASE_PORT);
//
// uint16_t REGISTRY_SERVICE_PROVIDER_ID = confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.SERVICE_PROVIDER_ID;
//
// chronolog::GrapherRegistryClient* grapherRegistryClient = chronolog::GrapherRegistryClient::CreateRegistryClient(
// *dataAdminEngine, REGISTRY_SERVICE_NA_STRING, REGISTRY_SERVICE_PROVIDER_ID);
//
// if(nullptr == grapherRegistryClient)
// {
// LOG_CRITICAL("[ChronoGrapher] failed to create RegistryClient; exiting");
// delete grapherRecordingService;
// delete keeperDataAdminService;
// return (-1);
// }
//
// /// Registration with ChronoVisor __________________________________________________________________________________
// // try to register with chronoVisor a few times than log ERROR and exit...
// int registration_status = chronolog::CL_ERR_UNKNOWN;
// int retries = 5;
// while((chronolog::CL_SUCCESS != registration_status) && (retries > 0))
// {
// registration_status = grapherRegistryClient->send_register_msg(
// chronolog::GrapherRegistrationMsg(processIdCard, collectionServiceId));
// retries--;
// }
//
// if(chronolog::CL_SUCCESS != registration_status)
// {
// LOG_CRITICAL("[ChronoGrapher] Failed to register with ChronoVisor after multiple attempts. Exiting.");
// delete grapherRegistryClient;
// delete grapherRecordingService;
// delete keeperDataAdminService;
// return (-1);
// }
// LOG_INFO("[ChronoGrapher] Successfully registered with ChronoVisor.");
/// RegistryClient SetUp _____________________________________________________________________________________
// create RegistryClient and register the new Recording service with the Registry
std::string REGISTRY_SERVICE_NA_STRING =
confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.PROTO_CONF + "://" +
confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.IP + ":" +
std::to_string(confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.BASE_PORT);

uint16_t REGISTRY_SERVICE_PROVIDER_ID = confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.SERVICE_PROVIDER_ID;

chronolog::GrapherRegistryClient* grapherRegistryClient = chronolog::GrapherRegistryClient::CreateRegistryClient(
*dataAdminEngine, REGISTRY_SERVICE_NA_STRING, REGISTRY_SERVICE_PROVIDER_ID);

if(nullptr == grapherRegistryClient)
{
LOG_CRITICAL("[ChronoGrapher] failed to create RegistryClient; exiting");
delete grapherRecordingService;
delete keeperDataAdminService;
return (-1);
}

/// Registration with ChronoVisor __________________________________________________________________________________
// try to register with chronoVisor a few times than log ERROR and exit...
int registration_status = chronolog::CL_ERR_UNKNOWN;
int retries = 5;
while((chronolog::CL_SUCCESS != registration_status) && (retries > 0))
{
registration_status = grapherRegistryClient->send_register_msg(
chronolog::GrapherRegistrationMsg(processIdCard, collectionServiceId));
sleep(5);
retries--;
}

if(chronolog::CL_SUCCESS != registration_status)
{
LOG_CRITICAL("[ChronoGrapher] Failed to register with ChronoVisor after multiple attempts. Exiting.");
delete grapherRegistryClient;
delete grapherRecordingService;
delete keeperDataAdminService;
return (-1);
}
LOG_INFO("[ChronoGrapher] Successfully registered with ChronoVisor.");

/// Start data collection and extraction threads ___________________________________________________________________
// services are successfully created and keeper process had registered with ChronoVisor
Expand All @@ -263,10 +265,10 @@ int main(int argc, char**argv)
sleep(30);
}

// /// Unregister from ChronoVisor ____________________________________________________________________________________
// // Unregister from the chronoVisor so that no new story requests would be coming
// grapherRegistryClient->send_unregister_msg(processIdCard);
// delete grapherRegistryClient;
/// Unregister from ChronoVisor ____________________________________________________________________________________
// Unregister from the chronoVisor so that no new story requests would be coming
grapherRegistryClient->send_unregister_msg(processIdCard);
delete grapherRegistryClient;

/// Stop services and shut down ____________________________________________________________________________________
LOG_INFO("[ChronoGrapher] Initiating shutdown procedures.");
Expand Down
5 changes: 4 additions & 1 deletion ChronoKeeper/CSVFileChunkExtractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <thallium.hpp>

#include "chronolog_types.h"
#include "chronolog_errcode.h"
#include "KeeperIdCard.h"
#include "CSVFileChunkExtractor.h"

Expand Down Expand Up @@ -44,6 +45,8 @@ int chronolog::CSVFileStoryChunkExtractor::processStoryChunk(StoryChunk*story_ch
chunk_fstream << event << std::endl;
}
chunk_fstream.close();
LOG_INFO("[CSVFileStoryChunkExtractor] Finished processing StoryChunk. File={}", chunk_filename);
LOG_DEBUG("[CSVFileStoryChunkExtractor] Finished processing StoryChunk. File={}", chunk_filename);

return chronolog::CL_SUCCESS;
}

4 changes: 1 addition & 3 deletions ChronoKeeper/ChronoKeeperInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
#include "cmd_arg_parse.h"
#include "StoryChunkExtractorRDMA.h"

#define KEEPER_GROUP_ID 7

// we will be using a combination of the uint32_t representation of the service IP address
// and uint16_t representation of the port number
int
Expand Down Expand Up @@ -88,7 +86,6 @@ int main(int argc, char**argv)

// Instantiate ChronoKeeper MemoryDataStore
// instantiate DataStoreAdminService
uint64_t keeper_group_id = KEEPER_GROUP_ID;

/// DataStoreAdminService setup ____________________________________________________________________________________
std::string datastore_service_ip = confManager.KEEPER_CONF.KEEPER_DATA_STORE_ADMIN_SERVICE_CONF.RPC_CONF.IP;
Expand Down Expand Up @@ -132,6 +129,7 @@ int main(int argc, char**argv)
LOG_INFO("[ChronoKeeperInstance] KeeperRecordingService started successfully.");

// create KeeperIdCard to identify this Keeper process in ChronoVisor's KeeperRegistry
chronolog::RecordingGroupId keeper_group_id = confManager.KEEPER_CONF.RECORDING_GROUP;
chronolog::KeeperIdCard keeperIdCard(keeper_group_id, recording_endpoint.first, recording_endpoint.second
, recording_service_provider_id);

Expand Down
5 changes: 1 addition & 4 deletions ChronoKeeper/StoryChunkExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ class StoryChunkExtractorBase

void drainExtractionQueue();

virtual int processStoryChunk(StoryChunk*) //=0
{
LOG_WARNING("[StoryChunkExtraction] Base processStoryChunk method called. Derived class should implement specific logic.");
}
virtual int processStoryChunk(StoryChunk*) = 0;

void startExtractionThreads(int);

Expand Down
1 change: 1 addition & 0 deletions ChronoVisor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ target_sources(chronovisor_server PRIVATE
./src/ClientRegistryRecord.cpp
./src/ChronicleMetaDirectory.cpp
./src/KeeperRegistry.cpp
../chrono_common/ConfigurationManager.cpp
../ChronoAPI/ChronoLog/src/city.cpp
../ChronoAPI/ChronoLog/src/log.cpp)

Expand Down
6 changes: 2 additions & 4 deletions ChronoVisor/include/ChronicleMetaDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,15 @@ class ChronicleMetaDirectory

int destroy_chronicle(const std::string &name);

//int create_story(std::string const& chronicle_name, const std::string& story_name,
// const std::unordered_map<std::string, std::string>& attrs);
int destroy_story(std::string const &chronicle_name, const std::string &story_name);

int
acquire_story(chronolog::ClientId const &client_id, const std::string &chronicle_name, const std::string &story_name
, const std::unordered_map <std::string, std::string> &attrs, int &flags, StoryId &, bool &);
, const std::unordered_map <std::string, std::string> &attrs, int &flags, StoryId &);

int
release_story(chronolog::ClientId const &client_id, const std::string &chronicle_name, const std::string &story_name
, StoryId &, bool &);
, StoryId &);

int get_chronicle_attr(std::string const &name, const std::string &key, std::string &value);

Expand Down
Loading

0 comments on commit 763dfb7

Please sign in to comment.