Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add status logging for monitoring #38

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion src/gribjump/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ void Task::notifyCancelled() {
}

void Task::execute() {
// atomically set status to executing, but only if it is currently pending
// atomically set status to executing, but only if it is currently pending (i.e. not cancelled)
Status expected = Status::PENDING;
if (!status_.compare_exchange_strong(expected, Status::EXECUTING)) {
return;
}
info();
executeImpl();
notify();
}
Expand All @@ -85,26 +86,33 @@ void TaskGroup::notify(size_t taskid) {
}

cv_.notify_one();
info();
}

void TaskGroup::notifyCancelled(size_t taskid) {
std::lock_guard<std::mutex> lock(m_);
nComplete_++;
nCancelledTasks_++;
cv_.notify_one();
info();
}

void TaskGroup::notifyError(size_t taskid, const std::string& s) {
std::lock_guard<std::mutex> lock(m_);
errors_.push_back(s);
nComplete_++;
cv_.notify_one();
info();

if (cancelOnFirstError) {
cancelTasks();
}
}

void TaskGroup::info() const {
eckit::Log::status() << nComplete_ << " of " << tasks_.size() << " tasks complete" << std::endl;
}

// Note: This will only affect tasks that have not yet started. Cancelled tasks will call notifyCancelled() when they are executed.
// NB: We do not lock a mutex as this will be called from notifyError()
void TaskGroup::cancelTasks() {
Expand Down Expand Up @@ -227,6 +235,10 @@ void FileExtractionTask::extract() {
}
}

void FileExtractionTask::info() const {
eckit::Log::status() << "Extract " << extractionItems_.size() << " items from " << fname_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

// Forward the work to a remote server, and wait for the results.
Expand All @@ -242,6 +254,12 @@ void ForwardExtractionTask::executeImpl(){
remoteGribJump.forwardExtract(filemap_);
}

void ForwardExtractionTask::info() const {
eckit::Log::status() << "Forward extract to " << endpoint_ << "nfiles=" << filemap_.size() << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

ForwardScanTask::ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, std::atomic<size_t>& nfields):
Task(taskgroup, id),
endpoint_(endpoint),
Expand All @@ -255,6 +273,10 @@ void ForwardScanTask::executeImpl(){
nfields_ += remoteGribJump.forwardScan(scanmap_);
}

void ForwardScanTask::info() const {
eckit::Log::status() << "Forward scan to " << endpoint_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------
InefficientFileExtractionTask::InefficientFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems):
FileExtractionTask(taskgroup, id, fname, extractionItems) {
Expand Down Expand Up @@ -299,6 +321,10 @@ void InefficientFileExtractionTask::extract() {
}
}

void InefficientFileExtractionTask::info() const {
eckit::Log::status() << "Inefficiently extract " << extractionItems_.size() << " items from " << fname_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------


Expand All @@ -325,6 +351,10 @@ void FileScanTask::scan() {
nfields_ += InfoCache::instance().scan(fname_, offsets_);
}

void FileScanTask::info() const {
eckit::Log::status() << "Scan " << offsets_.size() << " offsets in " << fname_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

} // namespace gribjump
17 changes: 16 additions & 1 deletion src/gribjump/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ class Task {
/// cancels the task. If execute() is called after this, it will return immediately.
void cancel();

/// Write description of task to eckit::Log::status() for monitoring
virtual void info() const = 0;

protected:
virtual void executeImpl() = 0;

protected:

TaskGroup& taskGroup_; //< Groups like-tasks to be executed in parallel
size_t taskid_; //< Task id within parent request
std::atomic<Status> status_ = Status::PENDING;
Expand Down Expand Up @@ -129,6 +132,8 @@ class TaskGroup {
return errors_.size();
}

void info() const;

private:

void enqueueTask(Task* task);
Expand Down Expand Up @@ -165,6 +170,8 @@ class FileExtractionTask : public Task {

virtual void extract();

virtual void info() const override;

protected:
eckit::PathName fname_;
ExtractionItems& extractionItems_;
Expand All @@ -183,6 +190,8 @@ class InefficientFileExtractionTask : public FileExtractionTask {

void extract() override;

virtual void info() const override;

};

//----------------------------------------------------------------------------------------------------------------------
Expand All @@ -194,6 +203,8 @@ class ForwardExtractionTask : public Task {

void executeImpl() override;

virtual void info() const override;

private:
eckit::net::Endpoint endpoint_;
filemap_t& filemap_;
Expand All @@ -207,6 +218,8 @@ class ForwardScanTask : public Task {

void executeImpl() override;

virtual void info() const override;

private:
eckit::net::Endpoint endpoint_;
scanmap_t& scanmap_;
Expand All @@ -226,6 +239,8 @@ class FileScanTask : public Task {

void scan();

virtual void info() const override;

private:
eckit::PathName fname_;
std::vector<eckit::Offset> offsets_;
Expand Down
1 change: 1 addition & 0 deletions src/gribjump/remote/GribJumpUser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ void GribJumpUser::processRequest(eckit::Stream& s) {
RequestType request(s);
MetricsManager::instance().set("elapsed_receive", timer.elapsed());
timer.reset("Request received");
request.info();

request.execute();
MetricsManager::instance().set("elapsed_execute", timer.elapsed());
Expand Down
19 changes: 19 additions & 0 deletions src/gribjump/remote/Request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ void ScanRequest::replyToClient() {
client_ << nFields_;
}

void ScanRequest::info() const {
eckit::Log::status() << "New ScanRequest: nRequests=" << requests_.size() << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------


Expand Down Expand Up @@ -127,6 +131,10 @@ void ExtractRequest::replyToClient() {
LOG_DEBUG_LIB(LibGribJump) << "Sent " << nRequests << " results to client" << std::endl;
}

void ExtractRequest::info() const {
eckit::Log::status() << "New ExtractRequest: nRequests=" << requests_.size() << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

ForwardedExtractRequest::ForwardedExtractRequest(eckit::Stream& stream) : Request(stream) {
Expand Down Expand Up @@ -178,6 +186,9 @@ void ForwardedExtractRequest::replyToClient() {
}
}

void ForwardedExtractRequest::info() const {
eckit::Log::status() << "New ForwardedExtractRequest: nItems=" << items_.size() << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -210,6 +221,10 @@ void ForwardedScanRequest::execute() {
void ForwardedScanRequest::replyToClient() {
client_ << nfields_;
}

void ForwardedScanRequest::info() const {
eckit::Log::status() << "New ForwardedScanRequest: nfiles=" << scanmap_.size() << std::endl;
}
//----------------------------------------------------------------------------------------------------------------------

AxesRequest::AxesRequest(eckit::Stream& stream) : Request(stream) {
Expand Down Expand Up @@ -246,6 +261,10 @@ void AxesRequest::replyToClient() {
}
}

void AxesRequest::info() const {
eckit::Log::status() << "New AxesRequest: " << request_ << ", level=" << level_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

} // namespace gribjump
13 changes: 13 additions & 0 deletions src/gribjump/remote/Request.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class Request {

void reportErrors();

/// Print information about the request to status(), for monitoring
virtual void info() const = 0;

protected: // members

eckit::Stream& client_;
Expand All @@ -63,6 +66,8 @@ class ScanRequest : public Request {

void replyToClient() override;

void info() const override;

private:

std::vector<metkit::mars::MarsRequest> requests_;
Expand All @@ -85,6 +90,8 @@ class ExtractRequest : public Request {

void replyToClient() override;

void info() const override;

private:
std::vector<ExtractionRequest> requests_;

Expand All @@ -105,6 +112,8 @@ class ForwardedExtractRequest : public Request {

void replyToClient() override;

void info() const override;

private:

std::vector<std::unique_ptr<ExtractionItem>> items_;
Expand All @@ -127,6 +136,8 @@ class ForwardedScanRequest : public Request {

void replyToClient() override;

void info() const override;

private:

std::vector<std::unique_ptr<ExtractionItem>> items_;
Expand All @@ -149,6 +160,8 @@ class AxesRequest : public Request {

void replyToClient() override;

void info() const override;

private:

std::string request_; /// @todo why is this a string?
Expand Down
1 change: 1 addition & 0 deletions src/gribjump/remote/WorkQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ WorkQueue::WorkQueue() : queue_(eckit::Resource<size_t>("$GRIBJUMP_QUEUESIZE;gri
// GribJump gj = GribJump(); // one per thread

for (;;) {
eckit::Log::status() << "Waiting for job" << std::endl;
WorkItem item;
if (queue_.pop(item) == -1) {
LOG_DEBUG_LIB(LibGribJump) << "Thread " << std::this_thread::get_id() << " stopping (queue closed)" << std::endl;
Expand Down
Loading