Skip to content

Commit

Permalink
Move hottest part of code to separate function
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Jan 14, 2025
1 parent 4067057 commit fc13438
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 67 deletions.
83 changes: 45 additions & 38 deletions src/core/operator/task/iterative_length_task.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "duckpgq/core/operator/task/iterative_length_task.hpp"
#include <duckpgq/core/operator/physical_path_finding_operator.hpp>
#include <chrono>

namespace duckpgq {
namespace core {
Expand Down Expand Up @@ -28,42 +29,59 @@ void IterativeLengthTask::CheckChange(vector<std::bitset<LANE_LIMIT>> &seen,


TaskExecutionResult IterativeLengthTask::ExecuteTask(TaskExecutionMode mode) {
auto &barrier = state->barrier;
while (state->started_searches < state->pairs->size()) {
barrier->Wait();
if (worker_id == 0) {
for (auto n = 0; n < state->v_size; n++) {
state->thread_assignment[n] = n % state->num_threads;
}
state->InitializeLanes();
auto &barrier = state->barrier;
while (state->started_searches < state->pairs->size()) {
barrier->Wait();
if (worker_id == 0) {
for (auto n = 0; n < state->v_size; n++) {
state->thread_assignment[n] = n % state->num_threads;
}
state->InitializeLanes();
}
barrier->Wait();
do {
IterativeLength();
barrier->Wait();
do {
IterativeLength();
barrier->Wait();

if (worker_id == 0) {
ReachDetect();
}
barrier->Wait();
} while (state->change);
if (worker_id == 0) {
UnReachableSet();
}

// Final synchronization before finishing
barrier->Wait();
if (worker_id == 0) {
state->Clear();
ReachDetect();
}
barrier->Wait();
} while (state->change);
if (worker_id == 0) {
UnReachableSet();
}

event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;
// Final synchronization before finishing
barrier->Wait();
if (worker_id == 0) {
state->Clear();
}
barrier->Wait();
}

void IterativeLengthTask::IterativeLength() {
event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;
}

void IterativeLengthTask::Explore(vector<std::bitset<LANE_LIMIT>> &visit,
vector<std::bitset<LANE_LIMIT>> &next, int64_t *v,
vector<int64_t> &e,
std::vector<int64_t> &thread_assignment) {
for (auto i = 0; i < state->v_size; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
// Check if this thread is responsible for the destination
if (thread_assignment[n] == worker_id) {
next[n] |= visit[i];
}
}
}
}
}

void IterativeLengthTask::IterativeLength() {
auto &seen = state->seen;
auto &visit = state->iter & 1 ? state->visit1 : state->visit2;
auto &next = state->iter & 1 ? state->visit2 : state->visit1;
Expand All @@ -83,18 +101,7 @@ void IterativeLengthTask::CheckChange(vector<std::bitset<LANE_LIMIT>> &seen,
// Synchronize after clearing
barrier->Wait();

for (auto i = 0; i < state->v_size; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
// Check if this thread is responsible for the destination
if (thread_assignment[n] == worker_id) {
next[n] |= visit[i];
}
}
}
}

Explore(visit, next, v, e, thread_assignment);

// Check and process tasks for the next phase
change = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class IterativeLengthTask : public ExecutorTask {
bool &change) const;
void UnReachableSet() const;

void Explore(vector<std::bitset<LANE_LIMIT>> &visit,
vector<std::bitset<LANE_LIMIT>> &next, int64_t *v,
vector<int64_t> &e,
std::vector<int64_t> &thread_assignment);

private:
ClientContext &context;
shared_ptr<BFSState> &state;
Expand Down
58 changes: 29 additions & 29 deletions test/snb1_perf.test
Original file line number Diff line number Diff line change
Expand Up @@ -50,33 +50,33 @@ with csr_cte as (
JOIN person c on c.id = k.person2id)
SELECT src as source, dst as destination, iterativelengthoperator(src, dst, csr_id) as path FROM snb_pairs, csr_cte;

# query III nosort lengthpairs
# WITH cte1 AS (
# SELECT CREATE_CSR_EDGE(
# 0,
# (SELECT count(a.id) FROM person a),
# CAST (
# (SELECT sum(CREATE_CSR_VERTEX(
# 0,
# (SELECT count(a.id) FROM person a),
# sub.dense_id,
# sub.cnt)
# )
# FROM (
# SELECT a.rowid as dense_id, count(k.person1id) as cnt
# FROM person a
# LEFT JOIN person_knows_person k ON k.person1id = a.id
# GROUP BY a.rowid) sub
# )
# AS BIGINT),
# (select count(*) from person_knows_person k JOIN person a on a.id = k.person1id JOIN person c on c.id = k.person2id),
# a.rowid,
# c.rowid,
# k.rowid) as temp
# FROM person_knows_person k
# JOIN person a on a.id = k.person1id
# JOIN person c on c.id = k.person2id
# ) SELECT src as source, dst as destination, iterativelength(0, (select count(*) from person), snb_pairs.src, snb_pairs.dst) as path
# FROM snb_pairs, (select count(cte1.temp) * 0 as temp from cte1) __x
# WHERE __x.temp * 0 = 0;
query III nosort lengthpairs
WITH cte1 AS (
SELECT CREATE_CSR_EDGE(
0,
(SELECT count(a.id) FROM person a),
CAST (
(SELECT sum(CREATE_CSR_VERTEX(
0,
(SELECT count(a.id) FROM person a),
sub.dense_id,
sub.cnt)
)
FROM (
SELECT a.rowid as dense_id, count(k.person1id) as cnt
FROM person a
LEFT JOIN person_knows_person k ON k.person1id = a.id
GROUP BY a.rowid) sub
)
AS BIGINT),
(select count(*) from person_knows_person k JOIN person a on a.id = k.person1id JOIN person c on c.id = k.person2id),
a.rowid,
c.rowid,
k.rowid) as temp
FROM person_knows_person k
JOIN person a on a.id = k.person1id
JOIN person c on c.id = k.person2id
) SELECT src as source, dst as destination, iterativelength(0, (select count(*) from person), snb_pairs.src, snb_pairs.dst) as path
FROM snb_pairs, (select count(cte1.temp) * 0 as temp from cte1) __x
WHERE __x.temp * 0 = 0;
# ----

0 comments on commit fc13438

Please sign in to comment.