Skip to content

Commit

Permalink
Remove passThrough from the codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
akotalwar committed Oct 26, 2023
1 parent 083df87 commit 809b6e8
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ internal static class CosmosQueryExecutionContextFactory
private const string InternalPartitionKeyDefinitionProperty = "x-ms-query-partitionkey-definition";
private const string QueryInspectionPattern = @"\s+(GROUP\s+BY\s+|COUNT\s*\(|MIN\s*\(|MAX\s*\(|AVG\s*\(|SUM\s*\(|DISTINCT\s+)";
private const string OptimisticDirectExecution = "OptimisticDirectExecution";
private const string Passthrough = "Passthrough";
private const string Specialized = "Specialized";
private const int PageSizeFactorForTop = 5;
private static readonly Regex QueryInspectionRegex = new Regex(QueryInspectionPattern, RegexOptions.IgnoreCase | RegexOptions.Compiled);
Expand Down Expand Up @@ -161,76 +160,12 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsy
}

PartitionedQueryExecutionInfo partitionedQueryExecutionInfo;
if (inputParameters.ForcePassthrough)
{
partitionedQueryExecutionInfo = new PartitionedQueryExecutionInfo()
{
QueryInfo = new QueryInfo()
{
Aggregates = null,
DistinctType = DistinctQueryType.None,
GroupByAliases = null,
GroupByAliasToAggregateType = null,
GroupByExpressions = null,
HasSelectValue = false,
Limit = null,
Offset = null,
OrderBy = null,
OrderByExpressions = null,
RewrittenQuery = null,
Top = null,
},
QueryRanges = new List<Documents.Routing.Range<string>>(),
};
}
else if (queryPlanFromContinuationToken != null)
if (queryPlanFromContinuationToken != null)
{
partitionedQueryExecutionInfo = queryPlanFromContinuationToken;
}
else
{
// If the query would go to gateway, but we have a partition key,
// then try seeing if we can execute as a passthrough using client side only logic.
// This is to short circuit the need to go to the gateway to get the query plan.
if (cosmosQueryContext.QueryClient.BypassQueryParsing()
&& inputParameters.PartitionKey.HasValue)
{
bool parsed;
SqlQuery sqlQuery;
using (ITrace queryParseTrace = createQueryPipelineTrace.StartChild("Parse Query", TraceComponent.Query, Tracing.TraceLevel.Info))
{
parsed = SqlQueryParser.TryParse(inputParameters.SqlQuerySpec.QueryText, out sqlQuery);
}

if (parsed)
{
bool hasDistinct = sqlQuery.SelectClause.HasDistinct;
bool hasGroupBy = sqlQuery.GroupByClause != default;
bool hasAggregates = AggregateProjectionDetector.HasAggregate(sqlQuery.SelectClause.SelectSpec);
bool createPassthroughQuery = !hasAggregates && !hasDistinct && !hasGroupBy;

if (createPassthroughQuery)
{
SetTestInjectionPipelineType(inputParameters, Passthrough);

// Only thing that matters is that we target the correct range.
Documents.PartitionKeyDefinition partitionKeyDefinition = GetPartitionKeyDefinition(inputParameters, containerQueryProperties);
List<Documents.PartitionKeyRange> targetRanges = await cosmosQueryContext.QueryClient.GetTargetPartitionKeyRangesAsync(
cosmosQueryContext.ResourceLink,
containerQueryProperties.ResourceId,
containerQueryProperties.EffectiveRangesForPartitionKey,
forceRefresh: false,
createQueryPipelineTrace);

return CosmosQueryExecutionContextFactory.TryCreatePassthroughQueryExecutionContext(
documentContainer,
inputParameters,
targetRanges,
cancellationToken);
}
}
}

partitionedQueryExecutionInfo = await GetPartitionedQueryExecutionInfoAsync(
cosmosQueryContext,
inputParameters,
Expand Down Expand Up @@ -284,9 +219,6 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
&& !partitionedQueryExecutionInfo.QueryInfo.HasTop
&& !partitionedQueryExecutionInfo.QueryInfo.HasLimit
&& !partitionedQueryExecutionInfo.QueryInfo.HasOffset;
bool streamingCrossContinuationQuery = !singleLogicalPartitionKeyQuery && clientStreamingQuery;

bool createPassthroughQuery = streamingSinglePartitionQuery || streamingCrossContinuationQuery;

TryCatch<IQueryPipelineStage> tryCreatePipelineStage;

Expand All @@ -311,20 +243,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
}
else
{
if (createPassthroughQuery)
{
SetTestInjectionPipelineType(inputParameters, Passthrough);

tryCreatePipelineStage = CosmosQueryExecutionContextFactory.TryCreatePassthroughQueryExecutionContext(
documentContainer,
inputParameters,
targetRanges,
cancellationToken);
}
else
{
tryCreatePipelineStage = TryCreateSpecializedDocumentQueryExecutionContext(documentContainer, cosmosQueryContext, inputParameters, targetRanges, partitionedQueryExecutionInfo, cancellationToken);
}
tryCreatePipelineStage = TryCreateSpecializedDocumentQueryExecutionContext(documentContainer, cosmosQueryContext, inputParameters, targetRanges, partitionedQueryExecutionInfo, cancellationToken);
}

return tryCreatePipelineStage;
Expand Down Expand Up @@ -432,7 +351,6 @@ private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryEx
inputParameters.PartitionedQueryExecutionInfo,
inputParameters.ExecutionEnvironment,
inputParameters.ReturnResultsInDeterministicOrder,
inputParameters.ForcePassthrough,
inputParameters.EnableOptimisticDirectExecution,
inputParameters.TestInjections);
}
Expand Down Expand Up @@ -509,33 +427,6 @@ private static TryCatch<IQueryPipelineStage> TryCreateOptimisticDirectExecutionC
cancellationToken: cancellationToken);
}

private static TryCatch<IQueryPipelineStage> TryCreatePassthroughQueryExecutionContext(
DocumentContainer documentContainer,
InputParameters inputParameters,
List<Documents.PartitionKeyRange> targetRanges,
CancellationToken cancellationToken)
{
// Return a parallel context, since we still want to be able to handle splits and concurrency / buffering.
return ParallelCrossPartitionQueryPipelineStage.MonadicCreate(
documentContainer: documentContainer,
sqlQuerySpec: inputParameters.SqlQuerySpec,
targetRanges: targetRanges
.Select(range => new FeedRangeEpk(
new Documents.Routing.Range<string>(
min: range.MinInclusive,
max: range.MaxExclusive,
isMinInclusive: true,
isMaxInclusive: false)))
.ToList(),
queryPaginationOptions: new QueryPaginationOptions(
pageSizeHint: inputParameters.MaxItemCount),
partitionKey: inputParameters.PartitionKey,
prefetchPolicy: PrefetchPolicy.PrefetchSinglePage,
maxConcurrency: inputParameters.MaxConcurrency,
cancellationToken: cancellationToken,
continuationToken: inputParameters.InitialUserContinuationToken);
}

private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryExecutionContext(
DocumentContainer documentContainer,
CosmosQueryContext cosmosQueryContext,
Expand Down Expand Up @@ -729,14 +620,10 @@ private static void SetTestInjectionPipelineType(InputParameters inputParameters
{
responseStats.PipelineType = TestInjections.PipelineType.OptimisticDirectExecution;
}
else if (pipelineType == Specialized)
else
{
responseStats.PipelineType = TestInjections.PipelineType.Specialized;
}
else
{
responseStats.PipelineType = TestInjections.PipelineType.Passthrough;
}
}
}

Expand Down Expand Up @@ -852,7 +739,6 @@ public InputParameters(
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
ExecutionEnvironment? executionEnvironment,
bool? returnResultsInDeterministicOrder,
bool forcePassthrough,
bool enableOptimisticDirectExecution,
TestInjections testInjections)
{
Expand Down Expand Up @@ -886,7 +772,6 @@ public InputParameters(
this.PartitionedQueryExecutionInfo = partitionedQueryExecutionInfo;
this.ExecutionEnvironment = executionEnvironment.GetValueOrDefault(InputParameters.DefaultExecutionEnvironment);
this.ReturnResultsInDeterministicOrder = returnResultsInDeterministicOrder.GetValueOrDefault(InputParameters.DefaultReturnResultsInDeterministicOrder);
this.ForcePassthrough = forcePassthrough;
this.EnableOptimisticDirectExecution = enableOptimisticDirectExecution;
this.TestInjections = testInjections;
}
Expand All @@ -903,7 +788,6 @@ public InputParameters(
public ExecutionEnvironment ExecutionEnvironment { get; }
public bool ReturnResultsInDeterministicOrder { get; }
public TestInjections TestInjections { get; }
public bool ForcePassthrough { get; }
public bool EnableOptimisticDirectExecution { get; }

public InputParameters WithContinuationToken(CosmosElement token)
Expand All @@ -920,7 +804,6 @@ public InputParameters WithContinuationToken(CosmosElement token)
this.PartitionedQueryExecutionInfo,
this.ExecutionEnvironment,
this.ReturnResultsInDeterministicOrder,
this.ForcePassthrough,
this.EnableOptimisticDirectExecution,
this.TestInjections);
}
Expand Down
1 change: 0 additions & 1 deletion Microsoft.Azure.Cosmos/src/Query/Core/TestInjections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ internal sealed class TestInjections
{
public enum PipelineType
{
Passthrough,
Specialized,
OptimisticDirectExecution,
}
Expand Down
2 changes: 0 additions & 2 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public static QueryIterator Create(
string resourceLink,
bool isContinuationExpected,
bool allowNonValueAggregateQuery,
bool forcePassthrough,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
Documents.ResourceType resourceType)
{
Expand Down Expand Up @@ -145,7 +144,6 @@ public static QueryIterator Create(
partitionedQueryExecutionInfo: partitionedQueryExecutionInfo,
executionEnvironment: queryRequestOptions.ExecutionEnvironment,
returnResultsInDeterministicOrder: queryRequestOptions.ReturnResultsInDeterministicOrder,
forcePassthrough: forcePassthrough,
enableOptimisticDirectExecution: queryRequestOptions.EnableOptimisticDirectExecution,
testInjections: queryRequestOptions.TestSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,28 +359,6 @@ public override async Task<TryExecuteQueryResult> TryExecuteQueryAsync(
throw new ArgumentNullException(nameof(requestOptions));
}

if (feedRangeInternal != null)
{
// The user has scoped down to a physical partition or logical partition.
// In either case let the query execute as a passthrough.
QueryIterator passthroughQueryIterator = QueryIterator.Create(
containerCore: this,
client: this.queryClient,
clientContext: this.ClientContext,
sqlQuerySpec: queryDefinition.ToSqlQuerySpec(),
continuationToken: continuationToken,
feedRangeInternal: feedRangeInternal,
queryRequestOptions: requestOptions,
resourceLink: this.LinkUri,
isContinuationExpected: false,
allowNonValueAggregateQuery: true,
forcePassthrough: true, // Forcing a passthrough, since we don't want to get the query plan nor try to rewrite it.
partitionedQueryExecutionInfo: null,
resourceType: ResourceType.Document);

return new QueryPlanIsSupportedResult(passthroughQueryIterator);
}

cancellationToken.ThrowIfCancellationRequested();

Documents.PartitionKeyDefinition partitionKeyDefinition;
Expand Down Expand Up @@ -438,7 +416,6 @@ public override async Task<TryExecuteQueryResult> TryExecuteQueryAsync(
resourceLink: this.LinkUri,
isContinuationExpected: false,
allowNonValueAggregateQuery: true,
forcePassthrough: false,
partitionedQueryExecutionInfo: queryPlan,
resourceType: ResourceType.Document);

Expand Down Expand Up @@ -835,7 +812,6 @@ public override FeedIteratorInternal GetItemQueryStreamIteratorInternal(
resourceLink: this.LinkUri,
isContinuationExpected: isContinuationExcpected,
allowNonValueAggregateQuery: true,
forcePassthrough: false,
partitionedQueryExecutionInfo: null,
resourceType: ResourceType.Document);
}
Expand Down Expand Up @@ -874,7 +850,6 @@ public override FeedIteratorInternal GetReadFeedIterator(
resourceLink: resourceLink,
isContinuationExpected: false,
allowNonValueAggregateQuery: true,
forcePassthrough: false,
partitionedQueryExecutionInfo: null,
resourceType: resourceType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public async Task TestPassingOptimisticDirectExecutionQueries()
partitionKey: partitionKeyValue,
enableOptimisticDirectExecution: false,
pageSizeOptions: PageSizeOptions.NonGroupByAndNoContinuationTokenPageSizeOptions,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),

// Simple query (requiresDist = false)
CreateInput(
Expand Down Expand Up @@ -277,14 +277,14 @@ public async Task TestPassingOptimisticDirectExecutionQueries()
partitionKey: null,
enableOptimisticDirectExecution: true,
pageSizeOptions: PageSizeOptions.NonGroupByAndNoContinuationTokenPageSizeOptions,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),
CreateInput(
query: $"SELECT VALUE r.numberField FROM r",
expectedResult: first7Integers,
partitionKey: null,
enableOptimisticDirectExecution: true,
pageSizeOptions: PageSizeOptions.NonGroupByWithContinuationTokenPageSizeOptions,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),

// DISTINCT with ORDER BY (requiresDist = true)
CreateInput(
Expand Down Expand Up @@ -445,28 +445,28 @@ public async Task TestQueriesWithPartitionKeyNone()
partitionKey: PartitionKey.None,
enableOptimisticDirectExecution: false,
pageSizeOptions: PageSizeOptions.PageSize100,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),
CreateInput(
query: $"SELECT VALUE r.{NumberField} FROM r ORDER BY r.{NumberField} ASC",
expectedResult: first400Integers,
partitionKey: PartitionKey.None,
enableOptimisticDirectExecution: false,
pageSizeOptions: PageSizeOptions.PageSize100,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),
CreateInput(
query: $"SELECT VALUE r.{NumberField} FROM r ORDER BY r.{NumberField} DESC",
expectedResult: first400IntegersReversed,
partitionKey: PartitionKey.None,
enableOptimisticDirectExecution: false,
pageSizeOptions: PageSizeOptions.PageSize100,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),
CreateInput(
query: $"SELECT VALUE r.numberField FROM r WHERE r.{NumberField} BETWEEN 0 AND {NumberOfDocuments} OFFSET 1 LIMIT 1",
expectedResult: new List<int> { 1 },
partitionKey: PartitionKey.None,
enableOptimisticDirectExecution: false,
pageSizeOptions: PageSizeOptions.PageSize100,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),

//TODO: Change expectedPipelineType to OptimisticDirectExecution once emulator is updated to 0415
CreateInput(
Expand Down
Loading

0 comments on commit 809b6e8

Please sign in to comment.