diff --git a/digdag-client/src/main/java/io/digdag/client/config/Config.java b/digdag-client/src/main/java/io/digdag/client/config/Config.java index ac388e2f14..d943a0bf3c 100644 --- a/digdag-client/src/main/java/io/digdag/client/config/Config.java +++ b/digdag-client/src/main/java/io/digdag/client/config/Config.java @@ -36,6 +36,12 @@ public class Config this.object = (ObjectNode) object; } + protected Config(Config config) + { + this.mapper = config.mapper; + this.object = config.object.deepCopy(); + } + // here uses JsonNode instead of ObjectNode for workaround of https://github.com/FasterXML/jackson-databind/issues/941 @JsonCreator public static Config deserializeFromJackson(@JacksonInject ObjectMapper mapper, JsonNode object) @@ -57,21 +63,39 @@ public Config set(String key, Object v) if (v == null) { remove(key); } else { - object.set(key, writeObject(v)); + set(key, writeObject(v)); + } + return this; + } + + public Config setIfNotSet(String key, Object v) + { + if (!has(key)) { + if (v != null) { + set(key, writeObject(v)); + } } return this; } public Config setNested(String key, Config v) { - object.set(key, v.object); + set(key, v.object); return this; } public Config setAll(Config other) { for (Map.Entry field : other.getEntries()) { - object.set(field.getKey(), field.getValue()); + set(field.getKey(), field.getValue()); + } + return this; + } + + public Config setAllIfNotSet(Config other) + { + for (Map.Entry field : other.getEntries()) { + setIfNotSet(field.getKey(), field.getValue()); } return this; } @@ -94,7 +118,7 @@ public Config remove(String key) public Config deepCopy() { - return new Config(mapper, object.deepCopy()); + return new Config(this); } //public Config merge(Config other) @@ -176,7 +200,7 @@ public E convert(Class type) public E get(String key, Class type) { - JsonNode value = object.get(key); + JsonNode value = get(key); if (value == null) { throw new ConfigException("Parameter '"+key+"' is required but not set"); } @@ -185,7 +209,7 @@ public E get(String key, Class type) public Object get(String key, JavaType type) { - JsonNode value = object.get(key); + JsonNode value = get(key); if (value == null) { throw new ConfigException("Parameter '"+key+"' is required but not set"); } @@ -200,7 +224,7 @@ public E get(String key, TypeReference type) public E get(String key, Class type, E defaultValue) { - JsonNode value = object.get(key); + JsonNode value = get(key); if (value == null) { return defaultValue; } @@ -209,7 +233,7 @@ public E get(String key, Class type, E defaultValue) public Object get(String key, JavaType type, Object defaultValue) { - JsonNode value = object.get(key); + JsonNode value = get(key); if (value == null) { return defaultValue; } @@ -254,7 +278,7 @@ public Map getMapOrEmpty(String key, Class keyType, Class val public Config getNested(String key) { - JsonNode value = object.get(key); + JsonNode value = get(key); if (value == null) { throw new ConfigException("Parameter '"+key+"' is required but not set"); } @@ -266,10 +290,10 @@ public Config getNested(String key) public Config getNestedOrSetEmpty(String key) { - JsonNode value = object.get(key); + JsonNode value = get(key); if (value == null) { - value = object.objectNode(); - object.set(key, value); + value = newObjectNode(); + set(key, value); } else if (!value.isObject()) { throw new ConfigException("Parameter '"+key+"' must be an object"); @@ -279,9 +303,9 @@ else if (!value.isObject()) { public Config getNestedOrGetEmpty(String key) { - JsonNode value = object.get(key); + JsonNode value = get(key); if (value == null) { - value = object.objectNode(); + value = newObjectNode(); } else if (!value.isObject()) { throw new ConfigException("Parameter '"+key+"' must be an object"); @@ -291,9 +315,9 @@ else if (!value.isObject()) { public Config getNestedOrderedOrGetEmpty(String key) { - JsonNode value = object.get(key); + JsonNode value = get(key); if (value == null) { - value = object.objectNode(); + value = newObjectNode(); } else if (value.isArray()) { Config config = new Config(mapper); @@ -314,6 +338,21 @@ else if (!value.isObject()) { return new Config(mapper, (ObjectNode) value); } + private ObjectNode newObjectNode() + { + return object.objectNode(); + } + + protected JsonNode get(String key) + { + return object.get(key); + } + + protected void set(String key, JsonNode value) + { + object.set(key, value); + } + private E readObject(Class type, JsonNode value, String key) { try { diff --git a/digdag-core/src/main/java/io/digdag/core/agent/CheckedConfig.java b/digdag-core/src/main/java/io/digdag/core/agent/CheckedConfig.java new file mode 100644 index 0000000000..a085efe53f --- /dev/null +++ b/digdag-core/src/main/java/io/digdag/core/agent/CheckedConfig.java @@ -0,0 +1,83 @@ +package io.digdag.core.agent; + +import java.util.List; +import java.util.Collection; +import java.util.Collections; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.digdag.client.config.Config; + +public class CheckedConfig + extends Config +{ + private final Set unusedKeys; + + public CheckedConfig(Config config, Collection shouldBeUsedKeys) + { + this(config, new HashSet<>(shouldBeUsedKeys)); + } + + private CheckedConfig(Config config, Set unusedKeys) + { + super(config); + this.unusedKeys = unusedKeys; + } + + public List getUnusedKeys() + { + List keys = new ArrayList<>(unusedKeys); + Collections.sort(keys); + return keys; + } + + @Override + public ObjectNode getInternalObjectNode() + { + unusedKeys.clear(); + return super.getInternalObjectNode(); + } + + @Override + public Config remove(String key) + { + unusedKeys.remove(key); + return super.remove(key); + } + + @Override + public List getKeys() + { + unusedKeys.clear(); + return super.getKeys(); + } + + @Override + public boolean has(String key) + { + unusedKeys.remove(key); + return super.has(key); + } + + @Override + protected JsonNode get(String key) + { + unusedKeys.remove(key); + return super.get(key); + } + + @Override + protected void set(String key, JsonNode value) + { + unusedKeys.remove(key); + super.set(key, value); + } + + @Override + public Config deepCopy() + { + return new CheckedConfig(this, unusedKeys); + } +} diff --git a/digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java b/digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java index da0d2ad3c2..8f357a50e2 100644 --- a/digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java +++ b/digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java @@ -2,6 +2,7 @@ import java.util.List; import java.util.Set; +import java.util.HashSet; import java.util.Map; import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; @@ -155,15 +156,13 @@ private void runWithArchive(Path archivePath, TaskRequest request, Config nextSt } logger.debug("evaluated config: {}", config); - TaskRequest mergedRequest = TaskRequest.builder() - .from(request) - .config(config) - .build(); + Set shouldBeUsedKeys = new HashSet<>(request.getLocalConfig().getKeys()); String type; if (config.has("_type")) { type = config.get("_type", String.class); logger.info("type: {}", type); + shouldBeUsedKeys.remove("_type"); } else { java.util.Optional operatorKey = config.getKeys() @@ -182,10 +181,20 @@ private void runWithArchive(Path archivePath, TaskRequest request, Config nextSt config.set("_type", type); config.set("_command", file); logger.info("{}>: {}", type, file); + shouldBeUsedKeys.remove(operatorKey.get()); } + CheckedConfig checkedConfig = new CheckedConfig(config, shouldBeUsedKeys); + + TaskRequest mergedRequest = TaskRequest.builder() + .from(request) + .config(checkedConfig) + .build(); + TaskResult result = callExecutor(archivePath, type, mergedRequest); + warnUnusedKeys(checkedConfig.getUnusedKeys(), request); + callback.taskSucceeded( taskId, request.getLockId(), agentId, result); @@ -212,6 +221,13 @@ private void runWithArchive(Path archivePath, TaskRequest request, Config nextSt } } + private void warnUnusedKeys(List unusedKeys, TaskRequest request) + { + if (!unusedKeys.isEmpty()) { + logger.warn("Some keys are not used at {}: {}", request.getTaskName(), unusedKeys); + } + } + protected TaskResult callExecutor(Path archivePath, String type, TaskRequest mergedRequest) { OperatorFactory factory = executorTypes.get(type); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/EmbulkOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/EmbulkOperatorFactory.java index 5133cd3ad8..48c9234c4b 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/EmbulkOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/EmbulkOperatorFactory.java @@ -70,9 +70,8 @@ public EmbulkOperator(Path archivePath, TaskRequest request) @Override public TaskResult runTask() { - Config params = request.getConfig().getNestedOrGetEmpty("embulk") - .deepCopy() - .setAll(request.getConfig()); + Config params = request.getConfig().setAllIfNotSet( + request.getConfig().getNestedOrGetEmpty("embulk")); String tempFile; try { diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/MailOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/MailOperatorFactory.java index aa776a7b83..8a682096e6 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/MailOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/MailOperatorFactory.java @@ -76,13 +76,11 @@ public MailOperator(Path archivePath, TaskRequest request) @Override public TaskResult runTask() { - Config config = request.getConfig(); - Config params = - config.getNestedOrGetEmpty("mail").deepCopy() - .setAll(config); + Config params = request.getConfig().setAllIfNotSet( + request.getConfig().getNestedOrGetEmpty("mail")); String body = templateEngine.templateCommand(archivePath, params, "body", UTF_8); - String subject = config.get("subject", String.class); + String subject = params.get("subject", String.class); List toList; try { diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/PyOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/PyOperatorFactory.java index 0a3dcf663e..d7a6c79318 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/PyOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/PyOperatorFactory.java @@ -77,12 +77,9 @@ public PyOperator(Path archivePath, TaskRequest request) @Override public TaskResult runTask() { - Config config = request.getConfig().getNestedOrGetEmpty("py") - .deepCopy() - .setAll(request.getConfig()); - - // merge state parameters in addition to regular config - Config params = config.setAll(request.getLastStateParams()); + Config params = request.getConfig() + .setAllIfNotSet(request.getConfig().getNestedOrGetEmpty("py")) + .setAll(request.getLastStateParams()); // merge state parameters in addition to regular config Config data; try { diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/RbOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/RbOperatorFactory.java index 504d7ebfd9..101eb1042c 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/RbOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/RbOperatorFactory.java @@ -77,12 +77,9 @@ public RbOperator(Path archivePath, TaskRequest request) @Override public TaskResult runTask() { - Config config = request.getConfig().getNestedOrGetEmpty("rb") - .deepCopy() - .setAll(request.getConfig()); - - // merge state parameters in addition to regular config - Config params = config.setAll(request.getLastStateParams()); + Config params = request.getConfig() + .setAllIfNotSet(request.getConfig().getNestedOrGetEmpty("rb")) + .setAll(request.getLastStateParams()); // merge state parameters in addition to regular config Config data; try { diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/ShOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/ShOperatorFactory.java index 6cef7e8050..95e17c3309 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/ShOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/ShOperatorFactory.java @@ -58,14 +58,17 @@ public ShOperator(Path archivePath, TaskRequest request) @Override public TaskResult runTask() { - String command = request.getConfig().get("_command", String.class); + Config params = request.getConfig().setAllIfNotSet( + request.getConfig().getNestedOrGetEmpty("sh")); + + String command = params.get("_command", String.class); ProcessBuilder pb = new ProcessBuilder("/bin/sh", "-c", command); final Map env = pb.environment(); - request.getConfig().getKeys() + params.getKeys() .forEach(key -> { if (isValidEnvKey(key)) { - JsonNode value = request.getConfig().get(key, JsonNode.class); + JsonNode value = params.get(key, JsonNode.class); String string; if (value.isTextual()) { string = value.textValue(); @@ -112,7 +115,7 @@ public TaskResult runTask() throw new RuntimeException("Command failed: "+message); } - return TaskResult.empty(request.getConfig().getFactory()); + return TaskResult.empty(params.getFactory()); } } diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdDdlOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdDdlOperatorFactory.java index f04bfd049b..5e25f52657 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdDdlOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdDdlOperatorFactory.java @@ -49,9 +49,8 @@ public TdDdlOperator(Path archivePath, TaskRequest request) @Override public TaskResult runTask() { - Config params = request.getConfig().getNestedOrGetEmpty("td") - .deepCopy() - .setAll(request.getConfig()); + Config params = request.getConfig().setAllIfNotSet( + request.getConfig().getNestedOrGetEmpty("td")); List deleteList = params.getListOrEmpty("drop_table", String.class); List createList = params.getListOrEmpty("create_table", String.class); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdLoadOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdLoadOperatorFactory.java index 2b38f1414e..af02244c4e 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdLoadOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdLoadOperatorFactory.java @@ -60,9 +60,8 @@ public TdLoadOperator(Path archivePath, TaskRequest request) @Override public TaskResult runTask() { - Config params = request.getConfig().getNestedOrGetEmpty("td") - .deepCopy() - .setAll(request.getConfig()); + Config params = request.getConfig().setAllIfNotSet( + request.getConfig().getNestedOrGetEmpty("td")); ObjectNode embulkConfig; if (params.has("_command")) { diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdOperatorFactory.java index ad9c43fdd3..ebd22ea8b9 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdOperatorFactory.java @@ -62,9 +62,8 @@ public TdOperator(Path archivePath, TaskRequest request) @Override public TaskResult runTask() { - Config params = request.getConfig().getNestedOrGetEmpty("td") - .deepCopy() - .setAll(request.getConfig()); + Config params = request.getConfig().setAllIfNotSet( + request.getConfig().getNestedOrGetEmpty("td")); String query = templateEngine.templateCommand(archivePath, params, "query", UTF_8);