From c19aa9bbb06072008a71b024472562c42f636e88 Mon Sep 17 00:00:00 2001 From: Jan Niklas Hasse Date: Mon, 25 Nov 2024 22:59:20 +0100 Subject: [PATCH] v2: Combine output reading and waiting for exit_code using boost::asio::experimental::awaitable_operators --- v2/real_command_runner.cc | 53 ++++++++++++++++++++++++++++----------- v2/status_printer.cc | 2 +- 2 files changed, 40 insertions(+), 15 deletions(-) diff --git a/v2/real_command_runner.cc b/v2/real_command_runner.cc index 17cdf8a0b9..ea4b25e0c3 100644 --- a/v2/real_command_runner.cc +++ b/v2/real_command_runner.cc @@ -11,8 +11,10 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include +#include #include #include @@ -31,7 +33,6 @@ class RealCommandRunner : public CommandRunner { }; std::vector> running_; std::vector> finished_; - std::vector> graveyard_; // TODO: Shouldn't exist size_t CanRunMore() const override { int capacity = config_.parallelism - static_cast(running_.size()); @@ -52,24 +53,49 @@ class RealCommandRunner : public CommandRunner { bp::process(gContext, "/bin/sh", { "-c", edge->EvaluateCommand() }, bp::process_stdio{ nullptr, *pipe_stdout, *pipe_stderr }), edge, std::string{} }); - for (auto* pipe : { &pipe_stdout, &pipe_stderr }) { - boost::asio::co_spawn( - gContext, - [pipe = std::move(*pipe), - output = &subprocess->output]() -> boost::asio::awaitable { + boost::asio::co_spawn( + gContext, + [this, subprocess = subprocess.get(), + pipe_stdout = std::move(pipe_stdout), + pipe_stderr = + std::move(pipe_stderr)]() -> boost::asio::awaitable { + auto read_loop = + [output = &subprocess->output](boost::asio::readable_pipe& pipe) + -> boost::asio::awaitable { while (true) { std::array buf; - size_t len = co_await pipe->async_read_some( - boost::asio::buffer(buf), boost::asio::use_awaitable); - if (len == 0 && !pipe->is_open()) { - co_return; + auto [err, len] = co_await pipe.async_read_some( + boost::asio::buffer(buf), + boost::asio::as_tuple(boost::asio::deferred)); + if (len == 0 && !pipe.is_open()) { + break; + } + if (err) { + if (err != boost::asio::error::eof) { + Fatal(err.message().c_str()); + } + break; } assert(*output); (*output)->append(buf.data(), len); } - }, - boost::asio::detached); - } + }; + using namespace boost::asio::experimental::awaitable_operators; + int exit_code = co_await ( + read_loop(*pipe_stdout) && read_loop(*pipe_stderr) && + subprocess->process.async_wait(boost::asio::use_awaitable)); + auto it = + std::ranges::find_if(running_, [subprocess](const auto& up) { + return up.get() == subprocess; + }); + assert(it != running_.end()); + finished_.emplace_back(std::move(*it)); + running_.erase(it); + co_return; + }, + boost::asio::detached); + running_.emplace_back(std::move(subprocess)); + return true; } if (!subprocess->process.is_open()) { return false; @@ -104,7 +130,6 @@ class RealCommandRunner : public CommandRunner { result->output = std::move(*finished_.back()->output); finished_.back()->output = std::nullopt; } - graveyard_.emplace_back(std::move(finished_.back())); finished_.pop_back(); return true; } diff --git a/v2/status_printer.cc b/v2/status_printer.cc index b7bd18cfc0..97cb680fe9 100644 --- a/v2/status_printer.cc +++ b/v2/status_printer.cc @@ -144,7 +144,7 @@ void StatusPrinter::StartTimer() { } void StatusPrinter::TimerCallback(boost::system::error_code err) { - assert(!err); + // assert(!err); PrintStatus(); StartTimer(); }