Skip to content

Commit

Permalink
OfflinePartitionLeaderElectionStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Mar 5, 2020
1 parent 1732400 commit 60a7d0e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 9 deletions.
6 changes: 4 additions & 2 deletions kafka-controller-ControllerContext.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,11 @@ NOTE: `putPartitionState` is used when...FIXME
----
partitionsInStates(
states: Set[PartitionState]): Set[TopicPartition]
partitionsInStates(
topic: String, states: Set[PartitionState]): Set[TopicPartition]
----

`partitionsInStates` uses the <<partitionStates, partitionStates>> internal registry to find all of the `TopicPartitions` in the given `PartitionStates`.
`partitionsInStates` uses the <<partitionStates, partitionStates>> internal registry to find all of the `TopicPartitions` (of the topic if defined) in the given `PartitionStates`.

NOTE: `partitionsInStates` is used when `PartitionStateMachine` is requested to link:kafka-controller-PartitionStateMachine.adoc#triggerOnlinePartitionStateChange[triggerOnlinePartitionStateChange].

Expand Down Expand Up @@ -344,7 +346,7 @@ The timer metric name pattern is *kafka.controller:type=ControllerStats,name=*.
a| [[topicsIneligibleForDeletion]]

| topicsToBeDeleted
a| [[topicsToBeDeleted]]
a| [[topicsToBeDeleted]][[isTopicQueuedUpForDeletion]]

| topicsWithDeletionStarted
a| [[topicsWithDeletionStarted]]
Expand Down
19 changes: 17 additions & 2 deletions kafka-controller-KafkaController.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1484,13 +1484,28 @@ NOTE: `checkAndTriggerAutoLeaderRebalance` is used exclusively when `KafkaContro

[source, scala]
----
onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit
onNewPartitionCreation(
newPartitions: Set[TopicPartition]): Unit
----

`onNewPartitionCreation`...FIXME

NOTE: `onNewPartitionCreation` is used when <<kafka-controller-ControllerEvent.adoc#TopicChange, TopicChange>> and <<kafka-controller-ControllerEvent.adoc#PartitionModifications, PartitionModifications>> controller events are processed.

=== [[onReplicaElection]] `onReplicaElection` Internal Method

[source, scala]
----
onReplicaElection(
partitions: Set[TopicPartition],
electionType: ElectionType,
electionTrigger: ElectionTrigger): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
----

`onReplicaElection`...FIXME

NOTE: `onReplicaElection` is used when...FIXME

=== [[onBrokerLogDirFailure]] Handling Log Directory Failures for Brokers -- `onBrokerLogDirFailure` Internal Method

[source, scala]
Expand Down Expand Up @@ -1643,7 +1658,7 @@ processTopicUncleanLeaderElectionEnable(topic: String): Unit

`processTopicUncleanLeaderElectionEnable`...FIXME

NOTE: `processTopicUncleanLeaderElectionEnable` is used exclusively when `KafkaController` is requested to <<process, process a TopicUncleanLeaderElectionEnable controller event>>.
NOTE: `processTopicUncleanLeaderElectionEnable` is used when `KafkaController` is requested to <<process, process a TopicUncleanLeaderElectionEnable controller event>>.

=== [[processLeaderAndIsrResponseReceived]] Processing LeaderAndIsrResponseReceived Controller Event (On controller-event-thread) -- `processLeaderAndIsrResponseReceived` Internal Method

Expand Down
21 changes: 16 additions & 5 deletions kafka-controller-PartitionStateMachine.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,15 @@ NOTE: `PartitionStateMachine` is a Scala abstract class and cannot be <<creating
a| [[ControlledShutdownPartitionLeaderElectionStrategy]]

| OfflinePartitionLeaderElectionStrategy
a| [[OfflinePartitionLeaderElectionStrategy]]
a| [[OfflinePartitionLeaderElectionStrategy]] Accepts `allowUnclean` flag

Handled by `ZkPartitionStateMachine` when requested to link:kafka-controller-ZkPartitionStateMachine.adoc#doElectLeaderForPartitions[doElectLeaderForPartitions]

Used when:

* `KafkaController` is requested to link:kafka-controller-KafkaController.adoc#onNewPartitionCreation[onNewPartitionCreation] (with the `allowUnclean` flag off), #onReplicaElection[onReplicaElection] (with the `allowUnclean` flag on for the admin client)

* `PartitionStateMachine` is requested to <<triggerOnlineStateChangeForPartitions, triggerOnlineStateChangeForPartitions>> (with the `allowUnclean` flag off)

| PreferredReplicaPartitionLeaderElectionStrategy
a| [[PreferredReplicaPartitionLeaderElectionStrategy]] `KafkaController` is requested for <<kafka-controller-KafkaController.adoc#onPreferredReplicaElection, preferred replica leader election>> that in turn triggers `ZkPartitionStateMachine` to <<kafka-controller-ZkPartitionStateMachine.adoc#leaderForPreferredReplica, leaderForPreferredReplica>>
Expand All @@ -71,18 +79,21 @@ a| [[ReassignPartitionLeaderElectionStrategy]]
----
triggerOnlinePartitionStateChange(): Unit
triggerOnlinePartitionStateChange(
topic: String): Unit
topic: String): Unit // <1>
----
<1> Uses the partitions of the given topic only

`triggerOnlinePartitionStateChange` requests the <<controllerContext, ControllerContext>> for the link:kafka-controller-ControllerContext.adoc#partitionsInStates[partitions in the states]: `OfflinePartition` and `NewPartition`.

With the topic specified, `triggerOnlinePartitionStateChange` requests the <<controllerContext, ControllerContext>> for the link:kafka-controller-ControllerContext.adoc#partitionsInStates[partitions of the topic in the states]: `OfflinePartition` and `NewPartition`.

In the end, `triggerOnlinePartitionStateChange` <<triggerOnlineStateChangeForPartitions, triggers online state change for the partitions>>.

[NOTE]
====
`triggerOnlinePartitionStateChange` is used when:
* `KafkaController` is requested to link:kafka-controller-KafkaController.adoc#onBrokerStartup[onBrokerStartup], link:kafka-controller-KafkaController.adoc#onReplicasBecomeOffline[onReplicasBecomeOffline] and link:kafka-controller-KafkaController.adoc#processUncleanLeaderElectionEnable[processUncleanLeaderElectionEnable]
* `KafkaController` is requested to link:kafka-controller-KafkaController.adoc#onBrokerStartup[onBrokerStartup], link:kafka-controller-KafkaController.adoc#onReplicasBecomeOffline[onReplicasBecomeOffline], link:kafka-controller-KafkaController.adoc#processUncleanLeaderElectionEnable[processUncleanLeaderElectionEnable], link:kafka-controller-KafkaController.adoc#processTopicUncleanLeaderElectionEnable[processTopicUncleanLeaderElectionEnable]
* `PartitionStateMachine` is requested to <<startup, start up>>
====
Expand All @@ -95,9 +106,9 @@ triggerOnlineStateChangeForPartitions(
partitions: collection.Set[TopicPartition]): Unit
----

`triggerOnlineStateChangeForPartitions`...FIXME
`triggerOnlineStateChangeForPartitions` filters out the partitions of the link:kafka-controller-ControllerContext.adoc#isTopicQueuedUpForDeletion[topics to be deleted] and tries to <<handleStateChanges, move the partitions>> to `OnlinePartition` state with <<OfflinePartitionLeaderElectionStrategy, OfflinePartitionLeaderElectionStrategy>> (with `allowUnclean` flag off).

NOTE: `triggerOnlineStateChangeForPartitions` is used when...FIXME
NOTE: `triggerOnlineStateChangeForPartitions` is used when `PartitionStateMachine` is requested to <<triggerOnlinePartitionStateChange, triggerOnlinePartitionStateChange>>.

=== [[shutdown]] Shutting Down -- `shutdown` Method

Expand Down

0 comments on commit 60a7d0e

Please sign in to comment.