Skip to content

Commit

Permalink
Fix tx modes
Browse files Browse the repository at this point in the history
  • Loading branch information
vgvoleg committed Oct 31, 2024
1 parent 4c0afa1 commit 5870e24
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 28 deletions.
9 changes: 3 additions & 6 deletions tests/test_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,24 @@ def _test_isolation_level_read_only(
cursor = connection.cursor()
with suppress(dbapi.DatabaseError):
maybe_await(cursor.execute("DROP TABLE foo"))

cursor = connection.cursor()
maybe_await(cursor.execute(
"CREATE TABLE foo(id Int64 NOT NULL, PRIMARY KEY (id))"
))

connection.set_isolation_level(isolation_level)
cursor = connection.cursor()

query = "UPSERT INTO foo(id) VALUES (1)"
if read_only:
with pytest.raises(dbapi.DatabaseError):
maybe_await(cursor.execute(query))

else:
maybe_await(cursor.execute(query))

maybe_await(connection.rollback())

connection.set_isolation_level("AUTOCOMMIT")

cursor = connection.cursor()

maybe_await(cursor.execute("DROP TABLE foo"))

def _test_connection(self, connection: dbapi.Connection) -> None:
Expand Down Expand Up @@ -211,7 +206,9 @@ def connect() -> dbapi.AsyncConnection:
try:
yield conn
finally:
await greenlet_spawn(conn.close)
def close() -> None:
maybe_await(conn.close())
await greenlet_spawn(close)

@pytest.mark.asyncio
@pytest.mark.parametrize(
Expand Down
46 changes: 24 additions & 22 deletions ydb_dbapi/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,24 @@ class IsolationLevel(str, Enum):


class _IsolationSettings(NamedTuple):
ydb_mode: ydb.BaseQueryTxMode
ydb_mode: ydb.BaseQueryTxMode | None
interactive: bool


_ydb_isolation_settings_map = {
IsolationLevel.AUTOCOMMIT: _IsolationSettings(
ydb.QuerySerializableReadWrite(), interactive=False
),
IsolationLevel.AUTOCOMMIT: _IsolationSettings(None, interactive=False),
IsolationLevel.SERIALIZABLE: _IsolationSettings(
ydb.QuerySerializableReadWrite(), interactive=True
),
IsolationLevel.ONLINE_READONLY: _IsolationSettings(
ydb.QueryOnlineReadOnly(), interactive=True
ydb.QueryOnlineReadOnly(), interactive=False
),
IsolationLevel.ONLINE_READONLY_INCONSISTENT: _IsolationSettings(
ydb.QueryOnlineReadOnly().with_allow_inconsistent_reads(),
interactive=True,
interactive=False,
),
IsolationLevel.STALE_READONLY: _IsolationSettings(
ydb.QueryStaleReadOnly(), interactive=True
ydb.QueryStaleReadOnly(), interactive=False
),
IsolationLevel.SNAPSHOT_READONLY: _IsolationSettings(
ydb.QuerySnapshotReadOnly(), interactive=True
Expand Down Expand Up @@ -78,10 +76,11 @@ def __init__(

self.connection_kwargs: dict = kwargs

self._tx_mode: ydb.BaseQueryTxMode = ydb.QuerySerializableReadWrite()
self._shared_session_pool: bool = False

self._tx_context: TxContext | AsyncTxContext | None = None
self._tx_mode: ydb.BaseQueryTxMode | None = None
self.interactive_transaction: bool = False
self._shared_session_pool: bool = False

if ydb_session_pool is not None:
self._shared_session_pool = True
Expand All @@ -99,21 +98,24 @@ def __init__(
self._session: ydb.QuerySession | ydb.aio.QuerySession | None = None

def set_isolation_level(self, isolation_level: IsolationLevel) -> None:
ydb_isolation_settings = _ydb_isolation_settings_map[isolation_level]
if self._tx_context and self._tx_context.tx_id:
raise InternalError(
"Failed to set transaction mode: transaction is already began"
)

ydb_isolation_settings = _ydb_isolation_settings_map[isolation_level]

self._tx_context = None
self._tx_mode = ydb_isolation_settings.ydb_mode
self.interactive_transaction = ydb_isolation_settings.interactive

def get_isolation_level(self) -> str:
if self._tx_mode.name == ydb.QuerySerializableReadWrite().name:
if self.interactive_transaction:
return IsolationLevel.SERIALIZABLE
if self._tx_mode is None:
return IsolationLevel.AUTOCOMMIT
if self._tx_mode.name == ydb.QuerySerializableReadWrite().name:
return IsolationLevel.SERIALIZABLE
if self._tx_mode.name == ydb.QueryOnlineReadOnly().name:
if self._tx_mode.settings.allow_inconsistent_reads:
if self._tx_mode.allow_inconsistent_reads:
return IsolationLevel.ONLINE_READONLY_INCONSISTENT
return IsolationLevel.ONLINE_READONLY
if self._tx_mode.name == ydb.QueryStaleReadOnly().name:
Expand All @@ -123,6 +125,12 @@ def get_isolation_level(self) -> str:
msg = f"{self._tx_mode.name} is not supported"
raise NotSupportedError(msg)

def _maybe_init_tx(
self, session: ydb.QuerySession | ydb.aio.QuerySession
) -> None:
if self._tx_context is None and self._tx_mode is not None:
self._tx_context = session.transaction(self._tx_mode)


class Connection(BaseConnection):
_driver_cls = ydb.Driver
Expand Down Expand Up @@ -154,10 +162,7 @@ def cursor(self) -> Cursor:
if self._session is None:
raise RuntimeError("Connection is not ready, use wait_ready.")

if self.interactive_transaction:
self._tx_context = self._session.transaction(self._tx_mode)
else:
self._tx_context = None
self._maybe_init_tx(self._session)

self._current_cursor = self._cursor_cls(
session=self._session,
Expand Down Expand Up @@ -281,10 +286,7 @@ def cursor(self) -> AsyncCursor:
if self._session is None:
raise RuntimeError("Connection is not ready, use wait_ready.")

if self.interactive_transaction:
self._tx_context = self._session.transaction(self._tx_mode)
else:
self._tx_context = None
self._maybe_init_tx(self._session)

self._current_cursor = self._cursor_cls(
session=self._session,
Expand Down

0 comments on commit 5870e24

Please sign in to comment.