Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Custom metrics implementation #165

Open
wants to merge 1 commit into
base: ybdb-debezium-2.5.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -989,12 +989,12 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(PostgresSourceInfoStructMaker.class.getName());

public static final Field TASK_ID = Field.create("task.id")
.withDisplayName("ID of the connector task")
.withType(Type.INT)
.withDefault(0)
.withImportance(Importance.LOW)
.withDescription("Internal use only");
// public static final Field TASK_ID = Field.create("task.id")
// .withDisplayName("ID of the connector task")
// .withType(Type.INT)
// .withDefault(0)
// .withImportance(Importance.LOW)
// .withDescription("Internal use only");

public static final Field PRIMARY_KEY_HASH_COLUMNS = Field.create("primary.key.hash.columns")
.withDisplayName("Comma separated primary key fields")
Expand Down Expand Up @@ -1135,9 +1135,9 @@ public boolean isFlushLsnOnSource() {
return flushLsnOnSource;
}

public int taskId() {
return getConfig().getInteger(TASK_ID);
}
// public int taskId() {
// return getConfig().getInteger(TASK_ID);
// }

public String primaryKeyHashColumns() {
return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import io.debezium.connector.postgresql.metrics.YugabyteDBMetricsFactory;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -109,9 +110,10 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
final PostgresValueConverter valueConverter = valueConverterBuilder.build(typeRegistry);

schema = new PostgresSchema(connectorConfig, defaultValueConverter, topicNamingStrategy, valueConverter);
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy, connectorConfig.taskId());
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy, connectorConfig.getTaskId());
final PostgresPartition.Provider partitionProvider = new PostgresPartition.Provider(connectorConfig, config);
final Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets = getPreviousOffsets(
new PostgresPartition.Provider(connectorConfig, config), new PostgresOffsetContext.Loader(connectorConfig));
partitionProvider, new PostgresOffsetContext.Loader(connectorConfig));
final Clock clock = Clock.system();
final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();

Expand Down Expand Up @@ -249,7 +251,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
replicationConnection,
slotCreatedInfo,
slotInfo),
new DefaultChangeEventSourceMetricsFactory<>(),
new YugabyteDBMetricsFactory(partitionProvider.getPartitions()),
dispatcher,
schema,
snapshotter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ public class PostgresPartition extends AbstractPartition implements Partition {
private static final String SERVER_PARTITION_KEY = "server";

private final String serverName;
private final int taskId;
private final String taskId;

public PostgresPartition(String serverName, String databaseName, int taskId) {
public PostgresPartition(String serverName, String databaseName, String taskId) {
super(databaseName);
this.serverName = serverName;
this.taskId = taskId;
Expand Down Expand Up @@ -57,7 +57,7 @@ public String toString() {
}

public String getPartitionIdentificationKey() {
return String.format("%s_%d", serverName, taskId);
return String.format("%s_%s", serverName, taskId);
}

static class Provider implements Partition.Provider<PostgresPartition> {
Expand All @@ -73,7 +73,7 @@ static class Provider implements Partition.Provider<PostgresPartition> {
public Set<PostgresPartition> getPartitions() {
return Collections.singleton(new PostgresPartition(
connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()),
connectorConfig.taskId()));
connectorConfig.getTaskId()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema sch
this.schema = schema;
}

protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy<TableId> topicNamingStrategy, int taskId) {
super(config.getContextName(), config.getLogicalName(), String.valueOf(taskId), config.getCustomMetricTags(), Collections::emptySet);
protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy<TableId> topicNamingStrategy, String taskId) {
super(config.getContextName(), config.getLogicalName(), taskId, config.getCustomMetricTags(), Collections::emptySet);

this.config = config;
if (config.xminFetchInterval().toMillis() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected List<Map<String, String>> getConfigForParallelSnapshotConsumption(int
for (int i = 0; i < maxTasks; ++i) {
Map<String, String> taskProps = new HashMap<>(this.props);

taskProps.put(PostgresConnectorConfig.TASK_ID.name(), String.valueOf(i));
taskProps.put(PostgresConnectorConfig.TASK_ID, String.valueOf(i));

long lowerBound = i * rangeSize;
long upperBound = (i == maxTasks - 1) ? upperBoundExclusive - 1 : (lowerBound + rangeSize - 1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.debezium.connector.postgresql.metrics;

import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.data.Envelope;
import io.debezium.metrics.Metrics;
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.meters.CommonEventMeter;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.spi.schema.DataCollectionId;
import org.apache.kafka.connect.data.Struct;

import java.util.Map;

abstract class AbstractYugabyteDBPartitionMetrics extends YugabyteDBMetrics implements YugabyteDBPartitionMetricsMXBean {
private final CommonEventMeter commonEventMeter;

AbstractYugabyteDBPartitionMetrics(CdcSourceTaskContext taskContext, Map<String, String> tags,
EventMetadataProvider metadataProvider) {
super(taskContext, tags);
this.commonEventMeter = new CommonEventMeter(taskContext.getClock(), metadataProvider);
}

@Override
public String getLastEvent() {
return commonEventMeter.getLastEvent();
}

@Override
public long getMilliSecondsSinceLastEvent() {
return commonEventMeter.getMilliSecondsSinceLastEvent();
}

@Override
public long getTotalNumberOfEventsSeen() {
return commonEventMeter.getTotalNumberOfEventsSeen();
}

@Override
public long getTotalNumberOfCreateEventsSeen() {
return commonEventMeter.getTotalNumberOfCreateEventsSeen();
}

@Override
public long getTotalNumberOfUpdateEventsSeen() {
return commonEventMeter.getTotalNumberOfUpdateEventsSeen();
}

@Override
public long getTotalNumberOfDeleteEventsSeen() {
return commonEventMeter.getTotalNumberOfDeleteEventsSeen();
}

@Override
public long getNumberOfEventsFiltered() {
return commonEventMeter.getNumberOfEventsFiltered();
}

@Override
public long getNumberOfErroneousEvents() {
return commonEventMeter.getNumberOfErroneousEvents();
}

/**
* Invoked if an event is processed for a captured table.
*/
void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value, Envelope.Operation operation) {
commonEventMeter.onEvent(source, offset, key, value, operation);
}

/**
* Invoked for events pertaining to non-captured tables.
*/
void onFilteredEvent(String event) {
commonEventMeter.onFilteredEvent();
}

/**
* Invoked for events pertaining to non-captured tables.
*/
void onFilteredEvent(String event, Envelope.Operation operation) {
commonEventMeter.onFilteredEvent(operation);
}

/**
* Invoked for events that cannot be processed.
*/
void onErroneousEvent(String event) {
commonEventMeter.onErroneousEvent();
}

/**
* Invoked for events that cannot be processed.
*/
void onErroneousEvent(String event, Envelope.Operation operation) {
commonEventMeter.onErroneousEvent(operation);
}

/**
* Invoked for events that represent a connector event.
*/
void onConnectorEvent(ConnectorEvent event) {
}

@Override
public void reset() {
commonEventMeter.reset();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.debezium.connector.postgresql.metrics;

import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.data.Envelope;
import io.debezium.metrics.Metrics;
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.metrics.ChangeEventSourceMetrics;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
import org.apache.kafka.connect.data.Struct;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

abstract class AbstractYugabyteDBTaskMetrics<B extends AbstractYugabyteDBPartitionMetrics> extends YugabyteDBMetrics
implements ChangeEventSourceMetrics<PostgresPartition>, YugabyteDBTaskMetricsMXBean {

private final ChangeEventQueueMetrics changeEventQueueMetrics;
private final Map<PostgresPartition, B> beans = new HashMap<>();

AbstractYugabyteDBTaskMetrics(CdcSourceTaskContext taskContext,
String contextName,
ChangeEventQueueMetrics changeEventQueueMetrics,
Collection<PostgresPartition> partitions,
Function<PostgresPartition, B> beanFactory) {
super(taskContext, Collect.linkMapOf(
"server", taskContext.getConnectorName(),
"task", taskContext.getTaskId(),
"context", contextName));
this.changeEventQueueMetrics = changeEventQueueMetrics;

for (PostgresPartition partition : partitions) {
beans.put(partition, beanFactory.apply(partition));
}
}

@Override
public synchronized void register() {
super.register();
beans.values().forEach(YugabyteDBMetrics::register);
}

@Override
public synchronized void unregister() {
beans.values().forEach(YugabyteDBMetrics::unregister);
super.unregister();
}

@Override
public void reset() {
beans.values().forEach(B::reset);
}

@Override
public void onEvent(PostgresPartition partition, DataCollectionId source, OffsetContext offset, Object key,
Struct value, Envelope.Operation operation) {
onPartitionEvent(partition, bean -> bean.onEvent(source, offset, key, value, operation));
}

@Override
public void onFilteredEvent(PostgresPartition partition, String event) {
onPartitionEvent(partition, bean -> bean.onFilteredEvent(event));
}

@Override
public void onFilteredEvent(PostgresPartition partition, String event, Envelope.Operation operation) {
onPartitionEvent(partition, bean -> bean.onFilteredEvent(event, operation));
}

@Override
public void onErroneousEvent(PostgresPartition partition, String event) {
onPartitionEvent(partition, bean -> bean.onErroneousEvent(event));
}

@Override
public void onErroneousEvent(PostgresPartition partition, String event, Envelope.Operation operation) {
onPartitionEvent(partition, bean -> bean.onErroneousEvent(event, operation));
}

@Override
public void onConnectorEvent(PostgresPartition partition, ConnectorEvent event) {
onPartitionEvent(partition, bean -> bean.onConnectorEvent(event));
}

@Override
public int getQueueTotalCapacity() {
return changeEventQueueMetrics.totalCapacity();
}

@Override
public int getQueueRemainingCapacity() {
return changeEventQueueMetrics.remainingCapacity();
}

@Override
public long getMaxQueueSizeInBytes() {
return changeEventQueueMetrics.maxQueueSizeInBytes();
}

@Override
public long getCurrentQueueSizeInBytes() {
return changeEventQueueMetrics.currentQueueSizeInBytes();
}

protected void onPartitionEvent(PostgresPartition partition, Consumer<B> handler) {
B bean = beans.get(partition);
if (bean == null) {
throw new IllegalArgumentException("MBean for partition " + partition + " are not registered");
}
handler.accept(bean);
}
}
Loading