Skip to content

Commit

Permalink
KREST-8518 Add dry-run/validate-only versions of some topic APIs (con…
Browse files Browse the repository at this point in the history
…fluentinc#1090)

This change introduces an additional "validate only" parameter
to the requests for two important topic APIs:
- Create Topic
- Batch Alter Topic Config

When this parameter, which is a boolean flag, is set, the
corresponding operation would be dry-run, i.e. its execution
would be validated as if its actually run, but there would
be no side effects from the operation. E.g. if run in
that mode, a Create Topic request won't actually create a
topic.

As this requires just using simple overloads of the
corresponding Admin APIs, this change mostly focuses on
introducing the new parameters in minimally disruptive way
in exchange for some verbosity and maybe even redundancy.
- E.g. none of the existing tests had to be changed, including
  the resource test, which expect specific manager calls, and
  the manager impl tests, which expect specific Admin Client
  calls.
  • Loading branch information
dimitarndimitrov authored Dec 6, 2022
1 parent d9c6b78 commit 3f1e7c1
Show file tree
Hide file tree
Showing 11 changed files with 301 additions and 29 deletions.
41 changes: 35 additions & 6 deletions api/v3/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -854,11 +854,17 @@ paths:
[![Generally Available](https://img.shields.io/badge/Lifecycle%20Stage-Generally%20Available-%2345c6e8)](#section/Versioning/API-Lifecycle-Policy)
Creates a topic.
Also supports a dry-run mode that only validates whether the topic creation would succeed
if the ``validate_only`` request property is explicitly specified and set to true.
tags:
- Topic (v3)
requestBody:
$ref: '#/components/requestBodies/CreateTopicRequest'
responses:
# returned when dry-run mode is being used and a topic has not been created
'200':
$ref: '#/components/responses/CreateTopicResponse'
# returned in regular mode when a topic has been created
'201':
$ref: '#/components/responses/CreateTopicResponse'
'400':
Expand Down Expand Up @@ -984,11 +990,14 @@ paths:
[![Generally Available](https://img.shields.io/badge/Lifecycle%20Stage-Generally%20Available-%2345c6e8)](#section/Versioning/API-Lifecycle-Policy)
Updates or deletes a set of topic configs.
Also supports a dry-run mode that only validates whether the operation would succeed if the
``validate_only`` request property is explicitly specified and set to true.
tags:
- Configs (v3)
requestBody:
$ref: '#/components/requestBodies/AlterTopicConfigBatchRequest'
responses:
# returned in both regular and dry-run modes
'204':
description: 'No Content'
'400':
Expand Down Expand Up @@ -1610,6 +1619,8 @@ components:
- SET
- DELETE
nullable: true
validate_only:
type: boolean

AnyValue:
nullable: true
Expand Down Expand Up @@ -1819,6 +1830,8 @@ components:
value:
type: string
nullable: true
validate_only:
type: boolean

ConfigSource:
type: string
Expand Down Expand Up @@ -2487,12 +2500,22 @@ components:
application/json:
schema:
$ref: '#/components/schemas/AlterConfigBatchRequestData'
example:
data:
- name: 'cleanup.policy'
operation: 'DELETE'
- name: 'compression.type'
value: 'gzip'
examples:
batch_alter_topic_configs:
value:
data:
- name: 'cleanup.policy'
operation: 'DELETE'
- name: 'compression.type'
value: 'gzip'
validate_only_batch_alter_topic_configs:
value:
data:
- name: 'cleanup.policy'
operation: 'DELETE'
- name: 'compression.type'
value: 'gzip'
validate_only: true

CreateAclRequest:
description: 'The ACL creation request.'
Expand Down Expand Up @@ -2570,6 +2593,12 @@ components:
value: 'compact'
- name: 'compression.type'
value: 'gzip'
dry_run_create_topic:
value:
topic_name: 'topic-Z'
partitions_count: 64
replication_factor: 3
validate_only: true

ProduceRequest:
description: 'A single record to be produced to Kafka. To produce multiple records on the same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.ws.rs.NotFoundException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.common.config.ConfigResource;

Expand Down Expand Up @@ -170,6 +173,28 @@ final CompletableFuture<Void> unsafeResetConfig(
*/
final CompletableFuture<Void> safeAlterConfigs(
String clusterId, ConfigResource resourceId, B prototype, List<AlterConfigCommand> commands) {
return safeAlterConfigs(clusterId, resourceId, prototype, commands, false);
}

/**
* Atomically alter configs according to {@code commands}, checking if the configs exist first. If
* the {@code validateOnly} flag is set, the operation is only dry-ran (the configs do not get
* altered as a result).
*/
// KREST-8518 A separate overload is provided instead of changing the pre-existing
// safeAlterConfigs method in order to minimize any risks related to external usage of that method
// (as this manager can be injected in projects inheriting from kafka-rest) and to minimize the
// amount of necessary changes (e.g. by avoiding the need to heavily refactor tests).
final CompletableFuture<Void> safeAlterConfigs(
String clusterId,
ConfigResource resourceId,
B prototype,
List<AlterConfigCommand> commands,
boolean validateOnly) {
Function<? super List<T>, ? extends CompletionStage<Void>> alterConfigCall =
validateOnly
? config -> validateAlterConfigs(resourceId, commands)
: config -> alterConfigs(resourceId, commands);
return listConfigs(clusterId, resourceId, prototype)
.thenApply(
configs -> {
Expand All @@ -185,7 +210,7 @@ final CompletableFuture<Void> safeAlterConfigs(
}
return configs;
})
.thenCompose(config -> alterConfigs(resourceId, commands));
.thenCompose(alterConfigCall);
}

/**
Expand Down Expand Up @@ -213,4 +238,19 @@ private CompletableFuture<Void> alterConfigs(
.values()
.get(resourceId));
}

private CompletableFuture<Void> validateAlterConfigs(
ConfigResource resourceId, List<AlterConfigCommand> commands) {
return KafkaFutures.toCompletableFuture(
adminClient
.incrementalAlterConfigs(
singletonMap(
resourceId,
commands.stream()
.map(AlterConfigCommand::toAlterConfigOp)
.collect(Collectors.toList())),
new AlterConfigsOptions().validateOnly(true))
.values()
.get(resourceId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,16 @@ CompletableFuture<Void> updateTopicConfig(
*/
CompletableFuture<Void> alterTopicConfigs(
String clusterId, String topicName, List<AlterConfigCommand> commands);

/**
* Atomically alter configs according to {@code commands}, checking if the configs exist first. If
* the {@code validateOnly} flag is set, the operation is only dry-ran (the configs do not get
* altered as a result).
*/
// KREST-8518 A separate overload is provided instead of changing the pre-existing createTopic
// method in order to minimize any risks related to external usage of that method (as TopicManager
// can be injected in projects inheriting from kafka-rest) and to minimize the amount of necessary
// changes (e.g. by avoiding the need to heavily refactor tests).
CompletableFuture<Void> alterTopicConfigs(
String clusterId, String topicName, List<AlterConfigCommand> commands, boolean validateOnly);
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,15 @@ public CompletableFuture<Void> alterTopicConfigs(
TopicConfig.builder().setClusterId(clusterId).setTopicName(topicName),
commands);
}

@Override
public CompletableFuture<Void> alterTopicConfigs(
String clusterId, String topicName, List<AlterConfigCommand> commands, boolean validateOnly) {
return safeAlterConfigs(
clusterId,
new ConfigResource(ConfigResource.Type.TOPIC, topicName),
TopicConfig.builder().setClusterId(clusterId).setTopicName(topicName),
commands,
validateOnly);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ CompletableFuture<Void> createTopic(
Map<Integer, List<Integer>> replicasAssignments,
Map<String, Optional<String>> configs);

/**
* Creates a new Kafka {@link Topic} with either partitions count and replication factor or
* explicitly specified partition-to-replicas assignments. If the {@code validateOnly} flag is
* set, the operation is only dry-ran (a topic does not get created as a result).
*/
// KREST-8518 A separate overload is provided instead of changing the pre-existing createTopic
// method in order to minimize any risks related to external usage of that method (as TopicManager
// can be injected in projects inheriting from kafka-rest) and to minimize the amount of necessary
// changes (e.g. by avoiding the need to heavily refactor tests).
CompletableFuture<Void> createTopic(
String clusterId,
String topicName,
Optional<Integer> partitionsCount,
Optional<Short> replicationFactor,
Map<Integer, List<Integer>> replicasAssignments,
Map<String, Optional<String>> configs,
boolean validateOnly);

/** Deletes the Kafka {@link Topic} with the given {@code topicName}. */
CompletableFuture<Void> deleteTopic(String clusterId, String topicName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import io.confluent.kafkarest.common.KafkaFutures;
import io.confluent.kafkarest.entities.Acl;
import io.confluent.kafkarest.entities.Cluster;
import io.confluent.kafkarest.entities.Partition;
import io.confluent.kafkarest.entities.PartitionReplica;
import io.confluent.kafkarest.entities.Topic;
Expand All @@ -35,9 +36,12 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
Expand Down Expand Up @@ -202,6 +206,25 @@ public CompletableFuture<Void> createTopic(
Optional<Short> replicationFactor,
Map<Integer, List<Integer>> replicasAssignments,
Map<String, Optional<String>> configs) {
return createTopic(
clusterId,
topicName,
partitionsCount,
replicationFactor,
replicasAssignments,
configs,
false);
}

@Override
public CompletableFuture<Void> createTopic(
String clusterId,
String topicName,
Optional<Integer> partitionsCount,
Optional<Short> replicationFactor,
Map<Integer, List<Integer>> replicasAssignments,
Map<String, Optional<String>> configs,
boolean validateOnly) {
requireNonNull(topicName);

Map<String, String> nullableConfigs = new HashMap<>();
Expand All @@ -214,13 +237,22 @@ public CompletableFuture<Void> createTopic(
? new NewTopic(topicName, partitionsCount, replicationFactor).configs(nullableConfigs)
: new NewTopic(topicName, replicasAssignments).configs(nullableConfigs);

Function<? super Cluster, ? extends CompletionStage<Void>> createTopicCall =
validateOnly
? cluster ->
KafkaFutures.toCompletableFuture(
adminClient
.createTopics(
singletonList(createTopicRequest),
new CreateTopicsOptions().validateOnly(true))
.all())
: cluster ->
KafkaFutures.toCompletableFuture(
adminClient.createTopics(singletonList(createTopicRequest)).all());
return clusterManager
.getCluster(clusterId)
.thenApply(cluster -> checkEntityExists(cluster, "Cluster %s cannot be found.", clusterId))
.thenCompose(
cluster ->
KafkaFutures.toCompletableFuture(
adminClient.createTopics(singletonList(createTopicRequest)).all()));
.thenCompose(createTopicCall);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,26 @@ public abstract class AlterConfigBatchRequestData {
@JsonProperty("data")
public abstract ImmutableList<AlterEntry> getData();

// KREST-8518 This option is currently recognized solely by the Batch Alter Topic Config API.
// Nevertheless, it makes sense to add it in this generic Batch Alter Config entity as it
// should eventually be supported by all the other similar APIs.
@JsonProperty("validate_only")
public abstract Optional<Boolean> getValidateOnly();

public static AlterConfigBatchRequestData create(List<AlterEntry> data) {
return new AutoValue_AlterConfigBatchRequestData(ImmutableList.copyOf(data));
return new AutoValue_AlterConfigBatchRequestData(ImmutableList.copyOf(data), Optional.empty());
}

public static AlterConfigBatchRequestData create(
List<AlterEntry> data, Optional<Boolean> validateOnly) {
return new AutoValue_AlterConfigBatchRequestData(ImmutableList.copyOf(data), validateOnly);
}

@JsonCreator
static AlterConfigBatchRequestData fromJson(@JsonProperty("data") List<AlterEntry> data) {
return create(data);
static AlterConfigBatchRequestData fromJson(
@JsonProperty("data") List<AlterEntry> data,
@JsonProperty("validate_only") Optional<Boolean> validateOnly) {
return create(data, validateOnly);
}

public final List<AlterConfigCommand> toAlterConfigCommands() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public abstract class CreateTopicRequest {
@JsonProperty("configs")
public abstract ImmutableList<ConfigEntry> getConfigs();

@JsonProperty("validate_only")
public abstract Optional<Boolean> getValidateOnly();

public static Builder builder() {
return new AutoValue_CreateTopicRequest.Builder()
.setReplicasAssignments(Collections.emptyMap());
Expand All @@ -57,14 +60,16 @@ static CreateTopicRequest fromJson(
@JsonProperty("replication_factor") @Nullable Short replicationFactor,
@JsonProperty("replicas_assignments") @Nullable
Map<Integer, List<Integer>> replicasAssignments,
@JsonProperty("configs") @Nullable List<ConfigEntry> configs) {
@JsonProperty("configs") @Nullable List<ConfigEntry> configs,
@JsonProperty("validate_only") @Nullable Boolean validateOnly) {
return builder()
.setTopicName(topicName)
.setPartitionsCount(partitionsCount)
.setReplicationFactor(replicationFactor)
.setReplicasAssignments(
replicasAssignments != null ? replicasAssignments : Collections.emptyMap())
.setConfigs(configs != null ? configs : ImmutableList.of())
.setValidateOnly(validateOnly)
.build();
}

Expand All @@ -83,6 +88,8 @@ public abstract static class Builder {

public abstract Builder setConfigs(List<ConfigEntry> configs);

public abstract Builder setValidateOnly(@Nullable Boolean validateOnly);

public abstract CreateTopicRequest build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,17 @@ public void alterTopicConfigBatch(
@PathParam("clusterId") String clusterId,
@PathParam("topicName") String topicName,
@Valid AlterTopicConfigBatchRequest request) {
boolean validateOnly = request.getValue().getValidateOnly().orElse(false);
CompletableFuture<Void> response =
topicConfigManager
.get()
.alterTopicConfigs(clusterId, topicName, request.getValue().toAlterConfigCommands());
validateOnly
? topicConfigManager
.get()
.alterTopicConfigs(
clusterId, topicName, request.getValue().toAlterConfigCommands(), true)
: topicConfigManager
.get()
.alterTopicConfigs(
clusterId, topicName, request.getValue().toAlterConfigCommands());

AsyncResponseBuilder.from(Response.status(Status.NO_CONTENT))
.entity(response)
Expand Down
Loading

0 comments on commit 3f1e7c1

Please sign in to comment.