Skip to content

Commit

Permalink
assign split shard in allocate method
Browse files Browse the repository at this point in the history
  • Loading branch information
r1walz committed Apr 23, 2024
1 parent ddb0c79 commit 5f0db3d
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -612,23 +612,6 @@ void moveShards() {
if (targetNode != null) {
checkAndAddInEligibleTargetNode(targetNode.getRoutingNode());
}
} else if (moveDecision.isDecisionTaken() && moveDecision.canSplit()) {
final BalancedShardsAllocator.ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
sourceNode.removeShard(shardRouting);
IndexMetadata indexMetadata = metadata.getIndexSafe(shardRouting.index());
Tuple<ShardRouting, List<ShardRouting>> splittingShards = routingNodes.splitShard(
shardRouting,
indexMetadata,
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
allocation.changes()
);
splittingShards.v2().forEach(sourceNode::addShard);
if (logger.isTraceEnabled()) {
logger.trace("Splitting shard [{}]", shardRouting);
}

// Verifying if this node can be considered ineligible for further iterations
checkAndAddInEligibleTargetNode(sourceNode.getRoutingNode());
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) {
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
}
Expand Down Expand Up @@ -795,6 +778,7 @@ void allocateUnassigned() {
logger.trace("Start allocating unassigned shards");
}
if (unassigned.isEmpty()) {
assignSplitShards();
return;
}

Expand Down Expand Up @@ -914,6 +898,63 @@ void allocateUnassigned() {
secondaryLength = 0;
} while (primaryLength > 0);
// clear everything we have either added it or moved to ignoreUnassigned

assignSplitShards();
}

private void assignSplitShards() {
for (BalancedShardsAllocator.ModelNode node : nodes.values()) {
for (ShardRouting shard : node.getRoutingNode()) {
if (shard.primary()&& shard.started() &&
allocation.metadata().getIndexSafe(shard.index()).isParentShard(shard.shardId())) {

if (logger.isTraceEnabled()) {
logger.trace("Splitting shard [{}]", shard);
}

final BalancedShardsAllocator.ModelNode sourceNode = nodes.get(shard.currentNodeId());
final IndexMetadata indexMetadata = metadata.getIndexSafe(shard.index());
final Tuple<ShardRouting, List<ShardRouting>> splittingShards = routingNodes.splitShard(
shard,
indexMetadata,
allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
allocation.changes()
);
final List<ShardRouting> assignedShards = new ArrayList<>();

boolean performCleanUp = false;

for (ShardRouting childShard : splittingShards.v2()) {
final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(childShard);

if (allocationDecision.getAllocationDecision() == AllocationDecision.YES) {
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", childShard, sourceNode.getNodeId());
}

// Allocation decision should be YES only on sourceNode due to decider
sourceNode.addShard(childShard);
assignedShards.add(childShard);
checkAndAddInEligibleTargetNode(sourceNode.getRoutingNode());
} else {
performCleanUp = true;
break;
}
}

if (performCleanUp) {
for (ShardRouting childShard : assignedShards) {
if (logger.isTraceEnabled()) {
logger.trace("Unassigned shard [{}] from [{}]", childShard, sourceNode);
}
sourceNode.removeShard(childShard);
}
} else {
sourceNode.removeShard(shard);
}
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,27 @@
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.core.index.shard.ShardId;

public class InPlaceShardSplitAllocationDecider extends AllocationDecider {

public static final String NAME = "in_place_shard_split";

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return canRemainDecision(shardRouting, node, allocation);
}
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
ShardId parentShardId = shardRouting.getSplittingShardId();

if (parentShardId == null) {
return super.canAllocate(shardRouting, node, allocation);
}

public static Decision canRemainDecision(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
// If shardRouting is a started parent shard and fact that it exists is sufficient to conclude
// that it needs to be split.
if (allocation.metadata().getIndexSafe(shardRouting.index()).isParentShard(shardRouting.shardId()) && shardRouting.started()) {
return Decision.SPLIT;
if (node != null && node.getByShardId(parentShardId) != null) {
return allocation.decision(Decision.YES, NAME,
"Found routing node [" + node.getByShardId(parentShardId) + "] for parent shard [" +
parentShardId + "] matching routing node [" + node + "]");
}
return Decision.ALWAYS;

return allocation.decision(Decision.NO, NAME,
"Parent Shard [" + parentShardId + "] is not assigned to the node [" + node + "]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,8 @@ private void setSameHost(boolean sameHost) {

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
Decision decision = InPlaceShardSplitAllocationDecider.canRemainDecision(shardRouting, node, allocation);
if (decision == Decision.SPLIT) {
return decision;
}

Iterable<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId());
decision = decideSameNode(shardRouting, node, allocation, assignedShards);
Decision decision = decideSameNode(shardRouting, node, allocation, assignedShards);
if (decision.type() == Decision.Type.NO || sameHost == false) {
// if its already a NO decision looking at the node, or we aren't configured to look at the host, return the decision
return decision;
Expand Down

0 comments on commit 5f0db3d

Please sign in to comment.