Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Upgrade sink connector to new API version #205

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.connector.flink.sink;

import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.sink.writer.AppendSinkWriter;
import com.alibaba.fluss.connector.flink.sink.writer.FlinkSinkWriter;
import com.alibaba.fluss.connector.flink.sink.writer.UpsertSinkWriter;
import com.alibaba.fluss.metadata.TablePath;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;

/** Flink sink for Fluss. */
class FlinkSink implements Sink<RowData> {

private static final long serialVersionUID = 1L;

private final SinkWriterBuilder<? extends FlinkSinkWriter> builder;

FlinkSink(SinkWriterBuilder<? extends FlinkSinkWriter> builder) {
this.builder = builder;
}

@Deprecated
@Override
public SinkWriter<RowData> createWriter(InitContext context) throws IOException {
throw new UnsupportedOperationException(
"Not supported. Use FlinkSink#createWriter(WriterInitContext context)");
}

@Override
public SinkWriter<RowData> createWriter(WriterInitContext context) throws IOException {
FlinkSinkWriter flinkSinkWriter = builder.createWriter();
flinkSinkWriter.initialize(context);
return flinkSinkWriter;
}

@Internal
interface SinkWriterBuilder<W extends FlinkSinkWriter> extends Serializable {
W createWriter();
}

@Internal
static class AppendSinkWriterBuilder implements SinkWriterBuilder<AppendSinkWriter> {

private static final long serialVersionUID = 1L;

private final TablePath tablePath;
private final Configuration flussConfig;
private final RowType tableRowType;

public AppendSinkWriterBuilder(
TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
}

@Override
public AppendSinkWriter createWriter() {
return new AppendSinkWriter(tablePath, flussConfig, tableRowType);
}
}

@Internal
static class UpsertSinkWriterBuilder implements SinkWriterBuilder<UpsertSinkWriter> {

private static final long serialVersionUID = 1L;

private final TablePath tablePath;
private final Configuration flussConfig;
private final RowType tableRowType;
private final @Nullable int[] targetColumnIndexes;

UpsertSinkWriterBuilder(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
@Nullable int[] targetColumnIndexes) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.targetColumnIndexes = targetColumnIndexes;
}

@Override
public UpsertSinkWriter createWriter() {
return new UpsertSinkWriter(tablePath, flussConfig, tableRowType, targetColumnIndexes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.fluss.connector.flink.sink;

import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.sink.writer.FlinkSinkWriter;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion;
Expand All @@ -28,7 +29,7 @@
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.RowLevelModificationScanContext;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
Expand Down Expand Up @@ -144,13 +145,16 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
}
}

FlinkSinkFunction sinkFunction =
primaryKeyIndexes.length > 0
? new UpsertSinkFunction(
FlinkSink.SinkWriterBuilder<? extends FlinkSinkWriter> flinkSinkWriterBuilder =
(primaryKeyIndexes.length > 0)
? new FlinkSink.UpsertSinkWriterBuilder(
tablePath, flussConfig, tableRowType, targetColumnIndexes)
: new AppendSinkFunction(tablePath, flussConfig, tableRowType);
: new FlinkSink.AppendSinkWriterBuilder(
tablePath, flussConfig, tableRowType);

return SinkFunctionProvider.of(sinkFunction);
FlinkSink flinkSink = new FlinkSink(flinkSinkWriterBuilder);

return SinkV2Provider.of(flinkSink);
}

private List<String> columns(int[] columnIndexes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,33 @@
* limitations under the License.
*/

package com.alibaba.fluss.connector.flink.sink;
package com.alibaba.fluss.connector.flink.sink.writer;

import com.alibaba.fluss.client.table.writer.AppendWriter;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;

import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/** An append only sink for fluss log table. */
class AppendSinkFunction extends FlinkSinkFunction {

private static final long serialVersionUID = 1L;
/** An append only sink writer for fluss log table. */
public class AppendSinkWriter extends FlinkSinkWriter {

private transient AppendWriter appendWriter;

AppendSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
public AppendSinkWriter(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
super(tablePath, flussConfig, tableRowType);
}

@Override
public void open(org.apache.flink.configuration.Configuration config) {
super.open(config);
public void initialize(WriterInitContext context) {
super.initialize(context);
appendWriter = table.getAppendWriter();
LOG.info("Finished opening Fluss {}.", this.getClass().getSimpleName());
}
Expand All @@ -51,19 +50,14 @@ CompletableFuture<Void> writeRow(RowKind rowKind, InternalRow internalRow) {
return appendWriter.append(internalRow);
}

@Override
void flush() throws IOException {
appendWriter.flush();
checkAsyncException();
}

@Override
FlinkRowToFlussRowConverter createFlinkRowToFlussRowConverter() {
return FlinkRowToFlussRowConverter.create(tableRowType);
}

@Override
public void close() throws Exception {
super.close();
public void flush(boolean endOfInput) throws IOException {
appendWriter.flush();
checkAsyncException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.alibaba.fluss.connector.flink.sink;
package com.alibaba.fluss.connector.flink.sink.writer;

import com.alibaba.fluss.client.Connection;
import com.alibaba.fluss.client.ConnectionFactory;
Expand All @@ -30,14 +30,11 @@
import com.alibaba.fluss.metrics.MetricNames;
import com.alibaba.fluss.row.InternalRow;

import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
Expand All @@ -48,16 +45,13 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;

/** Flink's {@link SinkFunction} implementation for Fluss. */
abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
implements CheckpointedFunction, Serializable {
/** Base class for Flink {@link SinkWriter} implementations in Fluss. */
public abstract class FlinkSinkWriter implements SinkWriter<RowData> {

private static final long serialVersionUID = 1L;
protected static final Logger LOG = LoggerFactory.getLogger(FlinkSinkFunction.class);
protected static final Logger LOG = LoggerFactory.getLogger(FlinkSinkWriter.class);

private final TablePath tablePath;
private final Configuration flussConfig;
Expand All @@ -75,11 +69,11 @@ abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
private transient Counter numRecordsOutErrorsCounter;
private volatile Throwable asyncWriterException;

public FlinkSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
public FlinkSinkWriter(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
this(tablePath, flussConfig, tableRowType, null);
}

public FlinkSinkFunction(
public FlinkSinkWriter(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
Expand All @@ -90,14 +84,13 @@ public FlinkSinkFunction(
this.tableRowType = tableRowType;
}

@Override
public void open(org.apache.flink.configuration.Configuration config) {
public void initialize(WriterInitContext context) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming this function initialize instead of open is intentional to avoid ambiguities with old Flink API

LOG.info(
"Opening Fluss {}, database: {} and table: {}",
this.getClass().getSimpleName(),
tablePath.getDatabaseName(),
tablePath.getTableName());
metricGroup = InternalSinkWriterMetricGroup.wrap(getRuntimeContext().getMetricGroup());
metricGroup = InternalSinkWriterMetricGroup.wrap(context.metricGroup());
flinkMetricRegistry =
new FlinkMetricRegistry(
metricGroup, Collections.singleton(MetricNames.WRITER_SEND_LATENCY_MS));
Expand All @@ -115,7 +108,7 @@ protected void initMetrics() {
}

@Override
public void invoke(RowData value, SinkFunction.Context context) throws IOException {
public void write(RowData value, Context context) throws IOException, InterruptedException {
checkAsyncException();
InternalRow internalRow = dataConverter.toInternalRow(value);
CompletableFuture<Void> writeFuture = writeRow(value.getRowKind(), internalRow);
Expand All @@ -131,28 +124,14 @@ public void invoke(RowData value, SinkFunction.Context context) throws IOExcepti
}

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws IOException {
flush();
}

@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) {}

@Override
public void finish() throws IOException {
flush();
}

abstract void flush() throws IOException;
public abstract void flush(boolean endOfInput) throws IOException, InterruptedException;

abstract FlinkRowToFlussRowConverter createFlinkRowToFlussRowConverter();

abstract CompletableFuture<Void> writeRow(RowKind rowKind, InternalRow internalRow);

@Override
public void close() throws Exception {
super.close();

try {
if (table != null) {
table.close();
Expand Down Expand Up @@ -193,7 +172,7 @@ public void close() throws Exception {

private void sanityCheck(TableDescriptor flussTableDescriptor) {
// when it's UpsertSinkFunction, it means it has primary key got from Flink's metadata
boolean hasPrimaryKey = this instanceof UpsertSinkFunction;
boolean hasPrimaryKey = this instanceof UpsertSinkWriter;
if (flussTableDescriptor.hasPrimaryKey() != hasPrimaryKey) {
throw new ValidationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.alibaba.fluss.connector.flink.sink;
package com.alibaba.fluss.connector.flink.sink.writer;

import com.alibaba.fluss.client.table.writer.UpsertWrite;
import com.alibaba.fluss.client.table.writer.UpsertWriter;
Expand All @@ -23,6 +23,7 @@
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;

import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

Expand All @@ -31,14 +32,12 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/** A upsert sink for fluss primary key table. */
class UpsertSinkFunction extends FlinkSinkFunction {

private static final long serialVersionUID = 1L;
/** An upsert sink writer or fluss primary key table. */
public class UpsertSinkWriter extends FlinkSinkWriter {

private transient UpsertWriter upsertWriter;

UpsertSinkFunction(
public UpsertSinkWriter(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
Expand All @@ -47,8 +46,8 @@ class UpsertSinkFunction extends FlinkSinkFunction {
}

@Override
public void open(org.apache.flink.configuration.Configuration config) {
super.open(config);
public void initialize(WriterInitContext context) {
super.initialize(context);
UpsertWrite upsertOptions = new UpsertWrite();
if (targetColumnIndexes != null) {
upsertOptions = upsertOptions.withPartialUpdate(targetColumnIndexes);
Expand All @@ -75,7 +74,7 @@ FlinkRowToFlussRowConverter createFlinkRowToFlussRowConverter() {
}

@Override
void flush() throws IOException {
public void flush(boolean endOfInput) throws IOException, InterruptedException {
upsertWriter.flush();
checkAsyncException();
}
Expand Down
Loading
Loading