From 276ea4e007f2a6f74dea7fae58f1de2a923e3f78 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha <34186745+vaibhav-yb@users.noreply.github.com> Date: Thu, 2 Jan 2025 10:19:06 +0530 Subject: [PATCH] [DBZ-PGYB] Add transformer `PGCompatible` to emit events with standard structure (#168) This PR adds a transformer `PGCompatible` with full path `io.debezium.connector.postgresql.transforms.yugabytedb`, this will be helpful in converting the structure of the emitted events to match the one emitted by standard Debezium connectors. **Example:** Consider the following schema for a table `test`: `(id INT PRIMARY KEY, name TEXT, age INT)` - If a record is inserted having values `(1, 'John Doe', 25)` then after using the above transformer, the `payload` of the record would look like: ```json "payload": { "id": 1, "name": "John Doe", "age": 25 } ``` - If the same record is now updated and age is changed to 30 i.e. `UPDATE test SET age = 30 WHERE id = 1;` then the `payload` would look like: ```json "payload": { "id": 1, "name": null, "age": 30 } ``` > **NOTE:** The above example assumes that the replica identity of the table is `CHANGE` and that is how the assumption was made that the `UPDATE` event will not contain the value for the fields which were not updated. For more information on replica identity, see [YugabyteDB docs](https://docs.yugabyte.com/preview/explore/change-data-capture/using-logical-replication/key-concepts/#replica-identity). --- .../transforms/yugabytedb/PGCompatible.java | 125 ++++++++++++++ .../transforms/yugabytedb/SchemaUtil.java | 21 +++ .../yugabytedb/YBExtractNewRecordState.java | 17 -- .../yugabytedb/PGCompatibleTest.java | 154 ++++++++++++++++++ 4 files changed, 300 insertions(+), 17 deletions(-) create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/PGCompatible.java create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/SchemaUtil.java create mode 100644 debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/yugabytedb/PGCompatibleTest.java diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/PGCompatible.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/PGCompatible.java new file mode 100644 index 00000000000..5c46dfcfc25 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/PGCompatible.java @@ -0,0 +1,125 @@ +package io.debezium.connector.postgresql.transforms.yugabytedb; + +import java.util.Map; +import java.util.Objects; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.Transformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Custom extractor for YugabyteDB to be used to convert the output of yboutput/CHANGE to the + * standard structure emitted by Debezium Connector for Postgres; this will be used + * to transform records from the format {@code fieldName:{value:"someValue",set:true}} + * to {@code fieldName:"someValue"} and set the columns to null which are not updated in the + * given change event. + * @param + * + * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) + */ +public class PGCompatible> implements Transformation { + private static final Logger LOGGER = LoggerFactory.getLogger(PGCompatible.class); + + @Override + public R apply(final R record) { + if (record == null || (record.value() != null && !(record.value() instanceof Struct))) { + return record; + } + + Pair p = getUpdatedValueAndSchema(record.keySchema(), (Struct) record.key()); + Schema updatedSchemaForKey = p.getFirst(); + Struct updatedValueForKey = p.getSecond(); + + Schema updatedSchemaForValue = null; + Struct updatedValueForValue = null; + if (record.value() != null) { + Pair val = getUpdatedValueAndSchema(record.valueSchema(), (Struct) record.value()); + updatedSchemaForValue = val.getFirst(); + updatedValueForValue = val.getSecond(); + } + + return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchemaForKey, updatedValueForKey, updatedSchemaForValue, updatedValueForValue, record.timestamp()); + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public void close() { + } + + private boolean isValueSetStruct(Field field) { + return field.schema().fields().size() == 2 + && (Objects.equals(field.schema().fields().get(0).name(), "value") + && Objects.equals(field.schema().fields().get(1).name(), "set")); + } + + private Schema makeUpdatedSchema(Schema schema) { + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); + + if (schema.isOptional()) { + builder.optional(); + } else { + builder.required(); + } + + for (Field field : schema.fields()) { + if (field.schema().type() == Type.STRUCT) { + if (isValueSetStruct(field)) { + builder.field(field.name(), field.schema().field("value").schema()); + } else { + builder.field(field.name(), makeUpdatedSchema(field.schema())); + } + } else { + builder.field(field.name(), field.schema()); + } + } + + return builder.build(); + } + + private Struct makeUpdatedValue(Schema updatedSchema, Struct value) { + final Struct updatedValue = new Struct(updatedSchema); + + for (Field field : value.schema().fields()) { + LOGGER.debug("Considering value {}", field.name()); + if (field.schema().type() == Type.STRUCT) { + LOGGER.debug("Value is a struct"); + Struct fieldValue = (Struct) value.get(field); + if (isValueSetStruct(field) && fieldValue != null) { + updatedValue.put(field.name(), fieldValue.get("value")); + } else if (fieldValue != null) { + updatedValue.put(field.name(), makeUpdatedValue(updatedSchema.field(field.name()).schema(), fieldValue)); + } + } else { + updatedValue.put(field.name(), value.get(field)); + } + } + + return updatedValue; + } + + public Pair getUpdatedValueAndSchema(Schema schema, Struct value) { + Schema updatedSchema = makeUpdatedSchema(schema); + Struct updatedValue = makeUpdatedValue(updatedSchema, value); + + LOGGER.trace("Updated schema as json: " + io.debezium.data.SchemaUtil.asString(updatedValue.schema())); + + return new Pair<>(updatedSchema, updatedValue); + } + + @Override + public void configure(Map map) { + + } +} + diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/SchemaUtil.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/SchemaUtil.java new file mode 100644 index 00000000000..59f3353d4b6 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/SchemaUtil.java @@ -0,0 +1,21 @@ +package io.debezium.connector.postgresql.transforms.yugabytedb; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; + +import java.util.Map; + +public class SchemaUtil { + public static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder builder) { + builder.name(source.name()); + builder.version(source.version()); + builder.doc(source.doc()); + + final Map params = source.parameters(); + if (params != null) { + builder.parameters(params); + } + + return builder; + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/YBExtractNewRecordState.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/YBExtractNewRecordState.java index 17ed72e903b..29d21387990 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/YBExtractNewRecordState.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/YBExtractNewRecordState.java @@ -104,20 +104,3 @@ private Pair getUpdatedValueAndSchema(Struct obj) { } } -class SchemaUtil { - - public static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder builder) { - builder.name(source.name()); - builder.version(source.version()); - builder.doc(source.doc()); - - final Map params = source.parameters(); - if (params != null) { - builder.parameters(params); - } - - return builder; - } - -} - diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/yugabytedb/PGCompatibleTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/yugabytedb/PGCompatibleTest.java new file mode 100644 index 00000000000..30d4d3571a1 --- /dev/null +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/yugabytedb/PGCompatibleTest.java @@ -0,0 +1,154 @@ +package io.debezium.connector.postgresql.transforms.yugabytedb; + +import io.debezium.data.Envelope; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.ADD_HEADERS; +import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.HANDLE_DELETES; + +/** + * Tests for {@link PGCompatible} + * + * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) + */ +public class PGCompatibleTest { + final Schema idSchema = SchemaBuilder.struct() + .field("value", Schema.INT64_SCHEMA) + .field("set", Schema.BOOLEAN_SCHEMA); + + final Schema nameSchema = SchemaBuilder.struct() + .field("value", Schema.OPTIONAL_STRING_SCHEMA) + .field("set", Schema.BOOLEAN_SCHEMA) + .optional(); + + final Schema keySchema = SchemaBuilder.struct() + .field("id", idSchema) + .build(); + + final Schema valueSchema = SchemaBuilder.struct() + .field("id", idSchema) + .field("name", nameSchema) + .field("location", nameSchema).optional() + .build(); + + final Schema sourceSchema = SchemaBuilder.struct() + .field("lsn", Schema.INT32_SCHEMA) + .field("ts_ms", Schema.OPTIONAL_INT32_SCHEMA) + .field("op", Schema.STRING_SCHEMA) + .build(); + + final Envelope envelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(valueSchema) + .withSource(sourceSchema) + .build(); + + private Struct createIdStruct() { + final Struct id = new Struct(idSchema); + id.put("value", 1L); + id.put("set", true); + + return id; + } + + private Struct createNameStruct() { + final Struct name = new Struct(nameSchema); + name.put("value", "yb"); + name.put("set", true); + return name; + } + + private Struct createLocationStruct() { + final Struct name = new Struct(nameSchema); + name.put("value", null); + name.put("set", false); + return name; + } + + private Struct createValue() { + final Struct value = new Struct(valueSchema); + value.put("id", createIdStruct()); + value.put("name", createNameStruct()); + value.put("location", createLocationStruct()); + + return value; + } + + @Test + public void testSingleLevelStruct() { + try (final PGCompatible transform = new PGCompatible<>()) { + final Pair unwrapped = transform.getUpdatedValueAndSchema(valueSchema, createValue()); + Assert.assertEquals(1, (long) unwrapped.getSecond().getInt64("id")); + Assert.assertEquals("yb", unwrapped.getSecond().getString("name")); + Assert.assertNull(unwrapped.getSecond().getString("location")); + } + } + + private Struct createPayload() { + final Struct source = new Struct(sourceSchema); + source.put("lsn", 1234); + source.put("ts_ms", 12836); + source.put("op", "c"); + return envelope.create(createValue(), source, Instant.now()); + } + + @Test + public void testPayload() { + try (final PGCompatible transform = new PGCompatible<>()) { + Struct payload = createPayload(); + final Pair unwrapped = transform.getUpdatedValueAndSchema(payload.schema(), payload); + Schema valueSchema = unwrapped.getFirst(); + + Assert.assertSame(valueSchema.type(), Schema.Type.STRUCT); + Assert.assertEquals(6, valueSchema.fields().size()); + Assert.assertSame(valueSchema.field("op").schema().type(), Schema.Type.STRING); + + Schema afterSchema = valueSchema.field("after").schema(); + Assert.assertSame(afterSchema.type(), Schema.Type.STRUCT); + Assert.assertEquals(3, afterSchema.fields().size()); + Assert.assertSame(afterSchema.field("id").schema().type(), Schema.Type.INT64); + Assert.assertSame(afterSchema.field("name").schema().type(), Schema.Type.STRING); + Assert.assertSame(afterSchema.field("location").schema().type(), Schema.Type.STRING); + + Struct after = unwrapped.getSecond().getStruct("after"); + Assert.assertEquals(1, (long) after.getInt64("id")); + Assert.assertEquals("yb", after.getString("name")); + } + } + + private SourceRecord createCreateRecord() { + final Struct key = new Struct(keySchema); + key.put("id", createIdStruct()); + + final Struct payload = createPayload(); + return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", keySchema, key, envelope.schema(), payload); + } + + @Test + public void testHandleCreateRewrite() { + try (final PGCompatible transform = new PGCompatible<>()) { + final Map props = new HashMap<>(); + props.put(HANDLE_DELETES.name(), "rewrite"); + props.put(ADD_HEADERS.name(), "op"); + transform.configure(props); + + final SourceRecord createRecord = createCreateRecord(); + final SourceRecord unwrapped = transform.apply(createRecord); + Struct after = ((Struct) unwrapped.value()).getStruct("after"); + Assert.assertEquals(1, (long) ((Struct) unwrapped.value()).getStruct("after").getInt64("id")); + Assert.assertEquals("yb", ((Struct) unwrapped.value()).getStruct("after").getString("name")); + + Assert.assertEquals("c", ((Struct) unwrapped.value()).getString("op")); + } + } +}