Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for SMT AID-AIMC #957

Draft
wants to merge 30 commits into
base: feature/submodel-template-processor
Choose a base branch
from
Draft
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8a64d79
add aimc processor project
tbischoff2 Nov 6, 2024
2073b9a
start adding http
tbischoff2 Nov 8, 2024
6bab783
add header
tbischoff2 Nov 8, 2024
fed64a2
Merge remote-tracking branch 'remotes/origin/main' into feature/aid-aimc
tbischoff2 Nov 11, 2024
cbd7db1
add seurity
tbischoff2 Nov 12, 2024
6faa158
Merge remote-tracking branch 'remotes/origin/main' into feature/aid-aimc
tbischoff2 Nov 12, 2024
502e1d6
add MqttAssetConnection
tbischoff2 Nov 15, 2024
1f9b75b
Merge branch 'feature/submodel-template-processor' into feature/aid-aimc
tbischoff2 Nov 19, 2024
4ae3a5e
refactoring
tbischoff2 Nov 19, 2024
b31d7fa
fix sonar warnings
tbischoff2 Nov 19, 2024
df653ab
prepare Submodel update
tbischoff2 Nov 21, 2024
d7dc0b7
Merge branch 'feature/submodel-template-processor' into feature/aid-aimc
tbischoff2 Nov 21, 2024
f450043
update MQTT
tbischoff2 Nov 22, 2024
31b322a
Merge remote-tracking branch 'remotes/origin/feature/submodel-templat…
tbischoff2 Nov 26, 2024
d82383b
integrate delete
tbischoff2 Nov 27, 2024
9215253
remove AssetConnection
tbischoff2 Nov 29, 2024
a25a5db
Merge branch 'feature/submodel-template-processor' into feature/aid-aimc
tbischoff2 Dec 4, 2024
ab57ceb
Merge branch 'feature/submodel-template-processor' into feature/aid-aimc
tbischoff2 Dec 10, 2024
4e3452f
Merge branch 'feature/submodel-template-processor' into feature/aid-aimc
tbischoff2 Dec 12, 2024
724bd6a
Merge branch 'feature/submodel-template-processor' into feature/aid-aimc
tbischoff2 Dec 13, 2024
c792a50
update version
tbischoff2 Dec 16, 2024
dc8e7f5
prepare AID update
tbischoff2 Dec 18, 2024
df79099
Merge branch 'feature/submodel-template-processor' into feature/aid-aimc
tbischoff2 Dec 18, 2024
b672f81
update AID
tbischoff2 Dec 19, 2024
aaed99d
Merge branch 'feature/submodel-template-processor' into feature/aid-aimc
tbischoff2 Jan 8, 2025
cdfc890
add default subscription interval
tbischoff2 Jan 10, 2025
c71dd11
add unit test
tbischoff2 Jan 14, 2025
936dc1d
Merge branch 'feature/submodel-template-processor' into feature/aid-aimc
tbischoff2 Jan 16, 2025
1ea779e
update ProcessorTest
tbischoff2 Jan 17, 2025
bd6cec2
handle SubmodelElement in model change events
tbischoff2 Jan 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -111,7 +111,8 @@ public void removeNewDataListener(NewDataListener listener) throws AssetConnecti
*
* @throws AssetConnectionException if unsubscribe fails
*/
protected abstract void unsubscribe() throws AssetConnectionException;
@Override
public abstract void unsubscribe() throws AssetConnectionException;


@Override
Original file line number Diff line number Diff line change
@@ -137,7 +137,7 @@ protected void fireNewDataReceived(byte[] value) {


@Override
protected void unsubscribe() throws AssetConnectionException {
public void unsubscribe() throws AssetConnectionException {
if (executorHandler != null) {
executorHandler.cancel(true);
}
Original file line number Diff line number Diff line change
@@ -68,7 +68,7 @@ public void subscribe() throws AssetConnectionException {


@Override
protected void unsubscribe() throws AssetConnectionException {
public void unsubscribe() throws AssetConnectionException {
multiplexer.removeListener(config.getTopic(), this::fireNewDataReceived);
}

Original file line number Diff line number Diff line change
@@ -145,4 +145,10 @@ public boolean equals(Object obj) {
&& Objects.equals(multiplexer, that.multiplexer);
}


@Override
public void unsubscribe() throws AssetConnectionException {
close();
}

}
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
import de.fraunhofer.iosb.ilt.faaast.service.exception.MessageBusException;
import de.fraunhofer.iosb.ilt.faaast.service.filestorage.FileStorage;
import de.fraunhofer.iosb.ilt.faaast.service.messagebus.MessageBus;
import de.fraunhofer.iosb.ilt.faaast.service.model.SubmodelElementIdentifier;
import de.fraunhofer.iosb.ilt.faaast.service.model.api.InternalErrorResponse;
import de.fraunhofer.iosb.ilt.faaast.service.model.api.Request;
import de.fraunhofer.iosb.ilt.faaast.service.model.api.Response;
@@ -397,7 +398,7 @@ private void subscribeMessageBus() throws MessageBusException {
}
SubscriptionInfo info = SubscriptionInfo.create(ElementCreateEventMessage.class, x -> {
try {
elementCreated(x.getValue());
elementCreated(x.getValue(), x.getElement());
}
catch (Exception e) {
LOGGER.error("elementCreated Exception", e);
@@ -407,7 +408,7 @@ private void subscribeMessageBus() throws MessageBusException {

info = SubscriptionInfo.create(ElementDeleteEventMessage.class, x -> {
try {
elementDeleted(x.getValue());
elementDeleted(x.getValue(), x.getElement());
}
catch (Exception e) {
LOGGER.error("elementDeleted Exception", e);
@@ -417,7 +418,7 @@ private void subscribeMessageBus() throws MessageBusException {

info = SubscriptionInfo.create(ElementUpdateEventMessage.class, x -> {
try {
elementUpdated(x.getValue());
elementUpdated(x.getValue(), x.getElement());
}
catch (Exception e) {
LOGGER.error("elementUpdated Exception", e);
@@ -440,29 +441,44 @@ private void unsubscribeMessageBus() {
}


private void elementCreated(Referable value) throws PersistenceException {
private void elementCreated(Referable value, Reference reference) throws PersistenceException, ResourceNotFoundException {
Ensure.requireNonNull(value, VALUE_NULL);

if (value instanceof Submodel submodel) {
addSubmodel(submodel);
}
else if (value instanceof SubmodelElement) {
// if a SubmodelElement changed, we use updateSubodel
SubmodelElementIdentifier submodelElementIdentifier = SubmodelElementIdentifier.fromReference(reference);
updateSubmodel(getPersistence().getSubmodel(submodelElementIdentifier.getSubmodelId(), QueryModifier.DEFAULT));
}
}


private void elementDeleted(Referable value) {
private void elementDeleted(Referable value, Reference reference) throws PersistenceException, ResourceNotFoundException {
Ensure.requireNonNull(value, ELEMENT_NULL);

if (value instanceof Submodel submodel) {
deleteSubmodel(submodel);
}
else if (value instanceof SubmodelElement) {
// if a SubmodelElement changed, we use updateSubodel
SubmodelElementIdentifier submodelElementIdentifier = SubmodelElementIdentifier.fromReference(reference);
updateSubmodel(getPersistence().getSubmodel(submodelElementIdentifier.getSubmodelId(), QueryModifier.DEFAULT));
}
}


private void elementUpdated(Referable value) throws PersistenceException {
private void elementUpdated(Referable value, Reference reference) throws PersistenceException, ResourceNotFoundException {
Ensure.requireNonNull(value, VALUE_NULL);

if (value instanceof Submodel submodel) {
updateSubmodel(submodel);
}
else if (value instanceof SubmodelElement) {
// if a SubmodelElement changed, we use updateSubodel
SubmodelElementIdentifier submodelElementIdentifier = SubmodelElementIdentifier.fromReference(reference);
updateSubmodel(getPersistence().getSubmodel(submodelElementIdentifier.getSubmodelId(), QueryModifier.DEFAULT));
}
}
}
Original file line number Diff line number Diff line change
@@ -18,10 +18,12 @@
import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig;
import de.fraunhofer.iosb.ilt.faaast.service.exception.ConfigurationInitializationException;
import de.fraunhofer.iosb.ilt.faaast.service.util.Ensure;
import de.fraunhofer.iosb.ilt.faaast.service.util.ReferenceHelper;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.eclipse.digitaltwin.aas4j.v3.model.Reference;
import org.slf4j.LoggerFactory;


/**
@@ -41,6 +43,8 @@
public abstract class AbstractAssetConnection<T extends AssetConnection<C, VC, V, OC, O, SC, S>, C extends AssetConnectionConfig<T, VC, OC, SC>, VC extends AssetValueProviderConfig, V extends AssetValueProvider, OC extends AssetOperationProviderConfig, O extends AssetOperationProvider, SC extends AssetSubscriptionProviderConfig, S extends AssetSubscriptionProvider>
implements AssetConnection<C, VC, V, OC, O, SC, S> {

private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractAssetConnection.class);

protected volatile boolean connected;
protected static final String ERROR_MSG_REFERENCE_NOT_NULL = "reference must be non-null";
protected static final String ERROR_MSG_PROVIDER_CONFIG_NOT_NULL = "providerConfig must be non-null";
@@ -49,9 +53,11 @@ public abstract class AbstractAssetConnection<T extends AssetConnection<C, VC, V
protected ServiceContext serviceContext;
protected final Map<Reference, S> subscriptionProviders;
protected final Map<Reference, V> valueProviders;
protected volatile boolean active;

protected AbstractAssetConnection() {
connected = false;
active = false;
valueProviders = new HashMap<>();
operationProviders = new HashMap<>();
subscriptionProviders = new HashMap<>();
@@ -244,7 +250,36 @@ public void unregisterOperationProvider(Reference reference) {

@Override
public void unregisterSubscriptionProvider(Reference reference) {
if (ReferenceHelper.containsSameReference(subscriptionProviders, reference)) {
var s = ReferenceHelper.getValueBySameReference(subscriptionProviders, reference);
try {
if (s != null) {
s.unsubscribe();
}
}
catch (AssetConnectionException ex) {
LOGGER.error("unregisterSubscriptionProvider error in unsubscribe");
}
}
this.subscriptionProviders.remove(reference);
}


@Override
public void stop() {
try {
disconnect();
active = false;
}
catch (AssetConnectionException ex) {
LOGGER.error("stop: error in disconnect", ex);
}
active = false;
}


@Override
public boolean isActive() {
return active;
}
}
Original file line number Diff line number Diff line change
@@ -140,4 +140,17 @@ public interface AssetConnection<T extends AssetConnectionConfig, VC extends Ass
*/
public void unregisterValueProvider(Reference reference) throws AssetConnectionException;


/**
* Stops the asset connection.
*/
public void stop();


/**
* Return whether the asset connection is active.
*
* @return True if active, false otherwise.
*/
public boolean isActive();
}
Original file line number Diff line number Diff line change
@@ -196,7 +196,7 @@ private void tryConnectingUntilSuccess(AssetConnection connection) {
coreConfig.getAssetConnectionRetryInterval(),
e);
}
while (active && !connection.isConnected()) {
while (active && connection.isActive() && !connection.isConnected()) {
try {
tryConnecting(connection);
}
@@ -276,11 +276,11 @@ public void add(AssetConnectionConfig<? extends AssetConnection, ? extends Asset
Optional<AssetConnection> connection = connections.stream().filter(x -> Objects.equals(x, newConnection)).findFirst();
if (connection.isPresent()) {
connectionConfig.getValueProviders().forEach(LambdaExceptionHelper.rethrowBiConsumer(
(k, v) -> connection.get().registerValueProvider(k, (AssetValueProviderConfig) v)));
(k, v) -> connection.get().registerValueProvider(k, v)));
connectionConfig.getSubscriptionProviders().forEach(LambdaExceptionHelper.rethrowBiConsumer(
(k, v) -> connection.get().registerSubscriptionProvider(k, (AssetSubscriptionProviderConfig) v)));
(k, v) -> connection.get().registerSubscriptionProvider(k, v)));
connectionConfig.getOperationProviders().forEach(LambdaExceptionHelper.rethrowBiConsumer(
(k, v) -> connection.get().registerOperationProvider(k, (AssetOperationProviderConfig) v)));
(k, v) -> connection.get().registerOperationProvider(k, v)));
}
else {
connections.add(newConnection);
@@ -504,4 +504,21 @@ private void validateConnections() throws ConfigurationException {
ReferenceHelper.toString(subscriptionProviders.get().getKey())));
}
}


/**
* Remove the given AssetConnection.
*
* @param connection The AssetConnection to remove.
*/
public void remove(AssetConnection connection) {
if (connections.contains(connection)) {
connection.stop();

connections.remove(connection);
}
else {
throw new IllegalArgumentException("AssetConnection not found");
}
}
}
Original file line number Diff line number Diff line change
@@ -35,4 +35,12 @@ public interface AssetSubscriptionProvider extends AssetProvider {
* @throws AssetConnectionException if removinglistener fails
*/
public void removeNewDataListener(NewDataListener listener) throws AssetConnectionException;


/**
* Unsubscribe via underlying protocol.
*
* @throws AssetConnectionException if unsubscribe fails
*/
public void unsubscribe() throws AssetConnectionException;
}
Original file line number Diff line number Diff line change
@@ -65,6 +65,12 @@ public static Builder builder() {
return new Builder();
}


@Override
public void unsubscribe() throws AssetConnectionException {
listeners.clear();
}

public static class Builder extends ExtendableBuilder<LambdaSubscriptionProvider, Builder> {

public Builder generate(Consumer<NewDataListener> value) {
Original file line number Diff line number Diff line change
@@ -157,4 +157,16 @@ public void unregisterValueProvider(Reference reference) {
throw new UnsupportedOperationException("Not supported yet.");
}


@Override
public void stop() {
throw new UnsupportedOperationException("Not supported yet.");
}


@Override
public boolean isActive() {
throw new UnsupportedOperationException("Not supported yet.");
}

}
Original file line number Diff line number Diff line change
@@ -191,4 +191,16 @@ public void init(CoreConfig coreConfig, TestAssetConnectionConfig config, Servic
registerSubscriptionProvider(provider.getKey(), provider.getValue());
}
}


@Override
public void stop() {
// nothing to do here
}


@Override
public boolean isActive() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -114,7 +114,8 @@ private void fireNewDataReceived(DataElementValue value) {
}


private void unsubscribe() throws AssetConnectionException {
@Override
public void unsubscribe() throws AssetConnectionException {
if (executorHandler != null) {
executorHandler.cancel(true);
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -51,6 +51,7 @@
<module>assetconnection/http</module>
<module>dataformat/json</module>
<module>starter</module>
<module>submodeltemplate/asset-interfaces-mapping-configuration</module>
</modules>
<scm>
<connection>scm:git:git://github.com/FraunhoferIOSB/FAAAST-Service.git</connection>
5 changes: 5 additions & 0 deletions starter/pom.xml
Original file line number Diff line number Diff line change
@@ -93,6 +93,11 @@
<artifactId>persistence-mongo</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>submodeltemplate-asset-interfaces-mapping-configuration</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Loading