Skip to content

Commit

Permalink
MINOR Fix checkstyle failures in streams/examples module. (#13055)
Browse files Browse the repository at this point in the history
MINOR Fix checkstyle failures in streams/examples module. (#13055)
  • Loading branch information
a0x8o committed Dec 29, 2022
1 parent c34fc9d commit 54eff6a
Show file tree
Hide file tree
Showing 13 changed files with 139 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class ReplicaFetcherThread(name: String,
topicPartition, fetchOffset, log.logEndOffset))

if (logTrace)
trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
trace("Follower has replica log end offset %d for partition %s. Received %d bytes of messages and leader hw %d"
.format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))

// Append the leader's messages to the log
Expand Down
22 changes: 14 additions & 8 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ object TestUtils extends Logging {
config.setProperty(KafkaConfig.LogMessageFormatVersionProp, version.version)
}

def createAdminClient[B <: KafkaBroker](
def createAdminClient[B <: KafkaBroker](
brokers: Seq[B],
listenerName: ListenerName,
adminConfig: Properties = new Properties
Expand Down Expand Up @@ -424,7 +424,7 @@ object TestUtils extends Logging {
}

result.topicId(topic).get()
}
}

def createTopicWithAdmin[B <: KafkaBroker](
admin: Admin,
Expand All @@ -441,6 +441,14 @@ object TestUtils extends Logging {
replicaAssignment.size
}

def isTopicExistsAndHasSameNumPartitionsAndReplicationFactor(cause: Throwable): Boolean = {
cause != null &&
cause.isInstanceOf[TopicExistsException] &&
// wait until all partitions metadata are propagated before verifying partitions number and replication factor
!waitForAllPartitionsMetadata(brokers, topic, effectiveNumPartitions).isEmpty &&
topicHasSameNumPartitionsAndReplicationFactor(admin, topic, effectiveNumPartitions, replicationFactor)
}

try {
createTopicWithAdminRaw(
admin,
Expand All @@ -451,12 +459,10 @@ object TestUtils extends Logging {
topicConfig
)
} catch {
case e: ExecutionException => if (!(e.getCause != null &&
e.getCause.isInstanceOf[TopicExistsException] &&
topicHasSameNumPartitionsAndReplicationFactor(admin, topic,
effectiveNumPartitions, replicationFactor))) {
throw e
}
case e: ExecutionException =>
if (!isTopicExistsAndHasSameNumPartitionsAndReplicationFactor(e.getCause)) {
throw e
}
}

// wait until we've propagated all partitions metadata to all brokers
Expand Down
4 changes: 2 additions & 2 deletions docs/streams/architecture.html
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ <h3 class="anchor-heading"><a id="streams_architecture_tasks" class="anchor-link
<p>
Slightly simplified, the maximum parallelism at which your application may run is bounded by the maximum number of stream tasks, which itself is determined by
maximum number of partitions of the input topic(s) the application is reading from. For example, if your input topic has 5 partitions, then you can run up to 5
applications instances. These instances will collaboratively process the topics data. If you run a larger number of app instances than partitions of the input
topic, the excess app instances will launch but remain idle; however, if one of the busy instances goes down, one of the idle instances will resume the formers
applications instances. These instances will collaboratively process the topic's data. If you run a larger number of app instances than partitions of the input
topic, the "excess" app instances will launch but remain idle; however, if one of the busy instances goes down, one of the idle instances will resume the former's
work.
</p>

Expand Down
2 changes: 1 addition & 1 deletion docs/streams/developer-guide/dsl-api.html
Original file line number Diff line number Diff line change
Expand Up @@ -2567,7 +2567,7 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key
Function&lt;Long, Long&gt; foreignKeyExtractor = (x) -&gt; x;

// Java 8+ example, using lambda expressions
KTable&lt;String, String&gt; joined = left.join(right, foreignKeyExtractor,
KTable&lt;String, String&gt; joined = left.leftJoin(right, foreignKeyExtractor,
(leftValue, rightValue) -&gt; &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue /* ValueJoiner */
);</code></pre>
<p>Detailed behavior:</p>
Expand Down
12 changes: 12 additions & 0 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ <h1>Upgrade Guide and API Changes</h1>
<li> update your code and swap old code and jar file with new code and new jar file </li>
<li> restart all new ({{fullDotVersion}}) application instances </li>
</ul>
<p>
Note: The cooperative rebalancing protocol has been the default since 2.4, but we have continued to support the
eager rebalancing protocol to provide users an upgrade path. This support will be dropped in a future release,
so any users still on the eager protocol should prepare to finish upgrading their applications to the cooperative protocol in version 3.1.
This only affects users who are still on a version older than 2.4, and users who have upgraded already but have not yet
removed the <code>upgrade.from</code> config that they set when upgrading from a version below 2.4.
Users fitting into the latter case will simply need to unset this config when upgrading beyond 3.1,
while users in the former case will need to follow a slightly different upgrade path if they attempt to upgrade from 2.3 or below to a version above 3.1.
Those applications will need to go through a bridge release, by first upgrading to a version between 2.4 - 3.1 and setting the <code>upgrade.from</code> config,
then removing that config and upgrading to the final version above 3.1. See <a href="https://issues.apache.org/jira/browse/KAFKA-8575">KAFKA-8575</a>
for more details.
</p>

<h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"></a><a href="#streams_notable_changes">Notable compatibility changes in past releases</a></h3>
<p>
Expand Down
10 changes: 0 additions & 10 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,6 @@ <h5><a id="upgrade_310_notable" href="#upgrade_310_notable">Notable changes in 3
and <code>iotime-total</code>. Please use <code>bufferpool-wait-time-ns-total</code>, <code>io-wait-time-ns-total</code>,
and <code>io-time-ns-total</code> instead. See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-773%3A+Differentiate+consistently+metric+latency+measured+in+millis+and+nanos">KIP-773</a>
for more details.</li>
<li>The cooperative rebalancing protocol has been the default since 2.4, but we have continued to support the
eager rebalancing protocol to provide users an upgrade path. This support will be dropped in a future release,
so any users still on the eager protocol should prepare to finish upgrading their applications to the cooperative protocol in version 3.1.
This only affects users who are still on a version older than 2.4, and users who have upgraded already but have not yet
removed the <code>upgrade.from</code> config that they set when upgrading from a version below 2.4.
Users fitting into the latter case will simply need to unset this config when upgrading beyond 3.1,
while users in the former case will need to follow a slightly different upgrade path if they attempt to upgrade from 2.3 or below to a version above 3.1.
Those applications will need to go through a bridge release, by first upgrading to a version between 2.4 - 3.1 and setting the <code>upgrade.from</code> config,
then removing that config and upgrading to the final version above 3.1. See <a href="https://issues.apache.org/jira/browse/KAFKA-8575">KAFKA-8575</a>
for more details.</li>
<li>IBP 3.1 introduces topic IDs to FetchRequest as a part of
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers">KIP-516</a>.</li>
</ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* <p></p>
* <pre>
* [data_frame_version header message]
* header => [api_key version]
* header =&gt; [api_key version]
*
* data_frame_version : This is the header version, current value is 0. Header includes both api_key and version.
* api_key : apiKey of {@code ApiMessageAndVersion} object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
Expand All @@ -45,7 +42,7 @@
import java.util.concurrent.CountDownLatch;

/**
* Demonstrates, using a {@link Transformer} which combines the low-level Processor APIs with the high-level Kafka Streams DSL,
* Demonstrates, using a {@link Processor} implementing the low-level Processor APIs (replaces Transformer),
* how to implement the WordCount program that computes a simple word occurrence histogram from an input text.
* <p>
* <strong>Note: This is simplified code that only works correctly for single partition input topics.
Expand All @@ -55,10 +52,9 @@
* represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
* is an updated count of a single word.
* <p>
* This example differs from {@link WordCountProcessorDemo} in that it uses a {@link Transformer} to define the word
* count logic, and the topology is wired up through a {@link StreamsBuilder}, which more closely resembles the high-level DSL.
* Additionally, the {@link TransformerSupplier} specifies the {@link StoreBuilder} that the {@link Transformer} needs
* by implementing {@link ConnectedStoreProvider#stores()}.
* This example differs from {@link WordCountProcessorDemo} in that it uses a {@link ProcessorSupplier} to attach the Processor with the
* count logic to the Stream, and the topology is wired up through a {@link StreamsBuilder},
* which more closely resembles the high-level DSL (compared to the Topology builder approach, with Source, Processor, Sink).
* <p>
* Before running this example you must create the input topic and the output topic (e.g. via
* {@code bin/kafka-topics.sh --create ...}), and write some data to the input topic (e.g. via
Expand Down Expand Up @@ -164,4 +160,4 @@ public void run() {
}
System.exit(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? ex
* For example, you can compute the new key as the length of the value string.
* <pre>{@code
* KTable<String, String> table = builder.table("topic");
* KTable<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> {
* KStream<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> {
* Integer apply(String key, String value) {
* return value.length();
* }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,10 @@ private void emitNonJoinedOuterRecords(
if (internalProcessorContext.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) {
return;
}
if (sharedTimeTracker.nextTimeToEmit == 0) {
sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs();
}

// Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, if we dont set it everytime,
// they can get out of sync during a clock drift
sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs();
sharedTimeTracker.advanceNextTimeToEmit();

// reset to MAX_VALUE in case the store is empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
Expand All @@ -31,26 +32,49 @@
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.GenericInMemoryKeyValueStore;
import org.junit.Test;
import org.mockito.Mockito;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static java.time.Duration.ofHours;
import static java.time.Duration.ofMillis;
Expand Down Expand Up @@ -333,6 +357,77 @@ public void shouldJoinWithCustomStoreSuppliers() {
runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
}

@Test
public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {
/**
* This test is testing something internal to [[KStreamKStreamJoin]], so we had to setup low-level api manually.
*/
final KStreamImplJoin.TimeTracker tracker = new KStreamImplJoin.TimeTracker();
final KStreamKStreamJoin<String, String, String, String> join = new KStreamKStreamJoin<>(
false,
"other",
new JoinWindowsInternal(JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(1000))),
(key, v1, v2) -> v1 + v2,
true,
Optional.of("outer"),
tracker);
final Processor<String, String, String, String> joinProcessor = join.get();
final MockInternalNewProcessorContext<String, String> procCtx = new MockInternalNewProcessorContext<>();
final WindowStore<String, String> otherStore = new WindowStoreBuilder<>(
new InMemoryWindowBytesStoreSupplier(
"other",
1000L,
100,
false),
Serdes.String(),
Serdes.String(),
new MockTime()).build();

final KeyValueStore<TimestampedKeyAndJoinSide<String>, LeftOrRightValue<String, String>> outerStore = Mockito.spy(
new KeyValueStoreBuilder<>(
new InMemoryKeyValueBytesStoreSupplier("outer"),
new TimestampedKeyAndJoinSideSerde<>(Serdes.String()),
new LeftOrRightValueSerde<>(Serdes.String(), Serdes.String()),
new MockTime()
).build());

final GenericInMemoryKeyValueStore<String, String> rootStore = new GenericInMemoryKeyValueStore<>("root");

otherStore.init((StateStoreContext) procCtx, rootStore);
procCtx.addStateStore(otherStore);

outerStore.init((StateStoreContext) procCtx, rootStore);
procCtx.addStateStore(outerStore);

joinProcessor.init(procCtx);

final Record<String, String> record1 = new Record<>("key1", "value1", 10000L);
final Record<String, String> record2 = new Record<>("key2", "value2", 13000L);
final Record<String, String> record3 = new Record<>("key3", "value3", 15000L);
final Record<String, String> record4 = new Record<>("key4", "value4", 17000L);

procCtx.setSystemTimeMs(1000L);
joinProcessor.process(record1);

procCtx.setSystemTimeMs(2100L);
joinProcessor.process(record2);

procCtx.setSystemTimeMs(2500L);
joinProcessor.process(record3);
// being throttled, so the older value still exists
assertEquals(2, iteratorToList(outerStore.all()).size());

procCtx.setSystemTimeMs(4000L);
joinProcessor.process(record4);
assertEquals(1, iteratorToList(outerStore.all()).size());
}

private <T> List<T> iteratorToList(final Iterator<T> iterator) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.collect(Collectors.toList());
}

private void runJoin(final StreamJoined<String, Integer, Integer> streamJoined,
final JoinWindows joinWindows) {
final StreamsBuilder builder = new StreamsBuilder();
Expand Down Expand Up @@ -1808,7 +1903,7 @@ private WindowBytesStoreSupplier buildWindowBytesStoreSupplier(final String name
" <-- second-join-this-join, second-join-other-join\n" +
" Sink: KSTREAM-SINK-0000000021 (topic: out-two)\n" +
" <-- second-join-merge\n\n";

private final String expectedTopologyWithGeneratedRepartitionTopic = "Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ object Serdes {
implicit def JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
implicit def UUID: Serde[util.UUID] = JSerdes.UUID()

implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.TimeWindowedSerde[T] =
new WindowedSerdes.TimeWindowedSerde[T](tSerde)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* <p>
* The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance.
* Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.
* The topic will be named as "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* The topic will be named as "\${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* `StreamsConfig` via parameter `APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG`,
* "&lt;name&gt;" is either provided via `Repartitioned#as(String)` or an internally
* generated name, and "-repartition" is a fixed suffix.
Expand Down

0 comments on commit 54eff6a

Please sign in to comment.