diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index a91349b28b2..5075958c753 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import jakarta.jms.CompletionListener; import jakarta.jms.Connection; import jakarta.jms.ConnectionConsumer; import jakarta.jms.ConnectionMetaData; @@ -1439,6 +1440,68 @@ public void onCompletion(FutureResponse resp) { } } + /** + * Send a packet through a Connection - for internal use only + * + * @param command + * + * @throws JMSException + */ + public void syncSendPacket(final Command command, final CompletionListener completionListener) throws JMSException { + if(completionListener==null) { + syncSendPacket(command); + } else { + if (isClosed()) { + throw new ConnectionClosedException(); + } + try { + this.transport.asyncRequest(command, resp -> { + Response response; + Throwable exception = null; + try { + response = resp.getResult(); + if (response.isException()) { + ExceptionResponse er = (ExceptionResponse)response; + exception = er.getException(); + } + } catch (Exception e) { + exception = e; + } + if (exception != null) { + if ( exception instanceof JMSException) { + completionListener.onException((jakarta.jms.Message) command, (JMSException) exception); + } else { + if (isClosed() || closing.get()) { + LOG.debug("Received an exception but connection is closing"); + } + JMSException jmsEx = null; + try { + jmsEx = JMSExceptionSupport.create(exception); + } catch(Throwable e) { + LOG.error("Caught an exception trying to create a JMSException for " +exception,e); + } + // dispose of transport for security exceptions on connection initiation + if (exception instanceof SecurityException && command instanceof ConnectionInfo){ + try { + forceCloseOnSecurityException(exception); + } catch (Throwable t) { + // We throw the original error from the ExceptionResponse instead. + } + } + if (jmsEx != null) { + completionListener.onException((jakarta.jms.Message) command, jmsEx); + } + } + } else { + completionListener.onCompletion((jakarta.jms.Message) command); + } + }); + } catch (IOException e) { + throw JMSExceptionSupport.create(e); + } + } + } + private void forceCloseOnSecurityException(Throwable exception) { LOG.trace("force close on security exception:{}, transport={}", this, transport, exception); onException(new IOException("Force close due to SecurityException on connect", exception)); diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java index 185ebffd41b..6b352c7705b 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java @@ -23,10 +23,10 @@ import jakarta.jms.CompletionListener; import jakarta.jms.Destination; import jakarta.jms.IllegalStateException; +import jakarta.jms.IllegalStateRuntimeException; import jakarta.jms.InvalidDestinationException; import jakarta.jms.JMSException; import jakarta.jms.Message; - import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; @@ -83,11 +83,13 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl private final long startTime; private MessageTransformer transformer; private MemoryUsage producerWindow; + private final ThreadLocal inCompletionListenerCallback = new ThreadLocal<>(); protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException { super(session); this.info = new ProducerInfo(producerId); this.info.setWindowSize(session.connection.getProducerWindowSize()); + inCompletionListenerCallback.set(false); // Allows the options on the destination to configure the producerInfo if (destination != null && destination.getOptions() != null) { Map options = IntrospectionSupport.extractProperties( @@ -168,6 +170,9 @@ public Destination getDestination() throws JMSException { */ @Override public void close() throws JMSException { + if (inCompletionListenerCallback.get()) { + throw new IllegalStateRuntimeException("Can't close message producer within CompletionListener"); + } if (!closed) { dispose(); this.session.asyncSendPacket(info.createRemoveCommand()); @@ -239,27 +244,88 @@ public void send(Destination destination, Message message, int deliveryMode, int */ @Override public void send(Message message, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Message, CompletionListener) is not supported"); + this.send(getDestination(), + message, + defaultDeliveryMode, + defaultPriority, + defaultTimeToLive, + completionListener); } + @Override public void send(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported"); + this.send(this.getDestination(), + message, + deliveryMode, + priority, + timeToLive, + completionListener); } @Override public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Destination, Message, CompletionListener) is not supported"); + this.send(destination, + message, + defaultDeliveryMode, + defaultPriority, + defaultTimeToLive, + completionListener); } @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Destination, Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported"); + this.send(destination, message, deliveryMode, priority, timeToLive, + getDisableMessageID(), getDisableMessageTimestamp(), completionListener); + } + + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean disableMessageID, boolean disableMessageTimestamp, CompletionListener completionListener) throws JMSException { + checkClosed(); + if (destination == null) { + if (info.getDestination() == null) { + throw new UnsupportedOperationException("A destination must be specified."); + } + throw new InvalidDestinationException("Don't understand null destinations"); + } + + ActiveMQDestination dest; + if (destination.equals(info.getDestination())) { + dest = (ActiveMQDestination)destination; + } else if (info.getDestination() == null) { + dest = ActiveMQDestination.transform(destination); + } else { + throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); + } + if (dest == null) { + throw new JMSException("No destination specified"); + } + + if (transformer != null) { + Message transformedMessage = transformer.producerTransform(session, this, message); + if (transformedMessage != null) { + message = transformedMessage; + } + } + + if (producerWindow != null) { + try { + producerWindow.waitForSpace(); + } catch (InterruptedException e) { + throw new JMSException("Send aborted due to thread interrupt."); + } + } + + this.session.send(this, dest, message, deliveryMode, priority, timeToLive, disableMessageID, + disableMessageTimestamp, producerWindow, sendTimeout, completionListener, inCompletionListenerCallback); + + stats.onMessage(); } - public void send(Message message, AsyncCallback onComplete) throws JMSException { + + public void send(Message message, AsyncCallback onComplete) throws JMSException { this.send(this.getDestination(), message, this.defaultDeliveryMode, diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java index dc7311981e4..fd316efef6b 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java @@ -56,6 +56,7 @@ public class ActiveMQProducer implements JMSProducer { // Properties applied to all messages on a per-JMS producer instance basis private Map messageProperties = null; + private CompletionListener completionListener = null; ActiveMQProducer(ActiveMQContext activemqContext, ActiveMQMessageProducer activemqMessageProducer) { this.activemqContext = activemqContext; @@ -86,8 +87,7 @@ public JMSProducer send(Destination destination, Message message) { message.setObjectProperty(propertyEntry.getKey(), propertyEntry.getValue()); } } - - activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), null); + activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), getAsync()); } catch (JMSException e) { throw JMSExceptionSupport.convertToJMSRuntimeException(e); } @@ -253,12 +253,13 @@ public long getDeliveryDelay() { @Override public JMSProducer setAsync(CompletionListener completionListener) { - throw new UnsupportedOperationException("setAsync(CompletionListener) is not supported"); + this.completionListener = completionListener; + return this; } @Override public CompletionListener getAsync() { - throw new UnsupportedOperationException("getAsync() is not supported"); + return this.completionListener; } @Override diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 766005f2872..c0a2a2ac3a9 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -31,8 +31,10 @@ import java.util.concurrent.atomic.AtomicInteger; import jakarta.jms.BytesMessage; +import jakarta.jms.CompletionListener; import jakarta.jms.Destination; import jakarta.jms.IllegalStateException; +import jakarta.jms.IllegalStateRuntimeException; import jakarta.jms.InvalidDestinationException; import jakarta.jms.InvalidSelectorException; import jakarta.jms.JMSException; @@ -57,7 +59,6 @@ import jakarta.jms.TopicSession; import jakarta.jms.TopicSubscriber; import jakarta.jms.TransactionRolledBackException; - import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobUploader; @@ -93,6 +94,7 @@ import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.util.Callback; +import org.apache.activemq.util.CountdownLock; import org.apache.activemq.util.LongSequenceGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -234,6 +236,8 @@ public static interface DeliveryListener { private MessageTransformer transformer; private BlobTransferPolicy blobTransferPolicy; private long lastDeliveredSequenceId = -2; + private CountdownLock numIncompletedAsyncSend = new CountdownLock(); + private ThreadLocal inCompletionListenerCallback = new ThreadLocal<>(); /** * Construct the Session @@ -253,6 +257,7 @@ protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, in this.asyncDispatch = asyncDispatch; this.sessionAsyncDispatch = sessionAsyncDispatch; this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); + inCompletionListenerCallback.set(false); setTransactionContext(new TransactionContext(connection)); stats = new JMSSessionStatsImpl(producers, consumers); this.connection.asyncSendPacket(info); @@ -576,12 +581,16 @@ public int getAcknowledgeMode() throws JMSException { @Override public void commit() throws JMSException { checkClosed(); + if (inCompletionListenerCallback.get()) { + throw new IllegalStateRuntimeException("Can't commit transacted session within CompletionListener"); + } if (!getTransacted()) { throw new jakarta.jms.IllegalStateException("Not a transacted session"); } if (LOG.isDebugEnabled()) { LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); } + waitForAsyncSendToFinish(); transactionContext.commit(); } @@ -597,12 +606,16 @@ public void commit() throws JMSException { @Override public void rollback() throws JMSException { checkClosed(); + if (inCompletionListenerCallback.get()) { + throw new IllegalStateRuntimeException("Can't rollback transacted session within CompletionListener"); + } if (!getTransacted()) { throw new jakarta.jms.IllegalStateException("Not a transacted session"); } if (LOG.isDebugEnabled()) { LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId()); } + waitForAsyncSendToFinish(); transactionContext.rollback(); } @@ -637,6 +650,9 @@ public void rollback() throws JMSException { @Override public void close() throws JMSException { if (!closed) { + if (inCompletionListenerCallback.get()) { + throw new IllegalStateRuntimeException("Can't close session within CompletionListener"); + } if (getTransactionContext().isInXATransaction()) { if (!synchronizationRegistered) { synchronizationRegistered = true; @@ -722,7 +738,8 @@ void deliverAcks() { public synchronized void dispose() throws JMSException { if (!closed) { - + // Wait for Incompleted async send to finish per Jakarta Messaging 3.1 section 3.7.4 + waitForAsyncSendToFinish(); try { executor.close(); @@ -2049,6 +2066,144 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin } } + /** + * Sends the message for dispatch by the broker. + * + * @param producer - message producer. + * @param destination - message destination. + * @param message - message to be sent. + * @param deliveryMode - JMS message delivery mode. + * @param priority - message priority. + * @param timeToLive - message expiration. + * @param disableTimestamp - disable timestamp. + * @param disableMessageID - optionally, disable messageID. + * @param producerWindow + * @param completionListener + * @param producerInCompletionListenerCallback + * @throws JMSException + */ + protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean disableMessageID, boolean disableMessageTimestamp, MemoryUsage producerWindow, int sendTimeout, + CompletionListener completionListener, ThreadLocal producerInCompletionListenerCallback) throws JMSException { + + checkClosed(); + if (destination.isTemporary() && connection.isDeleted(destination)) { + throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); + } + synchronized (sendMutex) { + // tell the Broker we are about to start a new transaction + doStartTransaction(); + if (transactionContext.isRollbackOnly()) { + throw new IllegalStateException("transaction marked rollback only"); + } + TransactionId txid = transactionContext.getTransactionId(); + long sequenceNumber = producer.getMessageSequence(); + + //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 + message.setJMSDeliveryMode(deliveryMode); + long expiration = 0L; + long timeStamp = System.currentTimeMillis(); + if (timeToLive > 0) { + expiration = timeToLive + timeStamp; + } + + if(!(message instanceof ActiveMQMessage)) { + setForeignMessageDeliveryTime(message, timeStamp); + } else { + message.setJMSDeliveryTime(timeStamp); + } + if (!disableMessageTimestamp && !producer.getDisableMessageTimestamp()) { + message.setJMSTimestamp(timeStamp); + } else { + message.setJMSTimestamp(0l); + } + message.setJMSExpiration(expiration); + message.setJMSPriority(priority); + message.setJMSRedelivered(false); + + // transform to our own message format here + ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); + msg.setDestination(destination); + msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); + + // Set the message id. + if (msg != message) { + message.setJMSMessageID(msg.getMessageId().toString()); + // Make sure the JMS destination is set on the foreign messages too. + message.setJMSDestination(destination); + } + //clear the brokerPath in case we are re-sending this message + msg.setBrokerPath(null); + + msg.setTransactionId(txid); + if (connection.isCopyMessageOnSend()) { + msg = (ActiveMQMessage)msg.copy(); + } + msg.setConnection(connection); + msg.onSend(); + msg.setProducerId(msg.getMessageId().getProducerId()); + if (LOG.isTraceEnabled()) { + LOG.trace(getSessionId() + " sending message: " + msg); + } + if (completionListener==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { + this.connection.asyncSendPacket(msg); + if (producerWindow != null) { + // Since we defer lots of the marshaling till we hit the + // wire, this might not + // provide and accurate size. We may change over to doing + // more aggressive marshaling, + // to get more accurate sizes.. this is more important once + // users start using producer window + // flow control. + int size = msg.getSize(); + producerWindow.increaseUsage(size); + } + } else { + if (sendTimeout > 0 && completionListener==null) { + this.connection.syncSendPacket(msg, sendTimeout); + }else { + CompletionListener wrapperCompletionListener = null; + if (completionListener != null) { + // Make the Message object unaccessible and unmutable + // per Jakarta Messaging 3.1 spec section 7.3.9 and 7.3.6 + numIncompletedAsyncSend.doIncrement(); + wrapperCompletionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + try { + inCompletionListenerCallback.set(true); + producerInCompletionListenerCallback.set(true); + numIncompletedAsyncSend.doDecrement(); + completionListener.onCompletion(message); + } catch (Exception e) { + // invoke onException if the exception can't be thrown in the thread that calls the send + // per Jakarta Messaging 3.1 spec section 7.3.2 + completionListener.onException(message, e); + } finally { + inCompletionListenerCallback.set(false); + producerInCompletionListenerCallback.set(false); + } + } + + @Override + public void onException(Message message, Exception e) { + try { + inCompletionListenerCallback.set(true); + completionListener.onException(message, e); + } finally { + numIncompletedAsyncSend.doDecrement(); + inCompletionListenerCallback.set(false); + } + } + }; + } + this.connection.syncSendPacket(msg, wrapperCompletionListener); + } + } + + } + } + /** * Send TransactionInfo to indicate transaction has started * @@ -2339,4 +2494,8 @@ private static void setForeignMessageDeliveryTime(final Message foreignMessage, foreignMessage.setJMSDeliveryTime(deliveryTime); } } + + private void waitForAsyncSendToFinish() { + numIncompletedAsyncSend.doWaitForZero(); + } } diff --git a/activemq-client/src/main/java/org/apache/activemq/util/CountdownLock.java b/activemq-client/src/main/java/org/apache/activemq/util/CountdownLock.java new file mode 100644 index 00000000000..d74e457a9bb --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/util/CountdownLock.java @@ -0,0 +1,39 @@ +package org.apache.activemq.util; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This concurrent data structure is used when the calling thread wants to wait until a counter gets to 0 but the counter + * can go up and down (unlike a CountDownLatch which can only count down) + */ +public class CountdownLock { + + final Object counterMonitor = new Object(); + private final AtomicInteger counter = new AtomicInteger(); + + public void doWaitForZero() { + synchronized(counterMonitor){ + try { + if (counter.get() > 0) { + counterMonitor.wait(); + } + } catch (InterruptedException e) { + return; + } + } + } + + public void doDecrement() { + synchronized(counterMonitor){ + if (counter.decrementAndGet() == 0) { + counterMonitor.notify(); + } + } + } + + public void doIncrement() { + synchronized(counterMonitor){ + counter.incrementAndGet(); + } + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java new file mode 100644 index 00000000000..2b34ffa8960 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java @@ -0,0 +1,528 @@ +package org.apache.activemq.jms2; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.jms.CompletionListener; +import jakarta.jms.Destination; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSException; +import jakarta.jms.JMSProducer; +import jakarta.jms.Message; +import jakarta.jms.MessageConsumer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ActiveMQJMS2AsyncSendTest extends ActiveMQJMS2TestBase{ + + private static final Logger log = LoggerFactory.getLogger(ActiveMQJMS2AsyncSendTest.class); + + @Test + public void testSendMessageWithSessionApi_spec_7_3_1() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#quality-of-service + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + latch.countDown(); + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + messageProducer.send( + session.createQueue(methodNameDestinationName), + session.createTextMessage("Test-" + methodNameDestinationName), + completionListener); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener was not triggered within 10 seconds or threw an exception"); + } + } + + @Test + public void testSendMessageWithContextApi_spec_7_3_1() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#quality-of-service + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + latch.countDown(); + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + jmsProducer.setAsync(completionListener); + jmsProducer.send(destination, textBody); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener was not triggered within 10 seconds or threw an exception"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testOnExceptionTriggered_spec_7_3_2() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#exceptions + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) throws RuntimeException { + throw new RuntimeException("throw runtime exception"); + } + + @Override + public void onException(Message message, Exception e) { + latch.countDown(); + } + }; + + jmsProducer.setAsync(completionListener); + jmsProducer.send(destination, textBody); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener onException method was not triggered within 10 seconds"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testCorrectMessageOrder_spec7_3_3() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-order-2 + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + ArrayList expectedOrderedMessages = new ArrayList<>(); + ArrayList actualOrderedMessages = new ArrayList<>(); + Object mutex = new Object(); + jmsContext.start(); + int num_msgs = 100; + CountDownLatch latch = new CountDownLatch(num_msgs); + CompletionListener completionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + synchronized (mutex) { + try { + String text = ((TextMessage) message).getText(); + actualOrderedMessages.add(text); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + latch.countDown(); + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + jmsProducer.setAsync(completionListener); + for (int i = 0; i < num_msgs; i++) { + String textBody = "Test-" + methodNameDestinationName + "-" + String.valueOf(i); + expectedOrderedMessages.add(textBody); + jmsProducer.send(destination, textBody); + } + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener was not triggered within 10 seconds or threw an exception"); + } + for (int i = 0; i < num_msgs; i++) { + String got = actualOrderedMessages.get(i); + String expected = expectedOrderedMessages.get(i); + if (!got.equals(expected)) { + fail(String.format("Message out of order. Got %s but expected %s", got, expected)); + } + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testUnableToCloseContextInCompletionListener_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + jmsContext.close(); // This should cause a RuntimeException to throw and trigger the onException + } + + @Override + public void onException(Message message, Exception e) { + latch.countDown(); + } + }; + + jmsProducer.setAsync(completionListener); + jmsProducer.send(destination, textBody); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener onException was not triggered within 10 seconds or threw an exception"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testUnableToCloseProducerInCompletionListener_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + try { + messageProducer.close(); // This should cause a RuntimeException to throw and trigger the onException + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onException(Message message, Exception e) { + latch.countDown(); + } + }; + messageProducer.send(session.createQueue(methodNameDestinationName), + session.createTextMessage("Test-" + methodNameDestinationName), completionListener); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener onException was not triggered within 10 seconds or threw an exception"); + } + } + + @Test + public void testUnableToCommitTransactionInCompletionListener_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + try(JMSContext jmsContext = activemqConnectionFactory.createContext( + DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.SESSION_TRANSACTED)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + jmsContext.commit(); // This should cause a RuntimeException to throw and trigger the onException + } + + @Override + public void onException(Message message, Exception e) { + latch.countDown(); + } + }; + + jmsProducer.setAsync(completionListener); + jmsProducer.send(destination, textBody); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener onException was not triggered within 10 seconds or threw an exception"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testUnableToRollbackTransactionInCompletionListener_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + try(JMSContext jmsContext = activemqConnectionFactory.createContext( + DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.SESSION_TRANSACTED)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + jmsContext.rollback(); // This should cause a RuntimeException to throw and trigger the onException + } + + @Override + public void onException(Message message, Exception e) { + latch.countDown(); + } + }; + + jmsProducer.setAsync(completionListener); + jmsProducer.send(destination, textBody); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener onException was not triggered within 10 seconds or threw an exception"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testCloseContextWailUntilAllIncompleteSentToFinish_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + ArrayList expectedOrderedMessages = new ArrayList<>(); + ArrayList actualOrderedMessages = new ArrayList<>(); + Object mutex = new Object(); + jmsContext.start(); + int num_msgs = 100; + CompletionListener completionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + synchronized (mutex) { + try { + String text = ((TextMessage) message).getText(); + actualOrderedMessages.add(text); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + jmsProducer.setAsync(completionListener); + for (int i = 0; i < num_msgs; i++) { + String textBody = "Test-" + methodNameDestinationName + "-" + String.valueOf(i); + expectedOrderedMessages.add(textBody); + jmsProducer.send(destination, textBody); + } + jmsContext.close(); + if (expectedOrderedMessages.size() != actualOrderedMessages.size()) { + fail("jmsContext doesn't wait until all inComplete send to finish"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testCommitContextWailUntilAllIncompleteSentToFinish_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.SESSION_TRANSACTED)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + ArrayList expectedOrderedMessages = new ArrayList<>(); + ArrayList actualOrderedMessages = new ArrayList<>(); + Object mutex = new Object(); + jmsContext.start(); + int num_msgs = 100; + CompletionListener completionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + synchronized (mutex) { + try { + String text = ((TextMessage) message).getText(); + actualOrderedMessages.add(text); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + jmsProducer.setAsync(completionListener); + for (int i = 0; i < num_msgs; i++) { + String textBody = "Test-" + methodNameDestinationName + "-" + String.valueOf(i); + expectedOrderedMessages.add(textBody); + jmsProducer.send(destination, textBody); + } + jmsContext.commit(); + if (expectedOrderedMessages.size() != actualOrderedMessages.size()) { + fail("jmsContext doesn't wait until all inComplete send to finish"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testAbleToAccessMessageHeaderAfterAsyncSendCompleted_spec7_3_6_spec7_3_9() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-headers + // We won't throw exception because it's optional as stated in the spec. + // "If the Jakarta Messaging provider does not throw an exception then the behaviour is undefined." + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + try { + if (!((TextMessage) message).getText().equals(textBody)) { + log.error("messages don't match"); + throw new RuntimeException("messages don't match"); + } + } catch (JMSException e) { + throw new RuntimeException(e); + } + latch.countDown(); + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + jmsProducer.setAsync(completionListener); + TextMessage message = jmsContext.createTextMessage(); + message.setText(textBody); + jmsProducer.send(destination, message); + // Trying to get the message header + int deliveryMode = message.getJMSDeliveryMode(); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener was not triggered within 10 seconds or threw an exception"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testCompletionListenerThreadingRestriction_spec7_3_7() throws Exception { + // (https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#restrictions-on-threading) + // The session can continue to be used by current application thread. Session is used one thread at a time (CompletionListener, Application thread ... etc) + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + try { + // Simulate busy processing of the message for 5 seconds. + Thread.sleep(5 * 1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + latch.countDown(); + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + messageProducer.send( + session.createQueue(methodNameDestinationName), + session.createTextMessage("Test-" + methodNameDestinationName), + completionListener); + MessageConsumer consumer = session.createConsumer(session.createQueue(methodNameDestinationName)); + Message msg = consumer.receive(2 * 1000); + if (msg == null) { + fail("session in the original thread of control was dedicated to the thread of control of CompletionListener"); + } + String gotTextBody = ((TextMessage) msg).getText(); + if (!gotTextBody.equals("Test-" + methodNameDestinationName)) { + fail("receive message is different than the one originally sent"); + } + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener was not triggered within 10 seconds or threw an exception"); + } + } + + @Test + public void testCompletionListenerInvokedInDifferentThread_spec7_3_8() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#use-of-the-completionlistener-by-the-jakarta-messaging-provider + // The CompletionListener has to be invoked in different thread + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + String testThreadName = Thread.currentThread().getName(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + String onCompletionThreadName = Thread.currentThread().getName(); + if (!onCompletionThreadName.equals(testThreadName)) { + latch.countDown(); + } else { + log.error("onCompletion is executed in the same thread as the application thread."); + } + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + jmsProducer.setAsync(completionListener); + jmsProducer.send(destination, textBody); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener was not triggered within 10 seconds or threw an exception"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java index cf1e0fa594c..68b153d9ddd 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java @@ -23,6 +23,8 @@ import static org.junit.Assert.fail; import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import jakarta.jms.CompletionListener; import jakarta.jms.Destination; @@ -296,26 +298,6 @@ public void testProducerDeliveryDelaySet() throws JMSException { messageProducer.setDeliveryDelay(1000l); } - @Test(expected = UnsupportedOperationException.class) - public void testProducerSendMessageCompletionListener() throws JMSException { - messageProducer.send(session.createQueue(methodNameDestinationName), null, (CompletionListener)null); - } - - @Test(expected = UnsupportedOperationException.class) - public void testProducerSendMessageQoSParamsCompletionListener() throws JMSException { - messageProducer.send(null, 1, 4, 0l, null); - } - - @Test(expected = UnsupportedOperationException.class) - public void testProducerSendDestinationMessageCompletionListener() throws JMSException { - messageProducer.send(session.createQueue(methodNameDestinationName), null, null); - } - - @Test(expected = UnsupportedOperationException.class) - public void testProducerSendDestinationMessageQosParamsCompletionListener() throws JMSException { - messageProducer.send(session.createQueue(methodNameDestinationName), null, 1, 4, 0l, null); - } - protected static void sendMessage(JMSContext jmsContext, Destination testDestination, String textBody) { assertNotNull(jmsContext); JMSProducer jmsProducer = jmsContext.createProducer();