Skip to content

Commit

Permalink
integrate MessageBus
Browse files Browse the repository at this point in the history
  • Loading branch information
tbischoff2 committed Nov 6, 2024
1 parent 9a95a01 commit 95f4631
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 9 deletions.
109 changes: 101 additions & 8 deletions core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
import de.fraunhofer.iosb.ilt.faaast.service.model.api.paging.PagingInfo;
import de.fraunhofer.iosb.ilt.faaast.service.model.exception.PersistenceException;
import de.fraunhofer.iosb.ilt.faaast.service.model.exception.ResourceNotFoundException;
import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.SubscriptionId;
import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.SubscriptionInfo;
import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.event.change.ElementCreateEventMessage;
import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.event.change.ElementDeleteEventMessage;
import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.event.change.ElementUpdateEventMessage;
import de.fraunhofer.iosb.ilt.faaast.service.persistence.AssetAdministrationShellSearchCriteria;
import de.fraunhofer.iosb.ilt.faaast.service.persistence.ConceptDescriptionSearchCriteria;
import de.fraunhofer.iosb.ilt.faaast.service.persistence.Persistence;
Expand All @@ -54,6 +59,7 @@
import org.eclipse.digitaltwin.aas4j.v3.model.Environment;
import org.eclipse.digitaltwin.aas4j.v3.model.Operation;
import org.eclipse.digitaltwin.aas4j.v3.model.OperationVariable;
import org.eclipse.digitaltwin.aas4j.v3.model.Referable;
import org.eclipse.digitaltwin.aas4j.v3.model.Reference;
import org.eclipse.digitaltwin.aas4j.v3.model.Submodel;
import org.eclipse.digitaltwin.aas4j.v3.model.SubmodelElement;
Expand All @@ -68,6 +74,8 @@
public class Service implements ServiceContext {

private static final Logger LOGGER = LoggerFactory.getLogger(Service.class);
private static final String VALUE_NULL = "value must not be null";
private static final String ELEMENT_NULL = "element must not be null";
private final ServiceConfig config;
private AssetConnectionManager assetConnectionManager;
private List<Endpoint> endpoints;
Expand All @@ -78,6 +86,7 @@ public class Service implements ServiceContext {
private RegistrySynchronization registrySynchronization;
private RequestHandlerManager requestHandler;
private List<SubmodelTemplateProcessor> submodelTemplateProcessors;
private List<SubscriptionId> subscriptions;

/**
* Creates a new instance of {@link Service}.
Expand All @@ -92,6 +101,7 @@ public class Service implements ServiceContext {
* @throws IllegalArgumentException if coreConfig is null
* @throws IllegalArgumentException if persistence is null
* @throws PersistenceException if storage error occurs
* @throws MessageBusException if message bus error occurs
* @throws IllegalArgumentException if messageBus is null
* @throws RuntimeException if creating a deep copy of aasEnvironment fails
* @throws ConfigurationException the configuration the {@link AssetConnectionManager} fails
Expand All @@ -103,7 +113,7 @@ public Service(CoreConfig coreConfig,
MessageBus messageBus,
List<Endpoint> endpoints,
List<AssetConnection> assetConnections,
List<SubmodelTemplateProcessor> submodelTemplateProcessors) throws ConfigurationException, AssetConnectionException, PersistenceException {
List<SubmodelTemplateProcessor> submodelTemplateProcessors) throws ConfigurationException, AssetConnectionException, PersistenceException, MessageBusException {
Ensure.requireNonNull(coreConfig, "coreConfig must be non-null");
Ensure.requireNonNull(persistence, "persistence must be non-null");
Ensure.requireNonNull(messageBus, "messageBus must be non-null");
Expand All @@ -115,6 +125,7 @@ public Service(CoreConfig coreConfig,
this.endpoints = endpoints;
}
this.submodelTemplateProcessors = submodelTemplateProcessors;
this.subscriptions = new ArrayList<>();
this.config = ServiceConfig.builder()
.core(coreConfig)
.build();
Expand All @@ -140,12 +151,14 @@ public Service(CoreConfig coreConfig,
* @throws IllegalArgumentException if config is null
* @throws ConfigurationException if invalid configuration is provided
* @throws PersistenceException if storage error occurs
* @throws MessageBusException if message bus error occurs
* @throws AssetConnectionException when initializing asset connections fails
*/
public Service(ServiceConfig config)
throws ConfigurationException, AssetConnectionException, PersistenceException {
throws ConfigurationException, AssetConnectionException, PersistenceException, MessageBusException {
Ensure.requireNonNull(config, "config must be non-null");
this.config = config;
this.subscriptions = new ArrayList<>();
init();
}

Expand Down Expand Up @@ -273,6 +286,7 @@ public void start() throws MessageBusException, EndpointException, PersistenceEx
*/
public void stop() {
LOGGER.debug("Get command for stopping FA³ST Service");
unsubscribeMessageBus();
messageBus.stop();
assetConnectionManager.stop();
registrySynchronization.stop();
Expand All @@ -281,7 +295,7 @@ public void stop() {
}


private void init() throws ConfigurationException, PersistenceException {
private void init() throws ConfigurationException, PersistenceException, MessageBusException {
Ensure.requireNonNull(config.getPersistence(), new InvalidConfigurationException("config.persistence must be non-null"));
persistence = (Persistence) config.getPersistence().newInstance(config.getCore(), this);
Ensure.requireNonNull(config.getFileStorage(), new InvalidConfigurationException("config.filestorage must be non-null"));
Expand Down Expand Up @@ -322,7 +336,7 @@ private void init() throws ConfigurationException, PersistenceException {
}


private void initSubmodelTemplateProcessors() throws ConfigurationException, PersistenceException {
private void initSubmodelTemplateProcessors() throws ConfigurationException, PersistenceException, MessageBusException {
if (submodelTemplateProcessors == null) {
submodelTemplateProcessors = new ArrayList<>();
}
Expand All @@ -338,11 +352,90 @@ private void initSubmodelTemplateProcessors() throws ConfigurationException, Per
}
List<Submodel> submodels = persistence.getAllSubmodels(QueryModifier.MAXIMAL, PagingInfo.ALL).getContent();
for (var submodel: submodels) {
for (var submodelTemplateProcessor: submodelTemplateProcessors) {
if (submodelTemplateProcessor.accept(submodel) && submodelTemplateProcessor.process(submodel, assetConnectionManager)) {
persistence.save(submodel);
}
processSubmodel(submodel);
}

subscribeMessageBus();
}


private void processSubmodel(Submodel submodel) throws PersistenceException {
for (var submodelTemplateProcessor: submodelTemplateProcessors) {
if (submodelTemplateProcessor.accept(submodel) && submodelTemplateProcessor.process(submodel, assetConnectionManager)) {
persistence.save(submodel);
}
}
}


private void subscribeMessageBus() throws MessageBusException {
if (subscriptions == null) {
subscriptions = new ArrayList<>();
}
SubscriptionInfo info = SubscriptionInfo.create(ElementCreateEventMessage.class, x -> {
try {
elementCreated(x.getElement(), x.getValue());
}
catch (Exception e) {
LOGGER.error("elementCreated Exception", e);
}
});
subscriptions.add(messageBus.subscribe(info));

info = SubscriptionInfo.create(ElementDeleteEventMessage.class, x -> {
try {
elementDeleted(x.getElement());
}
catch (Exception e) {
LOGGER.error("elementDeleted Exception", e);
}
});
subscriptions.add(messageBus.subscribe(info));

info = SubscriptionInfo.create(ElementUpdateEventMessage.class, x -> {
try {
elementUpdated(x.getElement(), x.getValue());
}
catch (Exception e) {
LOGGER.error("elementUpdated Exception", e);
}
});
subscriptions.add(messageBus.subscribe(info));
}


private void unsubscribeMessageBus() {
for (var subscription: subscriptions) {
try {
messageBus.unsubscribe(subscription);
}
catch (Exception ex) {
LOGGER.error("unsubscribeMessageBus Exception", ex);
}
}
subscriptions.clear();
}


private void elementCreated(Reference element, Referable value) throws PersistenceException {
Ensure.requireNonNull(element, ELEMENT_NULL);
Ensure.requireNonNull(value, VALUE_NULL);

if (value instanceof Submodel submodel) {
processSubmodel(submodel);
}
}


private void elementDeleted(Reference element) {
Ensure.requireNonNull(element, ELEMENT_NULL);

}


private void elementUpdated(Reference element, Referable value) {
Ensure.requireNonNull(element, ELEMENT_NULL);
Ensure.requireNonNull(value, VALUE_NULL);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import de.fraunhofer.iosb.ilt.faaast.service.endpoint.opcua.OpcUaEndpointConfig;
import de.fraunhofer.iosb.ilt.faaast.service.endpoint.opcua.helper.assetconnection.TestAssetConnectionConfig;
import de.fraunhofer.iosb.ilt.faaast.service.exception.ConfigurationException;
import de.fraunhofer.iosb.ilt.faaast.service.exception.MessageBusException;
import de.fraunhofer.iosb.ilt.faaast.service.filestorage.memory.FileStorageInMemoryConfig;
import de.fraunhofer.iosb.ilt.faaast.service.messagebus.internal.MessageBusInternalConfig;
import de.fraunhofer.iosb.ilt.faaast.service.model.AASFull;
Expand All @@ -37,7 +38,7 @@
public class TestService extends Service {

public TestService(OpcUaEndpointConfig config, TestAssetConnectionConfig assetConnectionConfig, boolean full)
throws ConfigurationException, AssetConnectionException, PersistenceException {
throws ConfigurationException, AssetConnectionException, PersistenceException, MessageBusException {
super(ServiceConfig.builder()
.core(CoreConfig.builder()
.requestHandlerThreadPoolSize(2)
Expand Down

0 comments on commit 95f4631

Please sign in to comment.