Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource Isolation Feature: Extend CACHE SELECT to allow warming up backup replicas #23

Open
wants to merge 5 commits into
base: pinterest-integration-3.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,9 @@ public static DataCacheSelectMetrics cacheSelect(DataCacheSelectStatement statem
tmpSessionVariable.setEnableCacheSelect(true);
// Note that although setting these values in the SessionVariable is not ideal, it's way more disruptive to pipe
// this information to where it needs to be through the insertStmt.
if (statement.getNumReplicasDesired() > 1) {
// We only set this value if it is larger than the default assumption.
tmpSessionVariable.setNumDesiredDatacacheReplicas(statement.getNumReplicasDesired());
}
if (statement.getResourceIsolationGroups() != null && !statement.getResourceIsolationGroups().isEmpty()) {
// We only set this value if it is the non-default.
tmpSessionVariable.setDatacacheSelectResourceGroups(statement.getResourceIsolationGroups());
}
tmpSessionVariable.setNumDesiredDatacacheReplicas(statement.getNumReplicasDesired());
tmpSessionVariable.setNumDesiredDatacacheBackupReplicas(statement.getNumBackupReplicasDesired());
tmpSessionVariable.setDatacacheSelectResourceGroups(statement.getResourceIsolationGroups());
connectContext.setSessionVariable(tmpSessionVariable);

InsertStmt insertStmt = statement.getInsertStmt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package com.starrocks.qe;

import com.starrocks.common.DdlException;
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReportException;
import com.starrocks.common.UserException;
import com.starrocks.lake.qe.scheduler.DefaultSharedDataWorkerProvider;
import com.starrocks.planner.ScanNode;
Expand All @@ -27,6 +29,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -64,24 +67,55 @@ public CacheSelectBackendSelector(ScanNode scanNode, List<TScanRangeLocations> l
private Set<Long> assignedCnByTabletId(SystemInfoService systemInfoService, Long tabletId,
String resourceIsolationGroupId) throws UserException {
TabletComputeNodeMapper mapper = systemInfoService.internalTabletMapper();
int count = Math.max(props.numReplicasDesired, props.numBackupReplicasDesired);
// skipCount variable uses for backup cache replicas CN nodes selection
// to skip first CN node (primary) from the selected nodes
int skipCount = props.numBackupReplicasDesired > 0 ? 1 : 0;
List<Long> cnIdsOrderedByPreference =
mapper.computeNodesForTablet(tabletId, props.numReplicasDesired, resourceIsolationGroupId);
if (cnIdsOrderedByPreference.size() < props.numReplicasDesired) {
throw new DdlException(String.format("Requesting more replicas than we have available CN" +
" for the specified resource group. desiredReplicas: %d, resourceGroup: %s, tabletId: %d",
props.numReplicasDesired, resourceIsolationGroupId, tabletId));
mapper.computeNodesForTablet(tabletId, count, resourceIsolationGroupId, skipCount);
if (cnIdsOrderedByPreference.isEmpty()) {
throw new DdlException(
String.format("No CN nodes available for the specified resource group." +
" resourceGroup: %s, tabletId: %d",
resourceIsolationGroupId, tabletId));
}
if (cnIdsOrderedByPreference.size() < count) {
throw new DdlException(
String.format("Requesting more replicas than we have available CN" +
" for the specified resource group. desiredReplicas: %d," +
" desiredBackupReplicas: %d, resourceGroup: %s, tabletId: %d",
props.numReplicasDesired, props.numBackupReplicasDesired,
resourceIsolationGroupId, tabletId));
}
return new HashSet<>(cnIdsOrderedByPreference);
}

private Set<Long> assignedCnByBackupWorker(Long mainTargetCnId, String resourceIsolationGroupId)
throws UserException {
Set<Long> selectedCn = new HashSet<>();
DefaultSharedDataWorkerProvider workerProvider =
new DefaultSharedDataWorkerProvider.Factory().captureAvailableWorkers(warehouseId,
resourceIsolationGroupId);
DefaultSharedDataWorkerProvider workerProvider;
try {
workerProvider = new DefaultSharedDataWorkerProvider.Factory().captureAvailableWorkers(warehouseId,
resourceIsolationGroupId);
} catch (ErrorReportException ex) {
// captureAvailableWorkers() can throw an ErrorReportException (RuntimeException) with
// error code as ERR_NO_NODES_IN_WAREHOUSE, which should be considered as
// expected behaviour in this particular case, so transforming it to checked DdlException
// would be consistent with this class logic.
if (ex.getErrorCode() == ErrorCode.ERR_NO_NODES_IN_WAREHOUSE) {
throw new DdlException(
String.format("No CN nodes available for the specified resource group. resourceGroup: %s",
resourceIsolationGroupId));
} else {
throw ex;
}
}
List<Long> selectedCn = new ArrayList<>();
int count = Math.max(props.numReplicasDesired, props.numBackupReplicasDesired);
// skipCount variable uses for backup cache replicas CN nodes selection
// to skip first CN node (primary) from the selected nodes
int skipCount = props.numBackupReplicasDesired > 0 ? 1 : 0;
long targetBackendId = mainTargetCnId;
while (selectedCn.size() < props.numReplicasDesired) {
while (selectedCn.size() < count + skipCount) {
if (selectedCn.contains(targetBackendId) || !workerProvider.isDataNodeAvailable(targetBackendId)) {
targetBackendId = workerProvider.selectBackupWorker(targetBackendId, Optional.empty());
if (targetBackendId < 0 || selectedCn.contains(targetBackendId)) {
Expand All @@ -93,17 +127,24 @@ private Set<Long> assignedCnByBackupWorker(Long mainTargetCnId, String resourceI
}
selectedCn.add(targetBackendId);
}
return selectedCn;
if (selectedCn.isEmpty()) {
throw new DdlException(
String.format("No CN nodes available for the specified resource group. resourceGroup: %s",
resourceIsolationGroupId));
}
return selectedCn.stream().skip(skipCount).collect(Collectors.toSet());
}

@Override
public void computeScanRangeAssignment() throws UserException {
if (props.resourceIsolationGroups == null || props.resourceIsolationGroups.isEmpty()) {
if (props.resourceIsolationGroups.isEmpty()) {
throw new UserException("Should not have constructed CacheSelectBackendSelector with no" +
" resourceIsolationGroups specified.");
}
if (props.numReplicasDesired < 1) {
throw new UserException("Num replicas desired in cache must be at least 1: " + props.numReplicasDesired);
if (props.numReplicasDesired < 1 && props.numBackupReplicasDesired < 1) {
throw new UserException(String.format(
"Num replicas or backup replicas desired in cache must be at least 1: replicas [%d] backup replicas [%d]",
props.numReplicasDesired, props.numBackupReplicasDesired));
}

SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
Expand Down Expand Up @@ -146,9 +187,8 @@ public void computeScanRangeAssignment() throws UserException {
// Note that although we're not using the provided callerWorkerProvider above, the caller assumes that we used
// it to note the selected backend ids. This is used for things like checking if the worker has died
// and cancelling queries.
for (long workerId : allSelectedWorkerIds) {
callerWorkerProvider.selectWorkerUnchecked(workerId);
}
allSelectedWorkerIds.forEach(callerWorkerProvider::selectWorkerUnchecked);

// Also, caller upstream will use the workerProvider to get ComputeNode references corresponding to the compute
// nodes chosen in this function, so we must enable getting any worker regardless of availability.
callerWorkerProvider.setAllowGetAnyWorker(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.starrocks.server.GlobalStateMgr;

import java.util.ArrayList;
import java.util.List;

// Describes how a CACHE SELECT statement should choose compute nodes to populate with the data.
Expand All @@ -26,15 +25,29 @@
public class CacheSelectComputeNodeSelectionProperties {
public List<String> resourceIsolationGroups;
public int numReplicasDesired;
public int numBackupReplicasDesired;
anatoly2 marked this conversation as resolved.
Show resolved Hide resolved

public CacheSelectComputeNodeSelectionProperties(List<String> resourceIsolationGroups, int numReplicasDesired) {
/**
* CACHE SELECT compute node properties constructor
* @param resourceIsolationGroups - list of resource isolation groups
* @param numReplicasDesired - number of cache replicas to be created including primary compute node
* (should be >= 1 if numBackupReplicasDesired is 0)
* @param numBackupReplicasDesired - number of cache backup replicas to be created excluding primary compute node
* (should be >= 1 if numReplicasDesired is 0)
* @apiNote if numReplicasDesired = 0 and numBackupReplicasDesired = 0 then
* DataCacheSelectExecutor.computeScanRangeAssignment method will throw UseException
*/
public CacheSelectComputeNodeSelectionProperties(List<String> resourceIsolationGroups,
int numReplicasDesired,
int numBackupReplicasDesired) {
if (resourceIsolationGroups == null || resourceIsolationGroups.isEmpty()) {
this.resourceIsolationGroups = new ArrayList<>();
this.resourceIsolationGroups.add(GlobalStateMgr.getCurrentState().getNodeMgr().getMySelf()
.getResourceIsolationGroup());
this.resourceIsolationGroups = List.of(
GlobalStateMgr.getCurrentState().getNodeMgr().getMySelf().getResourceIsolationGroup()
);
} else {
this.resourceIsolationGroups = resourceIsolationGroups;
}
this.numReplicasDesired = Math.max(numReplicasDesired, 1);
this.numReplicasDesired = Math.max(numReplicasDesired, 0);
this.numBackupReplicasDesired = Math.max(numBackupReplicasDesired, 0);
}
}
14 changes: 12 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -1665,8 +1666,9 @@ public String getCatalog() {

private boolean enableCacheSelect = false;

private List<String> datacacheSelectResourceGroups = null;
private int numDesiredDatacacheReplicas = -1;
private List<String> datacacheSelectResourceGroups = Collections.emptyList();
private int numDesiredDatacacheReplicas = 0;
private int numDesiredDatacacheBackupReplicas = 0;

@VariableMgr.VarAttr(name = ENABLE_DYNAMIC_PRUNE_SCAN_RANGE)
private boolean enableDynamicPruneScanRange = true;
Expand Down Expand Up @@ -4017,10 +4019,18 @@ public int getNumDesiredDatacacheReplicas() {
return numDesiredDatacacheReplicas;
}

public int getNumDesiredDatacacheBackupReplicas() {
return numDesiredDatacacheBackupReplicas;
}

public void setNumDesiredDatacacheReplicas(int numDesiredDatacacheReplicas) {
this.numDesiredDatacacheReplicas = numDesiredDatacacheReplicas;
}

public void setNumDesiredDatacacheBackupReplicas(int numDesiredDatacacheBackupReplicas) {
this.numDesiredDatacacheBackupReplicas = numDesiredDatacacheBackupReplicas;
}

// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
public class Utils {
// We can only get the tablet id for an internal scan.
public static Optional<Long> getOptionalTabletId(TScanRange scanRange) {
Optional<Long> optTabletId = Optional.empty();
if (scanRange.internal_scan_range != null) {
optTabletId = Optional.of(scanRange.internal_scan_range.tablet_id);
}
return optTabletId;
return Optional.ofNullable(scanRange.internal_scan_range).map(isr -> isr.tablet_id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public static BackendSelector create(ScanNode scanNode,
FragmentScanRangeAssignment assignment = execFragment.getScanRangeAssignment();

int desiredDatacacheReplicas = sessionVariable.getNumDesiredDatacacheReplicas();
int desiredDatacacheBackupReplicas = sessionVariable.getNumDesiredDatacacheBackupReplicas();
List<String> datacacheSelectResourceGroups = sessionVariable.getDatacacheSelectResourceGroups();

if (scanNode instanceof SchemaScanNode) {
Expand All @@ -74,13 +75,19 @@ public static BackendSelector create(ScanNode scanNode,
return new HDFSBackendSelector(scanNode, locations, assignment, workerProvider,
sessionVariable.getForceScheduleLocal(),
sessionVariable.getHDFSBackendSelectorScanRangeShuffle());
} else if (desiredDatacacheReplicas > 1 || datacacheSelectResourceGroups != null) {
} else if (desiredDatacacheReplicas > 1 ||
desiredDatacacheBackupReplicas > 0 ||
!datacacheSelectResourceGroups.isEmpty()) {
// Note that a cacheSelect should never be hasReplicated (because currently shared-data mode otherwise
// doesn't support multiple replicas in cache), and it should never be hasColocate (because a cache select
// statement is for a single table).
return new CacheSelectBackendSelector(
scanNode, locations, assignment, workerProvider, new CacheSelectComputeNodeSelectionProperties(
datacacheSelectResourceGroups, desiredDatacacheReplicas), connectContext.getCurrentWarehouseId());
scanNode, locations, assignment, workerProvider,
new CacheSelectComputeNodeSelectionProperties(
datacacheSelectResourceGroups,
desiredDatacacheReplicas,
desiredDatacacheBackupReplicas
), connectContext.getCurrentWarehouseId());
} else {
boolean hasColocate = execFragment.isColocated();
boolean hasBucket = execFragment.isLocalBucketShuffleJoin();
Expand Down
Loading
Loading