Skip to content

Commit

Permalink
Fix for non-blocking sendall (#378)
Browse files Browse the repository at this point in the history
* Removed BufferedSocket and non-blocking func

* FIx for py35
  • Loading branch information
technige authored Mar 27, 2020
1 parent f1b34a8 commit f9d15f9
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 192 deletions.
92 changes: 1 addition & 91 deletions neo4j/io/_bolt3.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self, unresolved_address, sock, *, auth=None, **config):
self.socket = sock
self.server = ServerInfo(Address(sock.getpeername()), Bolt3.PROTOCOL_VERSION)
self.outbox = Outbox()
self.inbox = Inbox(BufferedSocket(self.socket, 32768), on_error=self._set_defunct)
self.inbox = Inbox(self.socket, on_error=self._set_defunct)
self.packer = Packer(self.outbox)
self.unpacker = Unpacker(self.inbox)
self.responses = deque()
Expand Down Expand Up @@ -463,96 +463,6 @@ def view(self):
return memoryview(self._data[:end])


class BufferedSocket:
""" Wrapper for a regular socket, with an added a dynamically-resizing
receive buffer to reduce the number of calls to recv.
NOTE: not all socket methods are implemented yet
"""

def __init__(self, socket_, initial_capacity=0):
self.socket = socket_
self.buffer = bytearray(initial_capacity)
self.r_pos = 0
self.w_pos = 0

def _fill_buffer(self, min_bytes):
""" Fill the buffer with at least `min_bytes` bytes, requesting more if
the buffer has space. Internally, this method attempts to do as little
allocation as possible and make as few calls to socket.recv as
possible.
"""
# First, we need to calculate how much spare space exists between the
# write cursor and the end of the buffer.
space_at_end = len(self.buffer) - self.w_pos
if min_bytes <= space_at_end:
# If there's at least enough here for the minimum number of bytes
# we need, then do nothing
#
pass
elif min_bytes <= space_at_end + self.r_pos:
# If the buffer contains enough space, but it's split between the
# end of the buffer and recyclable space at the start of the
# buffer, then recycle that space by pushing the remaining data
# towards the front.
#
# print("Recycling {} bytes".format(self.r_pos))
size = self.w_pos - self.r_pos
view = memoryview(self.buffer)
self.buffer[0:size] = view[self.r_pos:self.w_pos]
self.r_pos = 0
self.w_pos = size
else:
# Otherwise, there's just not enough space whichever way you shake
# it. So, rebuild the buffer from scratch, taking the unread data
# and appending empty space big enough to hold the minimum number
# of bytes we're looking for.
#
# print("Rebuilding buffer from {} bytes ({} used) to "
# "{} bytes".format(len(self.buffer),
# self.w_pos - self.r_pos,
# self.w_pos - self.r_pos + min_bytes))
self.buffer = (self.buffer[self.r_pos:self.w_pos] +
bytearray(min_bytes))
self.w_pos -= self.r_pos
self.r_pos = 0
min_end = self.w_pos + min_bytes
end = len(self.buffer)
view = memoryview(self.buffer)
self.socket.setblocking(0)
while self.w_pos < min_end:
ready_to_read, _, _ = select([self.socket], [], [])
subview = view[self.w_pos:end]
n = self.socket.recv_into(subview, end - self.w_pos)
if n == 0:
raise OSError("No data")
self.w_pos += n

def recv_into(self, buffer, n_bytes=0, flags=0):
""" Intercepts a regular socket.recv_into call, taking data from the
internal buffer, if available. If not enough data exists in the buffer,
more will be retrieved first.
Unlike the lower-level call, this method will never return 0, instead
raising an OSError if no data is returned on the underlying socket.
:param buffer:
:param n_bytes:
:param flags:
:raises OSError:
:return:
"""
available = self.w_pos - self.r_pos
required = n_bytes - available
if required > 0:
self._fill_buffer(required)
view = memoryview(self.buffer)
end = self.r_pos + n_bytes
buffer[:] = view[self.r_pos:end]
self.r_pos = end
return n_bytes


class Inbox(MessageInbox):

def __next__(self):
Expand Down
92 changes: 1 addition & 91 deletions neo4j/io/_bolt4x0.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self, unresolved_address, sock, *, auth=None, **config):
self.socket = sock
self.server = ServerInfo(Address(sock.getpeername()), Bolt4x0.PROTOCOL_VERSION)
self.outbox = Outbox()
self.inbox = Inbox(BufferedSocket(self.socket, 32768), on_error=self._set_defunct)
self.inbox = Inbox(self.socket, on_error=self._set_defunct)
self.packer = Packer(self.outbox)
self.unpacker = Unpacker(self.inbox)
self.responses = deque()
Expand Down Expand Up @@ -461,96 +461,6 @@ def view(self):
return memoryview(self._data[:end])


class BufferedSocket:
""" Wrapper for a regular socket, with an added a dynamically-resizing
receive buffer to reduce the number of calls to recv.
NOTE: not all socket methods are implemented yet
"""

def __init__(self, socket_, initial_capacity=0):
self.socket = socket_
self.buffer = bytearray(initial_capacity)
self.r_pos = 0
self.w_pos = 0

def _fill_buffer(self, min_bytes):
""" Fill the buffer with at least `min_bytes` bytes, requesting more if
the buffer has space. Internally, this method attempts to do as little
allocation as possible and make as few calls to socket.recv as
possible.
"""
# First, we need to calculate how much spare space exists between the
# write cursor and the end of the buffer.
space_at_end = len(self.buffer) - self.w_pos
if min_bytes <= space_at_end:
# If there's at least enough here for the minimum number of bytes
# we need, then do nothing
#
pass
elif min_bytes <= space_at_end + self.r_pos:
# If the buffer contains enough space, but it's split between the
# end of the buffer and recyclable space at the start of the
# buffer, then recycle that space by pushing the remaining data
# towards the front.
#
# print("Recycling {} bytes".format(self.r_pos))
size = self.w_pos - self.r_pos
view = memoryview(self.buffer)
self.buffer[0:size] = view[self.r_pos:self.w_pos]
self.r_pos = 0
self.w_pos = size
else:
# Otherwise, there's just not enough space whichever way you shake
# it. So, rebuild the buffer from scratch, taking the unread data
# and appending empty space big enough to hold the minimum number
# of bytes we're looking for.
#
# print("Rebuilding buffer from {} bytes ({} used) to "
# "{} bytes".format(len(self.buffer),
# self.w_pos - self.r_pos,
# self.w_pos - self.r_pos + min_bytes))
self.buffer = (self.buffer[self.r_pos:self.w_pos] +
bytearray(min_bytes))
self.w_pos -= self.r_pos
self.r_pos = 0
min_end = self.w_pos + min_bytes
end = len(self.buffer)
view = memoryview(self.buffer)
self.socket.setblocking(0)
while self.w_pos < min_end:
ready_to_read, _, _ = select([self.socket], [], [])
subview = view[self.w_pos:end]
n = self.socket.recv_into(subview, end - self.w_pos)
if n == 0:
raise OSError("No data")
self.w_pos += n

def recv_into(self, buffer, n_bytes=0, flags=0):
""" Intercepts a regular socket.recv_into call, taking data from the
internal buffer, if available. If not enough data exists in the buffer,
more will be retrieved first.
Unlike the lower-level call, this method will never return 0, instead
raising an OSError if no data is returned on the underlying socket.
:param buffer:
:param n_bytes:
:param flags:
:raises OSError:
:return:
"""
available = self.w_pos - self.r_pos
required = n_bytes - available
if required > 0:
self._fill_buffer(required)
view = memoryview(self.buffer)
end = self.r_pos + n_bytes
buffer[:] = view[self.r_pos:end]
self.r_pos = end
return n_bytes


class Inbox(MessageInbox):

def __next__(self):
Expand Down
3 changes: 0 additions & 3 deletions tests/unit/io/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ def __init__(self, address):
self.captured = b""
self.messages = MessageInbox(self, on_error=print)

def setblocking(self, flag):
pass

def getsockname(self):
return "127.0.0.1", 0xFFFF

Expand Down
3 changes: 0 additions & 3 deletions tests/unit/io/test_direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ class FakeSocket:
def __init__(self, address):
self.address = address

def setblocking(self, flag):
pass

def getpeername(self):
return self.address

Expand Down
15 changes: 11 additions & 4 deletions tests/unit/test_addressing.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,14 @@ def test_address_resolve_with_custom_resolver():
resolved = address.resolve(resolver=custom_resolver)
assert isinstance(resolved, Address) is False
assert isinstance(resolved, list) is True
assert len(resolved) == 3
assert resolved[0] == IPv4Address(('127.0.0.1', 7687))
assert resolved[1] == IPv6Address(('::1', 1234, 0, 0))
assert resolved[2] == IPv4Address(('127.0.0.1', 1234))
if len(resolved) == 2:
# IPv4 only
assert resolved[0] == IPv4Address(('127.0.0.1', 7687))
assert resolved[1] == IPv4Address(('127.0.0.1', 1234))
elif len(resolved) == 3:
# IPv4 and IPv6
assert resolved[0] == IPv4Address(('127.0.0.1', 7687))
assert resolved[1] == IPv6Address(('::1', 1234, 0, 0))
assert resolved[2] == IPv4Address(('127.0.0.1', 1234))
else:
assert False

0 comments on commit f9d15f9

Please sign in to comment.