Skip to content

Commit

Permalink
Merge pull request #150 from grc-iit/127-storychunkextractor-using-rdma
Browse files Browse the repository at this point in the history
127 storychunkextractor using rdma
  • Loading branch information
fkengun authored Apr 24, 2024
2 parents a99ea2f + 6bbc65e commit fe87690
Show file tree
Hide file tree
Showing 28 changed files with 959 additions and 1,085 deletions.
4 changes: 2 additions & 2 deletions ChronoGrapher/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ target_sources(chrono_grapher PRIVATE
../ChronoAPI/ChronoLog/src/log.cpp)
target_link_libraries(chrono_grapher chronolog_client thallium)

#configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../default_conf.json.in
# ${CMAKE_CURRENT_BINARY_DIR}/default_conf.json COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../default_conf.json.in
${CMAKE_CURRENT_BINARY_DIR}/default_conf.json COPYONLY)

set_target_properties(chrono_grapher PROPERTIES INSTALL_RPATH_USE_LINK_PATH TRUE)

Expand Down
108 changes: 54 additions & 54 deletions ChronoGrapher/ChronoGrapher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ service_endpoint_from_dotted_string(std::string const &ip_string, int port, std:
// we will be using a combination of the uint32_t representation of the service IP address
// and uint16_t representation of the port number
// NOTE: both IP and port values in the IdCard are in the host byte order, not the network order)
// to identfy the Chrono process
// to identify the Chrono process

struct sockaddr_in sa;
// translate the recording service dotted IP string into 32bit network byte order representation
Expand Down Expand Up @@ -81,7 +81,7 @@ int main(int argc, char**argv)
}
LOG_INFO("Running ChronoGrapher ");

// Instantiate MemoryDataStore
// Instantiate MemoryDataStore
// instantiate DataStoreAdminService

/// DataStoreAdminService setup ____________________________________________________________________________________
Expand All @@ -104,10 +104,10 @@ int main(int argc, char**argv)
LOG_INFO("[ChronoGrapher] DataStoreAdminService started successfully.");

// Instantiate GrapherRecordingService
std::string RECORDING_SERVICE_PROTOCOL = confManager.GRAPHER_CONF.RECORDING_SERVICE_CONF.PROTO_CONF;
std::string RECORDING_SERVICE_IP = confManager.GRAPHER_CONF.RECORDING_SERVICE_CONF.IP;
uint16_t RECORDING_SERVICE_PORT = confManager.GRAPHER_CONF.RECORDING_SERVICE_CONF.BASE_PORT;
uint16_t recording_service_provider_id = confManager.GRAPHER_CONF.RECORDING_SERVICE_CONF.SERVICE_PROVIDER_ID;
std::string RECORDING_SERVICE_PROTOCOL = confManager.GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF;
std::string RECORDING_SERVICE_IP = confManager.GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP;
uint16_t RECORDING_SERVICE_PORT = confManager.GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT;
uint16_t recording_service_provider_id = confManager.GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.SERVICE_PROVIDER_ID;

std::string RECORDING_SERVICE_NA_STRING =
std::string(RECORDING_SERVICE_PROTOCOL) + "://" + std::string(RECORDING_SERVICE_IP) + ":" +
Expand All @@ -132,7 +132,7 @@ int main(int argc, char**argv)
process_id_string << processIdCard;
LOG_INFO("[ChronoGrapher] GrapherIdCard: {}", process_id_string.str());

// Instantiate MemoryDataStore & ExtractorModule
// Instantiate MemoryDataStore & ExtractorModule
chronolog::ChunkIngestionQueue ingestionQueue;
std::string csv_files_directory = confManager.GRAPHER_CONF.EXTRACTOR_CONF.story_files_dir;

Expand Down Expand Up @@ -202,50 +202,50 @@ int main(int argc, char**argv)
return (-1);
}

/// RegistryClient SetUp _____________________________________________________________________________________
// create RegistryClient and register the new Recording service with the Registry
std::string REGISTRY_SERVICE_NA_STRING =
confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.PROTO_CONF + "://" +
confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.IP + ":" +
std::to_string(confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.BASE_PORT);

uint16_t REGISTRY_SERVICE_PROVIDER_ID = confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.SERVICE_PROVIDER_ID;

chronolog::GrapherRegistryClient* grapherRegistryClient = chronolog::GrapherRegistryClient::CreateRegistryClient(
*dataAdminEngine, REGISTRY_SERVICE_NA_STRING, REGISTRY_SERVICE_PROVIDER_ID);

if(nullptr == grapherRegistryClient)
{
LOG_CRITICAL("[ChronoGrapher] failed to create RegistryClient; exiting");
delete grapherRecordingService;
delete keeperDataAdminService;
return (-1);
}

/// Registration with ChronoVisor __________________________________________________________________________________
// try to register with chronoVisor a few times than log ERROR and exit...
int registration_status = chronolog::CL_ERR_UNKNOWN;
int retries = 5;
while((chronolog::CL_SUCCESS != registration_status) && (retries > 0))
{
registration_status = grapherRegistryClient->send_register_msg(
chronolog::GrapherRegistrationMsg(processIdCard, collectionServiceId));
retries--;
}

if(chronolog::CL_SUCCESS != registration_status)
{
LOG_CRITICAL("[ChronoGrapher] Failed to register with ChronoVisor after multiple attempts. Exiting.");
delete grapherRegistryClient;
delete grapherRecordingService;
delete keeperDataAdminService;
return (-1);
}
LOG_INFO("[ChronoGrapher] Successfully registered with ChronoVisor.");
// /// RegistryClient SetUp _____________________________________________________________________________________
// // create RegistryClient and register the new Recording service with the Registry
// std::string REGISTRY_SERVICE_NA_STRING =
// confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.PROTO_CONF + "://" +
// confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.IP + ":" +
// std::to_string(confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.BASE_PORT);
//
// uint16_t REGISTRY_SERVICE_PROVIDER_ID = confManager.GRAPHER_CONF.VISOR_REGISTRY_SERVICE_CONF.SERVICE_PROVIDER_ID;
//
// chronolog::GrapherRegistryClient* grapherRegistryClient = chronolog::GrapherRegistryClient::CreateRegistryClient(
// *dataAdminEngine, REGISTRY_SERVICE_NA_STRING, REGISTRY_SERVICE_PROVIDER_ID);
//
// if(nullptr == grapherRegistryClient)
// {
// LOG_CRITICAL("[ChronoGrapher] failed to create RegistryClient; exiting");
// delete grapherRecordingService;
// delete keeperDataAdminService;
// return (-1);
// }
//
// /// Registration with ChronoVisor __________________________________________________________________________________
// // try to register with chronoVisor a few times than log ERROR and exit...
// int registration_status = chronolog::CL_ERR_UNKNOWN;
// int retries = 5;
// while((chronolog::CL_SUCCESS != registration_status) && (retries > 0))
// {
// registration_status = grapherRegistryClient->send_register_msg(
// chronolog::GrapherRegistrationMsg(processIdCard, collectionServiceId));
// retries--;
// }
//
// if(chronolog::CL_SUCCESS != registration_status)
// {
// LOG_CRITICAL("[ChronoGrapher] Failed to register with ChronoVisor after multiple attempts. Exiting.");
// delete grapherRegistryClient;
// delete grapherRecordingService;
// delete keeperDataAdminService;
// return (-1);
// }
// LOG_INFO("[ChronoGrapher] Successfully registered with ChronoVisor.");

/// Start data collection and extraction threads ___________________________________________________________________
// services are successfulley created and keeper process had registered with ChronoVisor
// start all dataColelction and Extraction threads...
// services are successfully created and keeper process had registered with ChronoVisor
// start all dataCollection and Extraction threads...
tl::abt scope;
theDataStore.startDataCollection(3);
// start extraction streams & threads
Expand All @@ -263,10 +263,10 @@ int main(int argc, char**argv)
sleep(30);
}

/// Unregister from ChronoVisor ____________________________________________________________________________________
// Unregister from the chronoVisor so that no new story requests would be coming
grapherRegistryClient->send_unregister_msg(processIdCard);
delete grapherRegistryClient;
// /// Unregister from ChronoVisor ____________________________________________________________________________________
// // Unregister from the chronoVisor so that no new story requests would be coming
// grapherRegistryClient->send_unregister_msg(processIdCard);
// delete grapherRegistryClient;

/// Stop services and shut down ____________________________________________________________________________________
LOG_INFO("[ChronoGrapher] Initiating shutdown procedures.");
Expand All @@ -278,7 +278,7 @@ int main(int argc, char**argv)
// Shutdown extraction module
// drain extractionQueue and stop extraction xStreams
storyExtractor.shutdownExtractionThreads();
// these are not probably needed as thalium handles the engine finalization...
// these are not probably needed as thallium handles the engine finalization...
// recordingEngine.finalize();
// collectionEngine.finalize();
delete recordingEngine;
Expand Down
7 changes: 4 additions & 3 deletions ChronoGrapher/ChunkIngestionQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ChunkIngestionQueue
{
if(orphanQueue.empty())
{
LOG_DEBUG("[IngestionQueue] Orphan event queue is empty. No actions taken.");
LOG_DEBUG("[IngestionQueue] Orphan chunk queue is empty. No actions taken.");
return;
}
std::lock_guard <std::mutex> lock(ingestionQueueMutex);
Expand All @@ -84,15 +84,16 @@ class ChunkIngestionQueue
{
// Individual StoryIngestionHandle has its own mutex
(*ingestionHandle_iter).second->ingestChunk(*iter);
// Remove the event from the orphan deque and get the iterator to the next element prior to removal
// Remove the chunk from the orphan deque and get the iterator to the next element prior to removal
iter = orphanQueue.erase(iter);
}
else
{
LOG_DEBUG("[IngestionQueue] Orphan chunk for story {} is still orphaned.", (*iter)->getStoryId());
++iter;
}
}
LOG_DEBUG("[IngestionQueue] Drained {} orphan events into known handles.", orphanQueue.size());
LOG_DEBUG("[IngestionQueue] Drained {} orphan chunks into known handles.", orphanQueue.size());
}

bool is_empty() const
Expand Down
60 changes: 55 additions & 5 deletions ChronoGrapher/GrapherRecordingService.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
#include "KeeperIdCard.h"
#include "chronolog_types.h"
#include "ChunkIngestionQueue.h"
#include "../external_libs/cereal/include/cereal/archives/binary.hpp"

namespace tl = thallium;

namespace chronolog
{
#define MAX_BULK_MEM_SIZE (4 * 1024 * 1024)
class GrapherRecordingService: public tl::provider <GrapherRecordingService>
{
public:
Expand All @@ -31,29 +33,77 @@ class GrapherRecordingService: public tl::provider <GrapherRecordingService>
get_engine().pop_finalize_callback(this);
}

//TODO: replace or augment this with RDMA transfer later on...
void record_story_chunk(tl::request const &request, StoryChunk & chunk)
void record_story_chunk(tl::request const &request, tl::bulk &b)
{
LOG_DEBUG("[KeeperRecordingService] Recording chunk: {}", chunk.getStoryId());
theIngestionQueue.ingestStoryChunk(&chunk);
request.respond(chronolog::CL_SUCCESS);
std::chrono::high_resolution_clock::time_point start, end;
LOG_DEBUG("[GrapherRecordingService] StoryChunk recording RPC invoked, ThreadID={}", tl::thread::self_id());
tl::endpoint ep = request.get_endpoint();
LOG_DEBUG("[GrapherRecordingService] Endpoint obtained, ThreadID={}", tl::thread::self_id());
std::vector <std::pair <void*, std::size_t>> segments(1);
segments[0].first = (void*)(&mem_vec[0]);
segments[0].second = mem_vec.size();
LOG_DEBUG("[GrapherRecordingService] Bulk memory prepared, size: {}, ThreadID={}", mem_vec.size()
, tl::thread::self_id());
tl::engine tl_engine = get_engine();
LOG_DEBUG("[GrapherRecordingService] Engine addr: {}, ThreadID={}", (void*)&tl_engine, tl::thread::self_id());
tl::bulk local = tl_engine.expose(segments, tl::bulk_mode::write_only);
LOG_DEBUG("[GrapherRecordingService] Bulk memory exposed, ThreadID={}", tl::thread::self_id());
b.on(ep) >> local;
LOG_DEBUG("[GrapherRecordingService] Received {} bytes of StoryChunk data, ThreadID={}", b.size()
, tl::thread::self_id());
// for(auto i = 0; i < b.size() - 1; ++i)
// {
// std::cout << (char)*(char*)(&mem_vec[0]+i) << " ";
// }
// std::cout << std::endl;
StoryChunk story_chunk;
#ifndef NDEBUG
start = std::chrono::high_resolution_clock::now();
#endif
deserializedWithCereal(&mem_vec[0], b.size() - 1, story_chunk);
#ifndef NDEBUG
end = std::chrono::high_resolution_clock::now();
LOG_INFO("[GrapherRecordingService] Deserialization took {} us, ThreadID={}",
std::chrono::duration_cast <std::chrono::nanoseconds>(end - start).count() / 1000.0
, tl::thread::self_id());
#endif
LOG_DEBUG("[GrapherRecordingService] StoryChunk received: StoryID: {}, StartTime: {}, ThreadID={}"
, story_chunk.getStoryId(), story_chunk.getStartTime(), tl::thread::self_id());
request.respond(b.size());
LOG_DEBUG("[GrapherRecordingService] StoryChunk recording RPC responded {}, ThreadID={}"
, b.size(), tl::thread::self_id());

theIngestionQueue.ingestStoryChunk(&story_chunk);
LOG_DEBUG("[GrapherRecordingService] Ingested a StoryChunk, StoryID: {}, StartTime: {}, ThreadID={}"
, story_chunk.getStoryId(), story_chunk.getStartTime(), tl::thread::self_id());
}

private:
GrapherRecordingService(tl::engine &tl_engine, uint16_t service_provider_id, ChunkIngestionQueue &ingestion_queue)
: tl::provider <GrapherRecordingService>(tl_engine, service_provider_id), theIngestionQueue(ingestion_queue)
{
mem_vec.resize(MAX_BULK_MEM_SIZE);
define("record_story_chunk", &GrapherRecordingService::record_story_chunk, tl::ignore_return_value());
//set up callback for the case when the engine is being finalized while this provider is still alive
get_engine().push_finalize_callback(this, [p = this]()
{ delete p; });
}

void deserializedWithCereal(char *buffer, size_t size, StoryChunk &story_chunk)
{
std::stringstream ss;
ss.write(buffer, size);
cereal::BinaryInputArchive iarchive(ss);
iarchive(story_chunk);
}

GrapherRecordingService(GrapherRecordingService const &) = delete;

GrapherRecordingService &operator=(GrapherRecordingService const &) = delete;

ChunkIngestionQueue &theIngestionQueue;

std::vector <char> mem_vec;
};

}// namespace chronolog
Expand Down
3 changes: 2 additions & 1 deletion ChronoGrapher/KeeperDataStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ int chronolog::KeeperDataStore::stopStoryRecording(chronolog::StoryId const &sto

void chronolog::KeeperDataStore::collectIngestedEvents()
{
LOG_DEBUG("[KeeperDataStore] Initiating collection of ingested events. Current state={}, Active StoryPipelines={}, PipelinesWaitingForExit={}, ThreadID={}"
LOG_DEBUG("[KeeperDataStore] Initiating collection of ingested story chunks. Current state={}, Active "
"StoryPipelines={}, PipelinesWaitingForExit={}, ThreadID={}"
, state, theMapOfStoryPipelines.size(), pipelinesWaitingForExit.size(), tl::thread::self_id());
theIngestionQueue.drainOrphanChunks();

Expand Down
2 changes: 1 addition & 1 deletion ChronoGrapher/StoryChunkExtractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ void chronolog::StoryChunkExtractorBase::startExtractionThreads(int stream_count
}

extractorState = RUNNING;
LOG_DEBUG("[StoryChunkExtractionBase] Started extraction threads.");

for(int i = 0; i < stream_count; ++i)
{
Expand All @@ -35,6 +34,7 @@ void chronolog::StoryChunkExtractorBase::startExtractionThreads(int stream_count
{ p->drainExtractionQueue(); });
extractionThreads.push_back(std::move(th));
}
LOG_DEBUG("[StoryChunkExtractionBase] Started extraction threads.");
}
//////////////////////////////

Expand Down
1 change: 1 addition & 0 deletions ChronoKeeper/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ target_sources(chrono_keeper PRIVATE
../chrono_common/StoryChunk.cpp
StoryChunkExtractor.cpp
CSVFileChunkExtractor.cpp
StoryChunkExtractorRDMA.cpp
../ChronoAPI/ChronoLog/src/log.cpp)

target_link_libraries(chrono_keeper chronolog_client thallium)
Expand Down
2 changes: 1 addition & 1 deletion ChronoKeeper/CSVFileChunkExtractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ chronolog::CSVFileStoryChunkExtractor::~CSVFileStoryChunkExtractor()
}

/////////////
void chronolog::CSVFileStoryChunkExtractor::processStoryChunk(chronolog::StoryChunk*story_chunk)
int chronolog::CSVFileStoryChunkExtractor::processStoryChunk(StoryChunk*story_chunk)
{
std::ofstream chunk_fstream;

Expand Down
2 changes: 1 addition & 1 deletion ChronoKeeper/CSVFileChunkExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class CSVFileStoryChunkExtractor: public StoryChunkExtractorBase

~CSVFileStoryChunkExtractor();

virtual void processStoryChunk(StoryChunk*);
virtual int processStoryChunk(StoryChunk*);

private:
KeeperIdCard keeperIdCard;
Expand Down
Loading

0 comments on commit fe87690

Please sign in to comment.