Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DBZ-PGYB][yugabyte/yugabyte-db#23082] Remove dependency on custom debezium-core changes #137

Merged
merged 19 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ RUN rm -rf debezium-connector-oracle
RUN rm -rf debezium-connector-spanner
RUN rm -rf debezium-connector-sqlserver
RUN rm -rf debezium-connector-vitess
RUN rm -f debezium-connector-postgres/debezium-core-2.5.2.Final.jar
WORKDIR /

# Copy the Debezium Connector for Postgres adapted for YugabyteDB
COPY debezium-connector-postgres/target/debezium-connector-postgres-*.jar $KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres
COPY debezium-core/target/debezium-core-*.jar $KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres

# Set the TLS version to be used by Kafka processes
ENV KAFKA_OPTS="-Djdk.tls.client.protocols=TLSv1.2 -javaagent:/kafka/etc/jmx_prometheus_javaagent-0.17.2.jar=8080:/kafka/etc/jmx-exporter/metrics.yml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
LOGGER.debug("Snapshot result {}", snapshotResult);

if (context.isRunning() && snapshotResult.isCompletedOrSkipped()) {
if(YugabyteDBServer.isEnabled() && !snapshotResult.isSkipped()) {
if(YugabyteDBServer.isEnabled() && !isSnapshotSkipped(snapshotResult)) {
LOGGER.info("Will wait for snapshot completion before transitioning to streaming");
waitForSnapshotCompletion = true;
while (waitForSnapshotCompletion) {
Expand All @@ -99,6 +99,10 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
}
}

protected boolean isSnapshotSkipped(SnapshotResult<PostgresOffsetContext> snapshotResult) {
return snapshotResult.getStatus() == SnapshotResult.SnapshotResultStatus.SKIPPED;
}

@Override
protected CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSourceContext context,
SnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> snapshotSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

import io.debezium.data.Envelope;
import io.debezium.heartbeat.Heartbeat;
Expand Down Expand Up @@ -532,10 +533,20 @@ public static SchemaRefreshMode parse(String value) {
protected static final int DEFAULT_PORT = 5_433;
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240;
protected static final int DEFAULT_MAX_RETRIES = 6;
public static final Pattern YB_HOSTNAME_PATTERN = Pattern.compile("^[a-zA-Z0-9-_.,:]+$");

public static final Field PORT = RelationalDatabaseConnectorConfig.PORT
.withDefault(DEFAULT_PORT);

public static final Field HOSTNAME = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.HOSTNAME)
.withDisplayName("Hostname")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 2))
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.required()
.withValidation(PostgresConnectorConfig::validateYBHostname)
.withDescription("Resolvable hostname or IP address of the database server.");

public static final Field PLUGIN_NAME = Field.create("plugin.name")
.withDisplayName("Plugin")
Expand Down Expand Up @@ -1290,7 +1301,25 @@ public Optional<String[]> parseSignallingMessage(Struct value) {
});
}

protected static int validateYBHostname(Configuration config, Field field, Field.ValidationOutput problems) {
String hostName = config.getString(field);
int problemCount = 0;

if (!Strings.isNullOrBlank(hostName)) {
if (hostName.contains(",") && !hostName.contains(":")) {
// Basic validation for cases when a user has only specified comma separated IPs which is not the correct format.
problems.accept(field, hostName, hostName + " has invalid format (specify mutiple hosts in the format ip1:port1,ip2:port2,ip3:port3)");
++problemCount;
}

if (!YB_HOSTNAME_PATTERN.asPredicate().test(hostName)) {
problems.accept(field, hostName, hostName + " has invalid format (only the underscore, hyphen, dot, comma, colon and alphanumeric characters are allowed)");
++problemCount;
}
}

return problemCount;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,57 @@ public void shouldSetReplicaAutoSetRegExValue() {

assertThat((problemCount == 0)).isTrue();
}

@Test
public void shouldValidateWithCorrectSingleHostnamePattern() {
validateCorrectHostname(false);
}

@Test
public void shouldValidateWithCorrectMultiHostnamePattern() {
validateCorrectHostname(true);
}

@Test
public void shouldFailWithInvalidCharacterInHostname() {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.HOSTNAME, "*invalidCharacter");

int problemCount = PostgresConnectorConfig.validateYBHostname(
configBuilder.build(), PostgresConnectorConfig.HOSTNAME, (field, value, problemMessage) -> System.out.println(problemMessage));

assertThat((problemCount == 1)).isTrue();
}

@Test
public void shouldFailIfInvalidMultiHostFormatSpecified() {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.HOSTNAME, "127.0.0.1,127.0.0.2,127.0.0.3");

int problemCount = PostgresConnectorConfig.validateYBHostname(
configBuilder.build(), PostgresConnectorConfig.HOSTNAME, (field, value, problemMessage) -> System.out.println(problemMessage));

assertThat((problemCount == 1)).isTrue();
}

@Test
public void shouldFailIfInvalidMultiHostFormatSpecifiedWithInvalidCharacter() {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.HOSTNAME, "127.0.0.1,127.0.0.2,127.0.0.3+");

int problemCount = PostgresConnectorConfig.validateYBHostname(
configBuilder.build(), PostgresConnectorConfig.HOSTNAME, (field, value, problemMessage) -> System.out.println(problemMessage));

assertThat((problemCount == 2)).isTrue();
}

public void validateCorrectHostname(boolean multiNode) {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.HOSTNAME, multiNode ? "127.0.0.1:5433,127.0.0.2:5433,127.0.0.3:5433" : "127.0.0.1");

int problemCount = PostgresConnectorConfig.validateYBHostname(
configBuilder.build(), PostgresConnectorConfig.HOSTNAME, (field, value, problemMessage) -> System.out.println(problemMessage));

assertThat((problemCount == 0)).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ public boolean isCompletedOrSkipped() {
return this.status == SnapshotResultStatus.SKIPPED || this.status == SnapshotResultStatus.COMPLETED;
}

public boolean isSkipped() {
return this.status == SnapshotResultStatus.SKIPPED;
}

public SnapshotResultStatus getStatus() {
return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,17 +825,14 @@ private static int validateMessageKeyColumnsField(Configuration config, Field fi
}

protected static int validateHostname(Configuration config, Field field, ValidationOutput problems) {
LOGGER.info("Bypassing hostname validation for YB");
String hostName = config.getString(field);
if (!Strings.isNullOrBlank(hostName)) {
if (!HOSTNAME_PATTERN.asPredicate().test(hostName)) {
problems.accept(field, hostName, hostName + " has invalid format (only the underscore, hyphen, dot and alphanumeric characters are allowed)");
return 1;
}
}
return 0;

// String hostName = config.getString(field);
// if (!Strings.isNullOrBlank(hostName)) {
// if (!HOSTNAME_PATTERN.asPredicate().test(hostName)) {
// problems.accept(field, hostName, hostName + " has invalid format (only the underscore, hyphen, dot and alphanumeric characters are allowed)");
// return 1;
// }
// }
// return 0;
}

public FieldNamer<Column> getFieldNamer() {
Expand Down