From f0979a9a1791f93818a0ba76ac450ef4783aca04 Mon Sep 17 00:00:00 2001 From: Felix Leupold Date: Sat, 21 Dec 2024 23:20:33 +0100 Subject: [PATCH] Use pool_pre_ping for postgres connections (#128) Since the SQL engine is created whenever the job is instantiated, a lot of time can pass before a connection is actually used for writing data. In the meantime, the connection may be dropped or stale and thus upon first use a disconnect will be encountered. This can lead to errors like the following:
Error Logs ``` Status: Downloaded newer image for ghcr.io/bh2smith/dune-sync:v0.3.2 2024-12-20 14:59:06 [INFO] dune-sync:job.run:80 - Fetching data for job: pool_statistics 2024-12-20 14:59:06 [INFO] dune_client.api.base:client_async.execute:212 - executing 4335231 on large cluster 2024-12-20 14:59:07 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.PENDING (queue position: None) 2024-12-20 14:59:37 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING 2024-12-20 15:00:08 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING 2024-12-20 15:00:38 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING 2024-12-20 15:01:09 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING 2024-12-20 15:01:39 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING 2024-12-20 15:02:10 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING 2024-12-20 15:02:40 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING 2024-12-20 15:03:11 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING 2024-12-20 15:03:41 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING 2024-12-20 15:04:12 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING 2024-12-20 15:04:42 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING 2024-12-20 15:05:13 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING 2024-12-20 15:05:44 [INFO] dune-sync:job.run:84 - Saving data for job: pool_statistics Error: -20 15:05:44 [ERROR] dune-sync:main.main:44 - Error in job execution: Can't reconnect until invalid transaction is rolled back. Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b) Traceback (most recent call last): File "", line 198, in _run_module_as_main File "", line 88, in _run_code File "/app/src/main.py", line 58, in asyncio.run(main(jobs_to_run)) ~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.13/asyncio/runners.py", line 194, in run return runner.run(main) ~~~~~~~~~~^^^^^^ File "/usr/local/lib/python3.13/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^ File "/usr/local/lib/python3.13/asyncio/base_events.py", line 720, in run_until_complete return future.result() ~~~~~~~~~~~~~^^ File "/app/src/main.py", line 45, in main raise e File "/app/src/main.py", line 42, in main await completed_task File "/usr/local/lib/python3.13/asyncio/tasks.py", line 634, in _wait_for_one return f.result() if resolve else f ~~~~~~~~^^ File "/app/src/metrics.py", line 64, in wrapper return await func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/app/src/job.py", line 86, in run affected_rows = self.destination.save(df) File "/app/src/destinations/postgres.py", line 158, in save affected_rows = self.replace(data) File "/app/src/destinations/postgres.py", line 1[71](https://github.com/cowprotocol/data-jobs/actions/runs/12433303565/job/34715486180#step:3:72), in replace df.to_sql( ~~~~~~~~~^ self.table_name, ^^^^^^^^^^^^^^^^ ...<4 lines>... dtype=dtypes, ^^^^^^^^^^^^^ ) ^ File "/usr/local/lib/python3.13/site-packages/pandas/util/_decorators.py", line 333, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.13/site-packages/pandas/core/generic.py", line 3087, in to_sql return sql.to_sql( ~~~~~~~~~~^ self, ^^^^^ ...<8 lines>... method=method, ^^^^^^^^^^^^^^ ) ```
This PR adds the [pool_pre_ping](https://docs.sqlalchemy.org/en/20/core/pooling.html#dealing-with-disconnects) option to the `create_engine()` call, which makes it so that connections are tested for validity upon checkout to avoid this error. ### Test Plan With this change, the above long-running job no longer fails when it's time to write the results into postgres. --- src/destinations/postgres.py | 4 +++- tests/unit/dune_test.py | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/destinations/postgres.py b/src/destinations/postgres.py index 7756988..be1d2f1 100644 --- a/src/destinations/postgres.py +++ b/src/destinations/postgres.py @@ -52,7 +52,9 @@ def __init__( ): if index_columns is None: index_columns = [] - self.engine: sqlalchemy.engine.Engine = create_engine(db_url) + self.engine: sqlalchemy.engine.Engine = create_engine( + db_url, pool_pre_ping=True + ) self.table_name: str = table_name self.schema = "public" # Split table_name if it contains schema diff --git a/tests/unit/dune_test.py b/tests/unit/dune_test.py index fc5b55b..e723c0b 100644 --- a/tests/unit/dune_test.py +++ b/tests/unit/dune_test.py @@ -16,7 +16,6 @@ class DuneSourceTest(unittest.TestCase): - def test_parse_varchar_type(self): self.assertEqual(7, _parse_varchar_type("varchar(7)")) self.assertEqual(9, _parse_varchar_type("varchar(9)"))