diff --git a/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/Service.java b/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/Service.java index 89dc214a6..6670ad7a7 100644 --- a/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/Service.java +++ b/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/Service.java @@ -35,6 +35,11 @@ import de.fraunhofer.iosb.ilt.faaast.service.model.api.paging.PagingInfo; import de.fraunhofer.iosb.ilt.faaast.service.model.exception.PersistenceException; import de.fraunhofer.iosb.ilt.faaast.service.model.exception.ResourceNotFoundException; +import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.SubscriptionId; +import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.SubscriptionInfo; +import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.event.change.ElementCreateEventMessage; +import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.event.change.ElementDeleteEventMessage; +import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.event.change.ElementUpdateEventMessage; import de.fraunhofer.iosb.ilt.faaast.service.persistence.AssetAdministrationShellSearchCriteria; import de.fraunhofer.iosb.ilt.faaast.service.persistence.ConceptDescriptionSearchCriteria; import de.fraunhofer.iosb.ilt.faaast.service.persistence.Persistence; @@ -54,6 +59,7 @@ import org.eclipse.digitaltwin.aas4j.v3.model.Environment; import org.eclipse.digitaltwin.aas4j.v3.model.Operation; import org.eclipse.digitaltwin.aas4j.v3.model.OperationVariable; +import org.eclipse.digitaltwin.aas4j.v3.model.Referable; import org.eclipse.digitaltwin.aas4j.v3.model.Reference; import org.eclipse.digitaltwin.aas4j.v3.model.Submodel; import org.eclipse.digitaltwin.aas4j.v3.model.SubmodelElement; @@ -68,6 +74,8 @@ public class Service implements ServiceContext { private static final Logger LOGGER = LoggerFactory.getLogger(Service.class); + private static final String VALUE_NULL = "value must not be null"; + private static final String ELEMENT_NULL = "element must not be null"; private final ServiceConfig config; private AssetConnectionManager assetConnectionManager; private List endpoints; @@ -78,6 +86,7 @@ public class Service implements ServiceContext { private RegistrySynchronization registrySynchronization; private RequestHandlerManager requestHandler; private List submodelTemplateProcessors; + private List subscriptions; /** * Creates a new instance of {@link Service}. @@ -92,6 +101,7 @@ public class Service implements ServiceContext { * @throws IllegalArgumentException if coreConfig is null * @throws IllegalArgumentException if persistence is null * @throws PersistenceException if storage error occurs + * @throws MessageBusException if message bus error occurs * @throws IllegalArgumentException if messageBus is null * @throws RuntimeException if creating a deep copy of aasEnvironment fails * @throws ConfigurationException the configuration the {@link AssetConnectionManager} fails @@ -103,7 +113,7 @@ public Service(CoreConfig coreConfig, MessageBus messageBus, List endpoints, List assetConnections, - List submodelTemplateProcessors) throws ConfigurationException, AssetConnectionException, PersistenceException { + List submodelTemplateProcessors) throws ConfigurationException, AssetConnectionException, PersistenceException, MessageBusException { Ensure.requireNonNull(coreConfig, "coreConfig must be non-null"); Ensure.requireNonNull(persistence, "persistence must be non-null"); Ensure.requireNonNull(messageBus, "messageBus must be non-null"); @@ -115,6 +125,7 @@ public Service(CoreConfig coreConfig, this.endpoints = endpoints; } this.submodelTemplateProcessors = submodelTemplateProcessors; + this.subscriptions = new ArrayList<>(); this.config = ServiceConfig.builder() .core(coreConfig) .build(); @@ -140,12 +151,14 @@ public Service(CoreConfig coreConfig, * @throws IllegalArgumentException if config is null * @throws ConfigurationException if invalid configuration is provided * @throws PersistenceException if storage error occurs + * @throws MessageBusException if message bus error occurs * @throws AssetConnectionException when initializing asset connections fails */ public Service(ServiceConfig config) - throws ConfigurationException, AssetConnectionException, PersistenceException { + throws ConfigurationException, AssetConnectionException, PersistenceException, MessageBusException { Ensure.requireNonNull(config, "config must be non-null"); this.config = config; + this.subscriptions = new ArrayList<>(); init(); } @@ -273,6 +286,7 @@ public void start() throws MessageBusException, EndpointException, PersistenceEx */ public void stop() { LOGGER.debug("Get command for stopping FA³ST Service"); + unsubscribeMessageBus(); messageBus.stop(); assetConnectionManager.stop(); registrySynchronization.stop(); @@ -281,7 +295,7 @@ public void stop() { } - private void init() throws ConfigurationException, PersistenceException { + private void init() throws ConfigurationException, PersistenceException, MessageBusException { Ensure.requireNonNull(config.getPersistence(), new InvalidConfigurationException("config.persistence must be non-null")); persistence = (Persistence) config.getPersistence().newInstance(config.getCore(), this); Ensure.requireNonNull(config.getFileStorage(), new InvalidConfigurationException("config.filestorage must be non-null")); @@ -322,7 +336,7 @@ private void init() throws ConfigurationException, PersistenceException { } - private void initSubmodelTemplateProcessors() throws ConfigurationException, PersistenceException { + private void initSubmodelTemplateProcessors() throws ConfigurationException, PersistenceException, MessageBusException { if (submodelTemplateProcessors == null) { submodelTemplateProcessors = new ArrayList<>(); } @@ -338,11 +352,90 @@ private void initSubmodelTemplateProcessors() throws ConfigurationException, Per } List submodels = persistence.getAllSubmodels(QueryModifier.MAXIMAL, PagingInfo.ALL).getContent(); for (var submodel: submodels) { - for (var submodelTemplateProcessor: submodelTemplateProcessors) { - if (submodelTemplateProcessor.accept(submodel) && submodelTemplateProcessor.process(submodel, assetConnectionManager)) { - persistence.save(submodel); - } + processSubmodel(submodel); + } + + subscribeMessageBus(); + } + + + private void processSubmodel(Submodel submodel) throws PersistenceException { + for (var submodelTemplateProcessor: submodelTemplateProcessors) { + if (submodelTemplateProcessor.accept(submodel) && submodelTemplateProcessor.process(submodel, assetConnectionManager)) { + persistence.save(submodel); + } + } + } + + + private void subscribeMessageBus() throws MessageBusException { + if (subscriptions == null) { + subscriptions = new ArrayList<>(); + } + SubscriptionInfo info = SubscriptionInfo.create(ElementCreateEventMessage.class, x -> { + try { + elementCreated(x.getElement(), x.getValue()); + } + catch (Exception e) { + LOGGER.error("elementCreated Exception", e); + } + }); + subscriptions.add(messageBus.subscribe(info)); + + info = SubscriptionInfo.create(ElementDeleteEventMessage.class, x -> { + try { + elementDeleted(x.getElement()); + } + catch (Exception e) { + LOGGER.error("elementDeleted Exception", e); + } + }); + subscriptions.add(messageBus.subscribe(info)); + + info = SubscriptionInfo.create(ElementUpdateEventMessage.class, x -> { + try { + elementUpdated(x.getElement(), x.getValue()); + } + catch (Exception e) { + LOGGER.error("elementUpdated Exception", e); } + }); + subscriptions.add(messageBus.subscribe(info)); + } + + + private void unsubscribeMessageBus() { + for (var subscription: subscriptions) { + try { + messageBus.unsubscribe(subscription); + } + catch (Exception ex) { + LOGGER.error("unsubscribeMessageBus Exception", ex); + } + } + subscriptions.clear(); + } + + + private void elementCreated(Reference element, Referable value) throws PersistenceException { + Ensure.requireNonNull(element, ELEMENT_NULL); + Ensure.requireNonNull(value, VALUE_NULL); + + if (value instanceof Submodel submodel) { + processSubmodel(submodel); } } + + + private void elementDeleted(Reference element) { + Ensure.requireNonNull(element, ELEMENT_NULL); + + } + + + private void elementUpdated(Reference element, Referable value) { + Ensure.requireNonNull(element, ELEMENT_NULL); + Ensure.requireNonNull(value, VALUE_NULL); + + } } diff --git a/endpoint/opcua/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/opcua/helper/TestService.java b/endpoint/opcua/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/opcua/helper/TestService.java index ff5123c66..0d2acf48c 100644 --- a/endpoint/opcua/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/opcua/helper/TestService.java +++ b/endpoint/opcua/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/opcua/helper/TestService.java @@ -21,6 +21,7 @@ import de.fraunhofer.iosb.ilt.faaast.service.endpoint.opcua.OpcUaEndpointConfig; import de.fraunhofer.iosb.ilt.faaast.service.endpoint.opcua.helper.assetconnection.TestAssetConnectionConfig; import de.fraunhofer.iosb.ilt.faaast.service.exception.ConfigurationException; +import de.fraunhofer.iosb.ilt.faaast.service.exception.MessageBusException; import de.fraunhofer.iosb.ilt.faaast.service.filestorage.memory.FileStorageInMemoryConfig; import de.fraunhofer.iosb.ilt.faaast.service.messagebus.internal.MessageBusInternalConfig; import de.fraunhofer.iosb.ilt.faaast.service.model.AASFull; @@ -37,7 +38,7 @@ public class TestService extends Service { public TestService(OpcUaEndpointConfig config, TestAssetConnectionConfig assetConnectionConfig, boolean full) - throws ConfigurationException, AssetConnectionException, PersistenceException { + throws ConfigurationException, AssetConnectionException, PersistenceException, MessageBusException { super(ServiceConfig.builder() .core(CoreConfig.builder() .requestHandlerThreadPoolSize(2)