Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DBZ-PGYB] Add transformer PGCompatible to emit events with standard structure #168

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package io.debezium.connector.postgresql.transforms.yugabytedb;

import java.util.Map;
import java.util.Objects;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Custom extractor for YugabyteDB to be used to convert the output of yboutput/CHANGE to the
* standard structure emitted by Debezium Connector for Postgres; this will be used
* to transform records from the format {@code fieldName:{value:"someValue",set:true}}
* to {@code fieldName:"someValue"} and set the columns to null which are not updated in the
* given change event.
* @param <R>
*
* @author Vaibhav Kushwaha ([email protected])
*/
public class PGCompatible<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(PGCompatible.class);

@Override
public R apply(final R record) {
if (record == null || (record.value() != null && !(record.value() instanceof Struct))) {
return record;
}

Pair<Schema, Struct> p = getUpdatedValueAndSchema(record.keySchema(), (Struct) record.key());
Schema updatedSchemaForKey = p.getFirst();
Struct updatedValueForKey = p.getSecond();

Schema updatedSchemaForValue = null;
Struct updatedValueForValue = null;
if (record.value() != null) {
Pair<Schema, Struct> val = getUpdatedValueAndSchema(record.valueSchema(), (Struct) record.value());
updatedSchemaForValue = val.getFirst();
updatedValueForValue = val.getSecond();
}

return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchemaForKey, updatedValueForKey, updatedSchemaForValue, updatedValueForValue, record.timestamp());
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {
}

private boolean isValueSetStruct(Field field) {
return field.schema().fields().size() == 2
&& (Objects.equals(field.schema().fields().get(0).name(), "value")
&& Objects.equals(field.schema().fields().get(1).name(), "set"));
}

private Schema makeUpdatedSchema(Schema schema) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());

if (schema.isOptional()) {
builder.optional();
} else {
builder.required();
}

for (Field field : schema.fields()) {
if (field.schema().type() == Type.STRUCT) {
if (isValueSetStruct(field)) {
builder.field(field.name(), field.schema().field("value").schema());
} else {
builder.field(field.name(), makeUpdatedSchema(field.schema()));
}
} else {
builder.field(field.name(), field.schema());
}
}

return builder.build();
}

private Struct makeUpdatedValue(Schema updatedSchema, Struct value) {
final Struct updatedValue = new Struct(updatedSchema);

for (Field field : value.schema().fields()) {
LOGGER.debug("Considering value {}", field.name());
if (field.schema().type() == Type.STRUCT) {
LOGGER.debug("Value is a struct");
Struct fieldValue = (Struct) value.get(field);
if (isValueSetStruct(field) && fieldValue != null) {
updatedValue.put(field.name(), fieldValue.get("value"));
} else if (fieldValue != null) {
updatedValue.put(field.name(), makeUpdatedValue(updatedSchema.field(field.name()).schema(), fieldValue));
}
} else {
updatedValue.put(field.name(), value.get(field));
}
}

return updatedValue;
}

public Pair<Schema, Struct> getUpdatedValueAndSchema(Schema schema, Struct value) {
Schema updatedSchema = makeUpdatedSchema(schema);
Struct updatedValue = makeUpdatedValue(updatedSchema, value);

LOGGER.trace("Updated schema as json: " + io.debezium.data.SchemaUtil.asString(updatedValue.schema()));

return new Pair<>(updatedSchema, updatedValue);
}

@Override
public void configure(Map<String, ?> map) {

}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.debezium.connector.postgresql.transforms.yugabytedb;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.util.Map;

public class SchemaUtil {
public static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder builder) {
builder.name(source.name());
builder.version(source.version());
builder.doc(source.doc());

final Map<String, String> params = source.parameters();
if (params != null) {
builder.parameters(params);
}

return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,3 @@ private Pair<Schema, Struct> getUpdatedValueAndSchema(Struct obj) {
}
}

class SchemaUtil {

public static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder builder) {
builder.name(source.name());
builder.version(source.version());
builder.doc(source.doc());

final Map<String, String> params = source.parameters();
if (params != null) {
builder.parameters(params);
}

return builder;
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package io.debezium.connector.postgresql.transforms.yugabytedb;

import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Test;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.ADD_HEADERS;
import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.HANDLE_DELETES;

/**
* Tests for {@link PGCompatible}
*
* @author Vaibhav Kushwaha ([email protected])
*/
public class PGCompatibleTest {
final Schema idSchema = SchemaBuilder.struct()
.field("value", Schema.INT64_SCHEMA)
.field("set", Schema.BOOLEAN_SCHEMA);

final Schema nameSchema = SchemaBuilder.struct()
.field("value", Schema.OPTIONAL_STRING_SCHEMA)
.field("set", Schema.BOOLEAN_SCHEMA)
.optional();

final Schema keySchema = SchemaBuilder.struct()
.field("id", idSchema)
.build();

final Schema valueSchema = SchemaBuilder.struct()
.field("id", idSchema)
.field("name", nameSchema)
.field("location", nameSchema).optional()
.build();

final Schema sourceSchema = SchemaBuilder.struct()
.field("lsn", Schema.INT32_SCHEMA)
.field("ts_ms", Schema.OPTIONAL_INT32_SCHEMA)
.field("op", Schema.STRING_SCHEMA)
.build();

final Envelope envelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(valueSchema)
.withSource(sourceSchema)
.build();

private Struct createIdStruct() {
final Struct id = new Struct(idSchema);
id.put("value", 1L);
id.put("set", true);

return id;
}

private Struct createNameStruct() {
final Struct name = new Struct(nameSchema);
name.put("value", "yb");
name.put("set", true);
return name;
}

private Struct createLocationStruct() {
final Struct name = new Struct(nameSchema);
name.put("value", null);
name.put("set", false);
return name;
}

private Struct createValue() {
final Struct value = new Struct(valueSchema);
value.put("id", createIdStruct());
value.put("name", createNameStruct());
value.put("location", createLocationStruct());

return value;
}

@Test
public void testSingleLevelStruct() {
try (final PGCompatible<SourceRecord> transform = new PGCompatible<>()) {
final Pair<Schema, Struct> unwrapped = transform.getUpdatedValueAndSchema(valueSchema, createValue());
Assert.assertEquals(1, (long) unwrapped.getSecond().getInt64("id"));
Assert.assertEquals("yb", unwrapped.getSecond().getString("name"));
Assert.assertNull(unwrapped.getSecond().getString("location"));
}
}

private Struct createPayload() {
final Struct source = new Struct(sourceSchema);
source.put("lsn", 1234);
source.put("ts_ms", 12836);
source.put("op", "c");
return envelope.create(createValue(), source, Instant.now());
}

@Test
public void testPayload() {
try (final PGCompatible<SourceRecord> transform = new PGCompatible<>()) {
Struct payload = createPayload();
final Pair<Schema, Struct> unwrapped = transform.getUpdatedValueAndSchema(payload.schema(), payload);
Schema valueSchema = unwrapped.getFirst();

Assert.assertSame(valueSchema.type(), Schema.Type.STRUCT);
Assert.assertEquals(6, valueSchema.fields().size());
Assert.assertSame(valueSchema.field("op").schema().type(), Schema.Type.STRING);

Schema afterSchema = valueSchema.field("after").schema();
Assert.assertSame(afterSchema.type(), Schema.Type.STRUCT);
Assert.assertEquals(3, afterSchema.fields().size());
Assert.assertSame(afterSchema.field("id").schema().type(), Schema.Type.INT64);
Assert.assertSame(afterSchema.field("name").schema().type(), Schema.Type.STRING);
Assert.assertSame(afterSchema.field("location").schema().type(), Schema.Type.STRING);

Struct after = unwrapped.getSecond().getStruct("after");
Assert.assertEquals(1, (long) after.getInt64("id"));
Assert.assertEquals("yb", after.getString("name"));
}
}

private SourceRecord createCreateRecord() {
final Struct key = new Struct(keySchema);
key.put("id", createIdStruct());

final Struct payload = createPayload();
return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", keySchema, key, envelope.schema(), payload);
}

@Test
public void testHandleCreateRewrite() {
try (final PGCompatible<SourceRecord> transform = new PGCompatible<>()) {
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES.name(), "rewrite");
props.put(ADD_HEADERS.name(), "op");
transform.configure(props);

final SourceRecord createRecord = createCreateRecord();
final SourceRecord unwrapped = transform.apply(createRecord);
Struct after = ((Struct) unwrapped.value()).getStruct("after");
Assert.assertEquals(1, (long) ((Struct) unwrapped.value()).getStruct("after").getInt64("id"));
Assert.assertEquals("yb", ((Struct) unwrapped.value()).getStruct("after").getString("name"));

Assert.assertEquals("c", ((Struct) unwrapped.value()).getString("op"));
}
}
}
Loading