Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

126 grapher registration with chronovisor #153

Merged
merged 42 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1285245
remove deprecated ChronoKeeper/chrono_common/*
ibrodkin Mar 4, 2024
4607871
corrected include references
ibrodkin Mar 7, 2024
8c80d4e
StoryChunk bound: start
ibrodkin Mar 7, 2024
6cde88b
chrono_grapher POC
ibrodkin Mar 13, 2024
8b41747
GrapherRecordingService
ibrodkin Mar 20, 2024
1fdef9b
StoryChunk serialization
ibrodkin Mar 22, 2024
72f40e1
Grapher Configuration and GrapherRegClient
ibrodkin Mar 27, 2024
d9baa37
grapher registration checkpoint
ibrodkin Apr 3, 2024
1405bb4
Grapher registration checkpoint
ibrodkin Apr 5, 2024
4edff43
Use getent to get Visor IP
fkengun Mar 12, 2024
2c5af08
Make sure there is only one Visor
fkengun Mar 12, 2024
aae479d
Keepers need to have service_ip for KeeperRecordingService and Keeper…
fkengun Mar 13, 2024
6cece7a
Add conf_file command line argument
fkengun Mar 21, 2024
b74d952
Change default log file size to 1MB
fkengun Mar 21, 2024
bfadff4
Update variables on non-default paths from command line
fkengun Mar 26, 2024
45bdc41
FEAT: Updated README.md
EnekoGonzalez3 Mar 28, 2024
aeee569
Delete unnecessary horizontal lines at README.md
EnekoGonzalez3 Mar 28, 2024
b1fda0e
Update spack and website links
EnekoGonzalez3 Mar 28, 2024
6358cfb
Updated readme with requested changes
EnekoGonzalez3 Apr 2, 2024
28b3e08
Updated readme with requested changes
EnekoGonzalez3 Apr 4, 2024
c6b6cef
Move environment installation after checking the status
EnekoGonzalez3 Apr 5, 2024
655b174
StoryChunk serialization fixed
ibrodkin Apr 5, 2024
209969d
CMakeLists.txt changes
ibrodkin Apr 5, 2024
b5d615b
Grapher Configuration and GrapherRegClient
ibrodkin Mar 27, 2024
4250ac1
RecordingGroupId, ServiceID.h and registration message reorg
ibrodkin Apr 11, 2024
bd7c226
grapher delayed exit
ibrodkin Apr 11, 2024
b1ba901
moved KeeperProcess map to RecordingGroup level
ibrodkin Apr 11, 2024
e8c103f
concept of activeGroups, uniform_group_distribution, mersene twister …
ibrodkin Apr 17, 2024
d8d3488
reworked recordingGroup notifications about story Start/stop
ibrodkin Apr 18, 2024
25994b9
Fix typo in help page
fkengun Apr 10, 2024
cb83038
Add -l for local deployment, and -e for verbose output
fkengun Apr 11, 2024
6cec659
Print default values in usage output
fkengun Apr 11, 2024
df82027
Grapher Configuration and GrapherRegClient
ibrodkin Mar 27, 2024
c3238f9
Merge branch 'develop' into 126-grapher-registration-with-chronovisor
ibrodkin Apr 18, 2024
dd4dd21
HOTFIX: keeper chunk_filename=storyId.chunkStartTime.keeperIP.port.csv
ibrodkin Apr 18, 2024
d727547
added operator +=(string,..) for ServiceId, KeeperIdCard, GrapherIdCard
ibrodkin Apr 23, 2024
4fd6c2b
better tracking of recording group activity status
ibrodkin Apr 24, 2024
77677e4
Merge branch 'develop' into 126-grapher-registration-with-chronovisor
ibrodkin Apr 25, 2024
1b49647
fixed no-return in CSVFileExtractor
ibrodkin Apr 25, 2024
f25955b
fixed merging issues
ibrodkin Apr 25, 2024
e623b7b
uint64_t story_start_time
ibrodkin Apr 25, 2024
06295ec
addeed RecordingGroup to configuration
ibrodkin Apr 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 48 additions & 46 deletions ChronoGrapher/ChronoGrapher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ int main(int argc, char**argv)
LOG_INFO("[ChronoGrapher] DataStoreAdminService started successfully.");

// Instantiate GrapherRecordingService
chronolog::RecordingGroupId recording_group_id = confManager.GRAPHER_CONF.RECORDING_GROUP;
std::string RECORDING_SERVICE_PROTOCOL = confManager.GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF;
std::string RECORDING_SERVICE_IP = confManager.GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP;
uint16_t RECORDING_SERVICE_PORT = confManager.GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT;
Expand All @@ -125,8 +126,8 @@ int main(int argc, char**argv)
LOG_INFO("[ChronoGrapher] RecordingService started successfully.");

// create GrapherIdCard to identify this Grapher process in ChronoVisor's Registry
chronolog::GrapherIdCard processIdCard(recording_endpoint.first, recording_endpoint.second
, recording_service_provider_id);
chronolog::GrapherIdCard processIdCard(recording_group_id, recording_endpoint.first, recording_endpoint.second,
recording_service_provider_id);

std::stringstream process_id_string;
process_id_string << processIdCard;
Expand Down Expand Up @@ -202,46 +203,47 @@ int main(int argc, char**argv)
return (-1);
}

// /// RegistryClient SetUp _____________________________________________________________________________________
// // create RegistryClient and register the new Recording service with the Registry
// std::string REGISTRY_SERVICE_NA_STRING =
// confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.PROTO_CONF + "://" +
// confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.IP + ":" +
// std::to_string(confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.BASE_PORT);
//
// uint16_t REGISTRY_SERVICE_PROVIDER_ID = confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.SERVICE_PROVIDER_ID;
//
// chronolog::GrapherRegistryClient* grapherRegistryClient = chronolog::GrapherRegistryClient::CreateRegistryClient(
// *dataAdminEngine, REGISTRY_SERVICE_NA_STRING, REGISTRY_SERVICE_PROVIDER_ID);
//
// if(nullptr == grapherRegistryClient)
// {
// LOG_CRITICAL("[ChronoGrapher] failed to create RegistryClient; exiting");
// delete grapherRecordingService;
// delete keeperDataAdminService;
// return (-1);
// }
//
// /// Registration with ChronoVisor __________________________________________________________________________________
// // try to register with chronoVisor a few times than log ERROR and exit...
// int registration_status = chronolog::CL_ERR_UNKNOWN;
// int retries = 5;
// while((chronolog::CL_SUCCESS != registration_status) && (retries > 0))
// {
// registration_status = grapherRegistryClient->send_register_msg(
// chronolog::GrapherRegistrationMsg(processIdCard, collectionServiceId));
// retries--;
// }
//
// if(chronolog::CL_SUCCESS != registration_status)
// {
// LOG_CRITICAL("[ChronoGrapher] Failed to register with ChronoVisor after multiple attempts. Exiting.");
// delete grapherRegistryClient;
// delete grapherRecordingService;
// delete keeperDataAdminService;
// return (-1);
// }
// LOG_INFO("[ChronoGrapher] Successfully registered with ChronoVisor.");
/// RegistryClient SetUp _____________________________________________________________________________________
// create RegistryClient and register the new Recording service with the Registry
std::string REGISTRY_SERVICE_NA_STRING =
confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.PROTO_CONF + "://" +
confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.IP + ":" +
std::to_string(confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.BASE_PORT);

uint16_t REGISTRY_SERVICE_PROVIDER_ID = confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.SERVICE_PROVIDER_ID;

chronolog::GrapherRegistryClient* grapherRegistryClient = chronolog::GrapherRegistryClient::CreateRegistryClient(
*dataAdminEngine, REGISTRY_SERVICE_NA_STRING, REGISTRY_SERVICE_PROVIDER_ID);

if(nullptr == grapherRegistryClient)
{
LOG_CRITICAL("[ChronoGrapher] failed to create RegistryClient; exiting");
delete grapherRecordingService;
delete keeperDataAdminService;
return (-1);
}

/// Registration with ChronoVisor __________________________________________________________________________________
// try to register with chronoVisor a few times than log ERROR and exit...
int registration_status = chronolog::CL_ERR_UNKNOWN;
int retries = 5;
while((chronolog::CL_SUCCESS != registration_status) && (retries > 0))
{
registration_status = grapherRegistryClient->send_register_msg(
chronolog::GrapherRegistrationMsg(processIdCard, collectionServiceId));
sleep(5);
retries--;
}

if(chronolog::CL_SUCCESS != registration_status)
{
LOG_CRITICAL("[ChronoGrapher] Failed to register with ChronoVisor after multiple attempts. Exiting.");
delete grapherRegistryClient;
delete grapherRecordingService;
delete keeperDataAdminService;
return (-1);
}
LOG_INFO("[ChronoGrapher] Successfully registered with ChronoVisor.");

/// Start data collection and extraction threads ___________________________________________________________________
// services are successfully created and keeper process had registered with ChronoVisor
Expand All @@ -263,10 +265,10 @@ int main(int argc, char**argv)
sleep(30);
}

// /// Unregister from ChronoVisor ____________________________________________________________________________________
// // Unregister from the chronoVisor so that no new story requests would be coming
// grapherRegistryClient->send_unregister_msg(processIdCard);
// delete grapherRegistryClient;
/// Unregister from ChronoVisor ____________________________________________________________________________________
// Unregister from the chronoVisor so that no new story requests would be coming
grapherRegistryClient->send_unregister_msg(processIdCard);
delete grapherRegistryClient;

/// Stop services and shut down ____________________________________________________________________________________
LOG_INFO("[ChronoGrapher] Initiating shutdown procedures.");
Expand Down
5 changes: 4 additions & 1 deletion ChronoKeeper/CSVFileChunkExtractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <thallium.hpp>

#include "chronolog_types.h"
#include "chronolog_errcode.h"
#include "KeeperIdCard.h"
#include "CSVFileChunkExtractor.h"

Expand Down Expand Up @@ -44,6 +45,8 @@ int chronolog::CSVFileStoryChunkExtractor::processStoryChunk(StoryChunk*story_ch
chunk_fstream << event << std::endl;
}
chunk_fstream.close();
LOG_INFO("[CSVFileStoryChunkExtractor] Finished processing StoryChunk. File={}", chunk_filename);
LOG_DEBUG("[CSVFileStoryChunkExtractor] Finished processing StoryChunk. File={}", chunk_filename);

return chronolog::CL_SUCCESS;
}

4 changes: 1 addition & 3 deletions ChronoKeeper/ChronoKeeperInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
#include "cmd_arg_parse.h"
#include "StoryChunkExtractorRDMA.h"

#define KEEPER_GROUP_ID 7

// we will be using a combination of the uint32_t representation of the service IP address
// and uint16_t representation of the port number
int
Expand Down Expand Up @@ -88,7 +86,6 @@ int main(int argc, char**argv)

// Instantiate ChronoKeeper MemoryDataStore
// instantiate DataStoreAdminService
uint64_t keeper_group_id = KEEPER_GROUP_ID;

/// DataStoreAdminService setup ____________________________________________________________________________________
std::string datastore_service_ip = confManager.KEEPER_CONF.KEEPER_DATA_STORE_ADMIN_SERVICE_CONF.RPC_CONF.IP;
Expand Down Expand Up @@ -132,6 +129,7 @@ int main(int argc, char**argv)
LOG_INFO("[ChronoKeeperInstance] KeeperRecordingService started successfully.");

// create KeeperIdCard to identify this Keeper process in ChronoVisor's KeeperRegistry
chronolog::RecordingGroupId keeper_group_id = confManager.KEEPER_CONF.RECORDING_GROUP;
chronolog::KeeperIdCard keeperIdCard(keeper_group_id, recording_endpoint.first, recording_endpoint.second
, recording_service_provider_id);

Expand Down
5 changes: 1 addition & 4 deletions ChronoKeeper/StoryChunkExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ class StoryChunkExtractorBase

void drainExtractionQueue();

virtual int processStoryChunk(StoryChunk*) //=0
{
LOG_WARNING("[StoryChunkExtraction] Base processStoryChunk method called. Derived class should implement specific logic.");
}
virtual int processStoryChunk(StoryChunk*) = 0;

void startExtractionThreads(int);

Expand Down
1 change: 1 addition & 0 deletions ChronoVisor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ target_sources(chronovisor_server PRIVATE
./src/ClientRegistryRecord.cpp
./src/ChronicleMetaDirectory.cpp
./src/KeeperRegistry.cpp
../chrono_common/ConfigurationManager.cpp
../ChronoAPI/ChronoLog/src/city.cpp
../ChronoAPI/ChronoLog/src/log.cpp)

Expand Down
6 changes: 2 additions & 4 deletions ChronoVisor/include/ChronicleMetaDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,15 @@ class ChronicleMetaDirectory

int destroy_chronicle(const std::string &name);

//int create_story(std::string const& chronicle_name, const std::string& story_name,
// const std::unordered_map<std::string, std::string>& attrs);
int destroy_story(std::string const &chronicle_name, const std::string &story_name);

int
acquire_story(chronolog::ClientId const &client_id, const std::string &chronicle_name, const std::string &story_name
, const std::unordered_map <std::string, std::string> &attrs, int &flags, StoryId &, bool &);
, const std::unordered_map <std::string, std::string> &attrs, int &flags, StoryId &);

int
release_story(chronolog::ClientId const &client_id, const std::string &chronicle_name, const std::string &story_name
, StoryId &, bool &);
, StoryId &);

int get_chronicle_attr(std::string const &name, const std::string &key, std::string &value);

Expand Down
Loading