Skip to content

Commit

Permalink
fixing ITs
Browse files Browse the repository at this point in the history
  • Loading branch information
sunlnu committed Jun 21, 2024
1 parent fc1d933 commit 0821e3c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,7 @@ public Optional<Stream> update(Stream stream) throws ValidationException {
if (existing.isEmpty()) {
throw new ValidationException("Can't update " + stream.getKey() + " because it doesn't exist");
}

if (stream.getSchemaKey() == null) {
stream.setSchemaKey(existing.get().getSchemaKey());
} else if (!existing.get().getSchemaKey().equals(stream.getSchemaKey())) {
throw new ValidationException("Stream = " + stream.getKey() + " update failed, because existing schemaKey = " +
existing.get().getSchemaKey() + " is not matching with given schemaKey = " + stream.getSchemaKey());
}

validateSchemaKey(stream, existing.get());
streamValidator.validateForUpdate(stream, existing.get());
stream.setSpecification(handlerService.handleUpdate(stream, existing.get()));
return saveSpecification(stream);
Expand Down Expand Up @@ -183,4 +176,13 @@ private java.util.stream.Stream<Process> allProcessesForStream(Stream stream) {
process.getOutputs().stream().anyMatch(output -> output.getStream().equals(stream.getKey()))
);
}

private void validateSchemaKey(Stream stream, Stream existing) {
if (stream.getSchemaKey() == null) {
stream.setSchemaKey(existing.getSchemaKey());
} else if (!existing.getSchemaKey().equals(stream.getSchemaKey())) {
throw new ValidationException("Stream = " + stream.getKey() + " update failed, because existing schemaKey = " +
existing.getSchemaKey() + " is not matching with given schemaKey = " + stream.getSchemaKey());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import com.expediagroup.streamplatform.streamregistry.graphql.client.test.type.StreamKeyInput;
import com.expediagroup.streamplatform.streamregistry.graphql.client.test.type.StreamKeyQuery;
import com.expediagroup.streamplatform.streamregistry.it.helpers.AbstractTestStage;
import com.expediagroup.streamplatform.streamregistry.model.keys.SchemaKey;
import com.expediagroup.streamplatform.streamregistry.model.keys.StreamKey;

public class StreamTestStage extends AbstractTestStage {

Expand Down Expand Up @@ -72,15 +70,16 @@ public void update() {

@Override
public void upsert() {

try {
client.getOptionalData(factory.upsertStreamMutationBuilder()
.schema(null)
.build()).get();
} catch (RuntimeException ex) {
assertEquals("Schema does not exist", ex.getMessage());
}

/**
* Not throw exception, because we are over-riding schemaKey if schemaKey is null
*/
client.getOptionalData(factory.upsertStreamMutationBuilder()
.schema(null)
.build()).get();

/**
* This should throw exception if schemaKey is matches with the existing schemaKey.
*/
try {
SchemaKeyInput nonExisting = SchemaKeyInput.builder()
.domain(factory.domainName)
Expand All @@ -89,12 +88,8 @@ public void upsert() {
client.getOptionalData(factory.upsertStreamMutationBuilder()
.schema(nonExisting)
.build()).get();
} catch (RuntimeException ex) {
String message = ex.getMessage().replace("com.expediagroup.streamplatform.streamregistry.state.graphql.ApolloResponseException: Unexpected response: ", "");
StreamKey streamKey = new StreamKey(factory.domainName, factory.streamName, 1);
SchemaKey existingSchemaKey = new SchemaKey(factory.domainName, factory.schemaName);
SchemaKey nonExisting = new SchemaKey(factory.domainName, "nonExisting");
assertEquals("Stream = " + streamKey + " update failed, because existing schemaKey = " + existingSchemaKey + " is not matching with given schemaKey = " + nonExisting, message);
} catch(RuntimeException ex ) {
assertTrue(ex.getMessage().contains("update failed, because existing schemaKey"));
}

Object data = client.getOptionalData(factory.upsertStreamMutationBuilder().build()).get();
Expand Down

0 comments on commit 0821e3c

Please sign in to comment.