From 258b4456773cc4760ff0d0fabcc6fcad6f1ab530 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Guz?= Date: Wed, 29 Jan 2020 13:07:51 +0100 Subject: [PATCH 1/4] Add queue overflow handler in asyncsender. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Guz --- fluent/asyncsender.py | 11 +++++++- tests/test_asynchandler.py | 52 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/fluent/asyncsender.py b/fluent/asyncsender.py index 7f8dc02..3b460ab 100644 --- a/fluent/asyncsender.py +++ b/fluent/asyncsender.py @@ -55,6 +55,7 @@ def __init__(self, msgpack_kwargs=None, queue_maxsize=DEFAULT_QUEUE_MAXSIZE, queue_circular=DEFAULT_QUEUE_CIRCULAR, + queue_overflow_handler=None, **kwargs): """ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. @@ -66,6 +67,10 @@ def __init__(self, **kwargs) self._queue_maxsize = queue_maxsize self._queue_circular = queue_circular + if queue_circular and queue_overflow_handler: + self._queue_overflow_handler = queue_overflow_handler + else: + self._queue_overflow_handler = self._queue_overflow_handler_default self._thread_guard = threading.Event() # This ensures visibility across all variables self._closed = False @@ -109,7 +114,8 @@ def _send(self, bytes_): if self._queue_circular and self._queue.full(): # discard oldest try: - self._queue.get(block=False) + discarded_bytes = self._queue.get(block=False) + self._queue_overflow_handler(discarded_bytes) except Empty: # pragma: no cover pass try: @@ -132,5 +138,8 @@ def _send_loop(self): finally: self._close() + def _queue_overflow_handler_default(self, discarded_bytes): + pass + def __exit__(self, exc_type, exc_val, exc_tb): self.close() diff --git a/tests/test_asynchandler.py b/tests/test_asynchandler.py index 52d9182..477b066 100644 --- a/tests/test_asynchandler.py +++ b/tests/test_asynchandler.py @@ -4,6 +4,9 @@ import sys import unittest +from mock import patch +from unittest import mock + import fluent.asynchandler import fluent.handler from tests import mockserver @@ -309,3 +312,52 @@ def test_simple(self): eq('userB', el[2]['to']) self.assertTrue(el[1]) self.assertTrue(isinstance(el[1], int)) + + +class QueueOverflowException(Exception): + pass + + +def queue_overflow_handler(discarded_bytes): + raise QueueOverflowException(discarded_bytes) + + + + +class TestHandlerWithCircularQueueHandler(unittest.TestCase): + Q_SIZE = 1 + + def setUp(self): + super(TestHandlerWithCircularQueueHandler, self).setUp() + self._server = mockserver.MockRecvServer('localhost') + self._port = self._server.port + + def tearDown(self): + self._server.close() + + def get_handler_class(self): + # return fluent.handler.FluentHandler + return fluent.asynchandler.FluentHandler + + @patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(return_value=True)) + def test_simple(self): + handler = self.get_handler_class()('app.follow', port=self._port, + queue_maxsize=self.Q_SIZE, + queue_circular=True, + queue_overflow_handler=queue_overflow_handler) + with handler: + self.assertEqual(handler.sender.queue_circular, True) + self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE) + + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + + log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) + with self.assertRaises(QueueOverflowException): + log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) + log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) + with self.assertRaises(QueueOverflowException): + log.info({'cnt': 4, 'from': 'userA', 'to': 'userB'}) + From 8e70e6a3addb47a896816b6771ec4c4be251e26d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Guz?= Date: Thu, 30 Jan 2020 18:35:01 +0100 Subject: [PATCH 2/4] Fix tests. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Guz --- tests/test_asynchandler.py | 45 ++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/tests/test_asynchandler.py b/tests/test_asynchandler.py index 477b066..ccd4069 100644 --- a/tests/test_asynchandler.py +++ b/tests/test_asynchandler.py @@ -4,8 +4,16 @@ import sys import unittest -from mock import patch -from unittest import mock +try: + from unittest import mock +except ImportError: + import mock +try: + from unittest.mock import patch +except ImportError: + from mock import patch + + import fluent.asynchandler import fluent.handler @@ -322,8 +330,6 @@ def queue_overflow_handler(discarded_bytes): raise QueueOverflowException(discarded_bytes) - - class TestHandlerWithCircularQueueHandler(unittest.TestCase): Q_SIZE = 1 @@ -339,25 +345,30 @@ def get_handler_class(self): # return fluent.handler.FluentHandler return fluent.asynchandler.FluentHandler - @patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(return_value=True)) def test_simple(self): handler = self.get_handler_class()('app.follow', port=self._port, queue_maxsize=self.Q_SIZE, queue_circular=True, queue_overflow_handler=queue_overflow_handler) with handler: - self.assertEqual(handler.sender.queue_circular, True) - self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE) + def custom_full_queue(): + handler.sender._queue.put(b'Mock', block=True) + return True - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) + with patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(side_effect=custom_full_queue)): + self.assertEqual(handler.sender.queue_circular, True) + self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE) - log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) - with self.assertRaises(QueueOverflowException): - log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) - with self.assertRaises(QueueOverflowException): - log.info({'cnt': 4, 'from': 'userA', 'to': 'userB'}) + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + + with self.assertRaises(QueueOverflowException): + log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) + + with self.assertRaises(QueueOverflowException): + log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) + with self.assertRaises(QueueOverflowException): + log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) From 84808c305bca80f78bfd4e39527ed85ea93774c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Guz?= Date: Mon, 24 Feb 2020 11:08:12 +0100 Subject: [PATCH 3/4] Execute queue overflow handler in case of no errors. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Guz --- fluent/asyncsender.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fluent/asyncsender.py b/fluent/asyncsender.py index 3b460ab..e140774 100644 --- a/fluent/asyncsender.py +++ b/fluent/asyncsender.py @@ -115,9 +115,10 @@ def _send(self, bytes_): # discard oldest try: discarded_bytes = self._queue.get(block=False) - self._queue_overflow_handler(discarded_bytes) except Empty: # pragma: no cover pass + else: + self._queue_overflow_handler(discarded_bytes) try: self._queue.put(bytes_, block=(not self._queue_circular)) except Full: # pragma: no cover From 478bd02ff69c3c4afca25694025f8548898d96ff Mon Sep 17 00:00:00 2001 From: pguz Date: Mon, 25 May 2020 13:21:24 +0200 Subject: [PATCH 4/4] Respect multithreading in asynchandler test. Signed-off-by: pguz --- tests/test_asynchandler.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/tests/test_asynchandler.py b/tests/test_asynchandler.py index ccd4069..e88a041 100644 --- a/tests/test_asynchandler.py +++ b/tests/test_asynchandler.py @@ -322,7 +322,7 @@ def test_simple(self): self.assertTrue(isinstance(el[1], int)) -class QueueOverflowException(Exception): +class QueueOverflowException(BaseException): pass @@ -364,11 +364,24 @@ def custom_full_queue(): handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - with self.assertRaises(QueueOverflowException): + exc_counter = 0 + + try: log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) + except QueueOverflowException: + exc_counter += 1 - with self.assertRaises(QueueOverflowException): + try: log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) + except QueueOverflowException: + exc_counter += 1 - with self.assertRaises(QueueOverflowException): + try: log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) + except QueueOverflowException: + exc_counter += 1 + + # we can't be sure to have exception in every case due to multithreading, + # so we can test only for a cautelative condition here + print('Exception raised: {} (expected 3)'.format(exc_counter)) + assert exc_counter >= 0