Skip to content

Commit

Permalink
Refactoring: introduce a single source of truth for thread names
Browse files Browse the repository at this point in the history
- also deduplicate thread body boilerplate
  • Loading branch information
PeterTh committed Nov 25, 2024
1 parent 2cfcb69 commit e797b17
Show file tree
Hide file tree
Showing 23 changed files with 247 additions and 206 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ set(SOURCES
src/grid.cc
src/instruction_graph_generator.cc
src/live_executor.cc
src/named_threads.cc
src/out_of_order_engine.cc
src/print_graph.cc
src/recorders.cc
Expand Down
21 changes: 4 additions & 17 deletions include/affinity.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#pragma once

#include <cstdint>
#include <string>
#include <string_view>
#include <vector>

#include "named_threads.h"

// The goal of this thread pinning mechanism, when enabled, is to ensure that threads which benefit from fast communication
// are pinned to cores that are close to each other in terms of cache hierarchy.
// It currently accomplishes this by pinning threads to cores in a round-robin fashion according to their order in the `thread_type` enum.
// It currently accomplishes this by pinning threads to cores in a round-robin fashion according to their order in the `named_threads::thread_type` enum.
//
// In terms of interface design, the goal is to provide a very simple entry point (`pin_this_thread`), that is safe to use from any thread at any time,
// and does not require polluting any other modules with state related to thread pinning. The `thread_pinner` RAII class offers the only way to manage the
Expand All @@ -16,20 +17,6 @@
// TODO: A future extension would be to respect NUMA for threads performing memory operations, but this requires in-depth knowledge of the system's topology.
namespace celerity::detail::thread_pinning {

constexpr uint32_t thread_type_step = 10000;

// The threads Celerity interacts with ("application") and creates (everything else), identified for the purpose of pinning.
// Note: this is not an enum class to make interactions such as specifying `first_backend_worker+i` easier
enum thread_type : uint32_t {
application = 0 * thread_type_step,
scheduler = 1 * thread_type_step,
executor = 2 * thread_type_step,
first_device_submitter = 3 * thread_type_step,
first_host_queue = 4 * thread_type_step,
max = 5 * thread_type_step,
};
std::string thread_type_to_string(const thread_type t_type);

// User-level configuration of the thread pinning mechanism (set by the user via environment variables)
struct environment_configuration {
bool enabled = true; // we want thread pinning to be enabled by default
Expand Down Expand Up @@ -85,6 +72,6 @@ class thread_pinner {

// Pins the invoking thread of type `t_type` according to the current configuration
// This is a no-op if the thread pinning machinery is not currently initialized (by a `thread_pinner` instance)
void pin_this_thread(const thread_type t_type);
void pin_this_thread(const named_threads::thread_type t_type);

} // namespace celerity::detail::thread_pinning
30 changes: 25 additions & 5 deletions include/named_threads.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,34 @@
#pragma once

#include <cstdint>
#include <string>
#include <thread>

namespace celerity::detail {
namespace celerity::detail::named_threads {

std::thread::native_handle_type get_current_thread_handle();
constexpr uint32_t thread_type_step = 10000;

void set_thread_name(const std::thread::native_handle_type thread_handle, const std::string& name);
// The threads Celerity interacts with ("application") and creates (everything else), identified for the purpose of naming and pinning.
enum class thread_type : uint32_t {
application = 0 * thread_type_step, // pinned
scheduler = 1 * thread_type_step, // pinned
executor = 2 * thread_type_step, // pinned
alloc = 3 * thread_type_step, //
first_device_submitter = 4 * thread_type_step, // pinned
first_host_queue = 5 * thread_type_step, //
first_test = 6 * thread_type_step, //
max = 7 * thread_type_step, //
};
// Builds the n-th thread types of various kinds
thread_type tt_device_submitter(const uint32_t n);
thread_type tt_host_queue(const uint32_t n);
thread_type tt_test(const uint32_t n);

std::string get_thread_name(const std::thread::native_handle_type thread_handle);
// Converts a thread type to a canoncial string representation
std::string thread_type_to_string(const thread_type t_type);

} // namespace celerity::detail
// Performs naming, pinning and tracy ordering (if enabled for this thread) of the invoking thread
// This should be the first thing called in any thread that is part of the Celerity runtime
void name_and_pin_and_order_this_thread(const named_threads::thread_type t_type);

} // namespace celerity::detail::named_threads
22 changes: 7 additions & 15 deletions include/thread_queue.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#pragma once

#include "affinity.h"
#include "async_event.h"
#include "double_buffered_queue.h"
#include "named_threads.h"
#include "tracy.h"
#include "utils.h"

#include <chrono>
#include <future>
Expand All @@ -22,7 +21,7 @@ class thread_queue {

/// Spawns a new thread queue with the given thread name. If `enable_profiling` is set to `true`, completed events from this thread queue will report a
/// non-nullopt duration.
explicit thread_queue(std::string thread_name, const bool enable_profiling = false) : m_impl(new impl(std::move(thread_name), enable_profiling)) {}
explicit thread_queue(const named_threads::thread_type t_type, const bool enable_profiling = false) : m_impl(new impl(t_type, enable_profiling)) {}

// thread_queue is movable, but not copyable.
thread_queue(const thread_queue&) = delete;
Expand Down Expand Up @@ -113,7 +112,8 @@ class thread_queue {
const bool enable_profiling;
std::thread thread;

explicit impl(std::string name, const bool enable_profiling) : enable_profiling(enable_profiling), thread(&impl::thread_main, this, std::move(name)) {}
explicit impl(const named_threads::thread_type t_type, const bool enable_profiling)
: enable_profiling(enable_profiling), thread(&impl::thread_main, this, t_type) {}

void execute(job& job) const {
std::chrono::steady_clock::time_point start;
Expand All @@ -139,17 +139,9 @@ class thread_queue {
}
}

void thread_main(const std::string& name) {
set_thread_name(get_current_thread_handle(), name);
CELERITY_DETAIL_TRACY_SET_THREAD_NAME_AND_ORDER(tracy_detail::leak_name(name), tracy_detail::thread_order::thread_queue);

try {
loop();
} catch(std::exception& e) { //
utils::panic("exception in {}: {}", name, e.what());
} catch(...) { //
utils::panic("exception in {}", name);
}
void thread_main(const named_threads::thread_type t_type) {
name_and_pin_and_order_this_thread(t_type);
loop();
}
};

Expand Down
20 changes: 17 additions & 3 deletions include/tracy.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "version.h"

#if CELERITY_TRACY_SUPPORT

#include "config.h"
Expand All @@ -18,7 +20,7 @@ namespace celerity::detail::tracy_detail {
// This is intentionally not an atomic, as parts of Celerity (= live_executor) expect it not to change after runtime startup.
// We start with `full` tracing to see the runtime startup trigger (i.e. buffer / queue construction), and adjust the setting in runtime::runtime() immediately
// after parsing the config.
inline tracy_mode g_tracy_mode = tracy_mode::full;
inline tracy_mode g_tracy_mode = tracy_mode::full; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)

/// Tracy is enabled via environment variable, either in fast or full mode.
inline bool is_enabled() { return g_tracy_mode != tracy_mode::off; }
Expand Down Expand Up @@ -71,11 +73,22 @@ enum thread_order : int32_t {

/// Tracy requires thread and fiber names to be live for the duration of the program, so if they are formatted dynamically, we need to leak them.
inline const char* leak_name(const std::string& name) {
auto* leaked = malloc(name.size() + 1);
auto* leaked = malloc(name.size() + 1); // NOLINT
memcpy(leaked, name.data(), name.size() + 1);
return static_cast<const char*>(leaked);
}

inline void set_thread_name_and_order(const named_threads::thread_type t_type) {
const auto name = named_threads::thread_type_to_string(t_type);
int32_t order = 0;
switch(t_type) {
case named_threads::thread_type::scheduler: order = thread_order::scheduler; break;
case named_threads::thread_type::executor: order = thread_order::executor; break;
default: order = thread_order::thread_queue; break;
}
::tracy::SetThreadNameWithHint(leak_name(name), order);
}

} // namespace celerity::detail::tracy_detail

#define CELERITY_DETAIL_IF_TRACY_SUPPORTED(...) __VA_ARGS__
Expand All @@ -102,4 +115,5 @@ inline const char* leak_name(const std::string& name) {
CELERITY_DETAIL_TRACY_ZONE_SCOPED(TAG, COLOR_NAME); \
CELERITY_DETAIL_TRACY_ZONE_NAME(__VA_ARGS__);

#define CELERITY_DETAIL_TRACY_SET_THREAD_NAME_AND_ORDER(NAME, ORDER) CELERITY_DETAIL_IF_TRACY_ENABLED(::tracy::SetThreadNameWithHint(NAME, ORDER))
#define CELERITY_DETAIL_TRACY_SET_THREAD_NAME_AND_ORDER(THREAD_TYPE) \
CELERITY_DETAIL_IF_TRACY_ENABLED(::celerity::detail::tracy_detail::set_thread_name_and_order(THREAD_TYPE))
14 changes: 0 additions & 14 deletions src/affinity.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,6 @@

namespace celerity::detail::thread_pinning {

std::string thread_type_to_string(const thread_type t_type) {
switch(t_type) {
case thread_type::application: return "application";
case thread_type::scheduler: return "scheduler";
case thread_type::executor: return "executor";
default: break;
}
if(t_type >= thread_type::first_device_submitter && t_type < thread_type::first_host_queue) {
return fmt::format("device_submitter_{}", t_type - thread_type::first_device_submitter);
}
if(t_type >= thread_type::first_host_queue && t_type < thread_type::max) { return fmt::format("host_queue_{}", t_type - thread_type::first_host_queue); }
return fmt::format("unknown({})", static_cast<uint32_t>(t_type));
}

namespace {
// When we no longer need to support compilers without a working std::views::split, get rid of this function
std::vector<std::string> split(const std::string_view str, const char delim) {
Expand Down
14 changes: 6 additions & 8 deletions src/backend/sycl_backend.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#include "backend/sycl_backend.h"

#include "affinity.h"
#include "closure_hydrator.h"
#include "dense_map.h"
#include "named_threads.h"
#include "nd_memory.h"
#include "system_info.h"
#include "thread_queue.h"
#include "tracy.h"
#include "types.h"

#include <atomic>
Expand Down Expand Up @@ -107,7 +108,7 @@ struct sycl_backend::impl {
// - TODO assert that all devices belong to the same platform + backend here
// - TODO test Celerity on a (SimSYCL) system without GPUs
: sycl_context(all_devices.at(0)), //
alloc_queue("cy-alloc", enable_profiling) {}
alloc_queue(named_threads::thread_type::alloc, enable_profiling) {}
};

system_info system;
Expand Down Expand Up @@ -152,7 +153,7 @@ struct sycl_backend::impl {

thread_queue& get_host_queue(const size_t lane) {
assert(lane <= host.queues.size());
if(lane == host.queues.size()) { host.queues.emplace_back(fmt::format("cy-host-{}", lane), config.profiling); }
if(lane == host.queues.size()) { host.queues.emplace_back(named_threads::tt_host_queue(lane), config.profiling); }
return host.queues[lane];
}

Expand All @@ -172,12 +173,9 @@ sycl_backend::sycl_backend(const std::vector<sycl::device>& devices, const confi
// Initialize a submission thread with hydrator for each device, if they are enabled
if(m_impl->config.per_device_submission_threads) {
for(device_id did = 0; did < m_impl->system.devices.size(); ++did) {
m_impl->devices[did].submission_thread.emplace(fmt::format("cy-be-sub-{}", did.value), m_impl->config.profiling);
m_impl->devices[did].submission_thread.emplace(named_threads::tt_device_submitter(did.value), m_impl->config.profiling);
// no need to wait for the event -> will happen before the first task is submitted
(void)m_impl->devices[did].submission_thread->submit([did] {
thread_pinning::pin_this_thread(thread_pinning::thread_type(thread_pinning::thread_type::first_device_submitter + did.value));
closure_hydrator::make_available();
});
(void)m_impl->devices[did].submission_thread->submit([did] { closure_hydrator::make_available(); });
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/dry_run_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

namespace celerity::detail {

dry_run_executor::dry_run_executor(executor::delegate* const dlg) : m_thread(&dry_run_executor::thread_main, this, dlg) {
set_thread_name(m_thread.native_handle(), "cy-executor");
}
dry_run_executor::dry_run_executor(executor::delegate* const dlg) : m_thread(&dry_run_executor::thread_main, this, dlg) {}

dry_run_executor::~dry_run_executor() { m_thread.join(); }

Expand All @@ -33,6 +31,7 @@ void dry_run_executor::submit(std::vector<const instruction*> instructions, std:
}

void dry_run_executor::thread_main(executor::delegate* const dlg) {
name_and_pin_and_order_this_thread(named_threads::thread_type::executor);
// For simplicity we keep all executor state within this function.
std::unordered_map<host_object_id, std::unique_ptr<host_object_instance>> host_object_instances;
bool shutdown = false;
Expand Down
21 changes: 3 additions & 18 deletions src/live_executor.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "live_executor.h"
#include "affinity.h"
#include "backend/backend.h"
#include "closure_hydrator.h"
#include "communicator.h"
Expand Down Expand Up @@ -924,10 +923,7 @@ std::unique_ptr<boundary_check_info> executor_impl::attach_boundary_check_info(s
namespace celerity::detail {

live_executor::live_executor(std::unique_ptr<backend> backend, std::unique_ptr<communicator> root_comm, executor::delegate* const dlg, const policy_set& policy)
: m_root_comm(std::move(root_comm)), m_thread(&live_executor::thread_main, this, std::move(backend), dlg, policy) //
{
set_thread_name(m_thread.native_handle(), "cy-executor");
}
: m_root_comm(std::move(root_comm)), m_thread(&live_executor::thread_main, this, std::move(backend), dlg, policy) {}

live_executor::~live_executor() {
m_thread.join(); // thread_main will exit only after executing shutdown epoch
Expand All @@ -952,19 +948,8 @@ void live_executor::submit(std::vector<const instruction*> instructions, std::ve
}

void live_executor::thread_main(std::unique_ptr<backend> backend, executor::delegate* const dlg, const policy_set& policy) {
CELERITY_DETAIL_TRACY_SET_THREAD_NAME_AND_ORDER("cy-executor", tracy_detail::thread_order::executor);

thread_pinning::pin_this_thread(thread_pinning::thread_type::executor);

try {
live_executor_detail::executor_impl(std::move(backend), m_root_comm.get(), m_submission_queue, dlg, policy).run();
}
// LCOV_EXCL_START
catch(const std::exception& e) {
CELERITY_CRITICAL("[executor] {}", e.what());
std::abort();
}
// LCOV_EXCL_STOP
name_and_pin_and_order_this_thread(named_threads::thread_type::executor);
live_executor_detail::executor_impl(std::move(backend), m_root_comm.get(), m_submission_queue, dlg, policy).run();
}

} // namespace celerity::detail
55 changes: 55 additions & 0 deletions src/named_threads.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include "named_threads.h"

#include <cassert>

#include <fmt/format.h>

#include "affinity.h"
#include "tracy.h"

namespace celerity::detail::named_threads {

thread_type tt_device_submitter(const uint32_t n) {
assert(n < thread_type_step);
return thread_type(static_cast<uint32_t>(thread_type::first_device_submitter) + n); // NOLINT(clang-analyzer-optin.core.EnumCastOutOfRange)
}
thread_type tt_host_queue(const uint32_t n) {
assert(n < thread_type_step);
return thread_type(static_cast<uint32_t>(thread_type::first_host_queue) + n); // NOLINT(clang-analyzer-optin.core.EnumCastOutOfRange)
}
thread_type tt_test(const uint32_t n) {
assert(n < thread_type_step);
return thread_type(static_cast<uint32_t>(thread_type::first_test) + n); // NOLINT(clang-analyzer-optin.core.EnumCastOutOfRange)
}

std::string thread_type_to_string(const thread_type t_type) {
switch(t_type) {
case thread_type::application: return "cy-application";
case thread_type::scheduler: return "cy-scheduler";
case thread_type::executor: return "cy-executor";
case thread_type::alloc: return "cy-alloc";
default: break;
}
if(t_type >= thread_type::first_device_submitter && t_type < thread_type::first_host_queue) { //
return fmt::format("cy-dev-sub-{}", static_cast<uint32_t>(t_type) - static_cast<uint32_t>(thread_type::first_device_submitter));
}
if(t_type >= thread_type::first_host_queue && t_type < thread_type::first_test) { //
return fmt::format("cy-host-{}", static_cast<uint32_t>(t_type) - static_cast<uint32_t>(thread_type::first_host_queue));
}
if(t_type >= thread_type::first_test && t_type <= thread_type::max) { //
return fmt::format("cy-test-{}", static_cast<uint32_t>(t_type) - static_cast<uint32_t>(thread_type::first_test));
}
return fmt::format("unknown({})", static_cast<uint32_t>(t_type));
}

// Sets the name for the invoking thread to its canonical string representation using OS-specific functions, if available
// Has a per-platform implementation in the platform-specific files
void set_current_thread_name(const thread_type t_type);

void name_and_pin_and_order_this_thread(const named_threads::thread_type t_type) {
set_current_thread_name(t_type);
thread_pinning::pin_this_thread(t_type);
CELERITY_DETAIL_TRACY_SET_THREAD_NAME_AND_ORDER(t_type);
}

} // namespace celerity::detail::named_threads
Loading

0 comments on commit e797b17

Please sign in to comment.