Skip to content

Commit

Permalink
Also set catalog and schema. Refactor code to retrieve property graphs
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Nov 28, 2024
1 parent d38a8c0 commit 4886c6a
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 203 deletions.
12 changes: 8 additions & 4 deletions src/core/functions/table/create_property_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,12 @@ void CreatePropertyGraphFunction::CreatePropertyGraphFunc(
for (idx_t i = 0; i < v_table->sub_labels.size(); i++) {
insert_info += "'" + v_table->sub_labels[i] + (i == v_table->sub_labels.size() - 1 ? "'" : "', ");
}
insert_info += "]";
insert_info += "],";
} else {
insert_info += "NULL";
insert_info += "NULL,";
}
insert_info += "'" + v_table->catalog_name + "', ";
insert_info += "'" + v_table->schema_name + "'";
insert_info += "), ";
}

Expand Down Expand Up @@ -354,10 +356,12 @@ void CreatePropertyGraphFunction::CreatePropertyGraphFunc(
for (idx_t i = 0; i < e_table->sub_labels.size(); i++) {
insert_info += "'" + e_table->sub_labels[i] + (i == e_table->sub_labels.size() - 1 ? "'" : "', ");
}
insert_info += "]";
insert_info += "], ";
} else {
insert_info += "NULL";
insert_info += "NULL, ";
}
insert_info += "'" + e_table->catalog_name + "', ";
insert_info += "'" + e_table->schema_name + "'";
insert_info += "), ";
}

Expand Down
177 changes: 80 additions & 97 deletions src/duckpgq_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,117 +27,100 @@ DuckPGQState::DuckPGQState(shared_ptr<ClientContext> context) {
RetrievePropertyGraphs(new_conn);
}

void DuckPGQState::RetrievePropertyGraphs(shared_ptr<ClientContext> context) {
auto vertex_property_graphs = context->Query("SELECT * FROM __duckpgq_internal where is_vertex_table", false);
void DuckPGQState::RetrievePropertyGraphs(const shared_ptr<ClientContext> &context) {
// Retrieve and process vertex property graphs
auto vertex_property_graphs = context->Query("SELECT * FROM __duckpgq_internal WHERE is_vertex_table", false);
ProcessPropertyGraphs(vertex_property_graphs, true);

auto &vertex_pg_materialized_result = vertex_property_graphs->Cast<MaterializedQueryResult>();
auto vertex_pg_count = vertex_pg_materialized_result.RowCount();
if (vertex_pg_count == 0) {
return; // no results
}
auto vertex_pg_chunk = vertex_pg_materialized_result.Fetch();
for (idx_t i = 0; i < vertex_pg_count; i++) {
auto table = make_shared_ptr<PropertyGraphTable>();
string property_graph_name = vertex_pg_chunk->GetValue(0, i).GetValue<string>();
table->table_name = vertex_pg_chunk->GetValue(1, i).GetValue<string>();
table->main_label = vertex_pg_chunk->GetValue(2, i).GetValue<string>();
table->is_vertex_table = vertex_pg_chunk->GetValue(3, i).GetValue<bool>();
table->all_columns = true; // TODO Be stricter on properties
string discriminator = vertex_pg_chunk->GetValue(10, i).GetValue<string>();
if (discriminator != "NULL") {
table->discriminator = discriminator;
auto sublabels = ListValue::GetChildren(vertex_pg_chunk->GetValue(11, i));
for (const auto &sublabel : sublabels) {
table->sub_labels.push_back(sublabel.GetValue<string>());
}
}
table->catalog_name = vertex_pg_chunk->GetValue(12, i).GetValue<string>();
table->schema_name = vertex_pg_chunk->GetValue(13, i).GetValue<string>();
// Retrieve and process edge property graphs
auto edge_property_graphs = context->Query("SELECT * FROM __duckpgq_internal WHERE NOT is_vertex_table", false);
ProcessPropertyGraphs(edge_property_graphs, false);
}

if (registered_property_graphs.find(property_graph_name) ==
registered_property_graphs.end()) {
registered_property_graphs[property_graph_name] =
make_uniq<CreatePropertyGraphInfo>(property_graph_name);
void DuckPGQState::ProcessPropertyGraphs(unique_ptr<QueryResult> &property_graphs, bool is_vertex) {
if (!property_graphs || property_graphs->type != QueryResultType::MATERIALIZED_RESULT) {
throw std::runtime_error("Failed to fetch property graphs or invalid result type.");
}
auto &pg_info = registered_property_graphs[property_graph_name]
->Cast<CreatePropertyGraphInfo>();
pg_info.label_map[table->main_label] = table;
if (!table->discriminator.empty()) {
for (const auto &label : table->sub_labels) {
pg_info.label_map[label] = table;
}

auto &materialized_result = property_graphs->Cast<MaterializedQueryResult>();
auto row_count = materialized_result.RowCount();
if (row_count == 0) {
return; // No results
}
pg_info.vertex_tables.push_back(std::move(table));
}

auto edge_property_graphs = context->Query("SELECT * FROM __duckpgq_internal where not is_vertex_table", false);
auto chunk = materialized_result.Fetch();
for (idx_t i = 0; i < row_count; i++) {
auto table = make_shared_ptr<PropertyGraphTable>();

auto &edge_pg_materialized_result = edge_property_graphs->Cast<MaterializedQueryResult>();
auto edge_pg_count = edge_pg_materialized_result.RowCount();
if (edge_pg_count == 0) {
return; // no results
}
auto edge_pg_chunk = edge_pg_materialized_result.Fetch();
for (idx_t i = 0; i < edge_pg_count; i++) {
auto table = make_shared_ptr<PropertyGraphTable>();
string property_graph_name = edge_pg_chunk->GetValue(0, i).GetValue<string>();
table->table_name = edge_pg_chunk->GetValue(1, i).GetValue<string>();
table->main_label = edge_pg_chunk->GetValue(2, i).GetValue<string>();
table->is_vertex_table = edge_pg_chunk->GetValue(3, i).GetValue<bool>();
table->all_columns = true; // TODO Be stricter on properties
if (!table->is_vertex_table) {
// Handle edge table related things.
table->source_reference = edge_pg_chunk->GetValue(4, i).GetValue<string>();
auto source_pk_chunk = ListValue::GetChildren(edge_pg_chunk->GetValue(5, i));
for (const auto &source_pk : source_pk_chunk) {
table->source_pk.push_back(source_pk.GetValue<string>());
}
auto source_fk_chunk = ListValue::GetChildren(edge_pg_chunk->GetValue(6, i));
for (const auto &source_fk : source_fk_chunk) {
table->source_fk.push_back(source_fk.GetValue<string>());
}
table->destination_reference = edge_pg_chunk->GetValue(7, i).GetValue<string>();
auto destination_pk_chunk = ListValue::GetChildren(edge_pg_chunk->GetValue(8, i));
for (const auto &dest_pk : destination_pk_chunk) {
table->destination_pk.push_back(dest_pk.GetValue<string>());
}
auto destination_fk_chunk = ListValue::GetChildren(edge_pg_chunk->GetValue(9, i));
for (const auto &dest_fk : destination_fk_chunk) {
table->destination_fk.push_back(dest_fk.GetValue<string>());
}
// Extract and validate common properties
table->table_name = chunk->GetValue(1, i).GetValue<string>();
table->main_label = chunk->GetValue(2, i).GetValue<string>();
table->is_vertex_table = chunk->GetValue(3, i).GetValue<bool>();
table->all_columns = true; // TODO: Be stricter on properties

// Handle discriminator and sub-labels
const auto &discriminator = chunk->GetValue(10, i).GetValue<string>();
if (discriminator != "NULL") {
table->discriminator = discriminator;
auto sublabels = ListValue::GetChildren(chunk->GetValue(11, i));
for (const auto &sublabel : sublabels) {
table->sub_labels.push_back(sublabel.GetValue<string>());
}
}

// Extract catalog and schema names
table->catalog_name = chunk->GetValue(12, i).GetValue<string>();
table->schema_name = chunk->GetValue(13, i).GetValue<string>();

// Additional edge-specific handling
if (!is_vertex) {
PopulateEdgeSpecificFields(chunk, i, *table);
}

RegisterPropertyGraph(table, chunk->GetValue(0, i).GetValue<string>(), is_vertex);
}
string discriminator = edge_pg_chunk->GetValue(10, i).GetValue<string>();
if (discriminator != "NULL") {
table->discriminator = discriminator;
auto sublabels = ListValue::GetChildren(edge_pg_chunk->GetValue(11, i));
for (const auto &sublabel : sublabels) {
table->sub_labels.push_back(sublabel.GetValue<string>());
}
}

void DuckPGQState::PopulateEdgeSpecificFields(unique_ptr<DataChunk> &chunk, idx_t row_idx, PropertyGraphTable &table) {
table.source_reference = chunk->GetValue(4, row_idx).GetValue<string>();
ExtractListValues(chunk->GetValue(5, row_idx), table.source_pk);
ExtractListValues(chunk->GetValue(6, row_idx), table.source_fk);
table.destination_reference = chunk->GetValue(7, row_idx).GetValue<string>();
ExtractListValues(chunk->GetValue(8, row_idx), table.destination_pk);
ExtractListValues(chunk->GetValue(9, row_idx), table.destination_fk);
}

void DuckPGQState::ExtractListValues(const Value &list_value, vector<string> &output) {
auto children = ListValue::GetChildren(list_value);
for (const auto &child : children) {
output.push_back(child.GetValue<string>());
}
table->catalog_name = vertex_pg_chunk->GetValue(12, i).GetValue<string>();
table->schema_name = vertex_pg_chunk->GetValue(13, i).GetValue<string>();
}

if (registered_property_graphs.find(property_graph_name) ==
registered_property_graphs.end()) {
registered_property_graphs[property_graph_name] =
make_uniq<CreatePropertyGraphInfo>(property_graph_name);
void DuckPGQState::RegisterPropertyGraph(const shared_ptr<PropertyGraphTable> &table, const string &graph_name, bool is_vertex) {
// Ensure the property graph exists in the registry
if (registered_property_graphs.find(graph_name) == registered_property_graphs.end()) {
registered_property_graphs[graph_name] = make_uniq<CreatePropertyGraphInfo>(graph_name);
}
auto &pg_info = registered_property_graphs[property_graph_name]
->Cast<CreatePropertyGraphInfo>();

auto &pg_info = registered_property_graphs[graph_name]->Cast<CreatePropertyGraphInfo>();
pg_info.label_map[table->main_label] = table;

if (!table->discriminator.empty()) {
for (const auto &label : table->sub_labels) {
pg_info.label_map[label] = table;
}
for (const auto &label : table->sub_labels) {
pg_info.label_map[label] = table;
}
}
// TODO verify this works with labels
table->source_pg_table = pg_info.GetTableByName(table->source_reference);
D_ASSERT(table->source_pg_table);
table->destination_pg_table = pg_info.GetTableByName(table->destination_reference);
D_ASSERT(table->destination_pg_table);

pg_info.edge_tables.push_back(std::move(table));
}
if (is_vertex) {
pg_info.vertex_tables.push_back(table);
} else {
table->source_pg_table = pg_info.GetTableByName(table->source_reference);
D_ASSERT(table->source_pg_table);
table->destination_pg_table = pg_info.GetTableByName(table->destination_reference);
D_ASSERT(table->destination_pg_table);
pg_info.edge_tables.push_back(table);
}
}

void DuckPGQState::QueryEnd() {
Expand Down
7 changes: 6 additions & 1 deletion src/include/duckpgq_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ class DuckPGQState : public ClientContextState {
CreatePropertyGraphInfo *GetPropertyGraph(const string &pg_name);
duckpgq::core::CSR *GetCSR(int32_t id);

void RetrievePropertyGraphs(shared_ptr<ClientContext> context);
void RetrievePropertyGraphs(const shared_ptr<ClientContext> &context);
void ProcessPropertyGraphs(unique_ptr<QueryResult> &property_graphs, bool is_vertex);
void PopulateEdgeSpecificFields(unique_ptr<DataChunk> &chunk, idx_t row_idx,
PropertyGraphTable &table);
static void ExtractListValues(const Value &list_value, vector<string> &output);
void RegisterPropertyGraph(const shared_ptr<PropertyGraphTable> &table, const string &graph_name, bool is_vertex);

public:
unique_ptr<ParserExtensionParseData> parse_data;
Expand Down
Loading

0 comments on commit 4886c6a

Please sign in to comment.