Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Query: Fixes optimalPageSize logic for OFFSET LIMIT in ORDER BY queries #4158

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,17 @@ private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryEx
long optimalPageSize = inputParameters.MaxItemCount;
if (queryInfo.HasOrderBy)
{
int top;
if (queryInfo.HasTop && (top = partitionedQueryExecutionInfo.QueryInfo.Top.Value) > 0)
int top = 0;
sc978345 marked this conversation as resolved.
Show resolved Hide resolved
if (queryInfo.HasTop && (partitionedQueryExecutionInfo.QueryInfo.Top.Value > 0))
{
top = partitionedQueryExecutionInfo.QueryInfo.Top.Value;
}
else if (queryInfo.HasLimit && (partitionedQueryExecutionInfo.QueryInfo.Limit.Value > 0))
{
top = (partitionedQueryExecutionInfo.QueryInfo.Offset ?? 0) + partitionedQueryExecutionInfo.QueryInfo.Limit.Value;
}

if (top > 0)
{
// All partitions should initially fetch about 1/nth of the top value.
long pageSizeWithTop = (long)Math.Min(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.QueryPlan
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
Expand Down Expand Up @@ -290,6 +291,8 @@ internal TryCatch<PartitionedQueryExecutionInfoInternal> TryGetPartitionedQueryE
MaxDepth = 64, // https://github.com/advisories/GHSA-5crp-9r3c-p9vr
});

Debug.Assert(!(queryInfoInternal.QueryInfo.HasTop && queryInfoInternal.QueryInfo.HasLimit));

return TryCatch<PartitionedQueryExecutionInfoInternal>.FromResult(queryInfoInternal);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,27 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Pagination;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;
using Microsoft.Azure.Cosmos.Tests.Pagination;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Newtonsoft.Json;

[TestClass]
Expand All @@ -46,7 +54,7 @@ public async Task TestMerge()
MergeTestUtil mergeTest = new MergeTestUtil();
mergeTest.DocumentContainer = await CreateDocumentContainerAsync(
documents: documents,
numPartitions: 2,
numSplits: 2,
failureConfigs: new FlakyDocumentContainer.FailureConfigs(
inject429s: false,
injectEmptyPages: false,
Expand Down Expand Up @@ -249,6 +257,160 @@ public async Task Tracing()
Assert.AreEqual(numTraces, rootTrace.Children.Count);
}

[TestMethod]
public async Task OffsetLimitPageSize()
{
List<CosmosObject> documents = new List<CosmosObject>();
for (int i = 0; i < 1100; i++)
{
documents.Add(CosmosObject.Parse($"{{\"pk\" : {i} }}"));
}

MockInMemoryContainer mockInMemoryContainer = new MockInMemoryContainer(new InMemoryContainer(partitionKeyDefinition));
DocumentContainer documentContainer = await CreateDocumentContainerAsync(documents, mockInMemoryContainer, numSplits: 4);

// OFFSET/LIMIT with ORDER BY
await this.TestPageSizeAsync("SELECT c.pk FROM c ORDER BY c.pk OFFSET 0 LIMIT 500", expectedPageSize: 315, expectedResults: 500, mockInMemoryContainer, documentContainer);
await this.TestPageSizeAsync("SELECT c.pk FROM c ORDER BY c.pk OFFSET 10000 LIMIT 5000", expectedPageSize: 1000, expectedResults: 0, mockInMemoryContainer, documentContainer);
await this.TestPageSizeAsync("SELECT c.pk FROM c ORDER BY c.pk OFFSET 10 LIMIT 100", expectedPageSize: 70, expectedResults: 100, mockInMemoryContainer, documentContainer);
await this.TestPageSizeAsync("SELECT c.pk FROM c ORDER BY c.pk OFFSET 0 LIMIT 100", expectedPageSize: 65, expectedResults: 100, mockInMemoryContainer, documentContainer);
await this.TestPageSizeAsync("SELECT c.pk FROM c ORDER BY c.pk OFFSET 100 LIMIT 0", expectedPageSize: 65, expectedResults: 0, mockInMemoryContainer, documentContainer);

// OFFSET/LIMIT without ORDER BY
await this.TestPageSizeAsync("SELECT c.pk FROM c OFFSET 10 LIMIT 100", expectedPageSize: 1000, expectedResults: 100, mockInMemoryContainer, documentContainer);
await this.TestPageSizeAsync("SELECT c.pk FROM c OFFSET 0 LIMIT 100", expectedPageSize: 1000, expectedResults: 100, mockInMemoryContainer, documentContainer);
await this.TestPageSizeAsync("SELECT c.pk FROM c OFFSET 100 LIMIT 0", expectedPageSize: 1000, expectedResults: 0, mockInMemoryContainer, documentContainer);

// TOP with ORDER BY
await this.TestPageSizeAsync("SELECT TOP 5 c.pk FROM c ORDER BY c.pk", expectedPageSize: 5, expectedResults: 5, mockInMemoryContainer, documentContainer);
await this.TestPageSizeAsync("SELECT TOP 100 c.pk FROM c ORDER BY c.pk", expectedPageSize: 65, expectedResults: 100, mockInMemoryContainer, documentContainer);
await this.TestPageSizeAsync("SELECT TOP 5000 c.pk FROM c ORDER BY c.pk", expectedPageSize: 1000, expectedResults: 1100, mockInMemoryContainer, documentContainer);
await this.TestPageSizeAsync("SELECT TOP 15000 c.pk FROM c ORDER BY c.pk", expectedPageSize: 1000, expectedResults: 1100, mockInMemoryContainer, documentContainer);

// TOP without ORDER BY
await this.TestPageSizeAsync("SELECT TOP 5 c.pk FROM c", expectedPageSize: 1000, expectedResults: 5, mockInMemoryContainer, documentContainer);
await this.TestPageSizeAsync("SELECT TOP 5000 c.pk FROM c", expectedPageSize: 1000, expectedResults: 1100, mockInMemoryContainer, documentContainer);
await this.TestPageSizeAsync("SELECT TOP 15000 c.pk FROM c", expectedPageSize: 1000, expectedResults: 1100, mockInMemoryContainer, documentContainer);
}

private async Task TestPageSizeAsync(string query, int expectedPageSize, int expectedResults, MockInMemoryContainer inMemoryContainer, DocumentContainer documentContainer)
{
(CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) = CreateInputParamsAndQueryContext(
query,
partitionKeyDefinition,
null,
new QueryRequestOptions());

IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create(
documentContainer,
cosmosQueryContextCore,
inputParameters,
NoOpTrace.Singleton);

List<CosmosElement> elements = new List<CosmosElement>();
while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton))
{
TryCatch<QueryPage> tryGetQueryPage = queryPipelineStage.Current;
tryGetQueryPage.ThrowIfFailed();

elements.AddRange(tryGetQueryPage.Result.Documents);
}

Assert.AreEqual(expected: expectedPageSize, actual: inMemoryContainer.PageSizeSpecified);
Assert.AreEqual(expected: expectedResults, actual: elements.Count);
}
sc978345 marked this conversation as resolved.
Show resolved Hide resolved

private static Tuple<CosmosQueryExecutionContextFactory.InputParameters, CosmosQueryContextCore> CreateInputParamsAndQueryContext(
string query,
PartitionKeyDefinition partitionKeyDefinition,
Cosmos.PartitionKey? partitionKeyValue,
QueryRequestOptions queryRequestOptions)
{
CosmosSerializerCore serializerCore = new();
using StreamReader streamReader = new(serializerCore.ToStreamSqlQuerySpec(new SqlQuerySpec(query), Documents.ResourceType.Document));
string sqlQuerySpecJsonString = streamReader.ReadToEnd();

CosmosQueryExecutionContextFactory.InputParameters inputParameters = new CosmosQueryExecutionContextFactory.InputParameters(
sqlQuerySpec: new SqlQuerySpec(query),
initialUserContinuationToken: null,
initialFeedRange: null,
maxConcurrency: queryRequestOptions.MaxConcurrency,
maxItemCount: queryRequestOptions.MaxItemCount,
maxBufferedItemCount: queryRequestOptions.MaxBufferedItemCount,
partitionKey: partitionKeyValue,
properties: new Dictionary<string, object>() { { "x-ms-query-partitionkey-definition", partitionKeyDefinition } },
partitionedQueryExecutionInfo: null,
executionEnvironment: null,
returnResultsInDeterministicOrder: null,
forcePassthrough: false,
enableOptimisticDirectExecution: queryRequestOptions.EnableOptimisticDirectExecution,
testInjections: queryRequestOptions.TestSettings);

string databaseId = "db1234";
string resourceLink = $"dbs/{databaseId}/colls";

const string suffix = "-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF";

List<PartitionKeyRange> partitionKeyRanges = new List<PartitionKeyRange>
{
new PartitionKeyRange() { MinInclusive = Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey, MaxExclusive = "1F" + suffix },
new PartitionKeyRange() { MinInclusive = "1F" + suffix, MaxExclusive = "3F" + suffix },
new PartitionKeyRange() { MinInclusive = "3F" + suffix, MaxExclusive = "5F" + suffix },
new PartitionKeyRange() { MinInclusive = "5F" + suffix, MaxExclusive = "7F" + suffix },
new PartitionKeyRange() { MinInclusive = "7F" + suffix, MaxExclusive = "9F" + suffix },
new PartitionKeyRange() { MinInclusive = "9F" + suffix, MaxExclusive = "BF" + suffix },
new PartitionKeyRange() { MinInclusive = "BF" + suffix, MaxExclusive = "DF" + suffix },
new PartitionKeyRange() { MinInclusive = "DF" + suffix, MaxExclusive = Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey },
};

Mock<CosmosQueryClient> mockClient = new Mock<CosmosQueryClient>();

mockClient.Setup(x => x.GetTargetPartitionKeyRangesAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<IReadOnlyList<Documents.Routing.Range<string>>>(),
It.IsAny<bool>(),
It.IsAny<ITrace>()))
.Returns((string resourceLink, string collectionResourceId, IReadOnlyList<Documents.Routing.Range<string>> providedRanges, bool forceRefresh, ITrace trace) => Task.FromResult(partitionKeyRanges));

mockClient.Setup(x => x.TryGetPartitionedQueryExecutionInfoAsync(
It.IsAny<SqlQuerySpec>(),
It.IsAny<ResourceType>(),
It.IsAny<PartitionKeyDefinition>(),
It.IsAny<bool>(),
It.IsAny<bool>(),
It.IsAny<bool>(),
It.IsAny<bool>(),
It.IsAny<bool>(),
It.IsAny<bool>(),
It.IsAny<Cosmos.GeospatialType>(),
It.IsAny<CancellationToken>()))
.Returns((SqlQuerySpec sqlQuerySpec, ResourceType resourceType, PartitionKeyDefinition partitionKeyDefinition, bool requireFormattableOrderByQuery, bool isContinuationExpected, bool allowNonValueAggregateQuery, bool hasLogicalPartitionKey, bool allowDCount, bool useSystemPrefix, Cosmos.GeospatialType geospatialType, CancellationToken cancellationToken) =>
{
CosmosSerializerCore serializerCore = new();
using StreamReader streamReader = new(serializerCore.ToStreamSqlQuerySpec(sqlQuerySpec, Documents.ResourceType.Document));
string sqlQuerySpecJsonString = streamReader.ReadToEnd();

TryCatch<PartitionedQueryExecutionInfo> queryPlan = OptimisticDirectExecutionQueryBaselineTests.TryGetPartitionedQueryExecutionInfo(sqlQuerySpecJsonString, partitionKeyDefinition);
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = queryPlan.Succeeded ? queryPlan.Result : throw queryPlan.Exception;
return Task.FromResult(TryCatch<PartitionedQueryExecutionInfo>.FromResult(partitionedQueryExecutionInfo));
}
);

CosmosQueryContextCore cosmosQueryContextCore = new CosmosQueryContextCore(
client: mockClient.Object,
resourceTypeEnum: Documents.ResourceType.Document,
operationType: Documents.OperationType.Query,
resourceType: typeof(QueryResponseCore),
resourceLink: resourceLink,
isContinuationExpected: true,
allowNonValueAggregateQuery: true,
useSystemPrefix: false,
correlatedActivityId: Guid.NewGuid());

return Tuple.Create(inputParameters, cosmosQueryContextCore);
}

internal static async Task<List<CosmosElement>> ExecuteQueryAsync(
string query,
IReadOnlyList<CosmosObject> documents,
Expand Down Expand Up @@ -329,13 +491,13 @@ private static async Task<List<CosmosElement>> DrainWithStateAsync(string query,
return elements;
}

internal static Task<IDocumentContainer> CreateDocumentContainerAsync(
internal static Task<DocumentContainer> CreateDocumentContainerAsync(
IReadOnlyList<CosmosObject> documents,
int numPartitions = 3,
int numSplits = 3,
FlakyDocumentContainer.FailureConfigs failureConfigs = null)
{
IMonadicDocumentContainer monadicDocumentContainer = CreateMonadicDocumentContainerAsync(failureConfigs);
return CreateDocumentContainerAsync(documents, monadicDocumentContainer, numPartitions);
return CreateDocumentContainerAsync(documents, monadicDocumentContainer, numSplits);
}

internal static IMonadicDocumentContainer CreateMonadicDocumentContainerAsync(FlakyDocumentContainer.FailureConfigs failureConfigs)
Expand All @@ -349,14 +511,14 @@ internal static IMonadicDocumentContainer CreateMonadicDocumentContainerAsync(Fl
return monadicDocumentContainer;
}

internal static async Task<IDocumentContainer> CreateDocumentContainerAsync(
internal static async Task<DocumentContainer> CreateDocumentContainerAsync(
IReadOnlyList<CosmosObject> documents,
IMonadicDocumentContainer monadicDocumentContainer,
int numPartitions)
int numSplits)
{
DocumentContainer documentContainer = new DocumentContainer(monadicDocumentContainer);

for (int i = 0; i < numPartitions; i++)
for (int i = 0; i < numSplits; i++)
{
IReadOnlyList<FeedRangeInternal> ranges = await documentContainer.GetFeedRangesAsync(
trace: NoOpTrace.Singleton,
Expand Down Expand Up @@ -461,5 +623,79 @@ public async Task<Exception> ShouldReturnFailure()
}
}
}

private class MockInMemoryContainer : IMonadicDocumentContainer
{
public int? PageSizeSpecified { get; private set; }
public IMonadicDocumentContainer MonadicDocumentContainer { get; }

public MockInMemoryContainer(IMonadicDocumentContainer documentContainer)
{
this.MonadicDocumentContainer = documentContainer;
this.PageSizeSpecified = null;
}

public void Reset()
{
this.PageSizeSpecified = null;
}

public Task<TryCatch<ChangeFeedPage>> MonadicChangeFeedAsync(FeedRangeState<ChangeFeedState> feedRangeState, ChangeFeedPaginationOptions changeFeedPaginationOptions, ITrace trace, CancellationToken cancellationToken)
{
return this.MonadicDocumentContainer.MonadicChangeFeedAsync(feedRangeState, changeFeedPaginationOptions, trace, cancellationToken);
}

public Task<TryCatch<Record>> MonadicCreateItemAsync(CosmosObject payload, CancellationToken cancellationToken)
{
return this.MonadicDocumentContainer.MonadicCreateItemAsync(payload, cancellationToken);
}

public Task<TryCatch<List<FeedRangeEpk>>> MonadicGetChildRangeAsync(FeedRangeInternal feedRange, ITrace trace, CancellationToken cancellationToken)
{
return this.MonadicDocumentContainer.MonadicGetChildRangeAsync(feedRange, trace, cancellationToken);
}

public Task<TryCatch<List<FeedRangeEpk>>> MonadicGetFeedRangesAsync(ITrace trace, CancellationToken cancellationToken)
{
return this.MonadicDocumentContainer.MonadicGetFeedRangesAsync(trace, cancellationToken);
}

public Task<TryCatch<string>> MonadicGetResourceIdentifierAsync(ITrace trace, CancellationToken cancellationToken)
{
return this.MonadicDocumentContainer.MonadicGetResourceIdentifierAsync(trace, cancellationToken);
}

public Task<TryCatch> MonadicMergeAsync(FeedRangeInternal feedRange1, FeedRangeInternal feedRange2, CancellationToken cancellationToken)
{
return this.MonadicDocumentContainer.MonadicMergeAsync(feedRange1, feedRange2, cancellationToken);
}

public Task<TryCatch<QueryPage>> MonadicQueryAsync(SqlQuerySpec sqlQuerySpec, FeedRangeState<QueryState> feedRangeState, QueryPaginationOptions queryPaginationOptions, ITrace trace, CancellationToken cancellationToken)
{
this.PageSizeSpecified = queryPaginationOptions.PageSizeLimit;

return this.MonadicDocumentContainer.MonadicQueryAsync(sqlQuerySpec, feedRangeState, queryPaginationOptions, trace, cancellationToken);
}

public Task<TryCatch<ReadFeedPage>> MonadicReadFeedAsync(FeedRangeState<ReadFeedState> feedRangeState, ReadFeedPaginationOptions readFeedPaginationOptions, ITrace trace, CancellationToken cancellationToken)
{
return this.MonadicDocumentContainer.MonadicReadFeedAsync(feedRangeState, readFeedPaginationOptions, trace, cancellationToken);
}

public Task<TryCatch<Record>> MonadicReadItemAsync(CosmosElement partitionKey, string identifier, CancellationToken cancellationToken)
{
return this.MonadicDocumentContainer.MonadicReadItemAsync(partitionKey, identifier, cancellationToken);
}

public Task<TryCatch> MonadicRefreshProviderAsync(ITrace trace, CancellationToken cancellationToken)
{
return this.MonadicDocumentContainer.MonadicRefreshProviderAsync(trace, cancellationToken);
}

public Task<TryCatch> MonadicSplitAsync(FeedRangeInternal feedRange, CancellationToken cancellationToken)
{
return this.MonadicDocumentContainer.MonadicSplitAsync(feedRange, cancellationToken);
}
}
}
}
Loading