Skip to content

Commit

Permalink
Add exclude topics filter
Browse files Browse the repository at this point in the history
Signed-off-by: Florian Hotze <[email protected]>
  • Loading branch information
florian-h05 committed Jan 10, 2025
1 parent 636ba6f commit b7f417c
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class EventWebSocket {
public static final String WEBSOCKET_TOPIC_PREFIX = "openhab/websocket/";

private static final Type STRING_LIST_TYPE = TypeToken.getParameterized(List.class, String.class).getType();
private static final Pattern TOPIC_VALIDATE_PATTERN = Pattern.compile("^(\\w*\\*?\\/?)+$");
private static final Pattern TOPIC_VALIDATE_PATTERN = Pattern.compile("^!?(\\w*\\*?\\/?)+$");

private final Logger logger = LoggerFactory.getLogger(EventWebSocket.class);

Expand All @@ -67,7 +67,8 @@ public class EventWebSocket {

private List<String> typeFilter = List.of();
private List<String> sourceFilter = List.of();
private @Nullable TopicEventFilter topicFilter = null;
private @Nullable TopicEventFilter topicIncludeFilter = null;
private @Nullable TopicEventFilter topicExcludeFilter = null;

public EventWebSocket(Gson gson, EventWebSocketAdapter wsAdapter, ItemEventUtility itemEventUtility,
EventPublisher eventPublisher) {
Expand Down Expand Up @@ -162,11 +163,18 @@ public void onText(String message) {
"Invalid topic '" + topic + "' in topic filter WebSocketEvent");
}
}
List<String> includeTopics = topics.stream().filter(t -> !t.startsWith("!")).toList();
List<String> excludeTopics = topics.stream().filter(t -> t.startsWith("!"))
.map(t -> t.substring(1)).toList();
// convert to regex: replace any wildcard (*) with the regex pattern (.*)
topics = topics.stream().map(t -> t.trim().replace("*", ".*") + "$").toList();
includeTopics = includeTopics.stream().map(t -> t.trim().replace("*", ".*") + "$").toList();
excludeTopics = excludeTopics.stream().map(t -> t.trim().replace("*", ".*") + "$").toList();
// create topic filter if topic list not empty
if (!topics.isEmpty()) {
topicFilter = new TopicEventFilter(topics);
if (!includeTopics.isEmpty()) {
topicIncludeFilter = new TopicEventFilter(includeTopics);
}
if (!excludeTopics.isEmpty()) {
topicExcludeFilter = new TopicEventFilter(excludeTopics);
}
logger.debug("Setting topic filter for connection to {}: {}",
remoteEndpoint.getInetSocketAddress(), topics);
Expand Down Expand Up @@ -220,7 +228,8 @@ public void processEvent(Event event) {
String source = event.getSource();
if ((source == null || !sourceFilter.contains(event.getSource()))
&& (typeFilter.isEmpty() || typeFilter.contains(event.getType()))
&& (topicFilter == null || topicFilter.apply(event))) {
&& (topicIncludeFilter == null || topicIncludeFilter.apply(event))
&& (topicExcludeFilter == null || !topicExcludeFilter.apply(event))) {
sendMessage(gson.toJson(new EventDTO(event)));
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -55,6 +56,7 @@
* The {@link EventWebSocketTest} contains tests for the {@link EventWebSocket}
*
* @author Jan N. Klug - Initial contribution
* @author Florian Hotze - Add topic filter tests
*/
@NonNullByDefault
@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -241,13 +243,14 @@ public void eventFromBusFilterSource() throws IOException {
}

@Test
public void eventFromBusFilterTopic() throws IOException {
public void eventFromBusFilterIncludeTopic() throws IOException {
EventDTO eventDTO = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/topic",
"[\"openhab/items/*/command\", \"openhab/items/*/statechanged\"]", null, null);
EventDTO responseEventDTO = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/topic",
eventDTO.payload, null, null);
eventWebSocket.onText(gson.toJson(eventDTO));
verify(remoteEndpoint).sendString(gson.toJson(responseEventDTO));
clearInvocations(remoteEndpoint);

// subscribed topics are sent
Event event = ItemEventFactory.createCommandEvent(TEST_ITEM_NAME, DecimalType.ZERO,
Expand All @@ -262,7 +265,58 @@ public void eventFromBusFilterTopic() throws IOException {
// not subscribed event not sent
event = ItemEventFactory.createStateEvent(TEST_ITEM_NAME, DecimalType.ZERO, REMOTE_WEBSOCKET_IMPLEMENTATION);
eventWebSocket.processEvent(event);
verify(remoteEndpoint, times(3)).sendString(any());
verify(remoteEndpoint, times(2)).sendString(any());
}

@Test
public void eventFromBusFilterExcludeTopic() throws IOException {
EventDTO eventDTO = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/topic",
"[\"!openhab/items/" + TEST_ITEM_NAME + "/command\"]", null, null);
EventDTO responseEventDTO = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/topic",
eventDTO.payload, null, null);
eventWebSocket.onText(gson.toJson(eventDTO));
verify(remoteEndpoint).sendString(gson.toJson(responseEventDTO));
clearInvocations(remoteEndpoint);

// excluded topics are not sent
Event event = ItemEventFactory.createCommandEvent(TEST_ITEM_NAME, DecimalType.ZERO,
REMOTE_WEBSOCKET_IMPLEMENTATION);
eventWebSocket.processEvent(event);
verify(remoteEndpoint, times(0)).sendString(any());

// not excluded event sent
event = ItemEventFactory.createStateChangedEvent(TEST_ITEM_NAME, DecimalType.ZERO, DecimalType.ZERO);
eventWebSocket.processEvent(event);
verify(remoteEndpoint).sendString(gson.toJson(new EventDTO(event)));

event = ItemEventFactory.createStateEvent(TEST_ITEM_NAME, DecimalType.ZERO, REMOTE_WEBSOCKET_IMPLEMENTATION);
eventWebSocket.processEvent(event);
verify(remoteEndpoint).sendString(gson.toJson(new EventDTO(event)));
}

@Test
public void eventFromBusFilterIncludeAndExcludeTopic() throws IOException {
EventDTO eventDTO = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/topic",
"[\"openhab/items/*/*\", \"!openhab/items/*/command\"]", null, null);
EventDTO responseEventDTO = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/topic",
eventDTO.payload, null, null);
eventWebSocket.onText(gson.toJson(eventDTO));
verify(remoteEndpoint).sendString(gson.toJson(responseEventDTO));
clearInvocations(remoteEndpoint);

// included topics are sent
Event event = ItemEventFactory.createStateChangedEvent(TEST_ITEM_NAME, DecimalType.ZERO, DecimalType.ZERO);
eventWebSocket.processEvent(event);
verify(remoteEndpoint).sendString(gson.toJson(new EventDTO(event)));

event = ItemEventFactory.createStateEvent(TEST_ITEM_NAME, DecimalType.ZERO, REMOTE_WEBSOCKET_IMPLEMENTATION);
eventWebSocket.processEvent(event);
verify(remoteEndpoint).sendString(gson.toJson(new EventDTO(event)));

// excluded sub-topics are not sent
event = ItemEventFactory.createCommandEvent(TEST_ITEM_NAME, DecimalType.ZERO, REMOTE_WEBSOCKET_IMPLEMENTATION);
eventWebSocket.processEvent(event);
verify(remoteEndpoint, times(2)).sendString(any());
}

private void assertEventProcessing(EventDTO incoming, @Nullable Event expectedEvent,
Expand Down

0 comments on commit b7f417c

Please sign in to comment.