diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index fdaef89cf..d96d91369 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -126,7 +126,7 @@ class ReplicaFetcherThread(name: String, topicPartition, fetchOffset, log.logEndOffset)) if (logTrace) - trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d" + trace("Follower has replica log end offset %d for partition %s. Received %d bytes of messages and leader hw %d" .format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) // Append the leader's messages to the log diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 5d1abea0e..8718ff685 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -385,7 +385,7 @@ object TestUtils extends Logging { config.setProperty(KafkaConfig.LogMessageFormatVersionProp, version.version) } - def createAdminClient[B <: KafkaBroker]( + def createAdminClient[B <: KafkaBroker]( brokers: Seq[B], listenerName: ListenerName, adminConfig: Properties = new Properties @@ -424,7 +424,7 @@ object TestUtils extends Logging { } result.topicId(topic).get() -} + } def createTopicWithAdmin[B <: KafkaBroker]( admin: Admin, @@ -441,6 +441,14 @@ object TestUtils extends Logging { replicaAssignment.size } + def isTopicExistsAndHasSameNumPartitionsAndReplicationFactor(cause: Throwable): Boolean = { + cause != null && + cause.isInstanceOf[TopicExistsException] && + // wait until all partitions metadata are propagated before verifying partitions number and replication factor + !waitForAllPartitionsMetadata(brokers, topic, effectiveNumPartitions).isEmpty && + topicHasSameNumPartitionsAndReplicationFactor(admin, topic, effectiveNumPartitions, replicationFactor) + } + try { createTopicWithAdminRaw( admin, @@ -451,12 +459,10 @@ object TestUtils extends Logging { topicConfig ) } catch { - case e: ExecutionException => if (!(e.getCause != null && - e.getCause.isInstanceOf[TopicExistsException] && - topicHasSameNumPartitionsAndReplicationFactor(admin, topic, - effectiveNumPartitions, replicationFactor))) { - throw e - } + case e: ExecutionException => + if (!isTopicExistsAndHasSameNumPartitionsAndReplicationFactor(e.getCause)) { + throw e + } } // wait until we've propagated all partitions metadata to all brokers diff --git a/docs/streams/architecture.html b/docs/streams/architecture.html index e561231c3..f05612961 100644 --- a/docs/streams/architecture.html +++ b/docs/streams/architecture.html @@ -68,8 +68,8 @@
Detailed behavior:
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index bba7068be..39be48975 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -51,6 +51,18 @@
+ Note: The cooperative rebalancing protocol has been the default since 2.4, but we have continued to support the
+ eager rebalancing protocol to provide users an upgrade path. This support will be dropped in a future release,
+ so any users still on the eager protocol should prepare to finish upgrading their applications to the cooperative protocol in version 3.1.
+ This only affects users who are still on a version older than 2.4, and users who have upgraded already but have not yet
+ removed the upgrade.from
config that they set when upgrading from a version below 2.4.
+ Users fitting into the latter case will simply need to unset this config when upgrading beyond 3.1,
+ while users in the former case will need to follow a slightly different upgrade path if they attempt to upgrade from 2.3 or below to a version above 3.1.
+ Those applications will need to go through a bridge release, by first upgrading to a version between 2.4 - 3.1 and setting the upgrade.from
config,
+ then removing that config and upgrading to the final version above 3.1. See KAFKA-8575
+ for more details.
+
diff --git a/docs/upgrade.html b/docs/upgrade.html index d96cddbdb..1ea6be319 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -238,16 +238,6 @@
iotime-total
. Please use bufferpool-wait-time-ns-total
, io-wait-time-ns-total
,
and io-time-ns-total
instead. See KIP-773
for more details.
- upgrade.from
config that they set when upgrading from a version below 2.4.
- Users fitting into the latter case will simply need to unset this config when upgrading beyond 3.1,
- while users in the former case will need to follow a slightly different upgrade path if they attempt to upgrade from 2.3 or below to a version above 3.1.
- Those applications will need to go through a bridge release, by first upgrading to a version between 2.4 - 3.1 and setting the upgrade.from
config,
- then removing that config and upgrading to the final version above 3.1. See KAFKA-8575
- for more details.* [data_frame_version header message] - * header => [api_key version] + * header => [api_key version] * * data_frame_version : This is the header version, current value is 0. Header includes both api_key and version. * api_key : apiKey of {@code ApiMessageAndVersion} object. diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java index 90f4764be..d347bb586 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java @@ -22,9 +22,6 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.kstream.TransformerSupplier; -import org.apache.kafka.streams.processor.ConnectedStoreProvider; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -45,7 +42,7 @@ import java.util.concurrent.CountDownLatch; /** - * Demonstrates, using a {@link Transformer} which combines the low-level Processor APIs with the high-level Kafka Streams DSL, + * Demonstrates, using a {@link Processor} implementing the low-level Processor APIs (replaces Transformer), * how to implement the WordCount program that computes a simple word occurrence histogram from an input text. ** Note: This is simplified code that only works correctly for single partition input topics. @@ -55,10 +52,9 @@ * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record * is an updated count of a single word. *
- * This example differs from {@link WordCountProcessorDemo} in that it uses a {@link Transformer} to define the word - * count logic, and the topology is wired up through a {@link StreamsBuilder}, which more closely resembles the high-level DSL. - * Additionally, the {@link TransformerSupplier} specifies the {@link StoreBuilder} that the {@link Transformer} needs - * by implementing {@link ConnectedStoreProvider#stores()}. + * This example differs from {@link WordCountProcessorDemo} in that it uses a {@link ProcessorSupplier} to attach the Processor with the + * count logic to the Stream, and the topology is wired up through a {@link StreamsBuilder}, + * which more closely resembles the high-level DSL (compared to the Topology builder approach, with Source, Processor, Sink). *
* Before running this example you must create the input topic and the output topic (e.g. via * {@code bin/kafka-topics.sh --create ...}), and write some data to the input topic (e.g. via @@ -164,4 +160,4 @@ public void run() { } System.exit(0); } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index d1c8623f2..28404eb50 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -663,7 +663,7 @@
KTable mapValues(final ValueMapperWithKey super K, ? super V, ? ex * For example, you can compute the new key as the length of the value string. * {@code * KTabletable = builder.table("topic"); - * KTable keyedStream = table.toStream(new KeyValueMapper { + * KStream keyedStream = table.toStream(new KeyValueMapper { * Integer apply(String key, String value) { * return value.length(); * } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index 397a86d24..d1cfa25e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -205,9 +205,10 @@ private void emitNonJoinedOuterRecords( if (internalProcessorContext.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) { return; } - if (sharedTimeTracker.nextTimeToEmit == 0) { - sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs(); - } + + // Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, if we dont set it everytime, + // they can get out of sync during a clock drift + sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs(); sharedTimeTracker.advanceNextTimeToEmit(); // reset to MAX_VALUE in case the store is empty diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 6864396d4..a5146a933 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; @@ -31,17 +32,33 @@ import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalTopicConfig; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; +import org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde; +import org.apache.kafka.streams.state.internals.WindowStoreBuilder; +import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde; +import org.apache.kafka.streams.state.internals.LeftOrRightValue; +import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier; import org.apache.kafka.test.MockApiProcessor; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.MockInternalNewProcessorContext; import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.GenericInMemoryKeyValueStore; import org.junit.Test; +import org.mockito.Mockito; import java.time.Duration; import java.time.Instant; @@ -49,8 +66,15 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.Optional; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static java.time.Duration.ofHours; import static java.time.Duration.ofMillis; @@ -333,6 +357,77 @@ public void shouldJoinWithCustomStoreSuppliers() { runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows); } + @Test + public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() { + /** + * This test is testing something internal to [[KStreamKStreamJoin]], so we had to setup low-level api manually. + */ + final KStreamImplJoin.TimeTracker tracker = new KStreamImplJoin.TimeTracker(); + final KStreamKStreamJoin join = new KStreamKStreamJoin<>( + false, + "other", + new JoinWindowsInternal(JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(1000))), + (key, v1, v2) -> v1 + v2, + true, + Optional.of("outer"), + tracker); + final Processor joinProcessor = join.get(); + final MockInternalNewProcessorContext procCtx = new MockInternalNewProcessorContext<>(); + final WindowStore otherStore = new WindowStoreBuilder<>( + new InMemoryWindowBytesStoreSupplier( + "other", + 1000L, + 100, + false), + Serdes.String(), + Serdes.String(), + new MockTime()).build(); + + final KeyValueStore , LeftOrRightValue > outerStore = Mockito.spy( + new KeyValueStoreBuilder<>( + new InMemoryKeyValueBytesStoreSupplier("outer"), + new TimestampedKeyAndJoinSideSerde<>(Serdes.String()), + new LeftOrRightValueSerde<>(Serdes.String(), Serdes.String()), + new MockTime() + ).build()); + + final GenericInMemoryKeyValueStore rootStore = new GenericInMemoryKeyValueStore<>("root"); + + otherStore.init((StateStoreContext) procCtx, rootStore); + procCtx.addStateStore(otherStore); + + outerStore.init((StateStoreContext) procCtx, rootStore); + procCtx.addStateStore(outerStore); + + joinProcessor.init(procCtx); + + final Record record1 = new Record<>("key1", "value1", 10000L); + final Record record2 = new Record<>("key2", "value2", 13000L); + final Record record3 = new Record<>("key3", "value3", 15000L); + final Record record4 = new Record<>("key4", "value4", 17000L); + + procCtx.setSystemTimeMs(1000L); + joinProcessor.process(record1); + + procCtx.setSystemTimeMs(2100L); + joinProcessor.process(record2); + + procCtx.setSystemTimeMs(2500L); + joinProcessor.process(record3); + // being throttled, so the older value still exists + assertEquals(2, iteratorToList(outerStore.all()).size()); + + procCtx.setSystemTimeMs(4000L); + joinProcessor.process(record4); + assertEquals(1, iteratorToList(outerStore.all()).size()); + } + + private List iteratorToList(final Iterator iterator) { + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .collect(Collectors.toList()); + } + private void runJoin(final StreamJoined streamJoined, final JoinWindows joinWindows) { final StreamsBuilder builder = new StreamsBuilder(); @@ -1808,7 +1903,7 @@ private WindowBytesStoreSupplier buildWindowBytesStoreSupplier(final String name " <-- second-join-this-join, second-join-other-join\n" + " Sink: KSTREAM-SINK-0000000021 (topic: out-two)\n" + " <-- second-join-merge\n\n"; - + private final String expectedTopologyWithGeneratedRepartitionTopic = "Topologies:\n" + " Sub-topology: 0\n" + " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" + diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala index 2e42090d1..d3cc61279 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala @@ -37,6 +37,7 @@ object Serdes { implicit def JavaDouble: Serde[java.lang.Double] = JSerdes.Double() implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]] implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer() + implicit def UUID: Serde[util.UUID] = JSerdes.UUID() implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T](tSerde) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 59a3f5aed..e46f8f5f4 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -404,7 +404,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. - * The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in + * The topic will be named as "\${applicationId}-<name>-repartition", where "applicationId" is user-specified in * `StreamsConfig` via parameter `APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG`, * "<name>" is either provided via `Repartitioned#as(String)` or an internally * generated name, and "-repartition" is a fixed suffix.