Skip to content

Commit

Permalink
AMQ-9541 Use limits on all expensive queries and at least the ones wh…
Browse files Browse the repository at this point in the history
…ere there is a setMaxRows on the statement.
  • Loading branch information
jeanouii committed Jul 25, 2024
1 parent 5bc3a1e commit 743c91c
Showing 1 changed file with 42 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
Expand Down Expand Up @@ -154,8 +152,8 @@ public void doDropTables(TransactionContext c) throws SQLException, IOException
s.execute(dropStatments[i]);
} catch (SQLException e) {
LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
+ " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
+ e.getErrorCode());
+ " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
+ e.getErrorCode());
JDBCPersistenceAdapter.log("Failure details: ", e);
}
}
Expand Down Expand Up @@ -201,7 +199,7 @@ public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throw
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(
this.statements.getFindMessageByIdStatement());
this.statements.getFindMessageByIdStatement());
s.setLong(1, storeSequenceId);
rs = s.executeQuery();
if (!rs.next()) {
Expand Down Expand Up @@ -278,7 +276,7 @@ public void doUpdateMessage(TransactionContext c, ActiveMQDestination destinatio

@Override
public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
long expirationTime, String messageRef) throws SQLException, IOException {
long expirationTime, String messageRef) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement();
try {
if (s == null) {
Expand Down Expand Up @@ -371,7 +369,7 @@ public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid)
try {
if (s == null) {
s = c.getConnection().prepareStatement(xid == null ?
this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement());
this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement());
if (this.batchStatements) {
c.setRemovedMessageStatement(s);
}
Expand Down Expand Up @@ -399,7 +397,7 @@ public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid)

@Override
public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
throws Exception {
throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
try {
Expand Down Expand Up @@ -427,11 +425,11 @@ public void doRecover(TransactionContext c, ActiveMQDestination destination, JDB

@Override
public void doMessageIdScan(TransactionContext c, int limit,
JDBCMessageIdScanListener listener) throws SQLException, IOException {
JDBCMessageIdScanListener listener) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
s = c.getConnection().prepareStatement(this.limitQuery(this.statements.getFindAllMessageIdsStatement()));
s.setMaxRows(limit);
rs = s.executeQuery();
// jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
Expand All @@ -458,8 +456,8 @@ public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination d
try {
if (s == null) {
s = c.getConnection().prepareStatement(xid == null ?
this.statements.getUpdateDurableLastAckWithPriorityStatement() :
this.statements.getUpdateDurableLastAckWithPriorityInTxStatement());
this.statements.getUpdateDurableLastAckWithPriorityStatement() :
this.statements.getUpdateDurableLastAckWithPriorityInTxStatement());
if (this.batchStatements) {
c.setUpdateLastAckStatement(s);
}
Expand Down Expand Up @@ -495,8 +493,8 @@ public void doSetLastAck(TransactionContext c, ActiveMQDestination destination,
try {
if (s == null) {
s = c.getConnection().prepareStatement(xid == null ?
this.statements.getUpdateDurableLastAckStatement() :
this.statements.getUpdateDurableLastAckInTxStatement());
this.statements.getUpdateDurableLastAckStatement() :
this.statements.getUpdateDurableLastAckInTxStatement());
if (this.batchStatements) {
c.setUpdateLastAckStatement(s);
}
Expand All @@ -516,7 +514,7 @@ public void doSetLastAck(TransactionContext c, ActiveMQDestination destination,
s.addBatch();
} else if (s.executeUpdate() != 1) {
throw new IOException("Could not update last ack seq : "
+ seq + ", for sub: " + subscriptionName);
+ seq + ", for sub: " + subscriptionName);
}
} finally {
if (!this.batchStatements) {
Expand Down Expand Up @@ -554,7 +552,7 @@ public void doClearLastAck(TransactionContext c, ActiveMQDestination destination

@Override
public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
// dumpTables(c,
// destination.getQualifiedName(),clientId,subscriptionName);
PreparedStatement s = null;
Expand Down Expand Up @@ -586,12 +584,12 @@ public void doRecoverSubscription(TransactionContext c, ActiveMQDestination dest

@Override
public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {

PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
s = c.getConnection().prepareStatement(this.limitQuery(this.statements.getFindDurableSubMessagesStatement()));
s.setMaxRows(Math.min(maxReturned * 2, maxRows));
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
Expand Down Expand Up @@ -620,12 +618,12 @@ public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination dest

@Override
public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {

PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
s = c.getConnection().prepareStatement(this.limitQuery(this.statements.getFindDurableSubMessagesByPriorityStatement()));
s.setMaxRows(Math.min(maxReturned * 2, maxRows));
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
Expand Down Expand Up @@ -655,7 +653,7 @@ public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDest

@Override
public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
int result = 0;
Expand Down Expand Up @@ -688,7 +686,7 @@ public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDest
*/
@Override
public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
throws SQLException, IOException {
throws SQLException, IOException {
// dumpTables(c, destination.getQualifiedName(), clientId,
// subscriptionName);
PreparedStatement s = null;
Expand Down Expand Up @@ -734,7 +732,7 @@ public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, bo

@Override
public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriptionName) throws SQLException, IOException {
String clientId, String subscriptionName) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
Expand All @@ -752,7 +750,7 @@ public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDesti
subscription.setSubscriptionName(subscriptionName);
subscription.setSelector(rs.getString(1));
subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
ActiveMQDestination.QUEUE_TYPE));
ActiveMQDestination.QUEUE_TYPE));
return subscription;
} finally {
close(rs);
Expand All @@ -762,7 +760,7 @@ public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDesti

@Override
public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
throws SQLException, IOException {
throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
Expand All @@ -777,7 +775,7 @@ public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDe
subscription.setSubscriptionName(rs.getString(2));
subscription.setClientId(rs.getString(3));
subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
ActiveMQDestination.QUEUE_TYPE));
ActiveMQDestination.QUEUE_TYPE));
rc.add(subscription);
}
return rc.toArray(new SubscriptionInfo[rc.size()]);
Expand All @@ -789,7 +787,7 @@ public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDe

@Override
public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
IOException {
IOException {
PreparedStatement s = null;
try {
s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
Expand All @@ -806,7 +804,7 @@ public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destin

@Override
public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName) throws SQLException, IOException {
String subscriptionName) throws SQLException, IOException {
PreparedStatement s = null;
try {
s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
Expand Down Expand Up @@ -838,7 +836,7 @@ public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOExc

@Override
public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriberName) throws SQLException, IOException {
String clientId, String subscriberName) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
long result = -1;
Expand Down Expand Up @@ -908,7 +906,7 @@ public void setBatchStatements(boolean batchStatements) {
this.batchStatements = batchStatements;
// The next lines are deprecated and should be removed in a future release
// and is here in case someone created their own
// this.batchStatments = batchStatements;
// this.batchStatments = batchStatements;
}

// Note - remove batchStatment in future distributions. Here for backward compatibility
Expand Down Expand Up @@ -1011,8 +1009,8 @@ public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStor
String subId = rs.getString(3);
String subName = rs.getString(4);
jdbcMemoryTransactionStore.recoverLastAck(encodedXid,
ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE),
subName, subId);
ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE),
subName, subId);
}
} finally {
close(rs);
Expand All @@ -1038,7 +1036,7 @@ public void doCommitAddOp(TransactionContext c, long preparedSequence, long sequ

@Override
public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
IOException {
IOException {
PreparedStatement s = null;
ResultSet rs = null;
int result = 0;
Expand All @@ -1058,7 +1056,7 @@ public int doGetMessageCount(TransactionContext c, ActiveMQDestination destinati

@Override
public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long[] lastRecoveredEntries,
long maxSeq, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
long maxSeq, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
try {
Expand Down Expand Up @@ -1109,7 +1107,7 @@ public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination dest

@Override
public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
throws SQLException, IOException {
throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
Expand All @@ -1128,16 +1126,16 @@ public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
}

public static void dumpTables(Connection c, String destinationName, String clientId, String
subscriptionName) throws SQLException {
subscriptionName) throws SQLException {
printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM "
+ "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D "
+ "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
+ " ORDER BY M.ID");
s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
printQuery(s,System.out); }
+ "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D "
+ "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
+ " ORDER BY M.ID");
s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
printQuery(s,System.out); }

public static void dumpTables(java.sql.Connection c) throws SQLException {
printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_MSGS", System.out);
Expand All @@ -1149,12 +1147,12 @@ public static void dumpTables(java.sql.Connection c) throws SQLException {
}

public static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
throws SQLException {
throws SQLException {
printQuery(c.prepareStatement(query), out);
}

public static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
throws SQLException {
throws SQLException {

ResultSet set = null;
try {
Expand Down

0 comments on commit 743c91c

Please sign in to comment.