Skip to content

Commit

Permalink
add prefer_ipv6 option, remember if we are in v4 or v6 mode
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Cipperly <[email protected]>
  • Loading branch information
mcipperly committed May 11, 2021
1 parent 4a30c09 commit 291a76a
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 12 deletions.
8 changes: 8 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ can also specify remote logger by passing the options.
# for remote fluent
logger = sender.FluentSender('app', host='host', port=24224)
The logger will prefer using IPv4 and fall back to IPv6 by default. Should you wish to prefer
IPv6 and fall back to IPv4, specify `prefer_ipv6` option as `True`.

.. code:: python
# for remote fluent preferring IPv6, falling back to IPv4
logger = sender.FluentSender('app', host='host', port=24224, prefer_ipv6=True)
For sending event, call `emit` method with your event. Following example will send the event to
fluentd, with tag 'app.follow' and the attributes 'from' and 'to'.

Expand Down
32 changes: 20 additions & 12 deletions fluent/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
prefer_ipv6=False,
**kwargs):
"""
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
Expand All @@ -69,6 +70,8 @@ def __init__(self,
self.msgpack_kwargs = {} if msgpack_kwargs is None else msgpack_kwargs

self.socket = None
self.prefer_ipv6 = prefer_ipv6
self.ip_addr_family = None
self.pendings = None
self.lock = threading.Lock()
self._closed = False
Expand Down Expand Up @@ -120,12 +123,19 @@ def close(self):
self._close()
self.pendings = None

def _is_ipv4_host(self):
try:
socket.getaddrinfo(self.host, None, socket.AF_INET)
return True
except socket.error:
return False
def _find_ip_addr_family(self):
if not self.prefer_ipv6:
try:
socket.getaddrinfo(self.host, None, socket.AF_INET)
return socket.AF_INET
except socket.error:
return socket.AF_INET6
else:
try:
socket.getaddrinfo(self.host, None, socket.AF_INET6)
return socket.AF_INET6
except socket.error:
return socket.AF_INET

def _make_packet(self, label, timestamp, data):
if label:
Expand Down Expand Up @@ -208,12 +218,10 @@ def _reconnect(self):
sock.settimeout(self.timeout)
sock.connect(self.host[len('unix://'):])
else:
if self._is_ipv4_host():
sock = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
else:
sock = socket.socket(socket.AF_INET6,
socket.SOCK_STREAM)
if not self.ip_addr_family:
self.ip_addr_family = self._find_ip_addr_family()
sock = socket.socket(self.ip_addr_family,
socket.SOCK_STREAM)
sock.settimeout(self.timeout)
# This might be controversial and may need to be removed
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
Expand Down
52 changes: 52 additions & 0 deletions tests/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,25 @@ def recv(self, bufsize, flags=0):
finally:
self._sender.socket = old_sock

def test_ipv6_preferred_but_not_avail(self):
real_getaddrinfo = socket.getaddrinfo

def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
if family == socket.AF_INET6:
raise socket.gaierror("mock: IPv4 Only")
else:
return real_getaddrinfo(host, port, family, type, proto, flags)
with patch('socket.getaddrinfo', side_effect=_fake_getaddrinfo):
sender = fluent.sender.FluentSender(tag='test',
host='localhost',
port=self._server.port,
prefer_ipv6=True)
sender.emit('foo', {'bar': 'baz'})
sender._close()
data = self.get_data()
self.assertEqual(len(data), 1)
self.assertEqual(data[0][2], {'bar': 'baz'})

def test_ipv6_only(self):
# Test if our host supports IPv6 before running this test
try:
Expand Down Expand Up @@ -323,6 +342,39 @@ def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
self.assertEqual(len(data), 1)
self.assertEqual(data[0][2], {'bar': 'baz'})

def test_ipv6_preferred(self):
# Test if our host supports IPv6 before running this test
try:
socket.gethostbyaddr('::1')
except socket.herror:
self.skipTest("Host does not support IPv6, cannot run this test")

self.tearDown()

real_getaddrinfo = socket.getaddrinfo

def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
if family == socket.AF_INET:
raise socket.gaierror("mock: IPv6 Only")
else:
return real_getaddrinfo(host, port, family, type, proto, flags)

self._server = mockserver.MockRecvServer(host='localhost',
inet_family=socket.AF_INET6)


with patch('socket.getaddrinfo', side_effect=_fake_getaddrinfo):
sender = fluent.sender.FluentSender(tag='test',
host='localhost',
port=self._server.port,
prefer_ipv6=True)
sender.emit('foo', {'bar': 'baz'})
sender._close()
data = self.get_data()
self.assertEqual(len(data), 1)
self.assertEqual(data[0][2], {'bar': 'baz'})


@unittest.skipIf(sys.platform == "win32", "Unix socket not supported")
def test_unix_socket(self):
self.tearDown()
Expand Down

0 comments on commit 291a76a

Please sign in to comment.