Skip to content

Commit

Permalink
Optimizing path logging with one-dimensional array
Browse files Browse the repository at this point in the history
  • Loading branch information
SiberiaWolfP committed Mar 6, 2024
1 parent e6ab7f1 commit a1c6833
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ static void ShortestPathLowerBoundFunction(DataChunk &args,

CreateScalarFunctionInfo DuckPGQFunctions::GetShortestPathLowerBoundFunction() {
auto fun = ScalarFunction(
"shortestpath_lowerbound",
"shortestpath_two_phase",
{LogicalType::INTEGER, LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT},
LogicalType::LIST(LogicalType::BIGINT), ShortestPathLowerBoundFunction,
Expand Down
227 changes: 61 additions & 166 deletions duckpgq/src/duckpgq/functions/scalar/shortest_path_two_phase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@

namespace duckdb {

static bool IterativeLengthPhaseOne(int64_t v_size, int64_t *V, vector<int64_t> &E,
int64_t iter, vector<int64_t> &edge_ids,
vector<vector<unordered_map<int64_t, int64_t>>> &paths_v,
vector<vector<unordered_map<int64_t, int64_t>>> &paths_e,
static bool IterativeLengthPhaseOne(int64_t v_size, int64_t iter, int64_t *V,
vector<int64_t> &E, vector<int64_t> &edge_ids,
vector<pair<int64_t, int64_t>> &paths_ve,
vector<std::bitset<LANE_LIMIT>> &visit,
vector<std::bitset<LANE_LIMIT>> &next) {
bool change = false;
Expand All @@ -30,8 +29,8 @@ static bool IterativeLengthPhaseOne(int64_t v_size, int64_t *V, vector<int64_t>
next[n] = next[n] | visit[v];
for (auto lane = 0; lane < LANE_LIMIT; lane++) {
if (visit[v][lane]) {
paths_v[n][lane][iter] = v;
paths_e[n][lane][iter] = edge_id;
// paths_ve[iter][n][lane] = {v, edge_id};
paths_ve[((iter - 1) * v_size + n) * LANE_LIMIT + lane] = {v, edge_id};
}
}
}
Expand All @@ -46,8 +45,8 @@ static bool IterativeLengthPhaseOne(int64_t v_size, int64_t *V, vector<int64_t>

static bool IterativeLengthPhaseTwo(int64_t v_size, int64_t *V, vector<int64_t> &E,
vector<int64_t> &edge_ids,
vector<std::vector<int64_t>> &parents_v,
vector<std::vector<int64_t>> &parents_e,
int16_t lane_to_num[LANE_LIMIT],
vector<pair<int64_t, int64_t>> &paths_ve,
vector<std::bitset<LANE_LIMIT>> &seen,
vector<std::bitset<LANE_LIMIT>> &visit,
vector<std::bitset<LANE_LIMIT>> &next) {
Expand All @@ -62,12 +61,10 @@ static bool IterativeLengthPhaseTwo(int64_t v_size, int64_t *V, vector<int64_t>
auto n = E[e];
auto edge_id = edge_ids[e];
next[n] = next[n] | visit[v];
for (auto l = 0; l < LANE_LIMIT; l++) {
parents_v[n][l] =
((parents_v[n][l] == -1) && visit[v][l]) ? v : parents_v[n][l];
parents_e[n][l] = ((parents_e[n][l] == -1) && visit[v][l])
? edge_id
: parents_e[n][l];
for (auto lane = 0; lane < LANE_LIMIT; lane++) {
// paths_ve[n][lane] = {v, edge_id};
if (lane_to_num[lane] >= 0 && visit[v][lane])
paths_ve[(n * LANE_LIMIT + lane)] = {v, edge_id};
}
}
}
Expand All @@ -81,97 +78,6 @@ static bool IterativeLengthPhaseTwo(int64_t v_size, int64_t *V, vector<int64_t>
return change;
}

static vector<int64_t> ShortestPathInternal(int64_t lane, int64_t v_size, int64_t destination,
int64_t bound,
int64_t *v, vector<int64_t> &e, vector<int64_t> &edge_ids,
vector<std::bitset<LANE_LIMIT>> &visit,
vector<std::bitset<LANE_LIMIT>> &seen,
vector<std::bitset<LANE_LIMIT>> &visit1,
vector<std::bitset<LANE_LIMIT>> &visit2,
vector<std::vector<int64_t>> &parents_v,
vector<std::vector<int64_t>> &parents_e) {
vector<int64_t> src;
vector<vector<int64_t>> results;
for (int64_t v = 0; v < v_size; v++) {
if (visit[v][lane]) {
src.push_back(v);
}
}

// maps lane to search number
int16_t lane_to_num[LANE_LIMIT];
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
lane_to_num[lane] = -1; // inactive
}

idx_t started_searches = 0;
while (started_searches < src.size()) {
for (auto i = 0; i < v_size; i++) {
seen[i] = 0;
visit1[i] = 0;
for (auto j = 0; j < LANE_LIMIT; j++) {
parents_v[i][j] = -1;
parents_e[i][j] = -1;
}
}
// add search jobs to free lanes
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
if (started_searches < src.size()) {
int64_t search_num = started_searches++;
visit1[src[search_num]][lane] = true;
lane_to_num[lane] = search_num;
} else {
break;
}
}

for (int64_t iter = 1; iter <= bound; iter++) {
if (!IterativeLengthPhaseTwo(v_size, v, e, edge_ids, parents_v, parents_e,
seen, (iter & 1) ? visit1 : visit2,
(iter & 1) ? visit2 : visit1)) {
break;
}
// detect lanes that found the destination
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
if (seen[destination][lane]) {
auto search_num = lane_to_num[lane];
vector<int64_t> result;

// found the destination, reconstruct the path
auto parent_vertex = parents_v[destination][lane];
auto parent_edge = parents_e[destination][lane];

result.push_back(destination);
result.push_back(parent_edge);
while (parent_vertex != src[search_num]) {

result.push_back(parent_vertex);
parent_edge = parents_e[parent_vertex][lane];
parent_vertex = parents_v[parent_vertex][lane];
result.push_back(parent_edge);
}
result.push_back(src[search_num]);
std::reverse(result.begin(), result.end());
results.push_back(result);
break;
}
}
}
}
size_t min_size = INT64_MAX;
size_t min_index = -1;
for (size_t i = 0; i < results.size(); i++) {
if (results[i].size() < min_size) {
min_size = results[i].size();
min_index = i;
}
}
if (min_index >= 0) {
return results[min_index];
}
return {};
}

static void ShortestPathTwoPhaseFunction(DataChunk &args, ExpressionState &state,
Vector &result) {
auto &func_expr = (BoundFunctionExpression &)state.expr;
Expand Down Expand Up @@ -219,14 +125,12 @@ static void ShortestPathTwoPhaseFunction(DataChunk &args, ExpressionState &state
ValidityMask &result_validity = FlatVector::Validity(result);

// create temp SIMD arrays
vector<std::bitset<LANE_LIMIT>> seen(v_size);
vector<std::bitset<LANE_LIMIT>> visit1(v_size);
vector<std::bitset<LANE_LIMIT>> visit2(v_size);

vector<vector<unordered_map<int64_t, int64_t>>> paths_v(v_size,
std::vector<unordered_map<int64_t, int64_t>>(LANE_LIMIT));
vector<vector<unordered_map<int64_t, int64_t>>> paths_e(v_size,
std::vector<unordered_map<int64_t, int64_t>>(LANE_LIMIT));

vector<pair<int64_t, int64_t>> paths_ve_one(v_size * LANE_LIMIT * (lower_bound - 1), {-1, -1});
vector<pair<int64_t, int64_t>> paths_ve_two(v_size * LANE_LIMIT, {-1, -1});

// maps lane to search number
int16_t lane_to_num[LANE_LIMIT];
Expand All @@ -240,11 +144,8 @@ static void ShortestPathTwoPhaseFunction(DataChunk &args, ExpressionState &state

// empty visit vectors
for (auto i = 0; i < v_size; i++) {
seen[i] = 0;
visit1[i] = 0;
for (auto j = 0; j < LANE_LIMIT; j++) {
paths_v[i][j].clear();
paths_v[i][j].clear();
}
}

// add search jobs to free lanes
Expand All @@ -265,69 +166,63 @@ static void ShortestPathTwoPhaseFunction(DataChunk &args, ExpressionState &state

int64_t iter = 1;
for (; iter < lower_bound; iter++) {
if (!IterativeLengthPhaseOne(v_size, v, e, iter, edge_ids, paths_v, paths_e,
if (!IterativeLengthPhaseOne(v_size, iter, v, e, edge_ids, paths_ve_one,
(iter & 1) ? visit1 : visit2,
(iter & 1) ? visit2 : visit1)) {
break;
}
}
if (iter == lower_bound) {
// resource reuse
vector<std::bitset<LANE_LIMIT>> seen_in(v_size);
vector<std::bitset<LANE_LIMIT>> visit1_in(v_size);
vector<std::bitset<LANE_LIMIT>> visit2_in(v_size);

vector<std::vector<int64_t>> parents_v_in(v_size,
std::vector<int64_t>(LANE_LIMIT, -1));
vector<std::vector<int64_t>> parents_e_in(v_size,
std::vector<int64_t>(LANE_LIMIT, -1));
for (; iter <= upper_bound; iter++) {
if (!IterativeLengthPhaseTwo(v_size, v, e, edge_ids, lane_to_num,
paths_ve_two, seen,
(iter & 1) ? visit1 : visit2,
(iter & 1) ? visit2 : visit1)) {
break;
}

for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
auto search_num = lane_to_num[lane];
if (search_num >= 0) {
int64_t dst_pos = vdata_dst.sel->get_index(search_num);
auto phase_two_result = ShortestPathInternal(lane, v_size, dst_data[dst_pos],
upper_bound - lower_bound + 1, v, e, edge_ids, (iter & 1) ? visit1 : visit2,
seen_in, visit1_in, visit2_in, parents_v_in, parents_e_in);
if (phase_two_result.size() > 0) {
vector<int64_t> output_vector;
auto phase_two_src = phase_two_result[0];
// construct the path of phase one
if (paths_v[phase_two_src][lane].size() > 0) {
auto iterations = lower_bound - 1;
auto parent_vertex = paths_v[phase_two_src][lane][iterations];
auto parent_edge = paths_e[phase_two_src][lane][iterations];
// detect lanes that finished
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = lane_to_num[lane];
if (search_num >= 0) { // active lane
//! Check if dst for a source has been seen
int64_t dst_pos = vdata_dst.sel->get_index(search_num);
if (seen[dst_data[dst_pos]][lane]) {
vector<int64_t> output_vector(2 * iter + 1);

//! construct the phase two path
auto iteration = iter;
auto parent_vertex = dst_data[dst_pos];
while (iteration >= lower_bound) {
output_vector[2 * iteration - 1] = paths_ve_two[(parent_vertex * LANE_LIMIT + lane)].second;
output_vector[2 * iteration - 2] = paths_ve_two[(parent_vertex * LANE_LIMIT + lane)].first;
parent_vertex = output_vector[2 * iteration - 2];
iteration--;
}

while (iterations > 0) {
output_vector.push_back(parent_edge);
output_vector.push_back(parent_vertex);
iterations--;
parent_edge = paths_e[parent_vertex][lane][iterations];
parent_vertex = paths_v[parent_vertex][lane][iterations];
//! construct the phase one path
while (iteration > 0) {
output_vector[2 * iteration - 1] = paths_ve_one[((iteration - 1) * v_size + parent_vertex) * LANE_LIMIT + lane].second;
output_vector[2 * iteration - 2] = paths_ve_one[((iteration - 1) * v_size + parent_vertex) * LANE_LIMIT + lane].first;
parent_vertex = output_vector[2 * iteration - 2];
iteration--;
}
std::reverse(output_vector.begin(), output_vector.end());
}

// construct the path of phase two
for (auto val : phase_two_result) {
output_vector.push_back(val);
}
output_vector.back() = dst_data[dst_pos];

// construct the output
auto output = make_uniq<Vector>(LogicalType::LIST(LogicalType::BIGINT));
for (auto val : output_vector) {
Value value_to_insert = val;
ListVector::PushBack(*output, value_to_insert);
auto output =
make_uniq<Vector>(LogicalType::LIST(LogicalType::BIGINT));
for (auto val : output_vector) {
Value value_to_insert = val;
ListVector::PushBack(*output, value_to_insert);
}
result_data[search_num].length = ListVector::GetListSize(*output);
result_data[search_num].offset = total_len;
ListVector::Append(result, ListVector::GetEntry(*output),
ListVector::GetListSize(*output));
total_len += result_data[search_num].length;
lane_to_num[lane] = -1; // mark inactive
}
result_data[search_num].length = ListVector::GetListSize(*output);
result_data[search_num].offset = total_len;
ListVector::Append(result, ListVector::GetEntry(*output),
ListVector::GetListSize(*output));
total_len += result_data[search_num].length;
lane_to_num[lane] = -1; // mark inactive
} else {
result_validity.SetInvalid(search_num);
lane_to_num[lane] = -1; // mark inactive
}
}
}
Expand All @@ -347,7 +242,7 @@ static void ShortestPathTwoPhaseFunction(DataChunk &args, ExpressionState &state
CreateScalarFunctionInfo
DuckPGQFunctions::GetShortestPathTwoPhaseFunction() {
auto fun = ScalarFunction(
"shortestpath_two_phase",
"shortestpath_lowerbound",
{LogicalType::INTEGER, LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT},
LogicalType::LIST(LogicalType::BIGINT),
Expand Down

0 comments on commit a1c6833

Please sign in to comment.