Skip to content

Commit

Permalink
Unify even more, obviate the need for the thread body helper function
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterTh committed Nov 25, 2024
1 parent 7de39cd commit 051d708
Show file tree
Hide file tree
Showing 14 changed files with 39 additions and 59 deletions.
4 changes: 0 additions & 4 deletions include/affinity.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ class thread_pinner {
bool m_successfully_initialized = false;
};

// Performs both naming and pinning (if enabled for this thread) of the invoking thread
// This should be the first thing called in any thread that is part of the Celerity runtime
void name_and_pin_thread(const named_threads::thread_type t_type);

// Pins the invoking thread of type `t_type` according to the current configuration
// This is a no-op if the thread pinning machinery is not currently initialized (by a `thread_pinner` instance)
void pin_this_thread(const named_threads::thread_type t_type);
Expand Down
13 changes: 3 additions & 10 deletions include/named_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,8 @@ thread_type tt_test(const uint32_t n);
// Converts a thread type to a canoncial string representation
std::string thread_type_to_string(const thread_type t_type);

// Sets the name for the invoking thread to its canonical string representation using OS-specific functions, if available
void set_current_thread_name(const thread_type t_type);

// Retrieves the name for the given thread_handle
// This function is only intended to be used in tests, and only works if CELERITY_DETAIL_HAS_NAMED_THREADS is true
std::string get_thread_name(const std::thread::native_handle_type thread_handle);

// Retrieves the name for the current thread
// This function is only intended to be used in tests, and only works if CELERITY_DETAIL_HAS_NAMED_THREADS is true
std::string get_current_thread_name();
// Performs naming, pinning and tracy ordering (if enabled for this thread) of the invoking thread
// This should be the first thing called in any thread that is part of the Celerity runtime
void name_and_pin_and_order_this_thread(const named_threads::thread_type t_type);

} // namespace celerity::detail::named_threads
26 changes: 3 additions & 23 deletions include/thread_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,15 @@
#include "async_event.h"
#include "double_buffered_queue.h"
#include "named_threads.h"
#include "tracy.h"
#include "utils.h"

#include <chrono>
#include <concepts>
#include <exception>
#include <future>
#include <thread>
#include <type_traits>
#include <variant>

namespace celerity::detail {

// A helper to implement the body of a worker thread while taking care of all boilerplate code
void thread_body(const named_threads::thread_type t_type, const std::invocable auto& fun) {
thread_pinning::name_and_pin_thread(t_type);
const auto name = named_threads::thread_type_to_string(t_type);
CELERITY_DETAIL_TRACY_SET_THREAD_NAME_AND_ORDER(t_type);
try {
fun();
}
// LCOV_EXCL_START
catch(std::exception& e) { //
utils::panic("exception in {}: {}", name, e.what());
} catch(...) { //
utils::panic("exception in {}", name);
}
// LCOV_EXCL_STOP
}

/// A single-thread job queue accepting functors and returning events that conditionally forward job results.
class thread_queue {
public:
Expand All @@ -42,7 +21,7 @@ class thread_queue {

/// Spawns a new thread queue with the given thread name. If `enable_profiling` is set to `true`, completed events from this thread queue will report a
/// non-nullopt duration.
explicit thread_queue(named_threads::thread_type t_type, const bool enable_profiling = false) : m_impl(new impl(t_type, enable_profiling)) {}
explicit thread_queue(const named_threads::thread_type t_type, const bool enable_profiling = false) : m_impl(new impl(t_type, enable_profiling)) {}

// thread_queue is movable, but not copyable.
thread_queue(const thread_queue&) = delete;
Expand Down Expand Up @@ -161,7 +140,8 @@ class thread_queue {
}

void thread_main(const named_threads::thread_type t_type) {
thread_body(t_type, [this] { loop(); });
name_and_pin_and_order_this_thread(t_type);
loop();
}
};

Expand Down
5 changes: 0 additions & 5 deletions src/affinity.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ namespace {
}
} // namespace

void name_and_pin_thread(const named_threads::thread_type t_type) {
set_current_thread_name(t_type);
pin_this_thread(t_type);
}

environment_configuration parse_validate_env(const std::string_view str) {
using namespace std::string_view_literals;
constexpr const char* error_msg =
Expand Down
1 change: 1 addition & 0 deletions src/backend/sycl_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "nd_memory.h"
#include "system_info.h"
#include "thread_queue.h"
#include "tracy.h"
#include "types.h"

#include <atomic>
Expand Down
2 changes: 1 addition & 1 deletion src/dry_run_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void dry_run_executor::submit(std::vector<const instruction*> instructions, std:
}

void dry_run_executor::thread_main(executor::delegate* const dlg) {
thread_pinning::name_and_pin_thread(named_threads::thread_type::executor);
name_and_pin_and_order_this_thread(named_threads::thread_type::executor);
// For simplicity we keep all executor state within this function.
std::unordered_map<host_object_id, std::unique_ptr<host_object_instance>> host_object_instances;
bool shutdown = false;
Expand Down
7 changes: 2 additions & 5 deletions src/live_executor.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "live_executor.h"
#include "affinity.h"
#include "backend/backend.h"
#include "closure_hydrator.h"
#include "communicator.h"
Expand All @@ -10,7 +9,6 @@
#include "out_of_order_engine.h"
#include "receive_arbiter.h"
#include "system_info.h"
#include "thread_queue.h"
#include "tracy.h"
#include "types.h"
#include "utils.h"
Expand Down Expand Up @@ -950,9 +948,8 @@ void live_executor::submit(std::vector<const instruction*> instructions, std::ve
}

void live_executor::thread_main(std::unique_ptr<backend> backend, executor::delegate* const dlg, const policy_set& policy) {
thread_body(named_threads::thread_type::executor, [&] { //
live_executor_detail::executor_impl(std::move(backend), m_root_comm.get(), m_submission_queue, dlg, policy).run();
});
name_and_pin_and_order_this_thread(named_threads::thread_type::executor);
live_executor_detail::executor_impl(std::move(backend), m_root_comm.get(), m_submission_queue, dlg, policy).run();
}

} // namespace celerity::detail
15 changes: 14 additions & 1 deletion src/named_threads.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

#include <fmt/format.h>

#include "affinity.h"
#include "tracy.h"

namespace celerity::detail::named_threads {

thread_type tt_device_submitter(const uint32_t n) {
Expand All @@ -28,7 +31,7 @@ std::string thread_type_to_string(const thread_type t_type) {
default: break;
}
if(t_type >= thread_type::first_device_submitter && t_type < thread_type::first_host_queue) { //
return fmt::format("cy-be-sub-{}", static_cast<uint32_t>(t_type) - static_cast<uint32_t>(thread_type::first_device_submitter));
return fmt::format("cy-dev-sub-{}", static_cast<uint32_t>(t_type) - static_cast<uint32_t>(thread_type::first_device_submitter));
}
if(t_type >= thread_type::first_host_queue && t_type < thread_type::first_test) { //
return fmt::format("cy-host-{}", static_cast<uint32_t>(t_type) - static_cast<uint32_t>(thread_type::first_host_queue));
Expand All @@ -39,4 +42,14 @@ std::string thread_type_to_string(const thread_type t_type) {
return fmt::format("unknown({})", static_cast<uint32_t>(t_type));
}

// Sets the name for the invoking thread to its canonical string representation using OS-specific functions, if available
// Has a per-platform implementation in the platform-specific files
void set_current_thread_name(const thread_type t_type);

void name_and_pin_and_order_this_thread(const named_threads::thread_type t_type) {
set_current_thread_name(t_type);
thread_pinning::pin_this_thread(t_type);
CELERITY_DETAIL_TRACY_SET_THREAD_NAME_AND_ORDER(t_type);
}

} // namespace celerity::detail::named_threads
2 changes: 1 addition & 1 deletion src/platform_specific/affinity.unix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ void pin_this_thread(const thread_type t_type) {
if(!g_state.initialized || !g_state.config.enabled) return;

if(!g_state.thread_pinning_plan.contains(t_type)) {
// this is fine, not all threads need to be pinned in each plan, but all should use the `name_and_pin_thread` function for consistency and potential
// this is fine, not all threads need to be pinned in each plan, but all should call this function for consistency and potential
// future (e.g. NUMA-aware) pinning strategies
CELERITY_TRACE("Affinity: thread '{}' is not pinned.", thread_type_to_string(t_type));
return;
Expand Down
2 changes: 1 addition & 1 deletion src/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ namespace detail {
.hardcoded_core_ids = pin_cfg.hardcoded_core_ids,
};
m_thread_pinner = std::make_unique<thread_pinning::thread_pinner>(thread_pinning_cfg);
thread_pinning::name_and_pin_thread(named_threads::thread_type::application);
name_and_pin_and_order_this_thread(named_threads::thread_type::application);
}

const sycl_backend::configuration backend_config = {
Expand Down
6 changes: 2 additions & 4 deletions src/scheduler.cc
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
#include "scheduler.h"

#include "affinity.h"
#include "command_graph_generator.h"
#include "instruction_graph_generator.h"
#include "log.h"
#include "named_threads.h"
#include "recorders.h"
#include "thread_queue.h"
#include "tracy.h"

#include <matchbox.hh>
Expand Down Expand Up @@ -125,7 +122,8 @@ namespace detail {
}

void scheduler::thread_main() {
thread_body(named_threads::thread_type::scheduler, [&] { schedule(); });
name_and_pin_and_order_this_thread(named_threads::thread_type::scheduler);
schedule();
}

} // namespace detail
Expand Down
2 changes: 1 addition & 1 deletion test/dag_benchmarks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class restartable_thread {

void main() {
// This thread is used for scheduling, so pin it to the scheduler core
thread_pinning::name_and_pin_thread(named_threads::thread_type::scheduler);
name_and_pin_and_order_this_thread(named_threads::thread_type::scheduler);
std::unique_lock lk{m_mutex};
for(;;) {
m_update.wait(lk, [this] { return !std::holds_alternative<empty>(m_next); });
Expand Down
11 changes: 9 additions & 2 deletions test/named_threads_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ TEST_CASE("semantic thread type enum entries can be constructed and turned into
CHECK(thread_type_to_string(thread_type::scheduler) == "cy-scheduler");
CHECK(thread_type_to_string(thread_type::executor) == "cy-executor");
CHECK(thread_type_to_string(thread_type::alloc) == "cy-alloc");
CHECK(thread_type_to_string(thread_type::first_device_submitter) == "cy-be-sub-0");
CHECK(thread_type_to_string(tt_device_submitter(13)) == "cy-be-sub-13");
CHECK(thread_type_to_string(thread_type::first_device_submitter) == "cy-dev-sub-0");
CHECK(thread_type_to_string(tt_device_submitter(13)) == "cy-dev-sub-13");
CHECK(thread_type_to_string(thread_type::first_host_queue) == "cy-host-0");
CHECK(thread_type_to_string(tt_host_queue(42)) == "cy-host-42");
CHECK(thread_type_to_string(thread_type::first_test) == "cy-test-0");
Expand All @@ -33,6 +33,13 @@ TEST_CASE("semantic thread type enum entries can be constructed and turned into

#if CELERITY_DETAIL_HAS_NAMED_THREADS

namespace celerity::detail::named_threads {
// These functions have a per-platform implementation in the platform-specific files
// They only work if CELERITY_DETAIL_HAS_NAMED_THREADS is defined
std::string get_thread_name(const std::thread::native_handle_type thread_handle);
std::string get_current_thread_name();
} // namespace celerity::detail::named_threads

TEST_CASE_METHOD(test_utils::runtime_fixture, "thread names are set", "[named_threads]") {
queue q;

Expand Down
2 changes: 1 addition & 1 deletion test/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ namespace test_utils {
.use_backend_device_submission_threads = false,
};
m_thread_pinner.emplace(cfg);
detail::thread_pinning::name_and_pin_thread(detail::named_threads::thread_type::application);
name_and_pin_and_order_this_thread(detail::named_threads::thread_type::application);
}

std::optional<detail::thread_pinning::thread_pinner> m_thread_pinner;
Expand Down

0 comments on commit 051d708

Please sign in to comment.