Skip to content

Commit

Permalink
Add forwarding of scan requests
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisspyB committed Nov 26, 2024
1 parent ded3024 commit d0b8bf4
Show file tree
Hide file tree
Showing 21 changed files with 434 additions and 184 deletions.
3 changes: 3 additions & 0 deletions src/gribjump/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ if( HAVE_GRIBJUMP_LOCAL_EXTRACT )
Task.h
ExtractionItem.cc
ExtractionItem.h
Forwarder.cc
Forwarder.h
URIHelper.h

tools/EccodesExtract.h
tools/EccodesExtract.cc
Expand Down
4 changes: 2 additions & 2 deletions src/gribjump/Config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Config::Config(const eckit::PathName path) :
path_{path} {
}

std::map<std::string, std::string> Config::loadServerMap() const {
Config::ServerMap Config::loadServerMap() const {
// e.g. yaml
// servermap:
// - fdb: "host1:port1"
Expand All @@ -50,7 +50,7 @@ std::map<std::string, std::string> Config::loadServerMap() const {
// gribjump: "host4:port4"
// becomes map:
// { "host1:port1": "host2:port2", "host3:port3": "host4:port4" }
std::map<std::string, std::string> map;
Config::ServerMap map;
eckit::LocalConfiguration conf = getSubConfiguration("servermap");
std::vector<eckit::LocalConfiguration> serverList = conf.getSubConfigurations();

Expand Down
11 changes: 7 additions & 4 deletions src/gribjump/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,29 @@

#pragma once

#include <map>
#include <unordered_map>
#include "eckit/config/LocalConfiguration.h"
#include "eckit/net/Endpoint.h"

namespace gribjump {

class Config : public eckit::LocalConfiguration {
public: // types
using ServerMap = std::unordered_map<eckit::net::Endpoint, eckit::net::Endpoint>;
public:
Config();
Config(const eckit::PathName);

const std::map<std::string, std::string>& serverMap() const { return serverMap_; }
const ServerMap& serverMap() const { return serverMap_; }

///@note : Will be empty if default config is used
const std::string& path() const { return path_; }

private:
std::map<std::string, std::string> loadServerMap() const;
ServerMap loadServerMap() const;

private:
std::map<std::string, std::string> serverMap_;
ServerMap serverMap_;
std::string path_;
};

Expand Down
140 changes: 42 additions & 98 deletions src/gribjump/Engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "gribjump/Engine.h"
#include "gribjump/ExtractionItem.h"
#include "gribjump/Forwarder.h"
#include "gribjump/remote/WorkQueue.h"
#include "gribjump/jumper/JumperFactory.h"

Expand All @@ -28,15 +29,6 @@ namespace gribjump {
//----------------------------------------------------------------------------------------------------------------------

// Stringify requests and keys alphabetically
namespace {
// ----------------------------------------------------------------------------------------------------------------------

bool isRemote(eckit::URI uri) {
return uri.scheme() == "fdb";
}

} // namespace
//----------------------------------------------------------------------------------------------------------------------

Engine::Engine() {}

Expand Down Expand Up @@ -112,81 +104,26 @@ filemap_t Engine::buildFileMap(const metkit::mars::MarsRequest& unionrequest, Ex
return filemap;
}

void Engine::forwardRemoteExtraction(filemap_t& filemap) {
// get servermap from config, which maps fdb remote uri to gribjump server uri
// format: fdbhost:port -> gjhost:port
/// @todo: dont parse servermap every request
const std::map<std::string, std::string>& servermap_str = LibGribJump::instance().config().serverMap();
ASSERT(!servermap_str.empty());

if (LibGribJump::instance().debug()) {
for (auto& [fdb, gj] : servermap_str) {
LOG_DEBUG_LIB(LibGribJump) << "Servermap: " << fdb << " -> " << gj << std::endl;
}
}
std::unordered_map<eckit::net::Endpoint, eckit::net::Endpoint> servermap;
for (auto& [fdb, gj] : servermap_str) {
eckit::net::Endpoint fdbEndpoint(fdb);
eckit::net::Endpoint gjEndpoint(gj);
servermap[fdbEndpoint] = gjEndpoint;
}

// Match servers with files
std::unordered_map<eckit::net::Endpoint, std::vector<std::string>> serverfiles;
for (auto& [fname, extractionItems] : filemap) {
eckit::URI uri = extractionItems[0]->URI();
eckit::net::Endpoint fdbEndpoint;

if(!isRemote(uri)) {
throw eckit::SeriousBug("URI is not remote: " + fname);
}

fdbEndpoint = eckit::net::Endpoint(uri.host(), uri.port());
void Engine::scheduleExtractionTasks(filemap_t& filemap){

if (servermap.find(fdbEndpoint) == servermap.end()) {
throw eckit::SeriousBug("No gribjump endpoint found for fdb endpoint: " + std::string(fdbEndpoint));
}

serverfiles[servermap[fdbEndpoint]].push_back(fname);
}

// make subfilemaps for each server
std::unordered_map<eckit::net::Endpoint, filemap_t> serverfilemaps;

for (auto& [server, files] : serverfiles) {
filemap_t subfilemap;
for (auto& fname : files) {
subfilemap[fname] = filemap[fname];
}
serverfilemaps[server] = subfilemap;
}

// forward to servers
size_t counter = 0;
for (auto& [endpoint, subfilemap] : serverfilemaps) {
taskGroup_.enqueueTask(new RemoteExtractionTask(taskGroup_, counter++, endpoint, subfilemap));
}

taskGroup_.waitForTasks();
}

void Engine::scheduleTasks(filemap_t& filemap){

bool remoteExtraction = LibGribJump::instance().config().getBool("remoteExtraction", false);
if (remoteExtraction) {
forwardRemoteExtraction(filemap);
bool forwardExtraction = LibGribJump::instance().config().getBool("forwardExtraction", false);
if (forwardExtraction) {
Forwarder forwarder;
forwarder.extract(filemap);
return;
}

bool inefficientExtraction = LibGribJump::instance().config().getBool("inefficientExtraction", false);

size_t counter = 0;
for (auto& [fname, extractionItems] : filemap) {
if (isRemote(extractionItems[0]->URI())) {
// Only possible if we are using remoteFDB, which requires remoteExtraction to be enabled.
// We technically do support it via inefficient extraction, but we are disabling this for now.
// taskGroup_.enqueueTask(new InefficientFileExtractionTask(taskGroup_, counter++, fname, extractionItems));
throw eckit::SeriousBug("Got remote URI from FDB, but remoteExtraction enabled in gribjump config.");
}
else {
if (extractionItems[0]->isRemote()) {
if (inefficientExtraction) {
taskGroup_.enqueueTask(new InefficientFileExtractionTask(taskGroup_, counter++, fname, extractionItems));
} else {
throw eckit::SeriousBug("Got remote URI from FDB, but forwardExtraction enabled in gribjump config.");
}
} else {
taskGroup_.enqueueTask(new FileExtractionTask(taskGroup_, counter++, fname, extractionItems));
}
}
Expand All @@ -204,7 +141,7 @@ ResultsMap Engine::extract(ExtractionRequests& requests) {
MetricsManager::instance().set("elapsed_build_filemap", timer.elapsed());
timer.reset("Gribjump Engine: Built file map");

scheduleTasks(filemap);
scheduleExtractionTasks(filemap);
MetricsManager::instance().set("elapsed_tasks", timer.elapsed());
timer.reset("Gribjump Engine: All tasks finished");

Expand All @@ -230,38 +167,45 @@ ResultsMap Engine::collectResults(ExItemMap& keyToExtractionItem) {

size_t Engine::scan(const MarsRequests& requests, bool byfiles) {

const std::map< eckit::PathName, eckit::OffsetList > filemap = FDBLister::instance().filesOffsets(requests);
std::vector<eckit::URI> uris = FDBLister::instance().URIs(requests);

// forwarded scan requests
if (LibGribJump::instance().config().getBool("forwardScan", false)) {
Forwarder forwarder;
return forwarder.scan(uris);
}

std::map< eckit::PathName, eckit::OffsetList > filemap = FDBLister::instance().filesOffsets(uris);

if (byfiles) { // ignore offsets and scan entire file
std::vector<eckit::PathName> files;
for (auto& [fname, offsets] : filemap) {
files.push_back(fname);
for (auto& [uri, offsets] : filemap) {
offsets.clear();
}

return scan(files);

}

size_t counter = 0;

for (auto& [fname, offsets] : filemap) {
taskGroup_.enqueueTask(new FileScanTask(taskGroup_, counter++, fname, offsets));
}
taskGroup_.waitForTasks();

return filemap.size();
return scan(filemap);
}

size_t Engine::scan(std::vector<eckit::PathName> files) {
size_t counter = 0;


scanmap_t scanmap;
for (auto& fname : files) {
taskGroup_.enqueueTask(new FileScanTask(taskGroup_, counter++, fname, {}));
scanmap[fname] = {};
}

return scan(scanmap);
}

size_t Engine::scan(const scanmap_t& scanmap) {

size_t counter = 0;
std::atomic<size_t> nfields(0);
for (auto& [uri, offsets] : scanmap) {
taskGroup_.enqueueTask(new FileScanTask(taskGroup_, counter++, uri.path(), offsets, nfields));
}
taskGroup_.waitForTasks();

return files.size();
return nfields;
}

std::map<std::string, std::unordered_set<std::string> > Engine::axes(const std::string& request, int level) {
Expand Down
6 changes: 3 additions & 3 deletions src/gribjump/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ class Engine {
// byfiles: scan entire file, not just fields matching request
size_t scan(const MarsRequests& requests, bool byfiles = false);
size_t scan(std::vector<eckit::PathName> files);
size_t scan(const scanmap_t& scanmap);

std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request, int level=3);

void scheduleTasks(filemap_t& filemap);
void scheduleExtractionTasks(filemap_t& filemap);

void reportErrors(eckit::Stream& client_);
void raiseErrors();
Expand All @@ -45,12 +46,11 @@ class Engine {

filemap_t buildFileMap(const metkit::mars::MarsRequest& unionrequest, ExItemMap& keyToExtractionItem);
ResultsMap collectResults(ExItemMap& keyToExtractionItem);
void forwardRemoteExtraction(filemap_t& filemap);
metkit::mars::MarsRequest buildRequestMap(ExtractionRequests& requests, ExItemMap& keyToExtractionItem );

private:

TaskGroup taskGroup_; /// @todo Maybe we should be returning the taskGroup, rather than storing it here.
TaskGroup taskGroup_; /// @todo Maybe we should be returning the taskGroup, rather than storing it here. /// I would really prefer a stateless engine.

};

Expand Down
2 changes: 2 additions & 0 deletions src/gribjump/ExtractionItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "gribjump/LibGribJump.h"
#include "gribjump/Types.h"
#include "gribjump/URIHelper.h"
namespace gribjump {


Expand Down Expand Up @@ -62,6 +63,7 @@ class ExtractionItem : public eckit::NonCopyable {
const std::string& gridHash() const { return gridHash_; }

void URI(const eckit::URI& uri) { uri_ = uri; }
bool isRemote() const { return URIHelper::isRemote(uri_); }

void values(ExValues values) { values_ = std::move(values); }
void mask(ExMask mask) { mask_ = std::move(mask); }
Expand Down
Loading

0 comments on commit d0b8bf4

Please sign in to comment.