Skip to content

Commit

Permalink
Implemented a blocking adapter acceptance test for two basic_consume …
Browse files Browse the repository at this point in the history
…consumers on the same channel.

Fix pylint warnings
  • Loading branch information
Vitaly Kruglikov committed Aug 25, 2015
1 parent a9bf96d commit 94f8618
Showing 1 changed file with 112 additions and 13 deletions.
125 changes: 112 additions & 13 deletions tests/acceptance/blocking_adapter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test(self):

class TestConnectionContextManagerClosesConnectionAndPassesOriginalException(BlockingTestCaseBase):
def test(self):
"""BlockingConnection: connection context manager closes connection and passes original exception"""
"""BlockingConnection: connection context manager closes connection and passes original exception""" # pylint: disable=C0301
class MyException(Exception):
pass

Expand All @@ -109,7 +109,7 @@ class MyException(Exception):

class TestConnectionContextManagerClosesConnectionAndPassesSystemException(BlockingTestCaseBase):
def test(self):
"""BlockingConnection: connection context manager closes connection and passes system exception"""
"""BlockingConnection: connection context manager closes connection and passes system exception""" # pylint: disable=C0301
with self.assertRaises(SystemExit):
with self._connect() as connection:
self.assertTrue(connection.is_open)
Expand Down Expand Up @@ -1365,8 +1365,7 @@ def test(self): # pylint: disable=R0914
self.addCleanup(self._connect().channel().queue_delete, q_name)

# Bind the queue to the exchange using routing key
frame = ch.queue_bind(q_name, exchange=exg_name,
routing_key=routing_key)
ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key)

# Attempt to send an unroutable message in the queue via basic_publish
res = ch.basic_publish(exg_name, routing_key='',
Expand Down Expand Up @@ -1443,8 +1442,7 @@ def test(self): # pylint: disable=R0914,R0915
self.addCleanup(self._connect().channel().queue_delete, q_name)

# Bind the queue to the exchange using routing key
frame = ch.queue_bind(q_name, exchange=exg_name,
routing_key=routing_key)
ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key)

# Deposit a message in the queue via basic_publish
msg1_headers = dict(
Expand Down Expand Up @@ -1549,6 +1547,111 @@ def test(self): # pylint: disable=R0914,R0915
self.assertEqual(frame.method.consumer_tag, consumer_tag)


class TestTwoBasicConsumersOnSameChannel(BlockingTestCaseBase):

def test(self): # pylint: disable=R0914
"""BlockingChannel: two basic_consume consumers on same channel
"""
connection = self._connect()

ch = connection.channel()

exg_name = 'TestPublishAndConsumeAndQos_exg_' + uuid.uuid1().hex
q1_name = 'TestTwoBasicConsumersOnSameChannel_q1' + uuid.uuid1().hex
q2_name = 'TestTwoBasicConsumersOnSameChannel_q2' + uuid.uuid1().hex
q1_routing_key = 'TestTwoBasicConsumersOnSameChannel1'
q2_routing_key = 'TestTwoBasicConsumersOnSameChannel2'

# Place channel in publisher-acknowledgments mode so that publishing
# with mandatory=True will be synchronous
ch.confirm_delivery()

# Declare a new exchange
ch.exchange_declare(exg_name, exchange_type='direct')
self.addCleanup(connection.channel().exchange_delete, exg_name)

# Declare the two new queues and bind them to the exchange
ch.queue_declare(q1_name, auto_delete=True)
self.addCleanup(self._connect().channel().queue_delete, q1_name)
ch.queue_bind(q1_name, exchange=exg_name, routing_key=q1_routing_key)

ch.queue_declare(q2_name, auto_delete=True)
self.addCleanup(self._connect().channel().queue_delete, q2_name)
ch.queue_bind(q2_name, exchange=exg_name, routing_key=q2_routing_key)

# Deposit messages in the queues
q1_tx_message_bodies = ['q1_message+%s' % (i,)
for i in pika.compat.xrange(100)]
for message_body in q1_tx_message_bodies:
ch.publish(exg_name, q1_routing_key, body=message_body,
mandatory=True)

q2_tx_message_bodies = ['q2_message+%s' % (i,)
for i in pika.compat.xrange(150)]
for message_body in q2_tx_message_bodies:
ch.publish(exg_name, q2_routing_key, body=message_body,
mandatory=True)

# Create the consumers
q1_rx_messages = []
q1_consumer_tag = ch.basic_consume(
lambda *args: q1_rx_messages.append(args),
q1_name,
no_ack=False,
exclusive=False,
arguments=None)

q2_rx_messages = []
q2_consumer_tag = ch.basic_consume(
lambda *args: q2_rx_messages.append(args),
q2_name,
no_ack=False,
exclusive=False,
arguments=None)

# Wait for all messages to be delivered
while (len(q1_rx_messages) < len(q1_tx_message_bodies) or
len(q2_rx_messages) < len(q2_tx_message_bodies)):
connection.process_data_events(time_limit=None)

self.assertEqual(len(q2_rx_messages), len(q2_tx_message_bodies))

# Verify the messages
def validate_messages(rx_messages,
routing_key,
consumer_tag,
tx_message_bodies):
self.assertEqual(len(rx_messages), len(tx_message_bodies))

for msg, expected_body in zip(rx_messages, tx_message_bodies):
self.assertIsInstance(msg, tuple)
rx_ch, rx_method, rx_properties, rx_body = msg
self.assertIs(rx_ch, ch)
self.assertIsInstance(rx_method, pika.spec.Basic.Deliver)
self.assertEqual(rx_method.consumer_tag, consumer_tag)
self.assertFalse(rx_method.redelivered)
self.assertEqual(rx_method.exchange, exg_name)
self.assertEqual(rx_method.routing_key, routing_key)

self.assertIsInstance(rx_properties, pika.BasicProperties)
self.assertEqual(rx_body, as_bytes(expected_body))

# Validate q1 consumed messages
validate_messages(rx_messages=q1_rx_messages,
routing_key=q1_routing_key,
consumer_tag=q1_consumer_tag,
tx_message_bodies=q1_tx_message_bodies)

# Validate q2 consumed messages
validate_messages(rx_messages=q2_rx_messages,
routing_key=q2_routing_key,
consumer_tag=q2_consumer_tag,
tx_message_bodies=q2_tx_message_bodies)

# There shouldn't be any more events now
self.assertFalse(ch._pending_events)


class TestBasicCancelPurgesPendingConsumerCancellationEvt(BlockingTestCaseBase):

def test(self):
Expand Down Expand Up @@ -1622,8 +1725,7 @@ def test(self): # pylint: disable=R0914,R0915
self.addCleanup(self._connect().channel().queue_delete, q_name)

# Bind the queue to the exchange using routing key
frame = ch.queue_bind(q_name, exchange=exg_name,
routing_key=routing_key)
ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key)

# Deposit a message in the queue via basic_publish and mandatory=True
msg1_headers = dict(
Expand Down Expand Up @@ -1761,8 +1863,7 @@ def on_consume(channel, method, props, body):
arguments=None)

# Consume from destination queue
for _, _, rx_body in ch.consume(dest_q_name,
no_ack=True):
for _, _, rx_body in ch.consume(dest_q_name, no_ack=True):
self.assertEqual(rx_body, as_bytes('via-publish'))
break
else:
Expand Down Expand Up @@ -2215,9 +2316,7 @@ def test(self):

# Consume, but don't ack
num_messages = 0
for rx_method, _, _ in ch.consume(q_name,
no_ack=True,
exclusive=False):
for rx_method, _, _ in ch.consume(q_name, no_ack=True, exclusive=False):
num_messages += 1

self.assertEqual(rx_method.delivery_tag, num_messages)
Expand Down

0 comments on commit 94f8618

Please sign in to comment.