Skip to content

Commit

Permalink
v2: Combine output reading and waiting for exit_code using boost::asi…
Browse files Browse the repository at this point in the history
…o::experimental::awaitable_operators
  • Loading branch information
jhasse committed Nov 25, 2024
1 parent d4bede2 commit c19aa9b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 15 deletions.
53 changes: 39 additions & 14 deletions v2/real_command_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <boost/asio.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/process/v2.hpp>
#include <cassert>

Expand All @@ -31,7 +33,6 @@ class RealCommandRunner : public CommandRunner {
};
std::vector<std::unique_ptr<Subprocess>> running_;
std::vector<std::unique_ptr<Subprocess>> finished_;
std::vector<std::unique_ptr<Subprocess>> graveyard_; // TODO: Shouldn't exist

size_t CanRunMore() const override {
int capacity = config_.parallelism - static_cast<int>(running_.size());
Expand All @@ -52,24 +53,49 @@ class RealCommandRunner : public CommandRunner {
bp::process(gContext, "/bin/sh", { "-c", edge->EvaluateCommand() },
bp::process_stdio{ nullptr, *pipe_stdout, *pipe_stderr }),
edge, std::string{} });
for (auto* pipe : { &pipe_stdout, &pipe_stderr }) {
boost::asio::co_spawn(
gContext,
[pipe = std::move(*pipe),
output = &subprocess->output]() -> boost::asio::awaitable<void> {
boost::asio::co_spawn(
gContext,
[this, subprocess = subprocess.get(),
pipe_stdout = std::move(pipe_stdout),
pipe_stderr =
std::move(pipe_stderr)]() -> boost::asio::awaitable<void> {
auto read_loop =
[output = &subprocess->output](boost::asio::readable_pipe& pipe)
-> boost::asio::awaitable<void> {
while (true) {
std::array<char, 1024> buf;
size_t len = co_await pipe->async_read_some(
boost::asio::buffer(buf), boost::asio::use_awaitable);
if (len == 0 && !pipe->is_open()) {
co_return;
auto [err, len] = co_await pipe.async_read_some(
boost::asio::buffer(buf),
boost::asio::as_tuple(boost::asio::deferred));
if (len == 0 && !pipe.is_open()) {
break;
}
if (err) {
if (err != boost::asio::error::eof) {
Fatal(err.message().c_str());
}
break;
}
assert(*output);
(*output)->append(buf.data(), len);
}
},
boost::asio::detached);
}
};
using namespace boost::asio::experimental::awaitable_operators;
int exit_code = co_await (
read_loop(*pipe_stdout) && read_loop(*pipe_stderr) &&
subprocess->process.async_wait(boost::asio::use_awaitable));
auto it =
std::ranges::find_if(running_, [subprocess](const auto& up) {
return up.get() == subprocess;
});
assert(it != running_.end());
finished_.emplace_back(std::move(*it));
running_.erase(it);
co_return;
},
boost::asio::detached);
running_.emplace_back(std::move(subprocess));
return true;
}
if (!subprocess->process.is_open()) {
return false;
Expand Down Expand Up @@ -104,7 +130,6 @@ class RealCommandRunner : public CommandRunner {
result->output = std::move(*finished_.back()->output);
finished_.back()->output = std::nullopt;
}
graveyard_.emplace_back(std::move(finished_.back()));
finished_.pop_back();
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion v2/status_printer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void StatusPrinter::StartTimer() {
}

void StatusPrinter::TimerCallback(boost::system::error_code err) {
assert(!err);
// assert(!err);
PrintStatus();
StartTimer();
}
Expand Down

0 comments on commit c19aa9b

Please sign in to comment.