From 5f0db3de2a49e2635dec59eaddb9d00569e55808 Mon Sep 17 00:00:00 2001 From: Rohit Ashiwal Date: Tue, 23 Apr 2024 15:46:50 +0530 Subject: [PATCH] assign split shard in allocate method --- .../allocator/LocalShardsBalancer.java | 75 ++++++++++++++----- .../InPlaceShardSplitAllocationDecider.java | 23 +++--- .../decider/SameShardAllocationDecider.java | 7 +- 3 files changed, 73 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 2396a620092dc..2fee3f5dfdac8 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -612,23 +612,6 @@ void moveShards() { if (targetNode != null) { checkAndAddInEligibleTargetNode(targetNode.getRoutingNode()); } - } else if (moveDecision.isDecisionTaken() && moveDecision.canSplit()) { - final BalancedShardsAllocator.ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); - sourceNode.removeShard(shardRouting); - IndexMetadata indexMetadata = metadata.getIndexSafe(shardRouting.index()); - Tuple> splittingShards = routingNodes.splitShard( - shardRouting, - indexMetadata, - allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), - allocation.changes() - ); - splittingShards.v2().forEach(sourceNode::addShard); - if (logger.isTraceEnabled()) { - logger.trace("Splitting shard [{}]", shardRouting); - } - - // Verifying if this node can be considered ineligible for further iterations - checkAndAddInEligibleTargetNode(sourceNode.getRoutingNode()); } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); } @@ -795,6 +778,7 @@ void allocateUnassigned() { logger.trace("Start allocating unassigned shards"); } if (unassigned.isEmpty()) { + assignSplitShards(); return; } @@ -914,6 +898,63 @@ void allocateUnassigned() { secondaryLength = 0; } while (primaryLength > 0); // clear everything we have either added it or moved to ignoreUnassigned + + assignSplitShards(); + } + + private void assignSplitShards() { + for (BalancedShardsAllocator.ModelNode node : nodes.values()) { + for (ShardRouting shard : node.getRoutingNode()) { + if (shard.primary()&& shard.started() && + allocation.metadata().getIndexSafe(shard.index()).isParentShard(shard.shardId())) { + + if (logger.isTraceEnabled()) { + logger.trace("Splitting shard [{}]", shard); + } + + final BalancedShardsAllocator.ModelNode sourceNode = nodes.get(shard.currentNodeId()); + final IndexMetadata indexMetadata = metadata.getIndexSafe(shard.index()); + final Tuple> splittingShards = routingNodes.splitShard( + shard, + indexMetadata, + allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + allocation.changes() + ); + final List assignedShards = new ArrayList<>(); + + boolean performCleanUp = false; + + for (ShardRouting childShard : splittingShards.v2()) { + final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(childShard); + + if (allocationDecision.getAllocationDecision() == AllocationDecision.YES) { + if (logger.isTraceEnabled()) { + logger.trace("Assigned shard [{}] to [{}]", childShard, sourceNode.getNodeId()); + } + + // Allocation decision should be YES only on sourceNode due to decider + sourceNode.addShard(childShard); + assignedShards.add(childShard); + checkAndAddInEligibleTargetNode(sourceNode.getRoutingNode()); + } else { + performCleanUp = true; + break; + } + } + + if (performCleanUp) { + for (ShardRouting childShard : assignedShards) { + if (logger.isTraceEnabled()) { + logger.trace("Unassigned shard [{}] from [{}]", childShard, sourceNode); + } + sourceNode.removeShard(childShard); + } + } else { + sourceNode.removeShard(shard); + } + } + } + } } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/InPlaceShardSplitAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/InPlaceShardSplitAllocationDecider.java index b477bafac2825..0784c6f315173 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/InPlaceShardSplitAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/InPlaceShardSplitAllocationDecider.java @@ -11,22 +11,27 @@ import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.core.index.shard.ShardId; public class InPlaceShardSplitAllocationDecider extends AllocationDecider { public static final String NAME = "in_place_shard_split"; @Override - public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - return canRemainDecision(shardRouting, node, allocation); - } + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + ShardId parentShardId = shardRouting.getSplittingShardId(); + + if (parentShardId == null) { + return super.canAllocate(shardRouting, node, allocation); + } - public static Decision canRemainDecision(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - // If shardRouting is a started parent shard and fact that it exists is sufficient to conclude - // that it needs to be split. - if (allocation.metadata().getIndexSafe(shardRouting.index()).isParentShard(shardRouting.shardId()) && shardRouting.started()) { - return Decision.SPLIT; + if (node != null && node.getByShardId(parentShardId) != null) { + return allocation.decision(Decision.YES, NAME, + "Found routing node [" + node.getByShardId(parentShardId) + "] for parent shard [" + + parentShardId + "] matching routing node [" + node + "]"); } - return Decision.ALWAYS; + + return allocation.decision(Decision.NO, NAME, + "Parent Shard [" + parentShardId + "] is not assigned to the node [" + node + "]"); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java index 0b8e69f97c645..c2eccdbc6ed26 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java @@ -87,13 +87,8 @@ private void setSameHost(boolean sameHost) { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - Decision decision = InPlaceShardSplitAllocationDecider.canRemainDecision(shardRouting, node, allocation); - if (decision == Decision.SPLIT) { - return decision; - } - Iterable assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId()); - decision = decideSameNode(shardRouting, node, allocation, assignedShards); + Decision decision = decideSameNode(shardRouting, node, allocation, assignedShards); if (decision.type() == Decision.Type.NO || sameHost == false) { // if its already a NO decision looking at the node, or we aren't configured to look at the host, return the decision return decision;