From b0cc78511b5b18a3557f24b81060be63f958c618 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Mon, 23 Dec 2024 09:14:51 +0200 Subject: [PATCH] Dynamic qualifiers (#871) --- .../scalecube/services/ServiceReference.java | 40 ++-- .../services/api/DynamicQualifier.java | 116 +++++++++++ .../services/methods/MethodInfo.java | 8 + .../services/methods/RequestContext.java | 62 ++++++ .../methods/ServiceMethodInvoker.java | 48 +++-- .../services/api/DynamicQualifierTest.java | 59 ++++++ .../methods/ServiceMethodInvokerTest.java | 115 +++++++---- .../services/methods/StubService.java | 7 +- .../services/methods/StubServiceImpl.java | 16 ++ .../services/examples/GreetingService.java | 3 + .../GreetingServiceCancelCallback.java | 5 + .../examples/GreetingServiceImpl.java | 8 +- .../gateway/http/HttpGatewayTest.java | 12 ++ .../gateway/http/HttpLocalGatewayTest.java | 12 ++ .../websocket/WebsocketGatewayTest.java | 13 ++ .../websocket/WebsocketLocalGatewayTest.java | 13 ++ .../registry/ServiceRegistryImpl.java | 156 ++++++++++----- .../services/ServiceCallLocalTest.java | 25 +-- .../scalecube/services/ServiceLocalTest.java | 33 +++- .../scalecube/services/ServiceRemoteTest.java | 15 +- .../registry/ServiceRegistryImplTest.java | 187 ++++++++++++++++++ .../services/sut/GreetingService.java | 3 + .../services/sut/GreetingServiceImpl.java | 48 +++-- 23 files changed, 837 insertions(+), 167 deletions(-) create mode 100644 services-api/src/main/java/io/scalecube/services/api/DynamicQualifier.java create mode 100644 services-api/src/main/java/io/scalecube/services/methods/RequestContext.java create mode 100644 services-api/src/test/java/io/scalecube/services/api/DynamicQualifierTest.java create mode 100644 services/src/test/java/io/scalecube/services/registry/ServiceRegistryImplTest.java diff --git a/services-api/src/main/java/io/scalecube/services/ServiceReference.java b/services-api/src/main/java/io/scalecube/services/ServiceReference.java index 1c47ddcd0..bdcbb05c1 100644 --- a/services-api/src/main/java/io/scalecube/services/ServiceReference.java +++ b/services-api/src/main/java/io/scalecube/services/ServiceReference.java @@ -1,5 +1,6 @@ package io.scalecube.services; +import io.scalecube.services.api.DynamicQualifier; import io.scalecube.services.api.Qualifier; import java.util.Collections; import java.util.HashMap; @@ -13,12 +14,13 @@ */ public class ServiceReference { - private final String qualifier; private final String endpointId; private final String namespace; + private final String action; + private final String qualifier; + private final DynamicQualifier dynamicQualifier; private final Set contentTypes; private final Map tags; - private final String action; private final Address address; private final boolean isSecured; @@ -35,18 +37,15 @@ public ServiceReference( ServiceEndpoint serviceEndpoint) { this.endpointId = serviceEndpoint.id(); this.namespace = serviceRegistration.namespace(); - this.contentTypes = Collections.unmodifiableSet(serviceEndpoint.contentTypes()); - this.tags = mergeTags(serviceMethodDefinition, serviceRegistration, serviceEndpoint); this.action = serviceMethodDefinition.action(); this.qualifier = Qualifier.asString(namespace, action); + this.dynamicQualifier = qualifier.contains(":") ? new DynamicQualifier(qualifier) : null; + this.contentTypes = Collections.unmodifiableSet(serviceEndpoint.contentTypes()); + this.tags = mergeTags(serviceMethodDefinition, serviceRegistration, serviceEndpoint); this.address = serviceEndpoint.address(); this.isSecured = serviceMethodDefinition.isSecured(); } - public String qualifier() { - return qualifier; - } - public String endpointId() { return endpointId; } @@ -55,6 +54,18 @@ public String namespace() { return namespace; } + public String action() { + return action; + } + + public String qualifier() { + return qualifier; + } + + public DynamicQualifier dynamicQualifier() { + return dynamicQualifier; + } + public Set contentTypes() { return contentTypes; } @@ -63,10 +74,6 @@ public Map tags() { return tags; } - public String action() { - return action; - } - public Address address() { return this.address; } @@ -89,11 +96,14 @@ private Map mergeTags( @Override public String toString() { return new StringJoiner(", ", ServiceReference.class.getSimpleName() + "[", "]") - .add("endpointId=" + endpointId) - .add("address=" + address) - .add("qualifier=" + qualifier) + .add("endpointId='" + endpointId + "'") + .add("namespace='" + namespace + "'") + .add("action='" + action + "'") + .add("qualifier='" + qualifier + "'") + .add("dynamicQualifier=" + dynamicQualifier) .add("contentTypes=" + contentTypes) .add("tags=" + tags) + .add("address=" + address) .add("isSecured=" + isSecured) .toString(); } diff --git a/services-api/src/main/java/io/scalecube/services/api/DynamicQualifier.java b/services-api/src/main/java/io/scalecube/services/api/DynamicQualifier.java new file mode 100644 index 000000000..75f4870c6 --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/api/DynamicQualifier.java @@ -0,0 +1,116 @@ +package io.scalecube.services.api; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.StringJoiner; +import java.util.regex.Pattern; + +public final class DynamicQualifier { + + private final String qualifier; + private final Pattern pattern; + private final List pathVariables; + private final int size; + + public DynamicQualifier(String qualifier) { + if (!qualifier.contains(":")) { + throw new IllegalArgumentException("Illegal dynamic qualifier: " + qualifier); + } + + final var pathVariables = new ArrayList(); + final var sb = new StringBuilder(); + for (var s : qualifier.split("/")) { + if (s.startsWith(":")) { + final var pathVar = s.substring(1); + sb.append("(?<").append(pathVar).append(">.*?)"); + pathVariables.add(pathVar); + } else { + sb.append(s); + } + sb.append("/"); + } + sb.setLength(sb.length() - 1); + + this.qualifier = qualifier; + this.pattern = Pattern.compile(sb.toString()); + this.pathVariables = Collections.unmodifiableList(pathVariables); + this.size = sizeOf(qualifier); + } + + public String qualifier() { + return qualifier; + } + + public Pattern pattern() { + return pattern; + } + + public List pathVariables() { + return pathVariables; + } + + public int size() { + return size; + } + + public Map matchQualifier(String input) { + if (size != sizeOf(input)) { + return null; + } + + final var matcher = pattern.matcher(input); + if (!matcher.matches()) { + return null; + } + + final var map = new LinkedHashMap(); + for (var pathVar : pathVariables) { + final var value = matcher.group(pathVar); + Objects.requireNonNull( + value, "Path variable value must not be null, path variable: " + pathVar); + map.put(pathVar, value); + } + + return map; + } + + private static int sizeOf(String value) { + int count = 0; + for (int i = 0, length = value.length(); i < length; i++) { + if (value.charAt(i) == '/') { + count++; + } + } + return count; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return Objects.equals(qualifier, ((DynamicQualifier) o).qualifier); + } + + @Override + public int hashCode() { + return Objects.hashCode(qualifier); + } + + @Override + public String toString() { + return new StringJoiner(", ", DynamicQualifier.class.getSimpleName() + "[", "]") + .add("qualifier='" + qualifier + "'") + .add("pattern=" + pattern) + .add("pathVariables=" + pathVariables) + .add("size=" + size) + .toString(); + } +} diff --git a/services-api/src/main/java/io/scalecube/services/methods/MethodInfo.java b/services-api/src/main/java/io/scalecube/services/methods/MethodInfo.java index 51544da75..8ca26cf3f 100644 --- a/services-api/src/main/java/io/scalecube/services/methods/MethodInfo.java +++ b/services-api/src/main/java/io/scalecube/services/methods/MethodInfo.java @@ -1,6 +1,7 @@ package io.scalecube.services.methods; import io.scalecube.services.CommunicationMode; +import io.scalecube.services.api.DynamicQualifier; import io.scalecube.services.api.Qualifier; import java.lang.reflect.Type; import java.util.StringJoiner; @@ -11,6 +12,7 @@ public final class MethodInfo { private final String serviceName; private final String methodName; private final String qualifier; + private final DynamicQualifier dynamicQualifier; private final Type parameterizedReturnType; private final boolean isReturnTypeServiceMessage; private final CommunicationMode communicationMode; @@ -51,6 +53,7 @@ public MethodInfo( this.serviceName = serviceName; this.methodName = methodName; this.qualifier = Qualifier.asString(serviceName, methodName); + this.dynamicQualifier = qualifier.contains(":") ? new DynamicQualifier(qualifier) : null; this.parameterCount = parameterCount; this.requestType = requestType; this.isRequestTypeServiceMessage = isRequestTypeServiceMessage; @@ -70,6 +73,10 @@ public String qualifier() { return qualifier; } + public DynamicQualifier dynamicQualifier() { + return dynamicQualifier; + } + public Type parameterizedReturnType() { return parameterizedReturnType; } @@ -112,6 +119,7 @@ public String toString() { .add("serviceName='" + serviceName + "'") .add("methodName='" + methodName + "'") .add("qualifier='" + qualifier + "'") + .add("dynamicQualifier=" + dynamicQualifier) .add("parameterizedReturnType=" + parameterizedReturnType) .add("isReturnTypeServiceMessage=" + isReturnTypeServiceMessage) .add("communicationMode=" + communicationMode) diff --git a/services-api/src/main/java/io/scalecube/services/methods/RequestContext.java b/services-api/src/main/java/io/scalecube/services/methods/RequestContext.java new file mode 100644 index 000000000..3722eadbe --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/methods/RequestContext.java @@ -0,0 +1,62 @@ +package io.scalecube.services.methods; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.StringJoiner; +import reactor.core.publisher.Mono; + +public class RequestContext { + + private final Map headers; + private final Object principal; + private final Map pathVars; + + /** + * Constructor. + * + * @param headers message headers + * @param principal authenticated principal (optional) + * @param pathVars path variables (optional) + */ + public RequestContext( + Map headers, Object principal, Map pathVars) { + this.headers = Collections.unmodifiableMap(new HashMap<>(headers)); + this.principal = principal; + this.pathVars = pathVars != null ? Map.copyOf(pathVars) : null; + } + + public Map headers() { + return headers; + } + + public String header(String name) { + return headers.get(name); + } + + public T principal() { + //noinspection unchecked + return (T) principal; + } + + public Map pathVars() { + return pathVars; + } + + public String pathVar(String name) { + return pathVars != null ? pathVars.get(name) : null; + } + + public static Mono deferContextual() { + return Mono.deferContextual(context -> Mono.just(context.get(RequestContext.class))); + } + + @Override + public String toString() { + return new StringJoiner(", ", RequestContext.class.getSimpleName() + "[", "]") + .add("headers=" + headers) + .add("principal=" + principal) + .add("pathVars=" + pathVars) + .toString(); + } +} diff --git a/services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java b/services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java index 2f60386d9..79c2a58a5 100644 --- a/services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java +++ b/services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java @@ -15,6 +15,7 @@ import java.lang.System.Logger.Level; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.StringJoiner; @@ -76,7 +77,7 @@ private Mono invokeOne(ServiceMessage message, Object authData) { context -> { final var request = toRequest(message); final var qualifier = message.qualifier(); - return Mono.from(invoke(request)) + return Mono.from(invokeRequest(request)) .doOnSuccess( response -> { if (logger != null && logger.isLoggable(level)) { @@ -95,7 +96,8 @@ private Mono invokeOne(ServiceMessage message, Object authData) { } }); }) - .contextWrite(context -> enhanceContext(authData, context)); + .contextWrite(context -> enhanceWithRequestContext(context, message)) + .contextWrite(context -> enhanceWithAuthContext(context, authData)); } /** @@ -118,7 +120,7 @@ private Flux invokeMany(ServiceMessage message, Object authData) { context -> { final var request = toRequest(message); final var qualifier = message.qualifier(); - return Flux.from(invoke(request)) + return Flux.from(invokeRequest(request)) .doOnSubscribe( s -> { if (logger != null && logger.isLoggable(level)) { @@ -132,7 +134,8 @@ private Flux invokeMany(ServiceMessage message, Object authData) { } }); }) - .contextWrite(context -> enhanceContext(authData, context)); + .contextWrite(context -> enhanceWithRequestContext(context, message)) + .contextWrite(context -> enhanceWithAuthContext(context, authData)); } /** @@ -157,11 +160,12 @@ public Flux invokeBidirectional(Publisher publis } private Flux invokeBidirectional(Flux messages, Object authData) { - return Flux.deferContextual(context -> messages.map(this::toRequest).transform(this::invoke)) - .contextWrite(context -> enhanceContext(authData, context)); + return Flux.deferContextual( + context -> messages.map(this::toRequest).transform(this::invokeRequest)) + .contextWrite(context -> enhanceWithAuthContext(context, authData)); } - private Publisher invoke(Object request) { + private Publisher invokeRequest(Object request) { Publisher result = null; Throwable throwable = null; try { @@ -206,26 +210,37 @@ private Mono authenticate(ServiceMessage message, Context context) { return authenticator .apply(message.headers()) .switchIfEmpty(Mono.just(NULL_AUTH_CONTEXT)) - .onErrorMap(this::toUnauthorizedException); + .onErrorMap(ServiceMethodInvoker::toUnauthorizedException); } - private UnauthorizedException toUnauthorizedException(Throwable th) { - if (th instanceof ServiceException) { - ServiceException e = (ServiceException) th; + private static UnauthorizedException toUnauthorizedException(Throwable th) { + if (th instanceof ServiceException e) { return new UnauthorizedException(e.errorCode(), e.getMessage()); } else { return new UnauthorizedException(th); } } - private Context enhanceContext(Object authData, Context context) { + private Context enhanceWithAuthContext(Context context, Object authData) { if (authData == NULL_AUTH_CONTEXT || principalMapper == null) { return context.put(AUTH_CONTEXT_KEY, authData); + } else { + final var principal = principalMapper.apply(authData); + return context.put(AUTH_CONTEXT_KEY, principal != null ? principal : NULL_AUTH_CONTEXT); } + } - Object mappedData = principalMapper.apply(authData); + private Context enhanceWithRequestContext(Context context, ServiceMessage message) { + final var headers = message.headers(); + final var principal = context.get(AUTH_CONTEXT_KEY); + final var dynamicQualifier = methodInfo.dynamicQualifier(); + + Map pathVars = null; + if (dynamicQualifier != null) { + pathVars = dynamicQualifier.matchQualifier(message.qualifier()); + } - return context.put(AUTH_CONTEXT_KEY, mappedData != null ? mappedData : NULL_AUTH_CONTEXT); + return context.put(RequestContext.class, new RequestContext(headers, principal, pathVars)); } private Object toRequest(ServiceMessage message) { @@ -246,9 +261,8 @@ private Object toRequest(ServiceMessage message) { return methodInfo.isRequestTypeServiceMessage() ? request : request.data(); } - private ServiceMessage toResponse(Object response, String qualifier, String dataFormat) { - if (response instanceof ServiceMessage) { - ServiceMessage message = (ServiceMessage) response; + private static ServiceMessage toResponse(Object response, String qualifier, String dataFormat) { + if (response instanceof ServiceMessage message) { if (dataFormat != null && !dataFormat.equals(message.dataFormat())) { return ServiceMessage.from(message).qualifier(qualifier).dataFormat(dataFormat).build(); } diff --git a/services-api/src/test/java/io/scalecube/services/api/DynamicQualifierTest.java b/services-api/src/test/java/io/scalecube/services/api/DynamicQualifierTest.java new file mode 100644 index 000000000..ca3980c48 --- /dev/null +++ b/services-api/src/test/java/io/scalecube/services/api/DynamicQualifierTest.java @@ -0,0 +1,59 @@ +package io.scalecube.services.api; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class DynamicQualifierTest { + + @Test + void testIllegalArgument() { + assertThrows( + IllegalArgumentException.class, () -> new DynamicQualifier("v1/this.is.namespace/foo/bar")); + } + + @Test + void testNoMatches() { + final var qualifier = new DynamicQualifier("v1/this.is.namespace/foo/:foo/bar/:bar"); + assertNull(qualifier.matchQualifier("v1/this.is.namespace/foo/bar")); + } + + @Test + void testStrictMatching() { + final var qualifier = new DynamicQualifier("v1/this.is.namespace/foo/:foo"); + assertNotNull(qualifier.matchQualifier("v1/this.is.namespace/foo/123")); + assertNull(qualifier.matchQualifier("v1/this.is.namespace/foo/123/bar/456/baz/678")); + } + + @Test + void testEquality() { + final var qualifier1 = new DynamicQualifier("v1/this.is.namespace/foo/:foo/bar/:bar"); + final var qualifier2 = new DynamicQualifier("v1/this.is.namespace/foo/:foo/bar/:bar"); + assertEquals(qualifier1, qualifier2); + } + + @Test + void testMatchSinglePathVariable() { + final var userName = UUID.randomUUID().toString(); + final var qualifier = new DynamicQualifier("v1/this.is.namespace/foo/bar/:userName"); + final var map = qualifier.matchQualifier("v1/this.is.namespace/foo/bar/" + userName); + assertNotNull(map); + assertEquals(1, map.size()); + assertEquals(userName, map.get("userName")); + } + + @Test + void testMatchMultiplePathVariables() { + final var qualifier = new DynamicQualifier("v1/this.is.namespace/foo/:foo/bar/:bar/baz/:baz"); + final var map = qualifier.matchQualifier("v1/this.is.namespace/foo/123/bar/456/baz/678"); + assertNotNull(map); + assertEquals(3, map.size()); + assertEquals("123", map.get("foo")); + assertEquals("456", map.get("bar")); + assertEquals("678", map.get("baz")); + } +} diff --git a/services-api/src/test/java/io/scalecube/services/methods/ServiceMethodInvokerTest.java b/services-api/src/test/java/io/scalecube/services/methods/ServiceMethodInvokerTest.java index b089d4ad9..ef50a7730 100644 --- a/services-api/src/test/java/io/scalecube/services/methods/ServiceMethodInvokerTest.java +++ b/services-api/src/test/java/io/scalecube/services/methods/ServiceMethodInvokerTest.java @@ -23,7 +23,7 @@ class ServiceMethodInvokerTest { - private static final String qualifierPrefix = "io.scalecube.services.methods.StubService/"; + private static final String QUALIFIER_PREFIX = StubService.NAMESPACE + "/"; private static final boolean AUTH = true; public static final boolean IS_RETURN_TYPE_SERVICE_MESSAGE = false; @@ -50,12 +50,11 @@ class ServiceMethodInvokerTest { @DisplayName("invokeOne should return empty response when service returns null") void testInvokeOneWhenReturnNull() throws Exception { final String methodName = "returnNull"; - final Class serviceClass = stubService.getClass(); - final Method method = serviceClass.getMethod(methodName); + final Method method = stubService.getClass().getMethod(methodName); final MethodInfo methodInfo = new MethodInfo( - serviceClass.getName(), + StubService.NAMESPACE, methodName, method.getReturnType(), IS_RETURN_TYPE_SERVICE_MESSAGE, @@ -79,7 +78,7 @@ void testInvokeOneWhenReturnNull() throws Exception { null); ServiceMessage message = - ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build(); + ServiceMessage.builder().qualifier(QUALIFIER_PREFIX + methodName).build(); StepVerifier.create( Mono.deferContextual(context -> serviceMethodInvoker.invokeOne(message)) @@ -91,12 +90,11 @@ void testInvokeOneWhenReturnNull() throws Exception { @DisplayName("invokeMany should return empty response when service returns null") void testInvokeManyWhenReturnNull() throws Exception { final String methodName = "returnNull2"; - final Class serviceClass = stubService.getClass(); - final Method method = serviceClass.getMethod(methodName); + final Method method = stubService.getClass().getMethod(methodName); final MethodInfo methodInfo = new MethodInfo( - serviceClass.getName(), + StubService.NAMESPACE, methodName, method.getReturnType(), IS_RETURN_TYPE_SERVICE_MESSAGE, @@ -120,7 +118,7 @@ void testInvokeManyWhenReturnNull() throws Exception { null); ServiceMessage message = - ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build(); + ServiceMessage.builder().qualifier(QUALIFIER_PREFIX + methodName).build(); StepVerifier.create( Flux.deferContextual(context -> serviceMethodInvoker.invokeMany(message)) @@ -132,12 +130,11 @@ void testInvokeManyWhenReturnNull() throws Exception { @DisplayName("invokeBidirectional should return empty response when service returns null") void testInvokeBidirectionalWhenReturnNull() throws Exception { final String methodName = "returnNull3"; - final Class serviceClass = stubService.getClass(); - final Method method = serviceClass.getMethod(methodName, Flux.class); + final Method method = stubService.getClass().getMethod(methodName, Flux.class); final MethodInfo methodInfo = new MethodInfo( - serviceClass.getName(), + StubService.NAMESPACE, methodName, method.getReturnType(), IS_RETURN_TYPE_SERVICE_MESSAGE, @@ -161,7 +158,7 @@ void testInvokeBidirectionalWhenReturnNull() throws Exception { null); ServiceMessage message = - ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build(); + ServiceMessage.builder().qualifier(QUALIFIER_PREFIX + methodName).build(); StepVerifier.create( Flux.deferContextual( @@ -174,12 +171,11 @@ void testInvokeBidirectionalWhenReturnNull() throws Exception { @DisplayName("invokeOne should return error response when service throws exception") void testInvokeOneWhenThrowException() throws Exception { final String methodName = "throwException"; - final Class serviceClass = stubService.getClass(); - final Method method = serviceClass.getMethod(methodName); + final Method method = stubService.getClass().getMethod(methodName); final MethodInfo methodInfo = new MethodInfo( - serviceClass.getName(), + StubService.NAMESPACE, methodName, method.getReturnType(), IS_RETURN_TYPE_SERVICE_MESSAGE, @@ -203,7 +199,7 @@ void testInvokeOneWhenThrowException() throws Exception { null); ServiceMessage message = - ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build(); + ServiceMessage.builder().qualifier(QUALIFIER_PREFIX + methodName).build(); // invokeOne final Mono invokeOne = @@ -219,12 +215,11 @@ void testInvokeOneWhenThrowException() throws Exception { @DisplayName("invokeMany should return error response when service throws exception") void testInvokeManyWhenThrowException() throws Exception { final String methodName = "throwException2"; - final Class serviceClass = stubService.getClass(); - final Method method = serviceClass.getMethod(methodName); + final Method method = stubService.getClass().getMethod(methodName); final MethodInfo methodInfo = new MethodInfo( - serviceClass.getName(), + StubService.NAMESPACE, methodName, method.getReturnType(), IS_RETURN_TYPE_SERVICE_MESSAGE, @@ -248,7 +243,7 @@ void testInvokeManyWhenThrowException() throws Exception { null); ServiceMessage message = - ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build(); + ServiceMessage.builder().qualifier(QUALIFIER_PREFIX + methodName).build(); final Flux invokeOne = Flux.deferContextual(context -> serviceMethodInvoker.invokeMany(message)) @@ -263,12 +258,11 @@ void testInvokeManyWhenThrowException() throws Exception { @DisplayName("invokeBidirectional should return error response when service throws exception") void testInvokeBidirectionalWhenThrowException() throws Exception { final String methodName = "throwException3"; - final Class serviceClass = stubService.getClass(); - final Method method = serviceClass.getMethod(methodName, Flux.class); + final Method method = stubService.getClass().getMethod(methodName, Flux.class); final MethodInfo methodInfo = new MethodInfo( - serviceClass.getName(), + StubService.NAMESPACE, methodName, method.getReturnType(), IS_RETURN_TYPE_SERVICE_MESSAGE, @@ -292,7 +286,7 @@ void testInvokeBidirectionalWhenThrowException() throws Exception { null); ServiceMessage message = - ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build(); + ServiceMessage.builder().qualifier(QUALIFIER_PREFIX + methodName).build(); // invokeOne final Flux invokeOne = @@ -307,16 +301,15 @@ void testInvokeBidirectionalWhenThrowException() throws Exception { @Test @DisplayName( - "invocation of auth method should return error " + "invocation of secured method should return error " + "if there're no auth.context and no authenticator") void testAuthMethodWhenNoContextAndNoAuthenticator() throws Exception { final String methodName = "helloAuthContext"; - final Class serviceClass = stubService.getClass(); - final Method method = serviceClass.getMethod(methodName); + final Method method = stubService.getClass().getMethod(methodName); final MethodInfo methodInfo = new MethodInfo( - serviceClass.getName(), + StubService.NAMESPACE, methodName, method.getReturnType(), IS_RETURN_TYPE_SERVICE_MESSAGE, @@ -340,7 +333,7 @@ void testAuthMethodWhenNoContextAndNoAuthenticator() throws Exception { null); ServiceMessage message = - ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build(); + ServiceMessage.builder().qualifier(QUALIFIER_PREFIX + methodName).build(); // invokeOne final Mono invokeOne = serviceMethodInvoker.invokeOne(message); @@ -352,16 +345,15 @@ void testAuthMethodWhenNoContextAndNoAuthenticator() throws Exception { @Test @DisplayName( - "invocation of auth method should return empty response " + "invocation of secured method should return successfull response " + "if auth.context exists and no authenticator") void testAuthMethodWhenThereIsContextAndNoAuthenticator() throws Exception { final String methodName = "helloAuthContext"; - final Class serviceClass = stubService.getClass(); - final Method method = serviceClass.getMethod(methodName); + final Method method = stubService.getClass().getMethod(methodName); final MethodInfo methodInfo = new MethodInfo( - serviceClass.getName(), + StubService.NAMESPACE, methodName, method.getReturnType(), IS_RETURN_TYPE_SERVICE_MESSAGE, @@ -385,7 +377,7 @@ void testAuthMethodWhenThereIsContextAndNoAuthenticator() throws Exception { null); ServiceMessage message = - ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build(); + ServiceMessage.builder().qualifier(QUALIFIER_PREFIX + methodName).build(); StepVerifier.create( Mono.deferContextual(context -> serviceMethodInvoker.invokeOne(message)) @@ -395,16 +387,15 @@ void testAuthMethodWhenThereIsContextAndNoAuthenticator() throws Exception { @Test @DisplayName( - "invocation of auth method should return empty response " + "invocation of secured method should return successfull response " + "if there're no auth.context but authenticator exists") void testAuthMethodWhenNoContextButThereIsAuthenticator() throws Exception { final String methodName = "helloAuthContext"; - final Class serviceClass = stubService.getClass(); - final Method method = serviceClass.getMethod(methodName); + final Method method = stubService.getClass().getMethod(methodName); final MethodInfo methodInfo = new MethodInfo( - serviceClass.getName(), + StubService.NAMESPACE, methodName, method.getReturnType(), IS_RETURN_TYPE_SERVICE_MESSAGE, @@ -433,7 +424,51 @@ void testAuthMethodWhenNoContextButThereIsAuthenticator() throws Exception { null); ServiceMessage message = - ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build(); + ServiceMessage.builder().qualifier(QUALIFIER_PREFIX + methodName).build(); + + StepVerifier.create(serviceMethodInvoker.invokeOne(message)).verifyComplete(); + } + + @Test + @DisplayName("invocation of secured method should contain RequestConext with all fields") + void testRequestContextWithDynamicQualifier() throws Exception { + final String methodName = "helloRequestContextWithDynamicQualifier"; + final String actionName = "hello/:foo/dynamic/:bar"; + final String actualActionName = "hello/foo123/dynamic/bar456"; + final Method method = stubService.getClass().getMethod(methodName); + + final MethodInfo methodInfo = + new MethodInfo( + StubService.NAMESPACE, + actionName, + method.getReturnType(), + IS_RETURN_TYPE_SERVICE_MESSAGE, + CommunicationMode.REQUEST_RESPONSE, + method.getParameterCount(), + Void.TYPE, + IS_REQUEST_TYPE_SERVICE_MESSAGE, + AUTH, + Schedulers.immediate()); + + //noinspection unchecked,rawtypes + Authenticator mockedAuthenticator = Mockito.mock(Authenticator.class); + Mockito.when(mockedAuthenticator.apply(ArgumentMatchers.anyMap())) + .thenReturn(Mono.just(AUTH_DATA)); + + serviceMethodInvoker = + new ServiceMethodInvoker( + method, + stubService, + methodInfo, + DefaultErrorMapper.INSTANCE, + dataDecoder, + authenticator, + principalMapper, + null, + null); + + ServiceMessage message = + ServiceMessage.builder().qualifier(QUALIFIER_PREFIX + actualActionName).build(); StepVerifier.create(serviceMethodInvoker.invokeOne(message)).verifyComplete(); } diff --git a/services-api/src/test/java/io/scalecube/services/methods/StubService.java b/services-api/src/test/java/io/scalecube/services/methods/StubService.java index 7acc50f09..d29d69e79 100644 --- a/services-api/src/test/java/io/scalecube/services/methods/StubService.java +++ b/services-api/src/test/java/io/scalecube/services/methods/StubService.java @@ -5,9 +5,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -@Service +@Service(StubService.NAMESPACE) public interface StubService { + String NAMESPACE = "v1/stubService"; + @ServiceMethod Mono returnNull(); @@ -28,4 +30,7 @@ public interface StubService { @ServiceMethod Mono helloAuthContext(); + + @ServiceMethod("hello/:foo/dynamic/:bar") + Mono helloRequestContextWithDynamicQualifier(); } diff --git a/services-api/src/test/java/io/scalecube/services/methods/StubServiceImpl.java b/services-api/src/test/java/io/scalecube/services/methods/StubServiceImpl.java index dab197f88..1b74ab480 100644 --- a/services-api/src/test/java/io/scalecube/services/methods/StubServiceImpl.java +++ b/services-api/src/test/java/io/scalecube/services/methods/StubServiceImpl.java @@ -1,5 +1,7 @@ package io.scalecube.services.methods; +import static org.junit.jupiter.api.Assertions.assertNotNull; + import io.scalecube.services.auth.Authenticator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -40,4 +42,18 @@ public Flux throwException3(Flux request) { public Mono helloAuthContext() { return Authenticator.deferSecured(StubServicePrincipal.class).then(); } + + @Override + public Mono helloRequestContextWithDynamicQualifier() { + return RequestContext.deferContextual() + .doOnNext( + requestContext -> { + assertNotNull(requestContext.headers(), "requestContext.headers"); + assertNotNull(requestContext.principal(), "requestContext.principal"); + assertNotNull(requestContext.pathVars(), "requestContext.pathVars"); + assertNotNull(requestContext.pathVar("foo"), "foo"); + assertNotNull(requestContext.pathVar("bar"), "bar"); + }) + .then(); + } } diff --git a/services-examples/src/main/java/io/scalecube/services/examples/GreetingService.java b/services-examples/src/main/java/io/scalecube/services/examples/GreetingService.java index f74e8cdd5..92765754f 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/GreetingService.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/GreetingService.java @@ -60,4 +60,7 @@ public interface GreetingService { @RequestType(EmptyGreetingRequest.class) @ResponseType(EmptyGreetingResponse.class) Mono emptyGreetingMessage(ServiceMessage request); + + @ServiceMethod("hello/:someVar") + Mono helloDynamicQualifier(Long value); } diff --git a/services-examples/src/main/java/io/scalecube/services/examples/GreetingServiceCancelCallback.java b/services-examples/src/main/java/io/scalecube/services/examples/GreetingServiceCancelCallback.java index d86d2bb0d..ec452e31c 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/GreetingServiceCancelCallback.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/GreetingServiceCancelCallback.java @@ -88,4 +88,9 @@ public Mono emptyGreeting(EmptyGreetingRequest request) { public Mono emptyGreetingMessage(ServiceMessage request) { return greetingService.emptyGreetingMessage(request).doOnCancel(onCancel); } + + @Override + public Mono helloDynamicQualifier(Long value) { + return greetingService.helloDynamicQualifier(value).doOnCancel(onCancel); + } } diff --git a/services-examples/src/main/java/io/scalecube/services/examples/GreetingServiceImpl.java b/services-examples/src/main/java/io/scalecube/services/examples/GreetingServiceImpl.java index 987b250d3..a0c654518 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/GreetingServiceImpl.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/GreetingServiceImpl.java @@ -1,6 +1,7 @@ package io.scalecube.services.examples; import io.scalecube.services.api.ServiceMessage; +import io.scalecube.services.methods.RequestContext; import java.time.Duration; import java.util.Collections; import java.util.List; @@ -94,9 +95,14 @@ public Mono emptyGreeting(EmptyGreetingRequest request) { @Override public Mono emptyGreetingMessage(ServiceMessage request) { - EmptyGreetingRequest greetingRequest = request.data(); ServiceMessage response = ServiceMessage.from(request).data(new EmptyGreetingResponse()).build(); return Mono.just(response); } + + @Override + public Mono helloDynamicQualifier(Long value) { + return RequestContext.deferContextual() + .map(context -> context.pathVar("someVar") + "@" + value); + } } diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayTest.java index 855547f68..61f157893 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayTest.java @@ -208,4 +208,16 @@ void shouldReturnOnEmptyMessageGreeting() { void shouldReturnSomeException() { StepVerifier.create(errorService.oneError()).expectError(SomeException.class).verify(TIMEOUT); } + + @Test + void shouldWorkWithDynamicQualifier() { + final var value = "12345"; + final var data = System.currentTimeMillis(); + final var request = + ServiceMessage.builder().qualifier("greeting/hello/" + value).data(data).build(); + + StepVerifier.create(serviceCall.requestOne(request, String.class).map(ServiceMessage::data)) + .assertNext(result -> assertEquals(value + "@" + data, result)) + .verifyComplete(); + } } diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayTest.java index 7b0871642..4d186a0bf 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayTest.java @@ -181,4 +181,16 @@ void shouldReturnOnEmptyMessageGreeting() { void shouldReturnSomeException() { StepVerifier.create(errorService.oneError()).expectError(SomeException.class).verify(TIMEOUT); } + + @Test + void shouldWorkWithDynamicQualifier() { + final var value = "12345"; + final var data = System.currentTimeMillis(); + final var request = + ServiceMessage.builder().qualifier("greeting/hello/" + value).data(data).build(); + + StepVerifier.create(serviceCall.requestOne(request, String.class).map(ServiceMessage::data)) + .assertNext(result -> assertEquals(value + "@" + data, result)) + .verifyComplete(); + } } diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayTest.java index ea990fc9c..1a0306cc3 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayTest.java @@ -1,6 +1,7 @@ package io.scalecube.services.gateway.websocket; import static io.scalecube.services.gateway.GatewayErrorMapperImpl.ERROR_MAPPER; +import static org.junit.jupiter.api.Assertions.assertEquals; import io.scalecube.services.Address; import io.scalecube.services.Microservices; @@ -241,4 +242,16 @@ void shouldHeartbeat() { .assertNext(pongValue -> Assertions.assertEquals(value, pongValue)) .verifyComplete(); } + + @Test + void shouldWorkWithDynamicQualifier() { + final var value = "12345"; + final var data = System.currentTimeMillis(); + final var request = + ServiceMessage.builder().qualifier("greeting/hello/" + value).data(data).build(); + + StepVerifier.create(serviceCall.requestOne(request, String.class).map(ServiceMessage::data)) + .assertNext(result -> assertEquals(value + "@" + data, result)) + .verifyComplete(); + } } diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayTest.java index 44e60d807..2e153e9ca 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayTest.java @@ -1,6 +1,7 @@ package io.scalecube.services.gateway.websocket; import static io.scalecube.services.gateway.GatewayErrorMapperImpl.ERROR_MAPPER; +import static org.junit.jupiter.api.Assertions.assertEquals; import io.scalecube.services.Address; import io.scalecube.services.Microservices; @@ -212,4 +213,16 @@ void shouldReturnSomeExceptionOnFlux() { void shouldReturnSomeExceptionOnMono() { StepVerifier.create(errorService.oneError()).expectError(SomeException.class).verify(TIMEOUT); } + + @Test + void shouldWorkWithDynamicQualifier() { + final var value = "12345"; + final var data = System.currentTimeMillis(); + final var request = + ServiceMessage.builder().qualifier("greeting/hello/" + value).data(data).build(); + + StepVerifier.create(serviceCall.requestOne(request, String.class).map(ServiceMessage::data)) + .assertNext(result -> assertEquals(value + "@" + data, result)) + .verifyComplete(); + } } diff --git a/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java b/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java index 50ba5ae34..af0da8950 100644 --- a/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java +++ b/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java @@ -4,6 +4,7 @@ import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.ServiceInfo; import io.scalecube.services.ServiceReference; +import io.scalecube.services.api.DynamicQualifier; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.methods.MethodInfo; import io.scalecube.services.methods.ServiceMethodInvoker; @@ -16,11 +17,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.stream.Collectors; +import java.util.stream.Stream; import org.jctools.maps.NonBlockingHashMap; import reactor.core.scheduler.Scheduler; @@ -34,9 +32,13 @@ public class ServiceRegistryImpl implements ServiceRegistry { private final Map serviceEndpoints = new NonBlockingHashMap<>(); private final Map> serviceReferencesByQualifier = new NonBlockingHashMap<>(); + private final Map methodInvokerByQualifier = + new NonBlockingHashMap<>(); + private final Map> serviceReferencesByPattern = + new NonBlockingHashMap<>(); + private final Map methodInvokerByPattern = + new NonBlockingHashMap<>(); private final List serviceInfos = new CopyOnWriteArrayList<>(); - private final ConcurrentMap methodInvokers = - new ConcurrentHashMap<>(); public ServiceRegistryImpl(Map schedulers) { this.schedulers = schedulers; @@ -50,46 +52,73 @@ public List listServiceEndpoints() { @Override public List listServiceReferences() { - return serviceReferencesByQualifier.values().stream() - .flatMap(Collection::stream) - .collect(Collectors.toList()); + return Stream.concat( + serviceReferencesByQualifier.values().stream().flatMap(Collection::stream), + serviceReferencesByPattern.values().stream().flatMap(Collection::stream)) + .toList(); } @Override public List lookupService(ServiceMessage request) { - List list = serviceReferencesByQualifier.get(request.qualifier()); - if (list == null || list.isEmpty()) { - return Collections.emptyList(); + final var contentType = request.dataFormatOrDefault(); + final var qualifier = request.qualifier(); + + // Match by exact-match + + final var list = serviceReferencesByQualifier.get(qualifier); + if (list != null) { + return list.stream().filter(sr -> sr.contentTypes().contains(contentType)).toList(); } - String contentType = request.dataFormatOrDefault(); - return list.stream() - .filter(sr -> sr.contentTypes().contains(contentType)) - .collect(Collectors.toList()); + + // Match by dynamic-qualifier + + for (var entry : serviceReferencesByPattern.entrySet()) { + final var dynamicQualifier = entry.getKey(); + if (dynamicQualifier.matchQualifier(qualifier) != null) { + return entry.getValue(); + } + } + + return Collections.emptyList(); } @Override public boolean registerService(ServiceEndpoint serviceEndpoint) { - boolean success = serviceEndpoints.putIfAbsent(serviceEndpoint.id(), serviceEndpoint) == null; - if (success) { + boolean putIfAbsent = + serviceEndpoints.putIfAbsent(serviceEndpoint.id(), serviceEndpoint) == null; + if (putIfAbsent) { + serviceEndpoint.serviceReferences().forEach(this::addServiceReference); LOGGER.log(Level.DEBUG, "ServiceEndpoint registered: {0}", serviceEndpoint); - serviceEndpoint.serviceReferences().forEach(this::populateServiceReferences); } - return success; + return putIfAbsent; } @Override public ServiceEndpoint unregisterService(String endpointId) { ServiceEndpoint serviceEndpoint = serviceEndpoints.remove(endpointId); if (serviceEndpoint != null) { - LOGGER.log(Level.DEBUG, "ServiceEndpoint unregistered: {0}", serviceEndpoint); - - List serviceReferencesOfEndpoint = - serviceReferencesByQualifier.values().stream() - .flatMap(Collection::stream) - .filter(sr -> sr.endpointId().equals(endpointId)) - .toList(); + // Clean exact-match service references + + serviceReferencesByQualifier.values().stream() + .flatMap(Collection::stream) + .filter(sr -> sr.endpointId().equals(endpointId)) + .forEach( + value -> + serviceReferencesByQualifier.compute( + value.qualifier(), (key, list) -> removeServiceReference(value, list))); + + // Clean dynamic-qualifier service references + + serviceReferencesByPattern.values().stream() + .flatMap(Collection::stream) + .filter(sr -> sr.endpointId().equals(endpointId)) + .forEach( + value -> + serviceReferencesByPattern.compute( + value.dynamicQualifier(), + (key, list) -> removeServiceReference(value, list))); - serviceReferencesOfEndpoint.forEach(this::cleanServiceReferences); + LOGGER.log(Level.DEBUG, "ServiceEndpoint unregistered: {0}", serviceEndpoint); } return serviceEndpoint; } @@ -133,7 +162,7 @@ public void registerService(ServiceInfo serviceInfo) { Reflect.isSecured(method), Reflect.executeOnScheduler(serviceMethod, schedulers)); - checkMethodInvokerDoesntExist(methodInfo); + checkMethodInvokerIsNotPresent(methodInfo); ServiceMethodInvoker methodInvoker = new ServiceMethodInvoker( @@ -147,42 +176,73 @@ public void registerService(ServiceInfo serviceInfo) { serviceInfo.logger(), serviceInfo.level()); - methodInvokers.put(methodInfo.qualifier(), methodInvoker); + if (methodInfo.dynamicQualifier() == null) { + methodInvokerByQualifier.put(methodInfo.qualifier(), methodInvoker); + } else { + methodInvokerByPattern.put( + methodInfo.dynamicQualifier(), methodInvoker); + } })); } - private void checkMethodInvokerDoesntExist(MethodInfo methodInfo) { - if (methodInvokers.containsKey(methodInfo.qualifier())) { + private void checkMethodInvokerIsNotPresent(MethodInfo methodInfo) { + if (methodInvokerByQualifier.containsKey(methodInfo.qualifier())) { LOGGER.log(Level.ERROR, "MethodInvoker already exists, methodInfo: {0}", methodInfo); throw new IllegalStateException("MethodInvoker already exists"); } + if (methodInfo.dynamicQualifier() != null) { + if (methodInvokerByPattern.containsKey(methodInfo.dynamicQualifier())) { + LOGGER.log(Level.ERROR, "MethodInvoker already exists, methodInfo: {0}", methodInfo); + throw new IllegalStateException("MethodInvoker already exists"); + } + } } @Override public ServiceMethodInvoker getInvoker(String qualifier) { - return methodInvokers.get(Objects.requireNonNull(qualifier, "[getInvoker] qualifier")); + // Match by exact-match + + final var methodInvoker = methodInvokerByQualifier.get(qualifier); + if (methodInvoker != null) { + return methodInvoker; + } + + // Match by dynamic-qualifier + + for (var entry : methodInvokerByPattern.entrySet()) { + final var invoker = entry.getValue(); + final var dynamicQualifier = invoker.methodInfo().dynamicQualifier(); + if (dynamicQualifier.matchQualifier(qualifier) != null) { + return invoker; + } + } + + return null; } @Override public List listServices() { - return serviceInfos; + return new ArrayList<>(serviceInfos); } - private void populateServiceReferences(ServiceReference sr) { - serviceReferencesByQualifier - .computeIfAbsent(sr.qualifier(), key -> new CopyOnWriteArrayList<>()) - .add(sr); + private void addServiceReference(ServiceReference sr) { + if (sr.dynamicQualifier() == null) { + serviceReferencesByQualifier + .computeIfAbsent(sr.qualifier(), key -> new CopyOnWriteArrayList<>()) + .add(sr); + } else { + serviceReferencesByPattern + .computeIfAbsent(sr.dynamicQualifier(), key -> new CopyOnWriteArrayList<>()) + .add(sr); + } } - private void cleanServiceReferences(ServiceReference sr) { - serviceReferencesByQualifier.compute( - sr.qualifier(), - (key, list) -> { - if (list == null || list.isEmpty()) { - return null; - } - list.remove(sr); - return !list.isEmpty() ? list : null; - }); + private static List removeServiceReference( + ServiceReference value, List list) { + if (list == null || list.isEmpty()) { + return null; + } + list.remove(value); + return list.isEmpty() ? null : list; } } diff --git a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java index 5f231dad3..bb2abfcb8 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java @@ -46,6 +46,7 @@ public class ServiceCallLocalTest extends BaseTest { public static final int TIMEOUT = 3; + private Duration timeout = Duration.ofSeconds(TIMEOUT); private static Microservices provider; @@ -74,18 +75,6 @@ public void test_local_async_no_params() { GREETING_NO_PARAMS_REQUEST.qualifier(), message.qualifier(), "Didn't get desired response"); } - private static Microservices serviceProvider() { - return Microservices.start( - new Context() - .discovery( - serviceEndpoint -> - new ScalecubeServiceDiscovery() - .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .options(opts -> opts.metadata(serviceEndpoint))) - .transport(RSocketServiceTransport::new) - .services(new GreetingServiceImpl())); - } - @Test public void test_local_void_greeting() { // WHEN @@ -216,6 +205,18 @@ public void test_custom_error_mapper() { .verify(timeout); } + private static Microservices serviceProvider() { + return Microservices.start( + new Context() + .discovery( + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl())); + } + private static Optional route( ServiceRegistry serviceRegistry, ServiceMessage request) { return Optional.of( diff --git a/services/src/test/java/io/scalecube/services/ServiceLocalTest.java b/services/src/test/java/io/scalecube/services/ServiceLocalTest.java index d20ccf96a..f3ea06193 100644 --- a/services/src/test/java/io/scalecube/services/ServiceLocalTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceLocalTest.java @@ -23,7 +23,7 @@ public class ServiceLocalTest extends BaseTest { - private static final Duration timeout = Duration.ofSeconds(3); + private static final Duration TIMEOUT = Duration.ofSeconds(3); private Microservices microservices; @@ -47,7 +47,7 @@ public void test_local_greeting_request_completes_before_timeout() { GreetingResponse result = service .greetingRequestTimeout(new GreetingRequest("joe", Duration.ofMillis(500))) - .block(timeout.plusMillis(500)); + .block(TIMEOUT.plusMillis(500)); // print the greeting. System.out.println("2. greeting_request_completes_before_timeout : " + result.getResult()); @@ -95,7 +95,7 @@ public void test_local_void_greeting() { GreetingService service = createProxy(microservices); // call the service. - service.greetingVoid(new GreetingRequest("joe")).block(timeout); + service.greetingVoid(new GreetingRequest("joe")).block(TIMEOUT); System.out.println("test_local_void_greeting done."); } @@ -155,7 +155,7 @@ public void test_local_greeting_request_timeout_expires() { assertThrows( RuntimeException.class, () -> - Mono.from(service.greetingRequestTimeout(new GreetingRequest("joe", timeout))) + Mono.from(service.greetingRequestTimeout(new GreetingRequest("joe", TIMEOUT))) .timeout(Duration.ofMillis(500)) .block()); assertTrue( @@ -182,7 +182,7 @@ public void test_local_async_greeting_return_Message() { } @Test - void test_local_greeting_message() { + public void test_local_greeting_message() { GreetingService service = createProxy(microservices); ServiceMessage request = ServiceMessage.builder().data(new GreetingRequest("joe")).build(); @@ -199,7 +199,7 @@ void test_local_greeting_message() { assertEquals("hello to: joe", resp.getResult()); }) .expectComplete() - .verify(timeout); + .verify(TIMEOUT); StepVerifier.create(service.greetingMessage2(request)) .assertNext( @@ -208,7 +208,7 @@ void test_local_greeting_message() { assertEquals("hello to: joe", resp.getResult()); }) .expectComplete() - .verify(timeout); + .verify(TIMEOUT); // using serviceCall directly ServiceCall serviceCall = microservices.call(); @@ -227,7 +227,7 @@ void test_local_greeting_message() { assertEquals("hello to: joe", resp.getResult()); }) .expectComplete() - .verify(timeout); + .verify(TIMEOUT); StepVerifier.create( serviceCall.requestOne( @@ -243,7 +243,7 @@ void test_local_greeting_message() { assertEquals("hello to: joe", resp.getResult()); }) .expectComplete() - .verify(timeout); + .verify(TIMEOUT); } @Test @@ -379,7 +379,20 @@ public void test_local_bidi_greeting_expect_message_GreetingResponse() { .verify(Duration.ofSeconds(3)); } - private GreetingService createProxy(Microservices gateway) { + @Test + public void test_dynamic_qualifier() { + final var value = "12345"; + final var data = System.currentTimeMillis(); + final var request = + ServiceMessage.builder().qualifier("v1/greetings/hello/" + value).data(data).build(); + + StepVerifier.create( + microservices.call().requestOne(request, String.class).map(ServiceMessage::data)) + .assertNext(result -> assertEquals(value + "@" + data, result)) + .verifyComplete(); + } + + private static GreetingService createProxy(Microservices gateway) { return gateway.call().api(GreetingService.class); // create proxy for GreetingService API } } diff --git a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java index ca2433924..2be79b8b1 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java @@ -38,7 +38,6 @@ public class ServiceRemoteTest extends BaseTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); - public static final Duration TIMEOUT2 = Duration.ofSeconds(6); private static Microservices gateway; private static Address gatewayAddress; @@ -203,7 +202,7 @@ public void test_remote_async_greeting_return_Message() { } @Test - void test_remote_greeting_message() { + public void test_remote_greeting_message() { GreetingService service = createProxy(); ServiceMessage request = ServiceMessage.builder().data(new GreetingRequest("joe")).build(); @@ -561,6 +560,18 @@ public void test_many_stream_block_first() { } } + @Test + public void test_dynamic_qualifier() { + final var value = "12345"; + final var data = System.currentTimeMillis(); + final var request = + ServiceMessage.builder().qualifier("v1/greetings/hello/" + value).data(data).build(); + + StepVerifier.create(gateway.call().requestOne(request, String.class).map(ServiceMessage::data)) + .assertNext(result -> assertEquals(value + "@" + data, result)) + .verifyComplete(); + } + private GreetingService createProxy() { return gateway.call().api(GreetingService.class); // create proxy for GreetingService API } diff --git a/services/src/test/java/io/scalecube/services/registry/ServiceRegistryImplTest.java b/services/src/test/java/io/scalecube/services/registry/ServiceRegistryImplTest.java new file mode 100644 index 000000000..447ef8d63 --- /dev/null +++ b/services/src/test/java/io/scalecube/services/registry/ServiceRegistryImplTest.java @@ -0,0 +1,187 @@ +package io.scalecube.services.registry; + +import static io.scalecube.services.transport.jackson.JacksonCodec.CONTENT_TYPE; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.mock; + +import io.scalecube.services.Address; +import io.scalecube.services.ServiceEndpoint; +import io.scalecube.services.ServiceInfo; +import io.scalecube.services.ServiceMethodDefinition; +import io.scalecube.services.ServiceRegistration; +import io.scalecube.services.annotations.Service; +import io.scalecube.services.annotations.ServiceMethod; +import io.scalecube.services.api.ServiceMessage; +import io.scalecube.services.exceptions.ServiceProviderErrorMapper; +import io.scalecube.services.transport.api.ServiceMessageDataDecoder; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +class ServiceRegistryImplTest { + + private final ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl(null); + private final ServiceProviderErrorMapper errorMapper = mock(ServiceProviderErrorMapper.class); + private final ServiceMessageDataDecoder dataDecoder = mock(ServiceMessageDataDecoder.class); + + @Test + void testRegisterService() { + serviceRegistry.registerService( + ServiceInfo.fromServiceInstance(new HelloOneImpl()) + .errorMapper(errorMapper) + .dataDecoder(dataDecoder) + .build()); + serviceRegistry.registerService( + ServiceInfo.fromServiceInstance(new HelloTwoImpl()) + .errorMapper(errorMapper) + .dataDecoder(dataDecoder) + .build()); + assertEquals(2, serviceRegistry.listServices().size()); + } + + @Test + void testRegisterServiceRepeatedlyNotAllowed() { + final var helloOne = + ServiceInfo.fromServiceInstance(new HelloOneImpl()) + .errorMapper(errorMapper) + .dataDecoder(dataDecoder) + .build(); + final var helloTwo = + ServiceInfo.fromServiceInstance(new HelloTwoImpl()) + .errorMapper(errorMapper) + .dataDecoder(dataDecoder) + .build(); + serviceRegistry.registerService(helloOne); + serviceRegistry.registerService(helloTwo); + assertEquals(2, serviceRegistry.listServices().size()); + assertThrows(IllegalStateException.class, () -> serviceRegistry.registerService(helloOne)); + assertThrows(IllegalStateException.class, () -> serviceRegistry.registerService(helloTwo)); + } + + @Test + void testRegisterThenUnregisterServiceEndpoint() { + final var n = 10; + for (int i = 0; i < n; i++) { + serviceRegistry.registerService( + ServiceEndpoint.builder() + .id("endpoint" + i) + .address(Address.create("endpoint" + i, 4848)) + .contentTypes(Set.of(CONTENT_TYPE)) + .serviceRegistrations( + List.of( + new ServiceRegistration( + "greeting", + new HashMap<>(), + List.of( + new ServiceMethodDefinition("hello"), + new ServiceMethodDefinition("hello/:pathVar"))))) + .build()); + } + + assertEquals(n, serviceRegistry.listServiceEndpoints().size()); + assertEquals(n << 1, serviceRegistry.listServiceReferences().size()); + + for (int i = 0; i < n; i++) { + assertNotNull(serviceRegistry.unregisterService("endpoint" + i)); + } + + assertEquals(0, serviceRegistry.listServiceEndpoints().size()); + assertEquals(0, serviceRegistry.listServiceReferences().size()); + } + + @Test + void testGetInvoker() { + serviceRegistry.registerService( + ServiceInfo.fromServiceInstance(new HelloOneImpl()) + .errorMapper(errorMapper) + .dataDecoder(dataDecoder) + .build()); + serviceRegistry.registerService( + ServiceInfo.fromServiceInstance(new HelloTwoImpl()) + .errorMapper(errorMapper) + .dataDecoder(dataDecoder) + .build()); + assertNotNull(serviceRegistry.getInvoker("greeting/hello")); + assertNotNull(serviceRegistry.getInvoker("greeting/hello/12345")); + assertNotNull(serviceRegistry.getInvoker("greeting/hello/67890")); + assertNull(serviceRegistry.getInvoker("greeting/hola/that/not/exist")); + } + + @Test + void testLookupService() { + final var n = 10; + for (int i = 0; i < n; i++) { + serviceRegistry.registerService( + ServiceEndpoint.builder() + .id("endpoint" + i) + .address(Address.create("endpoint" + i, 4848)) + .contentTypes(Set.of(CONTENT_TYPE)) + .serviceRegistrations( + List.of( + new ServiceRegistration( + "greeting", + new HashMap<>(), + List.of( + new ServiceMethodDefinition("hello"), + new ServiceMethodDefinition("hello/:pathVar"))))) + .build()); + } + assertEquals( + n, + serviceRegistry + .lookupService(ServiceMessage.builder().qualifier("greeting/hello").build()) + .size()); + assertEquals( + n, + serviceRegistry + .lookupService(ServiceMessage.builder().qualifier("greeting/hello/12345").build()) + .size()); + assertEquals( + n, + serviceRegistry + .lookupService(ServiceMessage.builder().qualifier("greeting/hello/67890").build()) + .size()); + assertEquals( + 0, + serviceRegistry + .lookupService( + ServiceMessage.builder().qualifier("greeting/hola/that/not/exist").build()) + .size()); + } + + @Service(HelloOne.NAMESPACE) + interface HelloOne { + + String NAMESPACE = "greeting"; + + @ServiceMethod + Mono hello(); + } + + @Service(HelloTwo.NAMESPACE) + interface HelloTwo { + + String NAMESPACE = "greeting"; + + @ServiceMethod("hello/:pathVar") + Mono helloPathVar(); + } + + static class HelloOneImpl implements HelloOne { + + @Override + public Mono hello() { + return Mono.just("" + System.currentTimeMillis()); + } + } + + static class HelloTwoImpl implements HelloTwo { + + @Override + public Mono helloPathVar() { + return Mono.just("" + System.currentTimeMillis()); + } + } +} diff --git a/services/src/test/java/io/scalecube/services/sut/GreetingService.java b/services/src/test/java/io/scalecube/services/sut/GreetingService.java index a78608dae..d519e1b70 100644 --- a/services/src/test/java/io/scalecube/services/sut/GreetingService.java +++ b/services/src/test/java/io/scalecube/services/sut/GreetingService.java @@ -100,4 +100,7 @@ Flux bidiGreetingIllegalArgumentExceptionMessage( @ServiceMethod Flux manyStream(Long cnt); + + @ServiceMethod("hello/:someVar") + Mono helloDynamicQualifier(Long value); } diff --git a/services/src/test/java/io/scalecube/services/sut/GreetingServiceImpl.java b/services/src/test/java/io/scalecube/services/sut/GreetingServiceImpl.java index b50326e30..983da87f0 100644 --- a/services/src/test/java/io/scalecube/services/sut/GreetingServiceImpl.java +++ b/services/src/test/java/io/scalecube/services/sut/GreetingServiceImpl.java @@ -4,6 +4,7 @@ import io.scalecube.services.annotations.Inject; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.exceptions.ForbiddenException; +import io.scalecube.services.methods.RequestContext; import java.util.stream.LongStream; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -16,8 +17,6 @@ public final class GreetingServiceImpl implements GreetingService { private int instanceId; - private boolean ci = System.getenv("TRAVIS") != null; - public GreetingServiceImpl() {} public GreetingServiceImpl(int id) { @@ -46,7 +45,8 @@ public Mono greetingNotAuthorized(GreetingRequest name) { @Override public Mono greetingRequestTimeout(GreetingRequest request) { - print("[greetingRequestTimeout] Hello... i am a service an just recived a message:" + request); + System.out.println( + "[greetingRequestTimeout] Hello... i am a service an just recived a message:" + request); return Mono.delay(request.getDuration()) .flatMap( i -> @@ -57,7 +57,7 @@ public Mono greetingRequestTimeout(GreetingRequest request) { @Override public Mono greetingNoParams() { - print( + System.out.println( "[greetingNoParams] Hello... i am a service an just recived " + "a call bu i dont know from who."); return Mono.just("hello unknown"); @@ -65,7 +65,7 @@ public Mono greetingNoParams() { @Override public Mono greetingRequest(GreetingRequest request) { - print( + System.out.println( instanceId + ":[greetingRequest] Hello... i am a service an just recived a message:" + request); @@ -123,7 +123,8 @@ public Flux greetingFluxEmpty(GreetingRequest request) { @Override public Mono greetingMessage(ServiceMessage request) { - print("[greetingMessage] Hello... i am a service an just recived a message:" + request); + System.out.println( + "[greetingMessage] Hello... i am a service an just recived a message:" + request); GreetingRequest data = request.data(); GreetingResponse resp = new GreetingResponse("hello to: " + data.getName(), "1"); return Mono.just(ServiceMessage.builder().data(resp).build()); @@ -131,7 +132,8 @@ public Mono greetingMessage(ServiceMessage request) { @Override public Mono greetingMessage2(ServiceMessage request) { - print("[greetingMessage] Hello... i am a service an just recived a message:" + request); + System.out.println( + "[greetingMessage] Hello... i am a service an just recived a message:" + request); GreetingRequest data = request.data(); GreetingResponse resp = new GreetingResponse("hello to: " + data.getName(), "1"); return Mono.just(resp); @@ -139,51 +141,55 @@ public Mono greetingMessage2(ServiceMessage request) { @Override public Mono greetingVoid(GreetingRequest request) { - print("[greetingVoid] Hello... i am a service an just recived a message:" + request); - print(" hello to: " + request.getName()); + System.out.println( + "[greetingVoid] Hello... i am a service an just recived a message:" + request); + System.out.println(" hello to: " + request.getName()); return Mono.empty(); } @Override public Mono failingVoid(GreetingRequest request) { - print("[failingVoid] Hello... i am a service an just recived a message:" + request); + System.out.println( + "[failingVoid] Hello... i am a service an just recived a message:" + request); return Mono.error(new IllegalArgumentException(request.toString())); } @Override public Mono throwingVoid(GreetingRequest request) { - print("[failingVoid] Hello... i am a service an just recived a message:" + request); + System.out.println( + "[failingVoid] Hello... i am a service an just recived a message:" + request); throw new IllegalArgumentException(request.toString()); } @Override public Mono failingRequest(GreetingRequest request) { - print("[failingRequest] Hello... i am a service an just recived a message:" + request); + System.out.println( + "[failingRequest] Hello... i am a service an just recived a message:" + request); return Mono.error(new IllegalArgumentException(request.toString())); } @Override public Mono exceptionRequest(GreetingRequest request) { - print("[exceptionRequest] Hello... i am a service an just recived a message:" + request); + System.out.println( + "[exceptionRequest] Hello... i am a service an just recived a message:" + request); throw new IllegalArgumentException(request.toString()); } @Override public Mono emptyGreeting(EmptyGreetingRequest request) { - print("[emptyGreeting] service received a message:" + request); + System.out.println("[emptyGreeting] service received a message:" + request); return Mono.just(new EmptyGreetingResponse()); } @Override public Mono emptyGreetingMessage(ServiceMessage request) { - print("[emptyGreetingMessage] service received a message:" + request); - EmptyGreetingRequest request1 = request.data(); + System.out.println("[emptyGreetingMessage] service received a message:" + request); return Mono.just(ServiceMessage.from(request).data(new EmptyGreetingResponse()).build()); } @Override public void notifyGreeting() { - print("[notifyGreeting] Hello... i am a service and i just notefied"); + System.out.println("[notifyGreeting] Hello... i am a service and i just notefied"); } @Override @@ -192,9 +198,9 @@ public Flux manyStream(Long cnt) { () -> Flux.fromStream(LongStream.range(0, cnt).boxed()).publishOn(Schedulers.parallel())); } - private void print(String message) { - if (!ci) { - System.out.println(message); - } + @Override + public Mono helloDynamicQualifier(Long value) { + return RequestContext.deferContextual() + .map(context -> context.pathVar("someVar") + "@" + value); } }