diff --git a/src/core/operator/bfs_state.cpp b/src/core/operator/bfs_state.cpp index c651338..59d3de9 100644 --- a/src/core/operator/bfs_state.cpp +++ b/src/core/operator/bfs_state.cpp @@ -33,7 +33,6 @@ BFSState::BFSState(shared_ptr pairs_, CSR *csr_, idx_t num_threads_, visit1 = vector>(v_size); visit2 = vector>(v_size); seen = vector>(v_size); - element_locks = vector(v_size); parents_ve = std::vector>( v_size, std::array{}); @@ -46,9 +45,8 @@ BFSState::BFSState(shared_ptr pairs_, CSR *csr_, idx_t num_threads_, // Initialize the thread assignment vector thread_assignment = std::vector(v_size, -1); - CreateTasks(); - scheduled_threads = std::min(num_threads, (idx_t)global_task_queue.size()); - barrier = make_uniq(scheduled_threads); + // CreateTasks(); + barrier = make_uniq(num_threads); } void BFSState::Clear() { @@ -72,67 +70,6 @@ void BFSState::Clear() { lane_completed.reset(); } -void BFSState::CreateTasks() { - // workerTasks[workerId] = [task1, task2, ...] - // vector>> worker_tasks(num_threads); - // auto cur_worker = 0; - int64_t *v = (int64_t *)csr->v; - int64_t current_task_edges = 0; - idx_t current_task_start = 0; - for (idx_t v_idx = 0; v_idx < (idx_t)v_size; v_idx++) { - auto number_of_edges = v[v_idx + 1] - v[v_idx]; - if (current_task_edges + number_of_edges > split_size) { - global_task_queue.push_back({current_task_start, v_idx}); - current_task_start = v_idx; - current_task_edges = 0; // reset - } - current_task_edges += number_of_edges; - } - - // Final task if there are any remaining edges - if (current_task_start < (idx_t)v_size) { - global_task_queue.push_back({current_task_start, v_size}); - } -} - - -shared_ptr> BFSState::FetchTask() { - std::unique_lock lock(queue_mutex); // Lock the mutex to access the queue - - // Avoid unnecessary waiting if no tasks are available - if (current_task_index >= global_task_queue.size()) { - // std::cout << "FetchTask: No more tasks available. Exiting." << std::endl; - return nullptr; // No more tasks - } - - // Wait until a task is available or the queue is finalized - queue_cv.wait(lock, [this]() { - return current_task_index < global_task_queue.size(); - }); - - // Fetch the next task and increment the task index - if (current_task_index < global_task_queue.size()) { - auto task = make_shared_ptr>(global_task_queue[current_task_index]); - current_task_index++; - - return task; - } - return nullptr; -} - -void BFSState::ResetTaskIndex() { - std::lock_guard lock(queue_mutex); // Lock to reset index safely - current_task_index = 0; // Reset the task index for the next stage - queue_cv.notify_all(); // Notify all threads that tasks are available -} - -pair BFSState::BoundaryCalculation(idx_t worker_id) const { - idx_t block_size = ceil((double)v_size / num_threads); - block_size = block_size == 0 ? 1 : block_size; - idx_t left = block_size * worker_id; - idx_t right = std::min(block_size * (worker_id + 1), (idx_t)v_size); - return {left, right}; -} void BFSState::InitializeLanes() { auto &result_validity = FlatVector::Validity(pf_results->data[0]); @@ -159,7 +96,6 @@ void BFSState::InitializeLanes() { for (int64_t i = 0; i < v_size; i++) { seen[i] = seen_mask; } - } void BFSState::ScheduleBFSBatch(Pipeline &pipeline, Event &event, const PhysicalPathFinding *op) { diff --git a/src/core/operator/event/iterative_length_event.cpp b/src/core/operator/event/iterative_length_event.cpp index 2e4f087..a163c4f 100644 --- a/src/core/operator/event/iterative_length_event.cpp +++ b/src/core/operator/event/iterative_length_event.cpp @@ -14,9 +14,8 @@ IterativeLengthEvent::IterativeLengthEvent(shared_ptr gbfs_state_p, void IterativeLengthEvent::Schedule() { auto &context = pipeline->GetClientContext(); - // std::cout << gbfs_state->csr->ToString(); vector> bfs_tasks; - for (idx_t tnum = 0; tnum < gbfs_state->scheduled_threads; tnum++) { + for (idx_t tnum = 0; tnum < gbfs_state->num_threads; tnum++) { bfs_tasks.push_back(make_uniq( shared_from_this(), context, gbfs_state, tnum, op)); } diff --git a/src/core/operator/event/shortest_path_event.cpp b/src/core/operator/event/shortest_path_event.cpp index e5e38bf..0bb7337 100644 --- a/src/core/operator/event/shortest_path_event.cpp +++ b/src/core/operator/event/shortest_path_event.cpp @@ -16,7 +16,7 @@ void ShortestPathEvent::Schedule() { auto &context = pipeline->GetClientContext(); // std::cout << gbfs_state->csr->ToString(); vector> bfs_tasks; - for (idx_t tnum = 0; tnum < gbfs_state->scheduled_threads; tnum++) { + for (idx_t tnum = 0; tnum < gbfs_state->num_threads; tnum++) { bfs_tasks.push_back(make_uniq( shared_from_this(), context, gbfs_state, tnum, op)); } diff --git a/src/core/operator/task/iterative_length_task.cpp b/src/core/operator/task/iterative_length_task.cpp index f326a59..9132afd 100644 --- a/src/core/operator/task/iterative_length_task.cpp +++ b/src/core/operator/task/iterative_length_task.cpp @@ -10,26 +10,19 @@ IterativeLengthTask::IterativeLengthTask(shared_ptr event_p, const PhysicalOperator &op_p) : ExecutorTask(context, std::move(event_p), op_p), context(context), state(state), worker_id(worker_id) { - left = right = UINT64_MAX; // NOLINT -} - -bool IterativeLengthTask::SetTaskRange() { - auto task = state->FetchTask(); - if (task == nullptr) { - return false; - } - left = task->first; - right = task->second; - return true; } void IterativeLengthTask::CheckChange(vector> &seen, vector> &next, bool &change) const { for (auto i = 0; i < state->v_size; i++) { - auto updated = next[i] & ~seen[i]; - seen[i] |= updated; - change |= updated.any(); + if (next[i].any()) { + next[i] &= ~seen[i]; + seen[i] |= next[i]; + if (next[i].any()) { + change = true; + } + } } } @@ -40,7 +33,7 @@ void IterativeLengthTask::CheckChange(vector> &seen, barrier->Wait(); if (worker_id == 0) { for (auto n = 0; n < state->v_size; n++) { - state->thread_assignment[n] = n % state->scheduled_threads; + state->thread_assignment[n] = n % state->num_threads; } state->InitializeLanes(); } @@ -102,11 +95,11 @@ void IterativeLengthTask::CheckChange(vector> &seen, } } - barrier->Wait(); // Check and process tasks for the next phase change = false; + barrier->Wait(); if (worker_id == 0) { CheckChange(seen, next, change); } diff --git a/src/core/operator/task/shortest_path_task.cpp b/src/core/operator/task/shortest_path_task.cpp index d6963b9..9472c0e 100644 --- a/src/core/operator/task/shortest_path_task.cpp +++ b/src/core/operator/task/shortest_path_task.cpp @@ -10,7 +10,6 @@ ShortestPathTask::ShortestPathTask(shared_ptr event_p, const PhysicalOperator &op_p) : ExecutorTask(context, std::move(event_p), op_p), context(context), state(state), worker_id(worker_id) { - left = right = UINT64_MAX; // NOLINT } TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) { @@ -18,26 +17,23 @@ TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) { while (state->started_searches < state->pairs->size()) { barrier->Wait(); if (worker_id == 0) { + for (idx_t n = 0; n < state->v_size; n++) { + state->thread_assignment[n] = n % state->num_threads; + } state->InitializeLanes(); } + barrier->Wait(); do { IterativePath(); // Synchronize after IterativePath barrier->Wait(); - if (worker_id == 0) { - state->ResetTaskIndex(); - } - barrier->Wait(); + if (worker_id == 0) { ReachDetect(); } barrier->Wait(); - if (worker_id == 0) { - state->ResetTaskIndex(); - } - barrier->Wait(); } while (state->change); barrier->Wait(); @@ -59,16 +55,6 @@ TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) { return TaskExecutionResult::TASK_FINISHED; } -bool ShortestPathTask::SetTaskRange() { - auto task = state->FetchTask(); - if (task == nullptr) { - return false; - } - left = task->first; - right = task->second; - return true; -} - void ShortestPathTask::IterativePath() { auto &seen = state->seen; auto &visit = state->iter & 1 ? state->visit1 : state->visit2; @@ -79,84 +65,63 @@ void ShortestPathTask::IterativePath() { auto &edge_ids = state->csr->edge_ids; auto &parents_ve = state->parents_ve; auto &change = state->change; - - bool has_tasks = SetTaskRange(); + auto &thread_assignment = state->thread_assignment; // Attempt to get a task range - while (has_tasks) { - for (auto i = left; i < right; i++) { + if (worker_id == 0) { + for (auto i = 0; i < state->v_size; i++) { next[i].reset(); } - has_tasks = SetTaskRange(); } - barrier->Wait([&]() { state->ResetTaskIndex(); }); // Synchronize after clearing barrier->Wait(); - has_tasks = SetTaskRange(); // Main processing loop - while (has_tasks) { - - for (auto i = left; i < right; i++) { - if (visit[i].any()) { - for (auto offset = v[i]; offset < v[i + 1]; offset++) { - auto n = e[offset]; + for (auto i = 0; i < state->v_size; i++) { + if (visit[i].any()) { + for (auto offset = v[i]; offset < v[i + 1]; offset++) { + auto n = e[offset]; + if (thread_assignment[n] == worker_id) { auto edge_id = edge_ids[offset]; - { - std::lock_guard lock(state->element_locks[n]); - next[n] |= visit[i]; - for (auto l = 0; l < LANE_LIMIT; l++) { - // Create the mask: true (-1 in all bits) if condition is met, else - // 0 - uint64_t mask = ((parents_ve[n][l].GetV() == -1) && visit[i][l]) - ? ~uint64_t(0) - : 0; - // Use the mask to conditionally update the `value` field of the - // `ve` struct - uint64_t new_value = - (static_cast(i) << parents_ve[n][l].e_bits) | - (edge_id & parents_ve[n][l].e_mask); - parents_ve[n][l].value = - (mask & new_value) | (~mask & parents_ve[n][l].value); - } + next[n] |= visit[i]; + for (auto l = 0; l < LANE_LIMIT; l++) { + // Create the mask: true (-1 in all bits) if condition is met, else + // 0 + uint64_t mask = ((parents_ve[n][l].GetV() == -1) && visit[i][l]) + ? ~uint64_t(0) + : 0; + + // Use the mask to conditionally update the `value` field of the + // `ve` struct + uint64_t new_value = + (static_cast(i) << parents_ve[n][l].e_bits) | + (edge_id & parents_ve[n][l].e_mask); + parents_ve[n][l].value = + (mask & new_value) | (~mask & parents_ve[n][l].value); } - } } } - // Check for a new task range - has_tasks = SetTaskRange(); } - // Synchronize at the end of the main processing - barrier->Wait([&]() { state->ResetTaskIndex(); }); - barrier->Wait(); - // Second processing stage (if needed) - has_tasks = SetTaskRange(); change = false; barrier->Wait(); - while (has_tasks) { - for (auto i = left; i < right; i++) { + if (worker_id == 0) { + for (auto i = 0; i < state->v_size; i++) { if (next[i].any()) { next[i] &= ~seen[i]; seen[i] |= next[i]; if (next[i].any()) { - std::lock_guard lock(state->change_lock); change = true; } } } - has_tasks = SetTaskRange(); } - // std::cout << "Worker: " << worker_id << " Iteration: " << state->iter << " Change: " << change << std::endl; - - // Synchronize again - barrier->Wait([&]() { state->ResetTaskIndex(); }); barrier->Wait(); } @@ -190,7 +155,6 @@ void ShortestPathTask::PathConstruction() { auto result_data = FlatVector::GetData(state->pf_results->data[0]); auto &result_validity = FlatVector::Validity(state->pf_results->data[0]); - // std::cout << "Iterations: " << state->iter << std::endl; //! Reconstruct the paths for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { int64_t search_num = state->lane_to_num[lane]; diff --git a/src/include/duckpgq/core/operator/bfs_state.hpp b/src/include/duckpgq/core/operator/bfs_state.hpp index 9e35f79..2c06e3a 100644 --- a/src/include/duckpgq/core/operator/bfs_state.hpp +++ b/src/include/duckpgq/core/operator/bfs_state.hpp @@ -24,17 +24,12 @@ class BFSState : public enable_shared_from_this { void InitializeLanes(); void Clear(); - void CreateTasks(); - shared_ptr> FetchTask(); // Function to fetch a task - void ResetTaskIndex(); - pair BoundaryCalculation(idx_t worker_id) const; shared_ptr pairs; // (src, dst) pairs CSR *csr; string mode; shared_ptr pf_results; // results of path-finding LogicalType bfs_type; - // const PhysicalPathFinding *op; int64_t iter; int64_t v_size; // Number of vertices bool change; @@ -62,20 +57,9 @@ class BFSState : public enable_shared_from_this { idx_t total_pairs_processed; idx_t num_threads; - idx_t scheduled_threads; - - // task_queues[workerId] = {curTaskIdx, queuedTasks} - // queuedTasks[curTaskIx] = {start, end} - vector> global_task_queue; - std::mutex queue_mutex; // Mutex for synchronizing access - std::condition_variable queue_cv; // Condition variable for task availability - size_t current_task_index = 0; // Index to track the current task - int64_t split_size = GetPathFindingTaskSize(context); - unique_ptr barrier; // lock for next - mutable vector element_locks; mutex change_lock; }; diff --git a/src/include/duckpgq/core/operator/task/iterative_length_task.hpp b/src/include/duckpgq/core/operator/task/iterative_length_task.hpp index 1da4ac9..99fd357 100644 --- a/src/include/duckpgq/core/operator/task/iterative_length_task.hpp +++ b/src/include/duckpgq/core/operator/task/iterative_length_task.hpp @@ -21,14 +21,10 @@ class IterativeLengthTask : public ExecutorTask { vector> &next, bool &change) const; void UnReachableSet() const; - bool SetTaskRange(); private: ClientContext &context; shared_ptr &state; - // [left, right) - idx_t left; - idx_t right; idx_t worker_id; }; diff --git a/src/include/duckpgq/core/operator/task/shortest_path_task.hpp b/src/include/duckpgq/core/operator/task/shortest_path_task.hpp index c1c99e2..e2ee604 100644 --- a/src/include/duckpgq/core/operator/task/shortest_path_task.hpp +++ b/src/include/duckpgq/core/operator/task/shortest_path_task.hpp @@ -21,13 +21,9 @@ class ShortestPathTask : public ExecutorTask { void PathConstruction(); - bool SetTaskRange(); ClientContext &context; shared_ptr &state; - // [left, right) - idx_t left; - idx_t right; idx_t worker_id; };