Skip to content

Commit

Permalink
Bugfix: Ignore id when copying nested tables after a full load
Browse files Browse the repository at this point in the history
  • Loading branch information
JKoetsier authored and jjmurre committed Oct 25, 2023
1 parent e5e0b2f commit 7e56532
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 2023-10-18 (5.17.17)

* Bugfix: Ignore id when copying data from temp table to main table for nested tables.

# 2023-10-18 (5.17.16)

* Bugfix: Snake case temp table schema name in EventProcessor.
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = amsterdam-schema-tools
version = 5.17.16
version = 5.17.17
url = https://github.com/amsterdam/schema-tools
license = Mozilla Public 2.0
author = Team Data Diensten, van het Dataplatform onder de Directie Digitale Voorzieningen (Gemeente Amsterdam)
Expand Down
19 changes: 11 additions & 8 deletions src/schematools/events/full.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,12 @@ def _after_process(self, run_configuration: RunConfiguration, event_meta: dict):
dataset_id = event_meta["dataset_id"]
table_id = event_meta["table_id"]

nested_tables = [
to_snake_case(f.nested_table.id) for f in run_configuration.nested_table_fields
]
table_ids_to_replace = [
table_id,
] + [to_snake_case(f.nested_table.id) for f in run_configuration.nested_table_fields]
] + nested_tables

logger.info("End of full load sequence. Replacing active table.")
with self.conn.begin():
Expand All @@ -288,11 +291,12 @@ def _after_process(self, run_configuration: RunConfiguration, event_meta: dict):
full_load_table, full_load_schema_table = self._get_full_load_tables(
dataset_id, to_snake_case(t_id)
)
full_load_tables.append(full_load_table)
full_load_tables.append((full_load_table, full_load_schema_table))

fieldnames = ", ".join(
[field.db_name for field in full_load_schema_table.get_db_fields()]
)
fields = [field.db_name for field in full_load_schema_table.get_db_fields()]
if t_id in nested_tables:
fields.remove("id") # Let PG generate the id field for nested tables.
fieldnames = ", ".join(fields)

self.conn.execute(f"TRUNCATE {table_to_replace.fullname}")
self.conn.execute(
Expand All @@ -302,17 +306,16 @@ def _after_process(self, run_configuration: RunConfiguration, event_meta: dict):
if run_configuration.update_parent_table_configuration:
self._update_parent_table_bulk(run_configuration)

for full_load_table in full_load_tables:
for full_load_table, full_load_schema_table in full_load_tables:
self.conn.execute(f"DROP TABLE {full_load_table.fullname} CASCADE")
self.full_load_tables[dataset_id].pop(to_snake_case(full_load_schema_table.id))

# Copy full_load lasteventid to active table and set full_load lasteventid to None
self.lasteventids.copy_lasteventid(
self.conn, run_configuration.table_name, table_to_replace.name
)
self.lasteventids.update_eventid(self.conn, run_configuration.table_name, None)

self.full_load_tables[dataset_id].pop(to_snake_case(table_id))

def _prepare_row(
self,
run_configuration: RunConfiguration,
Expand Down
44 changes: 44 additions & 0 deletions tests/test_events_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -1281,3 +1281,47 @@ def test_reset_lasteventid_after_incomplete_full_load(
"SELECT * FROM benk_lasteventids WHERE \"table\" = 'nap_peilmerken_full_load'"
).fetchone()
assert lasteventrecord["last_event_id"] == 1


def test_avoid_duplicate_key_after_full_load(
here, db_schema, tconn, local_metadata, bag_verblijfsobjecten_schema
):
"""Make sure we don't get duplicate key errors after a full load sequence with a serial id
field in the table."""

def create_event(gebruiksdoel_cnt: int, event_id: int, identificatie: str, **extra_headers):
gebruiksdoelen = [
{"code": i, "omschrijving": f"doel {i}"} for i in range(1, gebruiksdoel_cnt + 1)
]
return (
{
"event_type": "ADD",
"event_id": event_id,
"dataset_id": "bag",
"table_id": "verblijfsobjecten",
**extra_headers,
},
{
"identificatie": identificatie,
"volgnummer": 1,
"gebruiksdoel": gebruiksdoelen,
"toegang": None,
"ligt_in_buurt": {},
"begin_geldigheid": "2018-10-22T00:00:00.000000",
"eind_geldigheid": None,
},
)

importer = EventsProcessor(
[bag_verblijfsobjecten_schema], tconn, local_metadata=local_metadata
)

# Add objects with in total 4 nested objects
full_load_events = [
create_event(2, 1, "VB1", full_load_sequence=True, first_of_sequence=True),
create_event(2, 2, "VB2", full_load_sequence=True, last_of_sequence=True),
]
importer.process_events(full_load_events)

update_event = [create_event(1, 3, "VB3")]
importer.process_events(update_event)

0 comments on commit 7e56532

Please sign in to comment.