Skip to content

Commit

Permalink
Use pool_pre_ping for postgres connections (#128)
Browse files Browse the repository at this point in the history
Since the SQL engine is created whenever the job is instantiated, a lot
of time can pass before a connection is actually used for writing data.
In the meantime, the connection may be dropped or stale and thus upon
first use a disconnect will be encountered.

This can lead to errors like the following:
<details><summary>Error Logs</summary>

```
Status: Downloaded newer image for ghcr.io/bh2smith/dune-sync:v0.3.2
2024-12-20 14:59:06 [INFO] dune-sync:job.run:80 - Fetching data for job: pool_statistics
2024-12-20 14:59:06 [INFO] dune_client.api.base:client_async.execute:212 - executing 4335231 on large cluster
2024-12-20 14:59:07 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.PENDING (queue position: None)
2024-12-20 14:59:37 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING
2024-12-20 15:00:08 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING
2024-12-20 15:00:38 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING
2024-12-20 15:01:09 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING
2024-12-20 15:01:39 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING
2024-12-20 15:02:10 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING
2024-12-20 15:02:40 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING
2024-12-20 15:03:11 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING
2024-12-20 15:03:41 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING
2024-12-20 15:04:12 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING
2024-12-20 15:04:42 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING
2024-12-20 15:05:13 [INFO] dune_client.api.base:client_async._refresh:585 - waiting for query execution 01JFJ98NQ07AA896NGNHHEC5RC to complete: ExecutionState.EXECUTING
2024-12-20 15:05:44 [INFO] dune-sync:job.run:84 - Saving data for job: pool_statistics
Error: -20 15:05:44 [ERROR] dune-sync:main.main:44 - Error in job execution: Can't reconnect until invalid transaction is rolled back.  Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/app/src/main.py", line 58, in <module>
    asyncio.run(main(jobs_to_run))
    ~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.13/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "/usr/local/lib/python3.13/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/usr/local/lib/python3.13/asyncio/base_events.py", line 720, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "/app/src/main.py", line 45, in main
    raise e
  File "/app/src/main.py", line 42, in main
    await completed_task
  File "/usr/local/lib/python3.13/asyncio/tasks.py", line 634, in _wait_for_one
    return f.result() if resolve else f
           ~~~~~~~~^^
  File "/app/src/metrics.py", line 64, in wrapper
    return await func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/src/job.py", line 86, in run
    affected_rows = self.destination.save(df)
  File "/app/src/destinations/postgres.py", line 158, in save
    affected_rows = self.replace(data)
  File "/app/src/destinations/postgres.py", line 1[71](https://github.com/cowprotocol/data-jobs/actions/runs/12433303565/job/34715486180#step:3:72), in replace
    df.to_sql(
    ~~~~~~~~~^
        self.table_name,
        ^^^^^^^^^^^^^^^^
    ...<4 lines>...
        dtype=dtypes,
        ^^^^^^^^^^^^^
    )
    ^
  File "/usr/local/lib/python3.13/site-packages/pandas/util/_decorators.py", line 333, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.13/site-packages/pandas/core/generic.py", line 3087, in to_sql
    return sql.to_sql(
           ~~~~~~~~~~^
        self,
        ^^^^^
    ...<8 lines>...
        method=method,
        ^^^^^^^^^^^^^^
    )
```
</details>

This PR adds the
[pool_pre_ping](https://docs.sqlalchemy.org/en/20/core/pooling.html#dealing-with-disconnects)
option to the `create_engine()` call, which makes it so that connections
are tested for validity upon checkout to avoid this error.

### Test Plan

With this change, the above long-running job no longer fails when it's
time to write the results into postgres.
  • Loading branch information
fleupold authored Dec 21, 2024
1 parent 0c91d16 commit f0979a9
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
4 changes: 3 additions & 1 deletion src/destinations/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def __init__(
):
if index_columns is None:
index_columns = []
self.engine: sqlalchemy.engine.Engine = create_engine(db_url)
self.engine: sqlalchemy.engine.Engine = create_engine(
db_url, pool_pre_ping=True
)
self.table_name: str = table_name
self.schema = "public"
# Split table_name if it contains schema
Expand Down
1 change: 0 additions & 1 deletion tests/unit/dune_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@


class DuneSourceTest(unittest.TestCase):

def test_parse_varchar_type(self):
self.assertEqual(7, _parse_varchar_type("varchar(7)"))
self.assertEqual(9, _parse_varchar_type("varchar(9)"))
Expand Down

0 comments on commit f0979a9

Please sign in to comment.