Skip to content

Commit

Permalink
feat: add psycopg3 pool (#298)
Browse files Browse the repository at this point in the history
Co-authored-by: Gorshkov Nikolay <[email protected]>
  • Loading branch information
akerlay and kalombos authored Jan 8, 2025
1 parent 18c2bd7 commit 922068c
Show file tree
Hide file tree
Showing 19 changed files with 278 additions and 129 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
build:

runs-on: ubuntu-latest
runs-on: ubuntu-22.04
strategy:
max-parallel: 4
matrix:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
.vscode
.venv/
.env
load-testing/logs

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
2 changes: 2 additions & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Contributors

* @rudyryk | Alexey Kinev <[email protected]>
* @kalombos | Nikolay Gorshkov <[email protected]>
* @akerlay | Kirill Mineev
* @mrbox | Jakub Paczkowski
* @CyberROFL | Ilnaz Nizametdinov
* @insolite | Oleg
Expand Down
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@ http://peewee-async-lib.readthedocs.io
Install
-------

Install with `pip` for PostgreSQL:
Install with `pip` for PostgreSQL aiopg backend:

```bash
pip install peewee-async[postgresql]
```

or for PostgreSQL psycopg3 backend:

```bash
pip install peewee-async[psycopg]
```

or for MySQL:

```bash
Expand Down
3 changes: 3 additions & 0 deletions docs/peewee_async/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ Databases

.. automethod:: peewee_async.databases.AioDatabase.aio_atomic

.. autoclass:: peewee_async.PsycopgDatabase
:members: init

.. autoclass:: peewee_async.PooledPostgresqlDatabase
:members: init

Expand Down
13 changes: 13 additions & 0 deletions load-testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,16 @@ Run yandex-tank:
```bash
docker run -v $(pwd):/var/loadtest --net host -it yandex/yandex-tank
```

Firewall rulle to make postgreql connection unreacheable

```bash
sudo iptables -I INPUT -p tcp --dport 5432 -j DROP
```

Revert the rule back:


```bash
sudo iptables -D INPUT 1
```
129 changes: 69 additions & 60 deletions load-testing/app.py
Original file line number Diff line number Diff line change
@@ -1,116 +1,125 @@
import logging
import random
from contextlib import asynccontextmanager

import peewee
import uvicorn
from fastapi import FastAPI
import asyncio
from aiopg.connection import Connection
from aiopg.pool import Pool
from fastapi import FastAPI

acquire = Pool.acquire
cursor = Connection.cursor


def new_acquire(self):
choice = random.randint(1, 5)
if choice == 5:
raise Exception("some network error") # network error imitation
return acquire(self)


def new_cursor(self):
choice = random.randint(1, 5)
if choice == 5:
raise Exception("some network error") # network error imitation
return cursor(self)
import random

Connection.cursor = new_cursor
Pool.acquire = new_acquire

import peewee_async


logging.basicConfig()
database = peewee_async.PooledPostgresqlDatabase(
aiopg_database = peewee_async.PooledPostgresqlDatabase(
database='postgres',
user='postgres',
password='postgres',
host='localhost',
port=5432,
max_connections=3
pool_params = {
"minsize": 0,
"maxsize": 3,
}
)

psycopg_database = peewee_async.PsycopgDatabase(
database='postgres',
user='postgres',
password='postgres',
host='localhost',
port=5432,
pool_params = {
"min_size": 0,
"max_size": 3,
}
)

database = psycopg_database


def patch_aiopg():
acquire = Pool.acquire
cursor = Connection.cursor


def new_acquire(self):
choice = random.randint(1, 5)
if choice == 5:
raise Exception("some network error") # network error imitation
return acquire(self)


def new_cursor(self):
choice = random.randint(1, 5)
if choice == 5:
raise Exception("some network error") # network error imitation
return cursor(self)

Connection.cursor = new_cursor
Pool.acquire = new_acquire


def setup_logging():
logging.basicConfig()
# logging.getLogger("psycopg.pool").setLevel(logging.DEBUG)
logger = logging.getLogger("uvicorn.error")
handler = logging.FileHandler(filename="app.log", mode="w")
logger.addHandler(handler)


class MySimplestModel(peewee_async.AioModel):
id = peewee.IntegerField(primary_key=True, sequence=True)
class AppTestModel(peewee_async.AioModel):
text = peewee.CharField(max_length=100)

class Meta:
database = database


@asynccontextmanager
async def lifespan(app: FastAPI):
await database.aio_execute_sql('CREATE TABLE IF NOT EXISTS MySimplestModel (id SERIAL PRIMARY KEY);')
await database.aio_execute_sql('TRUNCATE TABLE MySimplestModel;')
AppTestModel.drop_table()
AppTestModel.create_table()
await AppTestModel.aio_create(id=1, text="1")
await AppTestModel.aio_create(id=2, text="2")
setup_logging()
yield
# Clean up the ML models and release the resources
await database.aio_close()

app = FastAPI(lifespan=lifespan)
errors = set()


@app.get("/select")
@app.get("/errors")
async def select():
try:
await MySimplestModel.select().aio_execute()
except Exception as e:
errors.add(str(e))
raise
return errors


async def nested_transaction():
async with database.aio_atomic():
await MySimplestModel.update(id=1).aio_execute()
@app.get("/select")
async def select():
await AppTestModel.select().aio_execute()


async def nested_atomic():

@app.get("/transaction")
async def transaction() -> None:
async with database.aio_atomic():
await MySimplestModel.update(id=1).aio_execute()
await AppTestModel.update(text="5").where(AppTestModel.id==1).aio_execute()
await AppTestModel.update(text="10").where(AppTestModel.id==1).aio_execute()


@app.get("/transaction")
async def transaction():
try:
async with database.aio_atomic():
await MySimplestModel.update(id=1).aio_execute()
await nested_transaction()
except Exception as e:
errors.add(str(e))
raise
return errors
async def nested_atomic() -> None:
async with database.aio_atomic():
await AppTestModel.update(text="1").where(AppTestModel.id==2).aio_execute()


@app.get("/atomic")
async def atomic():
try:
async with database.aio_atomic():
await MySimplestModel.update(id=1).aio_execute()
await nested_atomic()
except Exception as e:
errors.add(str(e))
raise
return errors
@app.get("/savepoint")
async def savepoint():
async with database.aio_atomic():
await AppTestModel.update(text="2").where(AppTestModel.id==2).aio_execute()
await nested_atomic()



@app.get("/recreate_pool")
Expand Down
3 changes: 1 addition & 2 deletions load-testing/load.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ phantom:
address: 127.0.0.1:8000 # [Target's address]:[target's port]
uris:
- /select
- /atomic
- /transaction
- /recreate_pool
- /savepoint
load_profile:
load_type: rps # schedule load by defining requests per second
schedule: const(100, 10m) # starting from 1rps growing linearly to 10rps during 10 minutes
Expand Down
2 changes: 2 additions & 0 deletions peewee_async/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
PooledPostgresqlDatabase,
PooledPostgresqlExtDatabase,
PooledMySQLDatabase,
PsycopgDatabase,
)
from .pool import PostgresqlPoolBackend, MysqlPoolBackend
from .transactions import Transaction
Expand All @@ -43,4 +44,5 @@

register_database(PooledPostgresqlDatabase, 'postgres+pool+async', 'postgresql+pool+async')
register_database(PooledPostgresqlExtDatabase, 'postgresext+pool+async', 'postgresqlext+pool+async')
register_database(PsycopgDatabase, 'psycopg+pool+async', 'psycopg+pool+async')
register_database(PooledMySQLDatabase, 'mysql+pool+async')
2 changes: 1 addition & 1 deletion peewee_async/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ async def __aexit__(
) -> None:
if self.resuing_connection is False:
if self.connection_context is not None:
self.pool_backend.release(self.connection_context.connection)
await self.pool_backend.release(self.connection_context.connection)
connection_context.set(None)
42 changes: 32 additions & 10 deletions peewee_async/databases.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import contextlib
import logging
import warnings
from typing import Type, Optional, Any, AsyncIterator, Iterator, Dict, List

import peewee
from playhouse import postgres_ext as ext

from .connection import connection_context, ConnectionContextManager
from .pool import PoolBackend, PostgresqlPoolBackend, MysqlPoolBackend
from .pool import PoolBackend, PostgresqlPoolBackend, MysqlPoolBackend, PsycopgPoolBackend
from .transactions import Transaction
from .utils import aiopg, aiomysql, __log__, FetchResults
from .utils import aiopg, aiomysql, psycopg, __log__, FetchResults


class AioDatabase(peewee.Database):
Expand Down Expand Up @@ -46,12 +47,17 @@ def init_pool_params_defaults(self) -> None:

def init_pool_params(self) -> None:
self.init_pool_params_defaults()
self.pool_params.update(
{
"minsize": self.connect_params.pop("min_connections", 1),
"maxsize": self.connect_params.pop("max_connections", 20),
}
)
if "min_connections" in self.connect_params or "max_connections" in self.connect_params:
warnings.warn(
"`min_connections` and `max_connections` are deprecated, use `pool_params` instead.",
DeprecationWarning
)
self.pool_params.update(
{
"minsize": self.connect_params.pop("min_connections", 1),
"maxsize": self.connect_params.pop("max_connections", 20),
}
)
pool_params = self.connect_params.pop('pool_params', {})
self.pool_params.update(pool_params)
self.pool_params.update(self.connect_params)
Expand Down Expand Up @@ -178,9 +184,25 @@ async def aio_execute(self, query: Any, fetch_results: Optional[FetchResults] =
return await self.aio_execute_sql(sql, params, fetch_results=fetch_results)


class PsycopgDatabase(AioDatabase, peewee.PostgresqlDatabase):
"""Extension for `peewee.PostgresqlDatabase` providing extra methods
for managing async connection based on psycopg3 pool backend.
See also:
https://peewee.readthedocs.io/en/latest/peewee/api.html#PostgresqlDatabase
"""

pool_backend_cls = PsycopgPoolBackend

def init(self, database: Optional[str], **kwargs: Any) -> None:
if not psycopg:
raise Exception("Error, psycopg is not installed!")
super().init(database, **kwargs)


class PooledPostgresqlDatabase(AioDatabase, peewee.PostgresqlDatabase):
"""Extension for `peewee.PostgresqlDatabase` providing extra methods
for managing async connection.
for managing async connection based on aiopg pool backend.
See also:
https://peewee.readthedocs.io/en/latest/peewee/api.html#PostgresqlDatabase
Expand All @@ -202,7 +224,7 @@ class PooledPostgresqlExtDatabase(
ext.PostgresqlExtDatabase
):
"""PosgtreSQL database extended driver providing **single drop-in sync**
connection and **async connections pool** interface.
connection and **async connections pool** interface based on aiopg pool backend.
JSON fields support is enabled by default, HStore supports is disabled by
default, but can be enabled through pool_params or with ``register_hstore=False`` argument.
Expand Down
Loading

0 comments on commit 922068c

Please sign in to comment.