From 66dfe46cce9056b410fde1f1af24b919ad087fbb Mon Sep 17 00:00:00 2001 From: Guus der Kinderen Date: Wed, 19 Oct 2022 13:36:25 +0200 Subject: [PATCH] OF-2530: Optimize MUC Message History cache usage When reading or writing cached MUC messages, the entire collection of messages is serialized between cluster nodes. This adds an unacceptable amount of overhead, for every message that is added. In this commit, the singular cached entity (a list of messages), is separated into two parts: - A cached entity that represents a list of message references per chatroom - messages, by reference The purpose of this is to optimize the scenario of adding a message (which is expected to happen more frequently than reading the history). By having to update only a list of references, instead of a list of actual objects, the amount of data that is to be operated on is reduced significantly. --- .../openfire/muc/HistoryStrategy.java | 168 ++++++------------ .../jivesoftware/util/cache/CacheFactory.java | 9 +- 2 files changed, 63 insertions(+), 114 deletions(-) diff --git a/xmppserver/src/main/java/org/jivesoftware/openfire/muc/HistoryStrategy.java b/xmppserver/src/main/java/org/jivesoftware/openfire/muc/HistoryStrategy.java index 2b8d83c1d4..6f1416a6f2 100644 --- a/xmppserver/src/main/java/org/jivesoftware/openfire/muc/HistoryStrategy.java +++ b/xmppserver/src/main/java/org/jivesoftware/openfire/muc/HistoryStrategy.java @@ -16,7 +16,6 @@ package org.jivesoftware.openfire.muc; -import org.dom4j.Element; import org.dom4j.tree.DefaultElement; import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.muc.spi.MUCPersistenceManager; @@ -30,10 +29,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; +import java.io.*; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.Lock; @@ -55,10 +51,20 @@ public class HistoryStrategy implements Externalizable { private static final Logger Log = LoggerFactory.getLogger(HistoryStrategy.class); /** - * An unlimited cache that records MUC room messages. The key of the cache is the room JID for which a list of - * messages is recorded. + * An unlimited cache that records (references to) MUC room messages. The key of the cache is the room JID for which + * a list of messages is recorded. The value is a wrapper around a set of references for messages. These references + * are keys in the {@link #MUC_HISTORY_MESSAGES_CACHE} cache. */ - private static final Cache> MUC_HISTORY_CACHE = CacheFactory.createCache("MUC History"); + private static final Cache>> MUC_HISTORY_META_CACHE = CacheFactory.createCache("MUC History Meta"); + + /** + * An unlimited cache that holds individual messages, expected to be MUC room messages. The key of the cache is the + * reference used in {@link #MUC_HISTORY_META_CACHE}. The value is the dom4j element that backs the message stanza. + * + * This cache should only be referenced while holding a lock on the key of MUC_HISTORY_META_CACHE that represents + * the address of the room in which a message was exchanged! + */ + private static final Cache MUC_HISTORY_MESSAGES_CACHE = CacheFactory.createCache("MUC History Messages"); /** * The address of the room (expected to be a bare JID) for which this instance records message history. @@ -214,22 +220,37 @@ public void addMessage(@Nonnull final Message... packets) strategyMaxNumber = maxNumber; } - final Lock lock = MUC_HISTORY_CACHE.getLock(roomJID); + final Lock lock = MUC_HISTORY_META_CACHE.getLock(roomJID); lock.lock(); try { - final CacheableOptional optional = MUC_HISTORY_CACHE.get(roomJID); - final Messages history; + final CacheableOptional> optional = MUC_HISTORY_META_CACHE.get(roomJID); + final ConcurrentLinkedQueue references; if (optional == null || optional.isAbsent()) { - history = new Messages(); + references = new ConcurrentLinkedQueue<>(); } else { - history = optional.get(); + references = optional.get(); } for (final Message message : messages) { - history.add(message, strategyType, strategyMaxNumber); + // store message according to active strategy. + if (strategyType == Type.number) { + if (references.size() >= strategyMaxNumber) { + // We have to remove messages so the new message won't exceed the max history size. + while (!references.isEmpty() && references.size() >= strategyMaxNumber) { + final UUID oldReference = references.poll(); + MUC_HISTORY_MESSAGES_CACHE.remove(oldReference); + } + } + } + + if (strategyType == Type.all || strategyType == Type.number) { + final UUID reference = UUID.randomUUID(); + references.add(reference); + MUC_HISTORY_MESSAGES_CACHE.put(reference, (DefaultElement) message.getElement()); + } } // Explicitly add back to cache (Hazelcast won't update-by-reference). - MUC_HISTORY_CACHE.put(roomJID, CacheableOptional.of(history)); + MUC_HISTORY_META_CACHE.put(roomJID, CacheableOptional.of(references)); } finally { lock.unlock(); } @@ -255,7 +276,7 @@ boolean isHistoryEnabled() { */ protected Queue getHistoryFromCache() { // Ensure room history is in cache. Doing this outside of the lock below, to reduce the likelihood of deadlocks occurring. - if (!MUC_HISTORY_CACHE.containsKey(roomJID)) { + if (!MUC_HISTORY_META_CACHE.containsKey(roomJID)) { try { final MUCRoom room = XMPPServer.getInstance().getMultiUserChatManager().getMultiUserChatService(roomJID).getChatRoom(roomJID.getNode()); MUCPersistenceManager.loadHistory(room, getMaxNumber()); @@ -265,15 +286,22 @@ protected Queue getHistoryFromCache() { } // Obtain history from cache. - final Lock lock = MUC_HISTORY_CACHE.getLock(roomJID); + final Lock lock = MUC_HISTORY_META_CACHE.getLock(roomJID); lock.lock(); try { - final CacheableOptional optional = MUC_HISTORY_CACHE.get(roomJID); - if (optional == null || optional.isAbsent()) { - return new Messages().asCollection(); - } else { - return optional.get().asCollection(); + final CacheableOptional> optional = MUC_HISTORY_META_CACHE.get(roomJID); + final ConcurrentLinkedQueue result = new ConcurrentLinkedQueue<>(); + if (optional != null && !optional.isAbsent()) { + for (final UUID reference : optional.get()) { + final DefaultElement messageElement = MUC_HISTORY_MESSAGES_CACHE.get(reference); + if (messageElement == null) { + Log.warn("Unable to retrieve message of room {} from clustered cache by reference: {}", roomJID, reference); + } else { + result.add(new Message(messageElement, true)); + } + } } + return result; } finally { lock.unlock(); } @@ -310,10 +338,15 @@ public ListIterator getReverseMessageHistory(){ */ public void purge() { - final Lock lock = MUC_HISTORY_CACHE.getLock(roomJID); + final Lock lock = MUC_HISTORY_META_CACHE.getLock(roomJID); lock.lock(); try { - MUC_HISTORY_CACHE.put(roomJID, CacheableOptional.of(null)); + final CacheableOptional> oldReferences = MUC_HISTORY_META_CACHE.put(roomJID, CacheableOptional.of(null)); + if (oldReferences != null && oldReferences.isPresent()) { + for (final UUID oldReference : oldReferences.get()) { + MUC_HISTORY_MESSAGES_CACHE.remove(oldReference); + } + } } finally { lock.unlock(); } @@ -482,93 +515,6 @@ private boolean isSubjectChangeStrict() { return JiveGlobals.getBooleanProperty("xmpp.muc.subject.change.strict", true); } - /** - * A wrapper for a collection of Message instances that is cached. - */ - public static class Messages implements Cacheable, Externalizable - { - private ConcurrentLinkedQueue history = new ConcurrentLinkedQueue<>(); - - public Messages() {} - - public void add(Message packet, Type strategyType, int strategyMaxNumber) - { - // store message according to active strategy - if (strategyType == Type.all) { - history.add(packet); - } else if (strategyType == Type.number) { - if (history.size() >= strategyMaxNumber) { - // We have to remove messages so the new message won't exceed the max history size. - while (!history.isEmpty() && history.size() >= strategyMaxNumber) { - history.poll(); - } - } - history.add(packet); - } - } - - public Queue asCollection() - { - return history; - } - - @Override - public int getCachedSize() throws CannotCalculateSizeException - { - int size = 0; - size += CacheSizes.sizeOfObject(); // overhead of object - size += CacheSizes.sizeOfObject(); // overhead of collection. - - // OF-2498: repeated calculation of the true size of each message stanza is very resource intensive. - // To avoid performance issues, this implementation uses a size of 2k per message, which has - // empirically been observed to be roughly correct. Mileage will probably vary considerably. - size += history.size() * 2048; - - return size; - } - - @Override - public String toString() - { - // Note: this value is shown in the Openfire admin console (in the 'cache values' page). Do not expose - // privacy-sensitive data, such as message content. - return "A collection of " + history.size() + " message stanza(s)."; - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Messages messages = (Messages) o; - return history.equals(messages.history); - } - - @Override - public int hashCode() - { - return Objects.hash(history); - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - ExternalizableUtil.getInstance().writeLong(out, history.size()); - for (final Message packet : history) { - ExternalizableUtil.getInstance().writeSerializable(out, (DefaultElement) packet.getElement()); - } - } - - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - history = new ConcurrentLinkedQueue<>(); - final long size = ExternalizableUtil.getInstance().readLong(in); - for (int i=0; i { @Override public int compare(Message o1, Message o2) { diff --git a/xmppserver/src/main/java/org/jivesoftware/util/cache/CacheFactory.java b/xmppserver/src/main/java/org/jivesoftware/util/cache/CacheFactory.java index f2dfcb930e..baec6055bd 100644 --- a/xmppserver/src/main/java/org/jivesoftware/util/cache/CacheFactory.java +++ b/xmppserver/src/main/java/org/jivesoftware/util/cache/CacheFactory.java @@ -147,7 +147,8 @@ public class CacheFactory { cacheNames.put("JID Domain-parts", "jidDomainprep"); cacheNames.put("JID Resource-parts", "jidResourceprep"); cacheNames.put("Sequences", "sequences"); - cacheNames.put("MUC History", "mucHistory"); + cacheNames.put("MUC History Meta", "mucHistoryMeta"); + cacheNames.put("MUC History Messages", "mucHistoryMessages"); cacheNames.put("MUC Service Pings Sent", "mucPings"); cacheProps.put(PROPERTY_PREFIX_CACHE + "dnsRecords" + PROPERTY_SUFFIX_SIZE, 128 * 1024L); @@ -234,8 +235,10 @@ public class CacheFactory { cacheProps.put(PROPERTY_PREFIX_CACHE + "publishedItems" + PROPERTY_SUFFIX_MAX_LIFE_TIME, Duration.ofMinutes(15).toMillis()); cacheProps.put(PROPERTY_PREFIX_CACHE + "sequences" + PROPERTY_SUFFIX_SIZE, -1L); cacheProps.put(PROPERTY_PREFIX_CACHE + "sequences" + PROPERTY_SUFFIX_MAX_LIFE_TIME, -1L); - cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistory" + PROPERTY_SUFFIX_SIZE, -1L); - cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistory" + PROPERTY_SUFFIX_MAX_LIFE_TIME, -1L); + cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistoryMeta" + PROPERTY_SUFFIX_SIZE, -1L); + cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistoryMeta" + PROPERTY_SUFFIX_MAX_LIFE_TIME, -1L); + cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistoryMessages" + PROPERTY_SUFFIX_SIZE, -1L); + cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistoryMessages" + PROPERTY_SUFFIX_MAX_LIFE_TIME, -1L); cacheProps.put(PROPERTY_PREFIX_CACHE + "mucPings" + PROPERTY_SUFFIX_SIZE, -1L); cacheProps.put(PROPERTY_PREFIX_CACHE + "mucPings" + PROPERTY_SUFFIX_MAX_LIFE_TIME, Duration.ofMinutes(30).toMillis());