diff --git a/src/gribjump/Task.cc b/src/gribjump/Task.cc index 49346e5..2dee08b 100644 --- a/src/gribjump/Task.cc +++ b/src/gribjump/Task.cc @@ -55,11 +55,12 @@ void Task::notifyCancelled() { } void Task::execute() { - // atomically set status to executing, but only if it is currently pending + // atomically set status to executing, but only if it is currently pending (i.e. not cancelled) Status expected = Status::PENDING; if (!status_.compare_exchange_strong(expected, Status::EXECUTING)) { return; } + info(); executeImpl(); notify(); } @@ -85,6 +86,7 @@ void TaskGroup::notify(size_t taskid) { } cv_.notify_one(); + info(); } void TaskGroup::notifyCancelled(size_t taskid) { @@ -92,6 +94,7 @@ void TaskGroup::notifyCancelled(size_t taskid) { nComplete_++; nCancelledTasks_++; cv_.notify_one(); + info(); } void TaskGroup::notifyError(size_t taskid, const std::string& s) { @@ -99,12 +102,17 @@ void TaskGroup::notifyError(size_t taskid, const std::string& s) { errors_.push_back(s); nComplete_++; cv_.notify_one(); + info(); if (cancelOnFirstError) { cancelTasks(); } } +void TaskGroup::info() const { + eckit::Log::status() << nComplete_ << " of " << tasks_.size() << " tasks complete" << std::endl; +} + // Note: This will only affect tasks that have not yet started. Cancelled tasks will call notifyCancelled() when they are executed. // NB: We do not lock a mutex as this will be called from notifyError() void TaskGroup::cancelTasks() { @@ -227,6 +235,10 @@ void FileExtractionTask::extract() { } } +void FileExtractionTask::info() const { + eckit::Log::status() << "Extract " << extractionItems_.size() << " items from " << fname_ << std::endl; +} + //---------------------------------------------------------------------------------------------------------------------- // Forward the work to a remote server, and wait for the results. @@ -242,6 +254,12 @@ void ForwardExtractionTask::executeImpl(){ remoteGribJump.forwardExtract(filemap_); } +void ForwardExtractionTask::info() const { + eckit::Log::status() << "Forward extract to " << endpoint_ << "nfiles=" << filemap_.size() << std::endl; +} + +//---------------------------------------------------------------------------------------------------------------------- + ForwardScanTask::ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, std::atomic& nfields): Task(taskgroup, id), endpoint_(endpoint), @@ -255,6 +273,10 @@ void ForwardScanTask::executeImpl(){ nfields_ += remoteGribJump.forwardScan(scanmap_); } +void ForwardScanTask::info() const { + eckit::Log::status() << "Forward scan to " << endpoint_ << std::endl; +} + //---------------------------------------------------------------------------------------------------------------------- InefficientFileExtractionTask::InefficientFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems): FileExtractionTask(taskgroup, id, fname, extractionItems) { @@ -299,6 +321,10 @@ void InefficientFileExtractionTask::extract() { } } +void InefficientFileExtractionTask::info() const { + eckit::Log::status() << "Inefficiently extract " << extractionItems_.size() << " items from " << fname_ << std::endl; +} + //---------------------------------------------------------------------------------------------------------------------- @@ -325,6 +351,10 @@ void FileScanTask::scan() { nfields_ += InfoCache::instance().scan(fname_, offsets_); } +void FileScanTask::info() const { + eckit::Log::status() << "Scan " << offsets_.size() << " offsets in " << fname_ << std::endl; +} + //---------------------------------------------------------------------------------------------------------------------- } // namespace gribjump \ No newline at end of file diff --git a/src/gribjump/Task.h b/src/gribjump/Task.h index 960da5d..13f1948 100644 --- a/src/gribjump/Task.h +++ b/src/gribjump/Task.h @@ -58,11 +58,14 @@ class Task { /// cancels the task. If execute() is called after this, it will return immediately. void cancel(); + /// Write description of task to eckit::Log::status() for monitoring + virtual void info() const = 0; + protected: virtual void executeImpl() = 0; protected: - + TaskGroup& taskGroup_; //< Groups like-tasks to be executed in parallel size_t taskid_; //< Task id within parent request std::atomic status_ = Status::PENDING; @@ -129,6 +132,8 @@ class TaskGroup { return errors_.size(); } + void info() const; + private: void enqueueTask(Task* task); @@ -165,6 +170,8 @@ class FileExtractionTask : public Task { virtual void extract(); + virtual void info() const override; + protected: eckit::PathName fname_; ExtractionItems& extractionItems_; @@ -183,6 +190,8 @@ class InefficientFileExtractionTask : public FileExtractionTask { void extract() override; + virtual void info() const override; + }; //---------------------------------------------------------------------------------------------------------------------- @@ -194,6 +203,8 @@ class ForwardExtractionTask : public Task { void executeImpl() override; + virtual void info() const override; + private: eckit::net::Endpoint endpoint_; filemap_t& filemap_; @@ -207,6 +218,8 @@ class ForwardScanTask : public Task { void executeImpl() override; + virtual void info() const override; + private: eckit::net::Endpoint endpoint_; scanmap_t& scanmap_; @@ -226,6 +239,8 @@ class FileScanTask : public Task { void scan(); + virtual void info() const override; + private: eckit::PathName fname_; std::vector offsets_; diff --git a/src/gribjump/remote/GribJumpUser.cc b/src/gribjump/remote/GribJumpUser.cc index 85fca7e..43c585e 100644 --- a/src/gribjump/remote/GribJumpUser.cc +++ b/src/gribjump/remote/GribJumpUser.cc @@ -98,6 +98,7 @@ void GribJumpUser::processRequest(eckit::Stream& s) { RequestType request(s); MetricsManager::instance().set("elapsed_receive", timer.elapsed()); timer.reset("Request received"); + request.info(); request.execute(); MetricsManager::instance().set("elapsed_execute", timer.elapsed()); diff --git a/src/gribjump/remote/Request.cc b/src/gribjump/remote/Request.cc index 3477fb1..bef006a 100644 --- a/src/gribjump/remote/Request.cc +++ b/src/gribjump/remote/Request.cc @@ -66,6 +66,10 @@ void ScanRequest::replyToClient() { client_ << nFields_; } +void ScanRequest::info() const { + eckit::Log::status() << "New ScanRequest: nRequests=" << requests_.size() << std::endl; +} + //---------------------------------------------------------------------------------------------------------------------- @@ -127,6 +131,10 @@ void ExtractRequest::replyToClient() { LOG_DEBUG_LIB(LibGribJump) << "Sent " << nRequests << " results to client" << std::endl; } +void ExtractRequest::info() const { + eckit::Log::status() << "New ExtractRequest: nRequests=" << requests_.size() << std::endl; +} + //---------------------------------------------------------------------------------------------------------------------- ForwardedExtractRequest::ForwardedExtractRequest(eckit::Stream& stream) : Request(stream) { @@ -178,6 +186,9 @@ void ForwardedExtractRequest::replyToClient() { } } +void ForwardedExtractRequest::info() const { + eckit::Log::status() << "New ForwardedExtractRequest: nItems=" << items_.size() << std::endl; +} //---------------------------------------------------------------------------------------------------------------------- @@ -210,6 +221,10 @@ void ForwardedScanRequest::execute() { void ForwardedScanRequest::replyToClient() { client_ << nfields_; } + +void ForwardedScanRequest::info() const { + eckit::Log::status() << "New ForwardedScanRequest: nfiles=" << scanmap_.size() << std::endl; +} //---------------------------------------------------------------------------------------------------------------------- AxesRequest::AxesRequest(eckit::Stream& stream) : Request(stream) { @@ -246,6 +261,10 @@ void AxesRequest::replyToClient() { } } +void AxesRequest::info() const { + eckit::Log::status() << "New AxesRequest: " << request_ << ", level=" << level_ << std::endl; +} + //---------------------------------------------------------------------------------------------------------------------- } // namespace gribjump diff --git a/src/gribjump/remote/Request.h b/src/gribjump/remote/Request.h index a8e8b4b..0a98a3c 100644 --- a/src/gribjump/remote/Request.h +++ b/src/gribjump/remote/Request.h @@ -42,6 +42,9 @@ class Request { void reportErrors(); + /// Print information about the request to status(), for monitoring + virtual void info() const = 0; + protected: // members eckit::Stream& client_; @@ -63,6 +66,8 @@ class ScanRequest : public Request { void replyToClient() override; + void info() const override; + private: std::vector requests_; @@ -85,6 +90,8 @@ class ExtractRequest : public Request { void replyToClient() override; + void info() const override; + private: std::vector requests_; @@ -105,6 +112,8 @@ class ForwardedExtractRequest : public Request { void replyToClient() override; + void info() const override; + private: std::vector> items_; @@ -127,6 +136,8 @@ class ForwardedScanRequest : public Request { void replyToClient() override; + void info() const override; + private: std::vector> items_; @@ -149,6 +160,8 @@ class AxesRequest : public Request { void replyToClient() override; + void info() const override; + private: std::string request_; /// @todo why is this a string? diff --git a/src/gribjump/remote/WorkQueue.cc b/src/gribjump/remote/WorkQueue.cc index f18e24a..51efb2c 100644 --- a/src/gribjump/remote/WorkQueue.cc +++ b/src/gribjump/remote/WorkQueue.cc @@ -43,6 +43,7 @@ WorkQueue::WorkQueue() : queue_(eckit::Resource("$GRIBJUMP_QUEUESIZE;gri // GribJump gj = GribJump(); // one per thread for (;;) { + eckit::Log::status() << "Waiting for job" << std::endl; WorkItem item; if (queue_.pop(item) == -1) { LOG_DEBUG_LIB(LibGribJump) << "Thread " << std::this_thread::get_id() << " stopping (queue closed)" << std::endl;