Skip to content

Commit

Permalink
Remove unused code. Adapt shortest path to use lock free method
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Jan 14, 2025
1 parent b4f423c commit 4067057
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 175 deletions.
68 changes: 2 additions & 66 deletions src/core/operator/bfs_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ BFSState::BFSState(shared_ptr<DataChunk> pairs_, CSR *csr_, idx_t num_threads_,
visit1 = vector<std::bitset<LANE_LIMIT>>(v_size);
visit2 = vector<std::bitset<LANE_LIMIT>>(v_size);
seen = vector<std::bitset<LANE_LIMIT>>(v_size);
element_locks = vector<std::mutex>(v_size);
parents_ve = std::vector<std::array<ve, LANE_LIMIT>>(
v_size, std::array<ve, LANE_LIMIT>{});

Expand All @@ -46,9 +45,8 @@ BFSState::BFSState(shared_ptr<DataChunk> pairs_, CSR *csr_, idx_t num_threads_,
// Initialize the thread assignment vector
thread_assignment = std::vector<int64_t>(v_size, -1);

CreateTasks();
scheduled_threads = std::min(num_threads, (idx_t)global_task_queue.size());
barrier = make_uniq<Barrier>(scheduled_threads);
// CreateTasks();
barrier = make_uniq<Barrier>(num_threads);
}

void BFSState::Clear() {
Expand All @@ -72,67 +70,6 @@ void BFSState::Clear() {
lane_completed.reset();
}

void BFSState::CreateTasks() {
// workerTasks[workerId] = [task1, task2, ...]
// vector<vector<pair<idx_t, idx_t>>> worker_tasks(num_threads);
// auto cur_worker = 0;
int64_t *v = (int64_t *)csr->v;
int64_t current_task_edges = 0;
idx_t current_task_start = 0;
for (idx_t v_idx = 0; v_idx < (idx_t)v_size; v_idx++) {
auto number_of_edges = v[v_idx + 1] - v[v_idx];
if (current_task_edges + number_of_edges > split_size) {
global_task_queue.push_back({current_task_start, v_idx});
current_task_start = v_idx;
current_task_edges = 0; // reset
}
current_task_edges += number_of_edges;
}

// Final task if there are any remaining edges
if (current_task_start < (idx_t)v_size) {
global_task_queue.push_back({current_task_start, v_size});
}
}


shared_ptr<std::pair<idx_t, idx_t>> BFSState::FetchTask() {
std::unique_lock<std::mutex> lock(queue_mutex); // Lock the mutex to access the queue

// Avoid unnecessary waiting if no tasks are available
if (current_task_index >= global_task_queue.size()) {
// std::cout << "FetchTask: No more tasks available. Exiting." << std::endl;
return nullptr; // No more tasks
}

// Wait until a task is available or the queue is finalized
queue_cv.wait(lock, [this]() {
return current_task_index < global_task_queue.size();
});

// Fetch the next task and increment the task index
if (current_task_index < global_task_queue.size()) {
auto task = make_shared_ptr<std::pair<idx_t, idx_t>>(global_task_queue[current_task_index]);
current_task_index++;

return task;
}
return nullptr;
}

void BFSState::ResetTaskIndex() {
std::lock_guard<std::mutex> lock(queue_mutex); // Lock to reset index safely
current_task_index = 0; // Reset the task index for the next stage
queue_cv.notify_all(); // Notify all threads that tasks are available
}

pair<idx_t, idx_t> BFSState::BoundaryCalculation(idx_t worker_id) const {
idx_t block_size = ceil((double)v_size / num_threads);
block_size = block_size == 0 ? 1 : block_size;
idx_t left = block_size * worker_id;
idx_t right = std::min(block_size * (worker_id + 1), (idx_t)v_size);
return {left, right};
}

void BFSState::InitializeLanes() {
auto &result_validity = FlatVector::Validity(pf_results->data[0]);
Expand All @@ -159,7 +96,6 @@ void BFSState::InitializeLanes() {
for (int64_t i = 0; i < v_size; i++) {
seen[i] = seen_mask;
}

}

void BFSState::ScheduleBFSBatch(Pipeline &pipeline, Event &event, const PhysicalPathFinding *op) {
Expand Down
3 changes: 1 addition & 2 deletions src/core/operator/event/iterative_length_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ IterativeLengthEvent::IterativeLengthEvent(shared_ptr<BFSState> gbfs_state_p,

void IterativeLengthEvent::Schedule() {
auto &context = pipeline->GetClientContext();
// std::cout << gbfs_state->csr->ToString();
vector<shared_ptr<Task>> bfs_tasks;
for (idx_t tnum = 0; tnum < gbfs_state->scheduled_threads; tnum++) {
for (idx_t tnum = 0; tnum < gbfs_state->num_threads; tnum++) {
bfs_tasks.push_back(make_uniq<IterativeLengthTask>(
shared_from_this(), context, gbfs_state, tnum, op));
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/operator/event/shortest_path_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ void ShortestPathEvent::Schedule() {
auto &context = pipeline->GetClientContext();
// std::cout << gbfs_state->csr->ToString();
vector<shared_ptr<Task>> bfs_tasks;
for (idx_t tnum = 0; tnum < gbfs_state->scheduled_threads; tnum++) {
for (idx_t tnum = 0; tnum < gbfs_state->num_threads; tnum++) {
bfs_tasks.push_back(make_uniq<ShortestPathTask>(
shared_from_this(), context, gbfs_state, tnum, op));
}
Expand Down
25 changes: 9 additions & 16 deletions src/core/operator/task/iterative_length_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,19 @@ IterativeLengthTask::IterativeLengthTask(shared_ptr<Event> event_p,
const PhysicalOperator &op_p)
: ExecutorTask(context, std::move(event_p), op_p), context(context),
state(state), worker_id(worker_id) {
left = right = UINT64_MAX; // NOLINT
}

bool IterativeLengthTask::SetTaskRange() {
auto task = state->FetchTask();
if (task == nullptr) {
return false;
}
left = task->first;
right = task->second;
return true;
}

void IterativeLengthTask::CheckChange(vector<std::bitset<LANE_LIMIT>> &seen,
vector<std::bitset<LANE_LIMIT>> &next,
bool &change) const {
for (auto i = 0; i < state->v_size; i++) {
auto updated = next[i] & ~seen[i];
seen[i] |= updated;
change |= updated.any();
if (next[i].any()) {
next[i] &= ~seen[i];
seen[i] |= next[i];
if (next[i].any()) {
change = true;
}
}
}
}

Expand All @@ -40,7 +33,7 @@ void IterativeLengthTask::CheckChange(vector<std::bitset<LANE_LIMIT>> &seen,
barrier->Wait();
if (worker_id == 0) {
for (auto n = 0; n < state->v_size; n++) {
state->thread_assignment[n] = n % state->scheduled_threads;
state->thread_assignment[n] = n % state->num_threads;
}
state->InitializeLanes();
}
Expand Down Expand Up @@ -102,11 +95,11 @@ void IterativeLengthTask::CheckChange(vector<std::bitset<LANE_LIMIT>> &seen,
}
}

barrier->Wait();

// Check and process tasks for the next phase
change = false;

barrier->Wait();
if (worker_id == 0) {
CheckChange(seen, next, change);
}
Expand Down
96 changes: 30 additions & 66 deletions src/core/operator/task/shortest_path_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,30 @@ ShortestPathTask::ShortestPathTask(shared_ptr<Event> event_p,
const PhysicalOperator &op_p)
: ExecutorTask(context, std::move(event_p), op_p), context(context),
state(state), worker_id(worker_id) {
left = right = UINT64_MAX; // NOLINT
}

TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) {
auto &barrier = state->barrier;
while (state->started_searches < state->pairs->size()) {
barrier->Wait();
if (worker_id == 0) {
for (idx_t n = 0; n < state->v_size; n++) {
state->thread_assignment[n] = n % state->num_threads;
}
state->InitializeLanes();
}

barrier->Wait();
do {
IterativePath();

// Synchronize after IterativePath
barrier->Wait();
if (worker_id == 0) {
state->ResetTaskIndex();
}
barrier->Wait();

if (worker_id == 0) {
ReachDetect();
}
barrier->Wait();
if (worker_id == 0) {
state->ResetTaskIndex();
}
barrier->Wait();
} while (state->change);

barrier->Wait();
Expand All @@ -59,16 +55,6 @@ TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) {
return TaskExecutionResult::TASK_FINISHED;
}

bool ShortestPathTask::SetTaskRange() {
auto task = state->FetchTask();
if (task == nullptr) {
return false;
}
left = task->first;
right = task->second;
return true;
}

void ShortestPathTask::IterativePath() {
auto &seen = state->seen;
auto &visit = state->iter & 1 ? state->visit1 : state->visit2;
Expand All @@ -79,84 +65,63 @@ void ShortestPathTask::IterativePath() {
auto &edge_ids = state->csr->edge_ids;
auto &parents_ve = state->parents_ve;
auto &change = state->change;

bool has_tasks = SetTaskRange();
auto &thread_assignment = state->thread_assignment;

// Attempt to get a task range
while (has_tasks) {
for (auto i = left; i < right; i++) {
if (worker_id == 0) {
for (auto i = 0; i < state->v_size; i++) {
next[i].reset();
}
has_tasks = SetTaskRange();
}

barrier->Wait([&]() { state->ResetTaskIndex(); });

// Synchronize after clearing
barrier->Wait();

has_tasks = SetTaskRange();
// Main processing loop
while (has_tasks) {

for (auto i = left; i < right; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
for (auto i = 0; i < state->v_size; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
if (thread_assignment[n] == worker_id) {
auto edge_id = edge_ids[offset];
{
std::lock_guard<std::mutex> lock(state->element_locks[n]);
next[n] |= visit[i];
for (auto l = 0; l < LANE_LIMIT; l++) {
// Create the mask: true (-1 in all bits) if condition is met, else
// 0
uint64_t mask = ((parents_ve[n][l].GetV() == -1) && visit[i][l])
? ~uint64_t(0)
: 0;

// Use the mask to conditionally update the `value` field of the
// `ve` struct
uint64_t new_value =
(static_cast<uint64_t>(i) << parents_ve[n][l].e_bits) |
(edge_id & parents_ve[n][l].e_mask);
parents_ve[n][l].value =
(mask & new_value) | (~mask & parents_ve[n][l].value);
}
next[n] |= visit[i];
for (auto l = 0; l < LANE_LIMIT; l++) {
// Create the mask: true (-1 in all bits) if condition is met, else
// 0
uint64_t mask = ((parents_ve[n][l].GetV() == -1) && visit[i][l])
? ~uint64_t(0)
: 0;

// Use the mask to conditionally update the `value` field of the
// `ve` struct
uint64_t new_value =
(static_cast<uint64_t>(i) << parents_ve[n][l].e_bits) |
(edge_id & parents_ve[n][l].e_mask);
parents_ve[n][l].value =
(mask & new_value) | (~mask & parents_ve[n][l].value);
}

}
}
}

// Check for a new task range
has_tasks = SetTaskRange();
}

// Synchronize at the end of the main processing
barrier->Wait([&]() { state->ResetTaskIndex(); });
barrier->Wait();

// Second processing stage (if needed)
has_tasks = SetTaskRange();
change = false;
barrier->Wait();
while (has_tasks) {
for (auto i = left; i < right; i++) {
if (worker_id == 0) {
for (auto i = 0; i < state->v_size; i++) {
if (next[i].any()) {
next[i] &= ~seen[i];
seen[i] |= next[i];
if (next[i].any()) {
std::lock_guard<std::mutex> lock(state->change_lock);
change = true;
}
}
}
has_tasks = SetTaskRange();
}
// std::cout << "Worker: " << worker_id << " Iteration: " << state->iter << " Change: " << change << std::endl;

// Synchronize again
barrier->Wait([&]() { state->ResetTaskIndex(); });
barrier->Wait();
}

Expand Down Expand Up @@ -190,7 +155,6 @@ void ShortestPathTask::PathConstruction() {
auto result_data =
FlatVector::GetData<list_entry_t>(state->pf_results->data[0]);
auto &result_validity = FlatVector::Validity(state->pf_results->data[0]);
// std::cout << "Iterations: " << state->iter << std::endl;
//! Reconstruct the paths
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = state->lane_to_num[lane];
Expand Down
16 changes: 0 additions & 16 deletions src/include/duckpgq/core/operator/bfs_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,12 @@ class BFSState : public enable_shared_from_this<BFSState> {
void InitializeLanes();
void Clear();

void CreateTasks();
shared_ptr<pair<idx_t, idx_t>> FetchTask(); // Function to fetch a task
void ResetTaskIndex();

pair<idx_t, idx_t> BoundaryCalculation(idx_t worker_id) const;
shared_ptr<DataChunk> pairs; // (src, dst) pairs
CSR *csr;
string mode;
shared_ptr<DataChunk> pf_results; // results of path-finding
LogicalType bfs_type;
// const PhysicalPathFinding *op;
int64_t iter;
int64_t v_size; // Number of vertices
bool change;
Expand Down Expand Up @@ -62,20 +57,9 @@ class BFSState : public enable_shared_from_this<BFSState> {

idx_t total_pairs_processed;
idx_t num_threads;
idx_t scheduled_threads;

// task_queues[workerId] = {curTaskIdx, queuedTasks}
// queuedTasks[curTaskIx] = {start, end}
vector<pair<idx_t, idx_t>> global_task_queue;
std::mutex queue_mutex; // Mutex for synchronizing access
std::condition_variable queue_cv; // Condition variable for task availability
size_t current_task_index = 0; // Index to track the current task
int64_t split_size = GetPathFindingTaskSize(context);

unique_ptr<Barrier> barrier;

// lock for next
mutable vector<mutex> element_locks;
mutex change_lock;
};

Expand Down
Loading

0 comments on commit 4067057

Please sign in to comment.