Skip to content

Commit

Permalink
Make consumers serializable and reduce duplicate code
Browse files Browse the repository at this point in the history
  • Loading branch information
SoerenHenning committed Jan 31, 2024
1 parent f2f5be5 commit 54d882d
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.dynatrace.research.shufflebench.consumer;

import com.dynatrace.research.shufflebench.record.TimestampedRecord;

import java.io.Serializable;
import java.util.function.Supplier;

public class SerializableStatefulConsumer implements StatefulConsumer, Serializable {

private transient StatefulConsumer consumer;

private final SerializableStatefulConsumer.SerializableSupplier consumerFactory;

public SerializableStatefulConsumer(SerializableStatefulConsumer.SerializableSupplier consumerFactory) {
this.consumerFactory = consumerFactory;
this.buildConsumerIfAbsent();
}

@Override
public ConsumerResult accept(TimestampedRecord record, State state) {
this.buildConsumerIfAbsent();
return this.consumer.accept(record, state);
}

private void buildConsumerIfAbsent() {
if (this.consumer == null) {
this.consumer = this.consumerFactory.get();
}
}

public interface SerializableSupplier extends Supplier<StatefulConsumer>, Serializable {
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.dynatrace.research.shufflebench;
package com.dynatrace.research.shufflebench.matcher;

import com.dynatrace.research.shufflebench.matcher.MatcherService;
import com.dynatrace.research.shufflebench.matcher.MatchingRule;
Expand All @@ -15,11 +15,11 @@ public class SerializableMatcherService<T extends Record> implements MatcherServ

private transient MatcherService<T> matcherService;

private final SerializableSupplier<MatcherService<T>> matcherServiceFactory;
private final SerializableSupplier<T> matcherServiceFactory;

public SerializableMatcherService(SerializableSupplier<MatcherService<T>> matcherServiceFactory) {
public SerializableMatcherService(SerializableSupplier<T> matcherServiceFactory) {
this.matcherServiceFactory = matcherServiceFactory;
this.matcherService = this.matcherServiceFactory.get();
this.buildMatcherServiceIfAbsent();
}

@Override
Expand All @@ -34,7 +34,7 @@ public boolean removeMatchingRule(String id) {

@Override
public Collection<Map.Entry<String, T>> match(T record) {
buildMatcherServiceIfAbsent();
this.buildMatcherServiceIfAbsent();
return this.matcherService.match(record);
}

Expand All @@ -44,7 +44,7 @@ private void buildMatcherServiceIfAbsent() {
}
}

public interface SerializableSupplier<T> extends Supplier<T>, Serializable {
public interface SerializableSupplier<T extends Record> extends Supplier<MatcherService<T>>, Serializable {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.dynatrace.research.shufflebench.consumer.*;
import com.dynatrace.research.shufflebench.matcher.MatcherService;
import com.dynatrace.research.shufflebench.matcher.SerializableMatcherService;
import com.dynatrace.research.shufflebench.matcher.SimpleMatcherService;
import com.dynatrace.research.shufflebench.record.*;
import io.smallrye.config.SmallRyeConfig;
Expand Down Expand Up @@ -70,7 +71,8 @@ public FlinkShuffle() {
final boolean initCountRandom = config.getValue("consumer.init.count.random", Boolean.class);
final long initCountSeed = config.getValue("consumer.init.count.seed", Long.class);

final StatefulConsumer consumer = new AdvancedStateConsumer("counter", outputRate, stateSizeBytes, initCountRandom, initCountSeed);
final StatefulConsumer consumer = new SerializableStatefulConsumer(
() -> new AdvancedStateConsumer("counter", outputRate, stateSizeBytes, initCountRandom, initCountSeed));

this.env = StreamExecutionEnvironment.getExecutionEnvironment();

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.dynatrace.research.shufflebench.consumer.*;
import com.dynatrace.research.shufflebench.matcher.MatcherService;
import com.dynatrace.research.shufflebench.matcher.SerializableMatcherService;
import com.dynatrace.research.shufflebench.matcher.SimpleMatcherService;
import com.dynatrace.research.shufflebench.record.TimestampedRecord;
import io.smallrye.config.SmallRyeConfig;
Expand Down Expand Up @@ -76,7 +77,8 @@ public static void main(String[] args) throws StreamingQueryException, TimeoutEx
final boolean initCountRandom = config.getValue("consumer.init.count.random", Boolean.class);
final long initCountSeed = config.getValue("consumer.init.count.seed", Long.class);

final StatefulConsumer consumer = new AdvancedStateConsumer("counter", outputRate, stateSizeBytes, initCountRandom, initCountSeed);
final StatefulConsumer consumer = new SerializableStatefulConsumer(
() -> new AdvancedStateConsumer("counter", outputRate, stateSizeBytes, initCountRandom, initCountSeed));

Dataset<Tuple2<String, TimestampedRecord>> streamingWithKeys = kafkaStream
.as(Encoders.tuple(Encoders.BINARY(), Encoders.TIMESTAMP()))
Expand Down

0 comments on commit 54d882d

Please sign in to comment.