Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

126 grapher registration with chronovisor #153

Merged
merged 42 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1285245
remove deprecated ChronoKeeper/chrono_common/*
ibrodkin Mar 4, 2024
4607871
corrected include references
ibrodkin Mar 7, 2024
8c80d4e
StoryChunk bound: start
ibrodkin Mar 7, 2024
6cde88b
chrono_grapher POC
ibrodkin Mar 13, 2024
8b41747
GrapherRecordingService
ibrodkin Mar 20, 2024
1fdef9b
StoryChunk serialization
ibrodkin Mar 22, 2024
72f40e1
Grapher Configuration and GrapherRegClient
ibrodkin Mar 27, 2024
d9baa37
grapher registration checkpoint
ibrodkin Apr 3, 2024
1405bb4
Grapher registration checkpoint
ibrodkin Apr 5, 2024
4edff43
Use getent to get Visor IP
fkengun Mar 12, 2024
2c5af08
Make sure there is only one Visor
fkengun Mar 12, 2024
aae479d
Keepers need to have service_ip for KeeperRecordingService and Keeper…
fkengun Mar 13, 2024
6cece7a
Add conf_file command line argument
fkengun Mar 21, 2024
b74d952
Change default log file size to 1MB
fkengun Mar 21, 2024
bfadff4
Update variables on non-default paths from command line
fkengun Mar 26, 2024
45bdc41
FEAT: Updated README.md
EnekoGonzalez3 Mar 28, 2024
aeee569
Delete unnecessary horizontal lines at README.md
EnekoGonzalez3 Mar 28, 2024
b1fda0e
Update spack and website links
EnekoGonzalez3 Mar 28, 2024
6358cfb
Updated readme with requested changes
EnekoGonzalez3 Apr 2, 2024
28b3e08
Updated readme with requested changes
EnekoGonzalez3 Apr 4, 2024
c6b6cef
Move environment installation after checking the status
EnekoGonzalez3 Apr 5, 2024
655b174
StoryChunk serialization fixed
ibrodkin Apr 5, 2024
209969d
CMakeLists.txt changes
ibrodkin Apr 5, 2024
b5d615b
Grapher Configuration and GrapherRegClient
ibrodkin Mar 27, 2024
4250ac1
RecordingGroupId, ServiceID.h and registration message reorg
ibrodkin Apr 11, 2024
bd7c226
grapher delayed exit
ibrodkin Apr 11, 2024
b1ba901
moved KeeperProcess map to RecordingGroup level
ibrodkin Apr 11, 2024
e8c103f
concept of activeGroups, uniform_group_distribution, mersene twister …
ibrodkin Apr 17, 2024
d8d3488
reworked recordingGroup notifications about story Start/stop
ibrodkin Apr 18, 2024
25994b9
Fix typo in help page
fkengun Apr 10, 2024
cb83038
Add -l for local deployment, and -e for verbose output
fkengun Apr 11, 2024
6cec659
Print default values in usage output
fkengun Apr 11, 2024
df82027
Grapher Configuration and GrapherRegClient
ibrodkin Mar 27, 2024
c3238f9
Merge branch 'develop' into 126-grapher-registration-with-chronovisor
ibrodkin Apr 18, 2024
dd4dd21
HOTFIX: keeper chunk_filename=storyId.chunkStartTime.keeperIP.port.csv
ibrodkin Apr 18, 2024
d727547
added operator +=(string,..) for ServiceId, KeeperIdCard, GrapherIdCard
ibrodkin Apr 23, 2024
4fd6c2b
better tracking of recording group activity status
ibrodkin Apr 24, 2024
77677e4
Merge branch 'develop' into 126-grapher-registration-with-chronovisor
ibrodkin Apr 25, 2024
1b49647
fixed no-return in CSVFileExtractor
ibrodkin Apr 25, 2024
f25955b
fixed merging issues
ibrodkin Apr 25, 2024
e623b7b
uint64_t story_start_time
ibrodkin Apr 25, 2024
06295ec
addeed RecordingGroup to configuration
ibrodkin Apr 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ChronoGrapher/ChronoGrapher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ int main(int argc, char**argv)
LOG_INFO("[ChronoGrapher] DataStoreAdminService started successfully.");

// Instantiate GrapherRecordingService
chronolog::RecordingGroupId recording_group_id = 7;
std::string RECORDING_SERVICE_PROTOCOL = confManager.GRAPHER_CONF.RECORDING_SERVICE_CONF.PROTO_CONF;
std::string RECORDING_SERVICE_IP = confManager.GRAPHER_CONF.RECORDING_SERVICE_CONF.IP;
uint16_t RECORDING_SERVICE_PORT = confManager.GRAPHER_CONF.RECORDING_SERVICE_CONF.BASE_PORT;
Expand All @@ -125,8 +126,8 @@ int main(int argc, char**argv)
LOG_INFO("[ChronoGrapher] RecordingService started successfully.");

// create GrapherIdCard to identify this Grapher process in ChronoVisor's Registry
chronolog::GrapherIdCard processIdCard(recording_endpoint.first, recording_endpoint.second
, recording_service_provider_id);
chronolog::GrapherIdCard processIdCard(recording_group_id, recording_endpoint.first, recording_endpoint.second,
recording_service_provider_id);

std::stringstream process_id_string;
process_id_string << processIdCard;
Expand Down
8 changes: 6 additions & 2 deletions ChronoKeeper/CSVFileChunkExtractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ chronolog::CSVFileStoryChunkExtractor::~CSVFileStoryChunkExtractor()
void chronolog::CSVFileStoryChunkExtractor::processStoryChunk(chronolog::StoryChunk*story_chunk)
{
std::ofstream chunk_fstream;

// chunk_filename: /rootDirectory/storyId.chunkStartTime.keeperIP.port.csv

std::string chunk_filename(rootDirectory);
chunk_filename += "/" + std::to_string(story_chunk->getStoryId()) + "." + std::to_string(story_chunk->getStartTime() / 1000000000) + ".";
keeperIdCard.getIPasDottedString(chunk_filename);
chunk_filename += "." + std::to_string(story_chunk->getStoryId()) + "." +
std::to_string(story_chunk->getStartTime() / 1000000000) + ".csv";
chunk_filename += "." + std::to_string(keeperIdCard.getPort()) + ".csv";

tl::xstream es = tl::xstream::self();
LOG_INFO("[CSVFileStoryChunkExtractor] Processing StoryChunk: ES={}, ULT={}, StoryID={}, StartTime={}", es.get_rank()
, tl::thread::self_id(), story_chunk->getStoryId(), story_chunk->getStartTime());

// current thread if the only one that has this storyChunk and the only one that's writing to this chunk csv file
// thus no additional locking is needed ...
chunk_fstream.open(chunk_filename, std::ofstream::out|std::ofstream::app);
Expand Down
1 change: 1 addition & 0 deletions ChronoVisor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ target_sources(chronovisor_server PRIVATE
./src/ClientRegistryRecord.cpp
./src/ChronicleMetaDirectory.cpp
./src/KeeperRegistry.cpp
../chrono_common/ConfigurationManager.cpp
../ChronoAPI/ChronoLog/src/city.cpp
../ChronoAPI/ChronoLog/src/log.cpp)

Expand Down
6 changes: 2 additions & 4 deletions ChronoVisor/include/ChronicleMetaDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,15 @@ class ChronicleMetaDirectory

int destroy_chronicle(const std::string &name);

//int create_story(std::string const& chronicle_name, const std::string& story_name,
// const std::unordered_map<std::string, std::string>& attrs);
int destroy_story(std::string const &chronicle_name, const std::string &story_name);

int
acquire_story(chronolog::ClientId const &client_id, const std::string &chronicle_name, const std::string &story_name
, const std::unordered_map <std::string, std::string> &attrs, int &flags, StoryId &, bool &);
, const std::unordered_map <std::string, std::string> &attrs, int &flags, StoryId &);

int
release_story(chronolog::ClientId const &client_id, const std::string &chronicle_name, const std::string &story_name
, StoryId &, bool &);
, StoryId &);

int get_chronicle_attr(std::string const &name, const std::string &key, std::string &value);

Expand Down
188 changes: 123 additions & 65 deletions ChronoVisor/include/KeeperRegistry.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
#ifndef KEEPER_REGISTRY_H
#define KEEPER_REGISTRY_H

#include <chrono>
#include <map>
#include <mutex>
#include <random>
#include <vector>
#include <map>
#include <chrono>

#include <thallium.hpp>

#include "chronolog_types.h"
#include "KeeperIdCard.h"
#include "KeeperStatsMsg.h"
#include "KeeperRegistrationMsg.h"
#include "GrapherIdCard.h"
#include "GrapherRegistrationMsg.h"
#include "ConfigurationManager.h"

namespace chronolog
Expand All @@ -29,94 +33,148 @@ class DataStoreAdminClient;

class KeeperRegistryService;

class KeeperRegistry
class KeeperProcessEntry
{
public:
KeeperProcessEntry(KeeperIdCard const& keeper_id_card, ServiceId const& admin_service_id)
: idCard(keeper_id_card)
, adminServiceId(admin_service_id)
, keeperAdminClient(nullptr)
, active(false)
, lastStatsTime(0)
, activeStoryCount(0)
{
idCardString += idCard;
}

KeeperProcessEntry(KeeperProcessEntry const& other) = default;
~KeeperProcessEntry() = default;// Registry is reponsible for creating & deleting keeperAdminClient

KeeperIdCard idCard;
ServiceId adminServiceId;
std::string idCardString;
DataStoreAdminClient* keeperAdminClient;
bool active;
uint64_t lastStatsTime;
uint32_t activeStoryCount;
std::list<std::pair<std::time_t, DataStoreAdminClient*>> delayedExitClients;
};

class GrapherProcessEntry
{
public:
GrapherProcessEntry(GrapherIdCard const& id_card, ServiceId const& admin_service_id)
: idCard(id_card)
, adminServiceId(admin_service_id)
, adminClient(nullptr)
, active(false)
, lastStatsTime(0)
, activeStoryCount(0)
{
idCardString += idCard;
}

GrapherProcessEntry(GrapherProcessEntry const& other) = default;
~GrapherProcessEntry() = default;// Registry is reponsible for creating & deleting keeperAdminClient

GrapherIdCard idCard;
ServiceId adminServiceId;
std::string idCardString;
DataStoreAdminClient* adminClient;
bool active;
uint64_t lastStatsTime;
uint32_t activeStoryCount;
std::list<std::pair<std::time_t, DataStoreAdminClient*>> delayedExitGrapherClients;
};

struct KeeperProcessEntry {

class RecordingGroup
{
public:
KeeperProcessEntry(KeeperIdCard const& keeper_id_card, ServiceId const& admin_service_id)
: idCard(keeper_id_card)
, adminServiceId(admin_service_id)
, keeperAdminClient(nullptr)
, active(false)
, lastStatsTime(0)
, activeStoryCount(0)
RecordingGroup(RecordingGroupId group_id, GrapherProcessEntry* grapher_ptr = nullptr)
: groupId(group_id)
, activeKeeperCount(0)
, grapherProcess(grapher_ptr)
{}

KeeperProcessEntry(KeeperProcessEntry const &other) = default;
RecordingGroup(RecordingGroup const& other) = default;
~RecordingGroup() = default;

void reset()
{
keeperAdminClient = nullptr;
active = false;
lastStatsTime = 0;
activeStoryCount = 0;
}

~KeeperProcessEntry() = default; // Registry is reponsible for creating & deleting keeperAdminClient

KeeperIdCard idCard;
ServiceId adminServiceId;
DataStoreAdminClient* keeperAdminClient;
bool active;
uint64_t lastStatsTime;
uint32_t activeStoryCount;
std::list<std::pair<std::time_t, DataStoreAdminClient*>> delayedExitClients;
};
bool isActive() const;
void startDelayedGrapherExit(GrapherProcessEntry&, std::time_t);
void clearDelayedExitGrapher(GrapherProcessEntry&, std::time_t);
void startDelayedKeeperExit(KeeperProcessEntry&, std::time_t);
void clearDelayedExitKeeper(KeeperProcessEntry&, std::time_t);

enum RegistryState
{
UNKNOWN = 0, INITIALIZED = 1, // RegistryService is initialized, no active keepers
RUNNING = 2, // RegistryService and active Keepers
SHUTTING_DOWN = 3 // Shutting down services
};
std::vector<KeeperIdCard>& getActiveKeepers(std::vector<KeeperIdCard>& keeper_id_cards);

public:
KeeperRegistry(): registryState(UNKNOWN), registryEngine(nullptr), keeperRegistryService(nullptr)
{}
RecordingGroupId groupId;
size_t activeKeeperCount;
GrapherProcessEntry* grapherProcess;
std::map<std::pair<uint32_t, uint16_t>, KeeperProcessEntry> keeperProcesses;
};

~KeeperRegistry();
class KeeperRegistry
{
enum RegistryState
{
UNKNOWN = 0,
INITIALIZED = 1, // RegistryService is initialized, no active keepers
RUNNING = 2, // RegistryService and active Keepers
SHUTTING_DOWN = 3// Shutting down services
};

bool is_initialized() const
{ return (INITIALIZED == registryState); }
public:
KeeperRegistry();

bool is_running() const
{ return (RUNNING == registryState); }
~KeeperRegistry();

bool is_shutting_down() const
{ return (SHUTTING_DOWN == registryState); }
bool is_initialized() const { return (INITIALIZED == registryState); }

int InitializeRegistryService(ChronoLog::ConfigurationManager const &);
bool is_running() const { return (RUNNING == registryState); }

int ShutdownRegistryService();
bool is_shutting_down() const { return (SHUTTING_DOWN == registryState); }

int registerKeeperProcess(KeeperRegistrationMsg const &keeper_reg_msg);
int InitializeRegistryService(ChronoLog::ConfigurationManager const&);

int unregisterKeeperProcess(KeeperIdCard const &keeper_id_card);
int ShutdownRegistryService();

void updateKeeperProcessStats(KeeperStatsMsg const &keeperStatsMsg);
int registerKeeperProcess(KeeperRegistrationMsg const& keeper_reg_msg);

std::vector <KeeperIdCard> &getActiveKeepers(std::vector <KeeperIdCard> &keeper_id_cards);
int unregisterKeeperProcess(KeeperIdCard const& keeper_id_card);

int notifyKeepersOfStoryRecordingStart(std::vector<KeeperIdCard>&, ChronicleName const&, StoryName const&,
StoryId const&);
void updateKeeperProcessStats(KeeperStatsMsg const& keeperStatsMsg);

int notifyKeepersOfStoryRecordingStop(std::vector <KeeperIdCard> const &, StoryId const &);
int notifyRecordingGroupOfStoryRecordingStart(ChronicleName const&, StoryName const&, StoryId const&,
std::vector<KeeperIdCard>&);
int notifyRecordingGroupOfStoryRecordingStop(StoryId const&);

int registerGrapherProcess(GrapherRegistrationMsg const& reg_msg);
int unregisterGrapherProcess(GrapherIdCard const& id_card);

private:
private:
KeeperRegistry(KeeperRegistry const&) = delete;//disable copying
KeeperRegistry& operator=(KeeperRegistry const&) = delete;

KeeperRegistry(KeeperRegistry const &) = delete; //disable copying
KeeperRegistry &operator=(KeeperRegistry const &) = delete;
int notifyGrapherOfStoryRecordingStart(RecordingGroup&, ChronicleName const&, StoryName const&, StoryId const&,
uint64_t);
int notifyGrapherOfStoryRecordingStop(RecordingGroup&, StoryId const&);
int notifyKeepersOfStoryRecordingStart(RecordingGroup&, std::vector<KeeperIdCard>&, ChronicleName const&,
StoryName const&, StoryId const&, uint64_t);
int notifyKeepersOfStoryRecordingStop(RecordingGroup&, std::vector<KeeperIdCard> const&, StoryId const&);

RegistryState registryState;
std::mutex registryLock;
std::map <std::pair <uint32_t, uint16_t>, KeeperProcessEntry> keeperProcessRegistry;
thallium::engine*registryEngine;
KeeperRegistryService*keeperRegistryService;
size_t delayedDataAdminExitSeconds;
};
RegistryState registryState;
std::mutex registryLock;
thallium::engine* registryEngine;
KeeperRegistryService* keeperRegistryService;
size_t delayedDataAdminExitSeconds;

std::map<RecordingGroupId, RecordingGroup> recordingGroups;
std::vector<RecordingGroup*> activeGroups;
std::mt19937 mt_random;//mersene twister random int generator
std::uniform_int_distribution<size_t> group_id_distribution;
std::map<StoryId, RecordingGroup*> activeStories;
};
}

#endif
22 changes: 21 additions & 1 deletion ChronoVisor/include/KeeperRegistryService.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
#define KEEPER_REGISTRY_SERVICE_H

#include <iostream>
//#include <margo.h>
#include <thallium.hpp>
#include <thallium/serialization/stl/string.hpp>

#include "KeeperIdCard.h"
#include "KeeperRegistrationMsg.h"
#include "KeeperStatsMsg.h"
#include "GrapherIdCard.h"
#include "GrapherRegistrationMsg.h"
#include "log.h"
#include "KeeperRegistry.h"

Expand Down Expand Up @@ -53,13 +54,32 @@ class KeeperRegistryService: public tl::provider <KeeperRegistryService>
theKeeperProcessRegistry.updateKeeperProcessStats(keeper_stats_msg);
}

void register_grapher(tl::request const &request, chronolog::GrapherRegistrationMsg const & registrationMsg)
{
int return_code = 0;
std::stringstream ss;
ss << registrationMsg;
LOG_INFO("[KeeperRegistryService] register_grapher: {}", ss.str());
return_code = theKeeperProcessRegistry.registerGrapherProcess(registrationMsg);
request.respond(return_code);
}

void unregister_grapher(tl::request const &request, chronolog::GrapherIdCard const &id_card)
{
int return_code = 0;
return_code = theKeeperProcessRegistry.unregisterGrapherProcess(id_card);
request.respond(return_code);
}

KeeperRegistryService(tl::engine &tl_engine, uint16_t service_provider_id, KeeperRegistry &keeperRegistry)
: tl::provider <KeeperRegistryService>(tl_engine, service_provider_id), theKeeperProcessRegistry(
keeperRegistry)
{
define("register_keeper", &KeeperRegistryService::register_keeper);
define("unregister_keeper", &KeeperRegistryService::unregister_keeper);
define("handle_stats_msg", &KeeperRegistryService::handle_stats_msg, tl::ignore_return_value());
define("register_grapher", &KeeperRegistryService::register_grapher);
define("unregister_grapher", &KeeperRegistryService::unregister_grapher);
//setup finalization callback in case this ser vice provider is still alive when the engine is finalized
get_engine().push_finalize_callback(this, [p = this]()
{ delete p; });
Expand Down
Loading