Skip to content

Commit

Permalink
loadAt
Browse files Browse the repository at this point in the history
loadAt is new optional storage interface that is intended to replace loadBefore
with more clean and uniform semantic. Compared to loadBefore, loadAt:

1) returns data=None and serial of the removal, when loaded object was found to
   be deleted. loadBefore is returning only data=None in such case. This loadAt
   property allows to fix DemoStorage data corruption when whiteouts in overlay
   part were not previously correctly taken into account.

   #318

2) for regular data records, does not require storages to return next_serial,
   in addition to (data, serial). loadBefore requirement to return both
   serial and next_serial is constraining storages unnecessarily, and,
   while for FileStorage it is free to implement, for other storages it is
   not - for example for NEO and RelStorage, finding out next_serial, after
   looking up oid@at data record, costs one more SQL query:

   https://lab.nexedi.com/nexedi/neoppod/blob/fb746e6b/neo/storage/database/mysqldb.py#L484-508
   https://lab.nexedi.com/nexedi/neoppod/blob/fb746e6b/neo/storage/database/mysqldb.py#L477-482

   https://github.com/zodb/relstorage/blob/3.1.1-1-ge7628f9/src/relstorage/storage/load.py#L259-L264
   https://github.com/zodb/relstorage/blob/3.1.1-1-ge7628f9/src/relstorage/adapters/mover.py#L177-L199

   next_serial is not only about execution overhead - it is semantically
   redundant to be there and can be removed from load return. The reason
   I say that next_serial can be removed is that in ZODB/py the only place,
   that I could find, where next_serial is used on client side is in client
   cache (e.g. in NEO client cache), and that cache can be remade to
   work without using that next_serial at all. In simple words whenever
   after

     loadAt(oid, at)  ->  (data, serial)

   query, the cache can remember data for oid in [serial, at] range.

   Next, when invalidation message from server is received, cache entries,
   that had at == client_head, are extended (at -> new_head) for oids that
   are not present in invalidation message, while for oids that are present
   in invalidation message no such extension is done. This allows to
   maintain cache in correct state, invalidate it when there is a need to
   invalidate, and not to throw away cache entries that should remain live.
   This of course requires ZODB server to include both modified and
   just-created objects into invalidation messages

     ( zopefoundation/ZEO#160 ,
       #319 ).

   Switching to loadAt should thus allow storages like NEO and, maybe,
   RelStorage, to do 2x less SQL queries on every object access.

   #318 (comment)

In other words loadAt unifies return signature to always be

   (data, serial)

instead of

   POSKeyError				object does not exist at all
   None					object was removed
   (data, serial, next_serial)		regular data record

used by loadBefore.

This patch:

- introduces new interface.
- introduces ZODB.utils.loadAt helper, that uses either storage.loadAt,
  or, if the storage does not implement loadAt interface, tries to mimic
  loadAt semantic via storage.loadBefore to possible extent + emits
  corresponding warning.
- converts MVCCAdapter to use loadAt instead of loadBefore.
- changes DemoStorage to use loadAt, and this way fixes above-mentioned
  data corruption issue; adds corresponding test; converts
  DemoStorage.loadBefore to be a wrapper around DemoStorage.loadAt.
- adds loadAt implementation to FileStorage and MappingStorage.
- adapts other tests/code correspondingly.

/cc @jimfulton, @jamadden, @vpelletier, @jmuchemb, @arnaud-fontaine, @gidzit, @klawlf82, @hannosch
  • Loading branch information
navytux committed Jul 27, 2020
1 parent b446045 commit f2036f2
Show file tree
Hide file tree
Showing 17 changed files with 339 additions and 79 deletions.
7 changes: 3 additions & 4 deletions src/ZODB/BaseStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class BaseStorage(UndoLogCompatible):
If it stores multiple revisions, it should implement
loadSerial()
loadBefore()
loadAt()
Each storage will have two locks that are accessed via lock
acquire and release methods bound to the instance. (Yuck.)
Expand Down Expand Up @@ -267,9 +267,8 @@ def loadSerial(self, oid, serial):
raise POSException.Unsupported(
"Retrieval of historical revisions is not supported")

def loadBefore(self, oid, tid):
"""Return most recent revision of oid before tid committed."""
return None
# do not provide loadAt/loadBefore here in BaseStorage - if child forgets
# to override it - storage will always return "no data" instead of failing.

def copyTransactionsFrom(self, other, verbose=0):
"""Copy transactions from another storage.
Expand Down
3 changes: 1 addition & 2 deletions src/ZODB/DB.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,8 +726,7 @@ def open(self, transaction_manager=None, at=None, before=None):
- `before`: like `at`, but opens the readonly state before the
tid or datetime.
"""
# `at` is normalized to `before`, since we use storage.loadBefore
# as the underlying implementation of both.
# `at` is normalized to `before`.
before = getTID(at, before)
if (before is not None and
before > self.lastTransaction() and
Expand Down
85 changes: 48 additions & 37 deletions src/ZODB/DemoStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import random
import weakref
import tempfile
import warnings
import ZODB.BaseStorage
import ZODB.blob
import ZODB.interfaces
Expand Down Expand Up @@ -223,45 +224,55 @@ def __len__(self):
# still want load for old clients (e.g. zeo servers)
load = load_current

def loadBefore(self, oid, tid):
try:
result = self.changes.loadBefore(oid, tid)
except ZODB.POSException.POSKeyError:
# The oid isn't in the changes, so defer to base
return self.base.loadBefore(oid, tid)
def loadAt(self, oid, at):
data, serial = ZODB.utils.loadAt(self.changes, oid, at)
if (data is not None) or (serial != ZODB.utils.z64):
# object is present in changes either as data or deletion record.
return data, serial

if result is None:
# The oid *was* in the changes, but there aren't any
# earlier records. Maybe there are in the base.
try:
result = self.base.loadBefore(oid, tid)
except ZODB.POSException.POSKeyError:
# The oid isn't in the base, so None will be the right result
pass
# object is not present in changes at all - use base
return ZODB.utils.loadAt(self.base, oid, at)

def loadBefore(self, oid, before):
warnings.warn("loadBefore is deprecated - use loadAt instead",
DeprecationWarning, stacklevel=2)
p64 = ZODB.utils.p64
u64 = ZODB.utils.u64

if before in (maxtid, ZODB.utils.z64):
at = before
else:
at = p64(u64(before)-1)

data, serial = self.loadAt(oid, at)

# find out next_serial.
# it is ok to use dumb/slow implementation since loadBefore should not
# be used and is provided only for backward compatibility.
next_serial = maxtid
while 1:
_, s = self.loadAt(oid, p64(u64(next_serial)-1))
assert s >= serial
if s == serial:
# found - next_serial is serial of the next data record
break
next_serial = s

if next_serial == maxtid:
next_serial = None

# next_serial found -> return/raise what loadBefore users expect
if data is None:
if next_serial is None:
# object was never created
raise ZODB.POSException.POSKeyError(oid)
else:
if result and not result[-1]:
# The oid is current in the base. We need to find
# the end tid in the base by fining the first tid
# in the changes. Unfortunately, there isn't an
# api for this, so we have to walk back using
# loadBefore.

if tid == maxtid:
# Special case: we were looking for the
# current value. We won't find anything in
# changes, so we're done.
return result

end_tid = maxtid
t = self.changes.loadBefore(oid, end_tid)
while t:
end_tid = t[1]
t = self.changes.loadBefore(oid, end_tid)
result = result[:2] + (
end_tid if end_tid != maxtid else None,
)

return result
# object was deleted
return None

# regular data record
return data, serial, next_serial


def loadBlob(self, oid, serial):
try:
Expand Down
36 changes: 36 additions & 0 deletions src/ZODB/FileStorage/FileStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import os
import time
import warnings
from struct import pack
from struct import unpack

Expand Down Expand Up @@ -58,6 +59,7 @@
from ZODB.interfaces import IStorageRestoreable
from ZODB.interfaces import IStorageUndoable
from ZODB.interfaces import IStorageLastPack
from ZODB.interfaces import IStorageLoadAt
from ZODB.POSException import ConflictError
from ZODB.POSException import MultipleUndoErrors
from ZODB.POSException import POSKeyError
Expand Down Expand Up @@ -135,6 +137,7 @@ def __init__(self, afile):
IExternalGC,
IStorage,
IStorageLastPack,
IStorageLoadAt,
)
class FileStorage(
FileStorageFormatter,
Expand Down Expand Up @@ -566,7 +569,40 @@ def loadSerial(self, oid, serial):
else:
return self._loadBack_impl(oid, h.back)[0]

def loadAt(self, oid, at):
"""loadAt implements IStorageLoadAt."""
with self._files.get() as _file:
try:
pos = self._lookup_pos(oid)
except POSKeyError:
# object does not exist
return None, z64

while 1:
h = self._read_data_header(pos, oid, _file)
if h.tid <= at:
break
pos = h.prev
if not pos:
# object not yet created as of at
return None, z64

# h is the most recent DataHeader with .tid <= at
if h.plen:
# regular data record
return _file.read(h.plen), h.tid
elif h.back:
# backpointer
data, _, _, _ = self._loadBack_impl(oid, h.back,
fail=False, _file=_file)
return data, h.tid
else:
# deletion
return None, h.tid

def loadBefore(self, oid, tid):
warnings.warn("loadBefore is deprecated - use loadAt instead",
DeprecationWarning, stacklevel=2)
with self._files.get() as _file:
pos = self._lookup_pos(oid)
end_tid = None
Expand Down
19 changes: 19 additions & 0 deletions src/ZODB/MappingStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import BTrees
import time
import warnings
import ZODB.BaseStorage
import ZODB.interfaces
import ZODB.POSException
Expand All @@ -31,6 +32,7 @@
ZODB.interfaces.IStorage,
ZODB.interfaces.IStorageIteration,
ZODB.interfaces.IStorageLastPack,
ZODB.interfaces.IStorageLoadAt,
)
class MappingStorage(object):
"""In-memory storage implementation
Expand Down Expand Up @@ -149,9 +151,26 @@ def __len__(self):

load = ZODB.utils.load_current

# ZODB.interfaces.IStorageLoadAt
@ZODB.utils.locked(opened)
def loadAt(self, oid, at):
z64 = ZODB.utils.z64
tid_data = self._data.get(oid)
if not tid_data:
return None, z64
if at == z64:
return None, z64
tids_at = tid_data.keys(None, at)
if not tids_at:
return None, z64
serial = tids_at[-1]
return tid_data[serial], serial

# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def loadBefore(self, oid, tid):
warnings.warn("loadBefore is deprecated - use loadAt instead",
DeprecationWarning, stacklevel=2)
tid_data = self._data.get(oid)
if tid_data:
before = ZODB.utils.u64(tid)
Expand Down
7 changes: 3 additions & 4 deletions src/ZODB/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,10 +866,10 @@ def undo(self, serial_id, transaction):
for oid in self.fshelper.getOIDsForSerial(serial_id):
# we want to find the serial id of the previous revision
# of this blob object.
load_result = self.loadBefore(oid, serial_id)

if load_result is None:
at_before = utils.p64(utils.u64(serial_id)-1)
_, serial_before = utils.loadAt(self, oid, at_before)

if serial_before == utils.z64:
# There was no previous revision of this blob
# object. The blob was created in the transaction
# represented by serial_id. We copy the blob data
Expand All @@ -884,7 +884,6 @@ def undo(self, serial_id, transaction):
# transaction implied by "serial_id". We copy the blob
# data to a new file that references the undo transaction
# in case a user wishes to undo this undo.
data, serial_before, serial_after = load_result
orig_fn = self.fshelper.getBlobFilename(oid, serial_before)
new_fn = self.fshelper.getBlobFilename(oid, undo_serial)
with open(orig_fn, "rb") as orig:
Expand Down
9 changes: 4 additions & 5 deletions src/ZODB/historical_connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ development continues on a "development" head.

A database can be opened historically ``at`` or ``before`` a given transaction
serial or datetime. Here's a simple example. It should work with any storage
that supports ``loadBefore``.
that supports ``loadAt`` or ``loadBefore``.

We'll begin our example with a fairly standard set up. We

Expand Down Expand Up @@ -138,10 +138,9 @@ root.
>>> historical_conn.root()['first']['count']
0

In fact, ``at`` arguments are translated into ``before`` values because the
underlying mechanism is a storage's loadBefore method. When you look at a
connection's ``before`` attribute, it is normalized into a ``before`` serial,
no matter what you pass into ``db.open``.
In fact, ``at`` arguments are translated into ``before`` values.
When you look at a connection's ``before`` attribute, it is normalized into a
``before`` serial, no matter what you pass into ``db.open``.

>>> print(conn.before)
None
Expand Down
21 changes: 21 additions & 0 deletions src/ZODB/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,9 @@ def __len__():
def loadBefore(oid, tid):
"""Load the object data written before a transaction id
( This method is deprecated and kept for backward-compatibility.
Please use loadAt instead. )
If there isn't data before the object before the given
transaction, then None is returned, otherwise three values are
returned:
Expand Down Expand Up @@ -907,6 +910,24 @@ def lastPack(): # -> pack-cut-point (tid)
to perform round-trip and synchronize with the server.
"""

class IStorageLoadAt(Interface):

def loadAt(oid, at): # -> (data, serial)
"""Load object data as observed at given database state.
loadAt returns data for object with given object ID as observed by
database state ≤ at. Two values are returned:
- The data record,
- The transaction ID of the data record.
If the object does not exist, or is deleted as of `at` database state,
loadAt returns data=None, and serial indicates transaction ID of the
most recent deletion done in transaction with ID ≤ at, or null tid if
there is no such deletion.
Note: no POSKeyError is raised even if object id is not in the storage.
"""

class IMultiCommitStorage(IStorage):
"""A multi-commit storage can commit multiple transactions at once.
Expand Down
18 changes: 10 additions & 8 deletions src/ZODB/mvccadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import zope.interface

from . import interfaces, serialize, POSException
from .utils import p64, u64, Lock, oid_repr, tid_repr
from .utils import p64, u64, Lock, loadAt, oid_repr, tid_repr

class Base(object):

Expand Down Expand Up @@ -99,7 +99,7 @@ class MVCCAdapterInstance(Base):
'checkCurrentSerialInTransaction', 'tpc_abort',
)

_start = None # Transaction start time
_start = None # Transaction start time (before)
_ltid = b'' # Last storage transaction id

def __init__(self, base):
Expand Down Expand Up @@ -151,8 +151,9 @@ def poll_invalidations(self):

def load(self, oid):
assert self._start is not None
r = self._storage.loadBefore(oid, self._start)
if r is None:
at = p64(u64(self._start)-1)
data, serial = loadAt(self._storage, oid, at)
if data is None:
# object was deleted or not-yet-created.
# raise POSKeyError, or ReadConflictError, if the deletion is
# potentially due to simultaneous pack: a pack(t+δ) could be
Expand Down Expand Up @@ -186,7 +187,7 @@ def load(self, oid):
# no simultaneous pack detected, or lastPack was before our view of the database
raise POSException.POSKeyError(oid)

return r[:2]
return data, serial

def prefetch(self, oids):
try:
Expand Down Expand Up @@ -265,10 +266,11 @@ def poll_invalidations(self):
new_oid = pack = store = read_only_writer

def load(self, oid, version=''):
r = self._storage.loadBefore(oid, self._before)
if r is None:
at = p64(u64(self._before)-1)
data, serial = loadAt(self._storage, oid, at)
if data is None:
raise POSException.POSKeyError(oid)
return r[:2]
return data, serial


class UndoAdapterInstance(Base):
Expand Down
Loading

0 comments on commit f2036f2

Please sign in to comment.