Skip to content

Commit

Permalink
addressed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhav-yb committed Nov 27, 2024
1 parent cdd179e commit e23e3e6
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,6 @@ public static SchemaRefreshMode parse(String value) {
protected static final int DEFAULT_PORT = 5_433;
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240;
protected static final int DEFAULT_MAX_RETRIES = 6;
protected static final long DEFAULT_RETRIABLE_RESTART_EXPONENTIAL_DELAY_MIN_MS = 5_000;
protected static final long DEFAULT_RETRIABLE_RESTART_EXPONENTIAL_DELAY_MAX_MS = 120_000;
public static final Pattern YB_HOSTNAME_PATTERN = Pattern.compile("^[a-zA-Z0-9-_.,:]+$");

public static final Field PORT = RelationalDatabaseConnectorConfig.PORT
Expand Down Expand Up @@ -1012,19 +1010,6 @@ public static AutoCreateMode parse(String value, String defaultValue) {
return 0;
});

public static final Field RETRIABLE_RESTART_EXPONENTIAL_DELAY_MIN_MS = Field.create("retriable.restart.exponential.delay.min.ms")
.withDisplayName("Minimum delay in case of retries for exponential backoff")
.withType(Type.LONG)
.withImportance(Importance.LOW)
.withDescription("The minimum delay between retry when following exponential backoff")
.withDefault(DEFAULT_RETRIABLE_RESTART_EXPONENTIAL_DELAY_MIN_MS);
public static final Field RETRIABLE_RESTART_EXPONENTIAL_DELAY_MAX_MS = Field.create("retriable.restart.exponential.delay.max.ms")
.withDisplayName("Maximum delay in case of retries for exponential backoff")
.withType(Type.LONG)
.withImportance(Importance.LOW)
.withDescription("The maximum delay between retry when following exponential backoff")
.withDefault(DEFAULT_RETRIABLE_RESTART_EXPONENTIAL_DELAY_MAX_MS);

private final LogicalDecodingMessageFilter logicalDecodingMessageFilter;
private final HStoreHandlingMode hStoreHandlingMode;
private final IntervalHandlingMode intervalHandlingMode;
Expand Down Expand Up @@ -1158,14 +1143,6 @@ public String primaryKeyHashColumns() {
return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS);
}

public long retriableRestartExponentialDelayMinMs() {
return getConfig().getLong(RETRIABLE_RESTART_EXPONENTIAL_DELAY_MIN_MS);
}

public long retriableRestartExponentialDelayMaxMs() {
return getConfig().getLong(RETRIABLE_RESTART_EXPONENTIAL_DELAY_MAX_MS);
}

@Override
public byte[] getUnavailableValuePlaceholder() {
String placeholder = getConfig().getString(UNAVAILABLE_VALUE_PLACEHOLDER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,6 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
throw new DebeziumException(e);
}

// queue = new ChangeEventQueue.Builder<DataChangeEvent>()
// .pollInterval(connectorConfig.getPollInterval())
// .maxBatchSize(connectorConfig.getMaxBatchSize())
// .maxQueueSize(connectorConfig.getMaxQueueSize())
// .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
// .loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
// .build();
//
// errorHandler = new PostgresErrorHandler(connectorConfig, queue, errorHandler);

final PostgresEventMetadataProvider metadataProvider = new PostgresEventMetadataProvider();

SignalProcessor<PostgresPartition, PostgresOffsetContext> signalProcessor = new SignalProcessor<>(
Expand Down Expand Up @@ -277,6 +267,13 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
catch (Exception exception) {
LOGGER.warn("Received exception, will be setting producer throwable", exception);
errorHandler.setProducerThrowable(new RetriableException(exception));

if (errorHandler.getRetries() == connectorConfig.getMaxRetriesOnError()) {
throw new ConnectException("Maximum number of retries attempted, manually restart "
+ "the connector after fixing the error", exception);
} else {
throw new RetriableException(exception);
}
}
}

Expand Down

0 comments on commit e23e3e6

Please sign in to comment.