diff --git a/sdk/cosmosdb/cosmos/review/cosmos.api.md b/sdk/cosmosdb/cosmos/review/cosmos.api.md index 454f36a290fc..0628fb896ed5 100644 --- a/sdk/cosmosdb/cosmos/review/cosmos.api.md +++ b/sdk/cosmosdb/cosmos/review/cosmos.api.md @@ -1054,6 +1054,7 @@ export interface FeedOptions extends SharedOptions { continuationToken?: string; continuationTokenLimitInKB?: number; disableNonStreamingOrderByQuery?: boolean; + enableQueryControl?: boolean; enableScanInQuery?: boolean; forceQueryPlan?: boolean; maxDegreeOfParallelism?: number; diff --git a/sdk/cosmosdb/cosmos/src/common/Stack.ts b/sdk/cosmosdb/cosmos/src/common/Stack.ts new file mode 100644 index 000000000000..cf0e14b7bf8a --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/common/Stack.ts @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +export class Stack { + private items: T[] = []; + + // Push an element onto the stack + push(element: T): void { + this.items.push(element); + } + + // Pop an element off the stack + pop(): T | undefined { + return this.items.pop(); + } + + // Peek at the top element of the stack + peek(): T | undefined { + return this.items[this.items.length - 1]; + } + + // Check if the stack is empty + isEmpty(): boolean { + return this.items.length === 0; + } + + // Get the size of the stack + size(): number { + return this.items.length; + } + + // Clear the stack + clear(): void { + this.items = []; + } +} diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/Aggregators/GlobalStatisticsAggregator.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/Aggregators/GlobalStatisticsAggregator.ts index 8b548cd72b32..84e3dd93831b 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/Aggregators/GlobalStatisticsAggregator.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/Aggregators/GlobalStatisticsAggregator.ts @@ -1,8 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -import { GlobalStatistics } from "../../request/globalStatistics"; -import { Aggregator } from "./Aggregator"; +import type { GlobalStatistics } from "../../request/globalStatistics"; +import type { Aggregator } from "./Aggregator"; export class GlobalStatisticsAggregator implements Aggregator { private globalStatistics: GlobalStatistics; diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/GroupByEndpointComponent.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/GroupByEndpointComponent.ts index 7a8d1205ab2a..7c310f075f98 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/GroupByEndpointComponent.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/GroupByEndpointComponent.ts @@ -11,11 +11,6 @@ import { getInitialHeader, mergeHeaders } from "../headerUtils"; import { emptyGroup, extractAggregateResult } from "./emptyGroup"; import type { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal"; -interface GroupByResponse { - result: GroupByResult; - headers: CosmosHeaders; -} - interface GroupByResult { groupByItems: any[]; payload: any; @@ -32,52 +27,51 @@ export class GroupByEndpointComponent implements ExecutionContext { private readonly aggregateResultArray: any[] = []; private completed: boolean = false; - public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise> { - // If we have a full result set, begin returning results - if (this.aggregateResultArray.length > 0) { - return { - result: this.aggregateResultArray.pop(), - headers: getInitialHeader(), - }; - } + public hasMoreResults(): boolean { + return this.executionContext.hasMoreResults(); + } + public async fetchMore(diagnosticNode: DiagnosticNodeInternal): Promise> { if (this.completed) { return { result: undefined, headers: getInitialHeader(), }; } - const aggregateHeaders = getInitialHeader(); + const response = await this.executionContext.fetchMore(diagnosticNode); + mergeHeaders(aggregateHeaders, response.headers); - while (this.executionContext.hasMoreResults()) { - // Grab the next result - const { result, headers } = (await this.executionContext.nextItem( - diagnosticNode, - )) as GroupByResponse; - mergeHeaders(aggregateHeaders, headers); + if (response === undefined || response.result === undefined) { + // If there are any groupings, consolidate and return them + if (this.groupings.size > 0) { + return this.consolidateGroupResults(aggregateHeaders); + } + return { result: undefined, headers: aggregateHeaders }; + } + for (const item of response.result as GroupByResult[]) { // If it exists, process it via aggregators - if (result) { - const group = result.groupByItems ? await hashObject(result.groupByItems) : emptyGroup; + if (item) { + const group = item.groupByItems ? await hashObject(item.groupByItems) : emptyGroup; const aggregators = this.groupings.get(group); - const payload = result.payload; + const payload = item.payload; if (aggregators) { // Iterator over all results in the payload - Object.keys(payload).map((key) => { + for (const key of Object.keys(payload)) { // in case the value of a group is null make sure we create a dummy payload with item2==null const effectiveGroupByValue = payload[key] ? payload[key] : new Map().set("item2", null); const aggregateResult = extractAggregateResult(effectiveGroupByValue); aggregators.get(key).aggregate(aggregateResult); - }); + } } else { // This is the first time we have seen a grouping. Setup the initial result without aggregate values const grouping = new Map(); this.groupings.set(group, grouping); // Iterator over all results in the payload - Object.keys(payload).map((key) => { + for (const key of Object.keys(payload)) { const aggregateType = this.queryInfo.groupByAliasToAggregateType[key]; // Create a new aggregator for this specific aggregate field const aggregator = createAggregator(aggregateType); @@ -88,11 +82,22 @@ export class GroupByEndpointComponent implements ExecutionContext { } else { aggregator.aggregate(payload[key]); } - }); + } } } } + if (this.executionContext.hasMoreResults()) { + return { + result: [], + headers: aggregateHeaders, + }; + } else { + return this.consolidateGroupResults(aggregateHeaders); + } + } + + private consolidateGroupResults(aggregateHeaders: CosmosHeaders): Response { for (const grouping of this.groupings.values()) { const groupResult: any = {}; for (const [aggregateKey, aggregator] of grouping.entries()) { @@ -101,13 +106,6 @@ export class GroupByEndpointComponent implements ExecutionContext { this.aggregateResultArray.push(groupResult); } this.completed = true; - return { - result: this.aggregateResultArray.pop(), - headers: aggregateHeaders, - }; - } - - public hasMoreResults(): boolean { - return this.executionContext.hasMoreResults() || this.aggregateResultArray.length > 0; + return { result: this.aggregateResultArray, headers: aggregateHeaders }; } } diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/GroupByValueEndpointComponent.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/GroupByValueEndpointComponent.ts index a1ff1c109638..eee102351225 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/GroupByValueEndpointComponent.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/GroupByValueEndpointComponent.ts @@ -11,11 +11,6 @@ import { getInitialHeader, mergeHeaders } from "../headerUtils"; import { emptyGroup, extractAggregateResult } from "./emptyGroup"; import type { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal"; -interface GroupByResponse { - result: GroupByResult; - headers: CosmosHeaders; -} - interface GroupByResult { groupByItems: any[]; payload: any; @@ -36,39 +31,36 @@ export class GroupByValueEndpointComponent implements ExecutionContext { this.aggregateType = this.queryInfo.aggregates[0]; } - public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise> { - // Start returning results if we have processed a full results set - if (this.aggregateResultArray.length > 0) { - return { - result: this.aggregateResultArray.pop(), - headers: getInitialHeader(), - }; - } + public hasMoreResults(): boolean { + return this.executionContext.hasMoreResults(); + } + public async fetchMore(diagnosticNode: DiagnosticNodeInternal): Promise> { if (this.completed) { return { result: undefined, headers: getInitialHeader(), }; } - const aggregateHeaders = getInitialHeader(); + const response = await this.executionContext.fetchMore(diagnosticNode); + mergeHeaders(aggregateHeaders, response.headers); - while (this.executionContext.hasMoreResults()) { - // Grab the next result - const { result, headers } = (await this.executionContext.nextItem( - diagnosticNode, - )) as GroupByResponse; - mergeHeaders(aggregateHeaders, headers); + if (response === undefined || response.result === undefined) { + if (this.aggregators.size > 0) { + return this.generateAggregateResponse(aggregateHeaders); + } + return { result: undefined, headers: aggregateHeaders }; + } - // If it exists, process it via aggregators - if (result) { + for (const item of response.result as GroupByResult[]) { + if (item) { let grouping: string = emptyGroup; - let payload: any = result; - if (result.groupByItems) { + let payload: any = item; + if (item.groupByItems) { // If the query contains a GROUP BY clause, it will have a payload property and groupByItems - payload = result.payload; - grouping = await hashObject(result.groupByItems); + payload = item.payload; + grouping = await hashObject(item.groupByItems); } const aggregator = this.aggregators.get(grouping); @@ -99,18 +91,26 @@ export class GroupByValueEndpointComponent implements ExecutionContext { headers: aggregateHeaders, }; } - // If no results are left in the underlying execution context, convert our aggregate results to an array + + if (this.executionContext.hasMoreResults()) { + return { result: [], headers: aggregateHeaders }; + } else { + // If no results are left in the underlying execution context, convert our aggregate results to an array + return this.generateAggregateResponse(aggregateHeaders); + } + } + + private generateAggregateResponse(aggregateHeaders: CosmosHeaders): Response { for (const aggregator of this.aggregators.values()) { - this.aggregateResultArray.push(aggregator.getResult()); + const result = aggregator.getResult(); + if (result !== undefined) { + this.aggregateResultArray.push(result); + } } this.completed = true; return { - result: this.aggregateResultArray.pop(), + result: this.aggregateResultArray, headers: aggregateHeaders, }; } - - public hasMoreResults(): boolean { - return this.executionContext.hasMoreResults() || this.aggregateResultArray.length > 0; - } } diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/NonStreamingOrderByDistinctEndpointComponent.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/NonStreamingOrderByDistinctEndpointComponent.ts index b70612c0520a..cb2f216020d6 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/NonStreamingOrderByDistinctEndpointComponent.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/NonStreamingOrderByDistinctEndpointComponent.ts @@ -6,7 +6,6 @@ import { getInitialHeader } from "../headerUtils"; import type { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal"; import { hashObject } from "../../utils/hashObject"; import type { NonStreamingOrderByResult } from "../nonStreamingOrderByResult"; -import type { NonStreamingOrderByResponse } from "../nonStreamingOrderByResponse"; import { FixedSizePriorityQueue } from "../../utils/fixedSizePriorityQueue"; import { NonStreamingOrderByMap } from "../../utils/nonStreamingOrderByMap"; import { OrderByComparator } from "../orderByComparator"; @@ -56,58 +55,6 @@ export class NonStreamingOrderByDistinctEndpointComponent implements ExecutionCo ); } - public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise> { - let resHeaders = getInitialHeader(); - // if size is 0, just return undefined to signal to more results. Valid if query is TOP 0 or LIMIT 0 - if (this.priorityQueueBufferSize <= 0) { - return { - result: undefined, - headers: resHeaders, - }; - } - - // If there are more results in backend, keep filling map. - if (this.executionContext.hasMoreResults()) { - // Grab the next result - const { result, headers } = (await this.executionContext.nextItem( - diagnosticNode, - )) as NonStreamingOrderByResponse; - resHeaders = headers; - if (result) { - // make hash of result object and update the map if required. - const key = await hashObject(result?.payload); - this.aggregateMap.set(key, result); - } - - // return {} to signal that there are more results to fetch. - if (this.executionContext.hasMoreResults()) { - return { - result: {}, - headers: resHeaders, - }; - } - } - - // If all results are fetched from backend, prepare final results - if (!this.executionContext.hasMoreResults() && !this.isCompleted) { - this.isCompleted = true; - await this.buildFinalResultArray(); - } - - // Return results from final array. - if (this.finalResultArray.length > 0) { - return { - result: this.finalResultArray.shift(), - headers: resHeaders, - }; - } - // Signal that there are no more results. - return { - result: undefined, - headers: resHeaders, - }; - } - /** * Build final sorted result array from which responses will be served. */ @@ -140,6 +87,63 @@ export class NonStreamingOrderByDistinctEndpointComponent implements ExecutionCo public hasMoreResults(): boolean { if (this.priorityQueueBufferSize === 0) return false; - return this.executionContext.hasMoreResults() || this.finalResultArray.length > 0; + return this.executionContext.hasMoreResults(); + } + + public async fetchMore(diagnosticNode?: DiagnosticNodeInternal): Promise> { + if (this.isCompleted) { + return { + result: undefined, + headers: getInitialHeader(), + }; + } + let resHeaders = getInitialHeader(); + + // If there are more results in backend, keep filling map. + if (this.executionContext.hasMoreResults()) { + // Grab the next result + const response = await this.executionContext.fetchMore(diagnosticNode); + if (response === undefined || response.result === undefined) { + this.isCompleted = true; + if (this.aggregateMap.size() > 0) { + await this.buildFinalResultArray(); + return { + result: this.finalResultArray, + headers: response.headers, + }; + } + return { result: undefined, headers: response.headers }; + } + resHeaders = response.headers; + for (const item of response.result) { + if (item) { + const key = await hashObject(item?.payload); + this.aggregateMap.set(key, item); + } + } + + // return [] to signal that there are more results to fetch. + if (this.executionContext.hasMoreResults()) { + return { + result: [], + headers: resHeaders, + }; + } + } + + // If all results are fetched from backend, prepare final results + if (!this.executionContext.hasMoreResults() && !this.isCompleted) { + this.isCompleted = true; + await this.buildFinalResultArray(); + return { + result: this.finalResultArray, + headers: resHeaders, + }; + } + // Signal that there are no more results. + return { + result: undefined, + headers: resHeaders, + }; } } diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/NonStreamingOrderByEndpointComponent.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/NonStreamingOrderByEndpointComponent.ts index 05b2213309cc..852d346aae54 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/NonStreamingOrderByEndpointComponent.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/NonStreamingOrderByEndpointComponent.ts @@ -6,6 +6,7 @@ import type { ExecutionContext } from "../ExecutionContext"; import { OrderByComparator } from "../orderByComparator"; import type { NonStreamingOrderByResult } from "../nonStreamingOrderByResult"; import { FixedSizePriorityQueue } from "../../utils/fixedSizePriorityQueue"; +import type { CosmosHeaders } from "../headerUtils"; import { getInitialHeader } from "../headerUtils"; /** @@ -44,7 +45,25 @@ export class NonStreamingOrderByEndpointComponent implements ExecutionContext { ); } - public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise> { + /** + * Determine if there are still remaining resources to processs. + * @returns true if there is other elements to process in the NonStreamingOrderByEndpointComponent. + */ + public hasMoreResults(): boolean { + return this.priorityQueueBufferSize > 0 && this.executionContext.hasMoreResults(); + } + + /** + * Fetches the next batch of the result from the target container. + * @param diagnosticNode - The diagnostic information for the request. + */ + public async fetchMore(diagnosticNode?: DiagnosticNodeInternal): Promise> { + if (this.isCompleted) { + return { + result: undefined, + headers: getInitialHeader(), + }; + } let resHeaders = getInitialHeader(); // if size is 0, just return undefined to signal to more results. Valid if query is TOP 0 or LIMIT 0 if (this.priorityQueueBufferSize <= 0) { @@ -53,68 +72,78 @@ export class NonStreamingOrderByEndpointComponent implements ExecutionContext { headers: resHeaders, }; } - // If there are more results in backend, keep filling pq. if (this.executionContext.hasMoreResults()) { - const { result: item, headers } = await this.executionContext.nextItem(diagnosticNode); - resHeaders = headers; - if (item !== undefined) { - this.nonStreamingOrderByPQ.enqueue(item); + const response = await this.executionContext.fetchMore(diagnosticNode); + resHeaders = response.headers; + if (response === undefined || response.result === undefined) { + this.isCompleted = true; + if (!this.nonStreamingOrderByPQ.isEmpty()) { + return this.buildFinalResultArray(resHeaders); + } + return { result: undefined, headers: resHeaders }; } - // If the backend has more results to fetch, return {} to signal that there are more results to fetch. - if (this.executionContext.hasMoreResults()) { - return { - result: {}, - headers: resHeaders, - }; + for (const item of response.result) { + if (item !== undefined) { + this.nonStreamingOrderByPQ.enqueue(item); + } } } + + // If the backend has more results to fetch, return [] to signal that there are more results to fetch. + if (this.executionContext.hasMoreResults()) { + return { + result: [], + headers: resHeaders, + }; + } + // If all results are fetched from backend, prepare final results if (!this.executionContext.hasMoreResults() && !this.isCompleted) { - // Set isCompleted to true. this.isCompleted = true; - // Reverse the priority queue to get the results in the correct order - this.nonStreamingOrderByPQ = this.nonStreamingOrderByPQ.reverse(); - // For offset limit case we set the size of priority queue to offset + limit - // and we drain offset number of items from the priority queue - while ( - this.offset < this.priorityQueueBufferSize && - this.offset > 0 && - !this.nonStreamingOrderByPQ.isEmpty() - ) { - this.nonStreamingOrderByPQ.dequeue(); - this.offset--; - } + return this.buildFinalResultArray(resHeaders); } + + // If pq is empty, return undefined to signal that there are no more results. + return { + result: undefined, + headers: resHeaders, + }; + } + + private async buildFinalResultArray(resHeaders: CosmosHeaders): Promise> { + // Set isCompleted to true. + this.isCompleted = true; + // Reverse the priority queue to get the results in the correct order + this.nonStreamingOrderByPQ = this.nonStreamingOrderByPQ.reverse(); + // For offset limit case we set the size of priority queue to offset + limit + // and we drain offset number of items from the priority queue + while ( + this.offset < this.priorityQueueBufferSize && + this.offset > 0 && + !this.nonStreamingOrderByPQ.isEmpty() + ) { + this.nonStreamingOrderByPQ.dequeue(); + this.offset--; + } + // If pq is not empty, return the result from pq. if (!this.nonStreamingOrderByPQ.isEmpty()) { - let item; + const buffer: any[] = []; if (this.emitRawOrderByPayload) { - item = this.nonStreamingOrderByPQ.dequeue(); + while (!this.nonStreamingOrderByPQ.isEmpty()) { + buffer.push(this.nonStreamingOrderByPQ.dequeue()); + } } else { - item = this.nonStreamingOrderByPQ.dequeue()?.payload; + while (!this.nonStreamingOrderByPQ.isEmpty()) { + buffer.push(this.nonStreamingOrderByPQ.dequeue()?.payload); + } } return { - result: item, + result: buffer, headers: resHeaders, }; } - // If pq is empty, return undefined to signal that there are no more results. - return { - result: undefined, - headers: resHeaders, - }; - } - - /** - * Determine if there are still remaining resources to processs. - * @returns true if there is other elements to process in the NonStreamingOrderByEndpointComponent. - */ - public hasMoreResults(): boolean { - return ( - this.priorityQueueBufferSize > 0 && - (this.executionContext.hasMoreResults() || !this.nonStreamingOrderByPQ.isEmpty()) - ); } } diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/OffsetLimitEndpointComponent.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/OffsetLimitEndpointComponent.ts index c5a69636852a..65ca1b740c93 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/OffsetLimitEndpointComponent.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/OffsetLimitEndpointComponent.ts @@ -13,28 +13,27 @@ export class OffsetLimitEndpointComponent implements ExecutionContext { private limit: number, ) {} - public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise> { + public hasMoreResults(): boolean { + return (this.offset > 0 || this.limit > 0) && this.executionContext.hasMoreResults(); + } + + public async fetchMore(diagnosticNode?: DiagnosticNodeInternal): Promise> { const aggregateHeaders = getInitialHeader(); - while (this.offset > 0) { - // Grab next item but ignore the result. We only need the headers - const { headers } = await this.executionContext.nextItem(diagnosticNode); - this.offset--; - mergeHeaders(aggregateHeaders, headers); - } - if (this.limit > 0) { - const { result, headers } = await this.executionContext.nextItem(diagnosticNode); - this.limit--; - mergeHeaders(aggregateHeaders, headers); - return { result, headers: aggregateHeaders }; + const buffer: any[] = []; + const response = await this.executionContext.fetchMore(diagnosticNode); + mergeHeaders(aggregateHeaders, response.headers); + if (response === undefined || response.result === undefined) { + return { result: undefined, headers: response.headers }; } - // If both limit and offset are 0, return nothing - return { - result: undefined, - headers: getInitialHeader(), - }; - } - public hasMoreResults(): boolean { - return (this.offset > 0 || this.limit > 0) && this.executionContext.hasMoreResults(); + for (const item of response.result) { + if (this.offset > 0) { + this.offset--; + } else if (this.limit > 0) { + buffer.push(item); + this.limit--; + } + } + return { result: buffer, headers: aggregateHeaders }; } } diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/OrderByEndpointComponent.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/OrderByEndpointComponent.ts index 3588dd752e8b..d4ec4fa18ba6 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/OrderByEndpointComponent.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/OrderByEndpointComponent.ts @@ -17,24 +17,6 @@ export class OrderByEndpointComponent implements ExecutionContext { private executionContext: ExecutionContext, private emitRawOrderByPayload: boolean = false, ) {} - /** - * Execute a provided function on the next element in the OrderByEndpointComponent. - */ - public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise> { - const { result: item, headers } = await this.executionContext.nextItem(diagnosticNode); - if (this.emitRawOrderByPayload) { - return { - result: item !== undefined ? item : undefined, - headers, - }; - } else { - return { - result: item !== undefined ? item.payload : undefined, - headers, - }; - } - } - /** * Determine if there are still remaining resources to processs. * @returns true if there is other elements to process in the OrderByEndpointComponent. @@ -42,4 +24,21 @@ export class OrderByEndpointComponent implements ExecutionContext { public hasMoreResults(): boolean { return this.executionContext.hasMoreResults(); } + + public async fetchMore(diagnosticNode?: DiagnosticNodeInternal): Promise> { + const buffer: any[] = []; + const response = await this.executionContext.fetchMore(diagnosticNode); + if (response === undefined || response.result === undefined) { + return { result: undefined, headers: response.headers }; + } + for (const item of response.result) { + if (this.emitRawOrderByPayload) { + buffer.push(item); + } else { + buffer.push(item.payload); + } + } + + return { result: buffer, headers: response.headers }; + } } diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/OrderedDistinctEndpointComponent.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/OrderedDistinctEndpointComponent.ts index 9046a1023fe7..7ff7ffc5bc22 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/OrderedDistinctEndpointComponent.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/OrderedDistinctEndpointComponent.ts @@ -10,19 +10,25 @@ export class OrderedDistinctEndpointComponent implements ExecutionContext { private hashedLastResult: string; constructor(private executionContext: ExecutionContext) {} - public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise> { - const { headers, result } = await this.executionContext.nextItem(diagnosticNode); - if (result) { - const hashedResult = await hashObject(result); - if (hashedResult === this.hashedLastResult) { - return { result: undefined, headers }; - } - this.hashedLastResult = hashedResult; - } - return { result, headers }; - } - public hasMoreResults(): boolean { return this.executionContext.hasMoreResults(); } + + public async fetchMore(diagnosticNode?: DiagnosticNodeInternal): Promise> { + const buffer: any[] = []; + const response = await this.executionContext.fetchMore(diagnosticNode); + if (response === undefined || response.result === undefined) { + return { result: undefined, headers: response.headers }; + } + for (const item of response.result) { + if (item) { + const hashedResult = await hashObject(item); + if (hashedResult !== this.hashedLastResult) { + buffer.push(item); + this.hashedLastResult = hashedResult; + } + } + } + return { result: buffer, headers: response.headers }; + } } diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/UnorderedDistinctEndpointComponent.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/UnorderedDistinctEndpointComponent.ts index 2a75f011c650..ba72a745fca1 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/UnorderedDistinctEndpointComponent.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/EndpointComponent/UnorderedDistinctEndpointComponent.ts @@ -12,19 +12,25 @@ export class UnorderedDistinctEndpointComponent implements ExecutionContext { this.hashedResults = new Set(); } - public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise> { - const { headers, result } = await this.executionContext.nextItem(diagnosticNode); - if (result) { - const hashedResult = await hashObject(result); - if (this.hashedResults.has(hashedResult)) { - return { result: undefined, headers }; - } - this.hashedResults.add(hashedResult); - } - return { result, headers }; - } - public hasMoreResults(): boolean { return this.executionContext.hasMoreResults(); } + + public async fetchMore(diagnosticNode?: DiagnosticNodeInternal): Promise> { + const buffer: any[] = []; + const response = await this.executionContext.fetchMore(diagnosticNode); + if (response === undefined || response.result === undefined) { + return { result: undefined, headers: response.headers }; + } + for (const item of response.result) { + if (item) { + const hashedResult = await hashObject(item); + if (!this.hashedResults.has(hashedResult)) { + buffer.push(item); + this.hashedResults.add(hashedResult); + } + } + } + return { result: buffer, headers: response.headers }; + } } diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/ExecutionContext.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/ExecutionContext.ts index 353c84bf2a7a..9de0b815f878 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/ExecutionContext.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/ExecutionContext.ts @@ -5,7 +5,7 @@ import type { Response } from "../request"; /** @hidden */ export interface ExecutionContext { - nextItem: (diagnosticNode: DiagnosticNodeInternal) => Promise>; + nextItem?: (diagnosticNode: DiagnosticNodeInternal) => Promise>; hasMoreResults: () => boolean; - fetchMore?: (diagnosticNode: DiagnosticNodeInternal) => Promise>; // TODO: code smell + fetchMore?: (diagnosticNode: DiagnosticNodeInternal) => Promise>; } diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/FetchResult.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/FetchResult.ts index e6c516e440a5..9fa522fd7c1b 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/FetchResult.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/FetchResult.ts @@ -10,6 +10,7 @@ export enum FetchResultType { /** @hidden */ export class FetchResult { public feedResponse: any; + public headers: any; public fetchResultType: FetchResultType; public error: any; /** @@ -20,10 +21,11 @@ export class FetchResult { * @param error - The exception meant to be buffered on an unsuccessful fetch * @hidden */ - constructor(feedResponse: unknown, error: unknown) { + constructor(feedResponse: unknown, error: unknown, headers?: unknown) { // TODO: feedResponse/error if (feedResponse !== undefined) { this.feedResponse = feedResponse; + this.headers = headers; this.fetchResultType = FetchResultType.Result; } else { this.error = error; diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/documentProducer2.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/documentProducer2.ts new file mode 100644 index 000000000000..a90c2685cc0a --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/documentProducer2.ts @@ -0,0 +1,343 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +import type { PartitionKeyRange, Resource } from "../client"; +import type { ClientContext } from "../ClientContext"; +import { + Constants, + getIdFromLink, + getPathFromLink, + ResourceType, + StatusCodes, + SubStatusCodes, +} from "../common"; +import type { DiagnosticNodeInternal } from "../diagnostics/DiagnosticNodeInternal"; +import type { FeedOptions } from "../request"; +import type { Response } from "../request"; +import { DefaultQueryExecutionContext } from "./defaultQueryExecutionContext"; +import type { FetchFunctionCallback } from "./defaultQueryExecutionContext"; +import { FetchResult, FetchResultType } from "./FetchResult"; +import { getInitialHeader, mergeHeaders } from "./headerUtils"; +import type { CosmosHeaders } from "./headerUtils"; +import type { SqlQuerySpec } from "./index"; + +/** @hidden */ +export class DocumentProducer { + private collectionLink: string; + private query: string | SqlQuerySpec; + public targetPartitionKeyRange: PartitionKeyRange; + public fetchResults: FetchResult[]; + public allFetched: boolean; + private err: Error; + public previousContinuationToken: string; + public continuationToken: string; + public generation: number = 0; + private respHeaders: CosmosHeaders; + private internalExecutionContext: DefaultQueryExecutionContext; + public startEpk: string; + public endEpk: string; + public populateEpkRangeHeaders: boolean; + + /** + * Provides the Target Partition Range Query Execution Context. + * @param clientContext - The service endpoint to use to create the client. + * @param collectionLink - Represents collection link + * @param query - A SQL query. + * @param targetPartitionKeyRange - Query Target Partition key Range + * @hidden + */ + constructor( + private clientContext: ClientContext, + collectionLink: string, + query: SqlQuerySpec, + targetPartitionKeyRange: PartitionKeyRange, + options: FeedOptions, + correlatedActivityId: string, + startEpk?: string, + endEpk?: string, + populateEpkRangeHeaders?: boolean, + ) { + // TODO: any options + this.collectionLink = collectionLink; + this.query = query; + this.targetPartitionKeyRange = targetPartitionKeyRange; + this.fetchResults = []; + + this.allFetched = false; + this.err = undefined; + + this.previousContinuationToken = undefined; + this.continuationToken = undefined; + this.respHeaders = getInitialHeader(); + + this.internalExecutionContext = new DefaultQueryExecutionContext( + options, + this.fetchFunction, + correlatedActivityId, + ); + this.startEpk = startEpk; + this.endEpk = endEpk; + this.populateEpkRangeHeaders = populateEpkRangeHeaders; + } + public peekBufferedItems(): any[] { + const bufferedResults = []; + for (let i = 0, done = false; i < this.fetchResults.length && !done; i++) { + const fetchResult = this.fetchResults[i]; + switch (fetchResult.fetchResultType) { + case FetchResultType.Done: + done = true; + break; + case FetchResultType.Exception: + done = true; + break; + case FetchResultType.Result: + bufferedResults.push(fetchResult.feedResponse); + break; + } + } + return bufferedResults; + } + + public fetchFunction: FetchFunctionCallback = async ( + diagnosticNode: DiagnosticNodeInternal, + options: FeedOptions, + correlatedActivityId: string, + ): Promise> => { + const path = getPathFromLink(this.collectionLink, ResourceType.item); + diagnosticNode.addData({ partitionKeyRangeId: this.targetPartitionKeyRange.id }); + const id = getIdFromLink(this.collectionLink); + const startEpk = this.populateEpkRangeHeaders ? this.startEpk : undefined; + const endEpk = this.populateEpkRangeHeaders ? this.endEpk : undefined; + + return this.clientContext.queryFeed({ + path, + resourceType: ResourceType.item, + resourceId: id, + resultFn: (result: any) => result.Documents, + query: this.query, + options, + diagnosticNode, + partitionKeyRangeId: this.targetPartitionKeyRange["id"], + correlatedActivityId: correlatedActivityId, + startEpk: startEpk, + endEpk: endEpk, + }); + }; + + public hasMoreResults(): boolean { + return this.internalExecutionContext.hasMoreResults() || this.fetchResults.length !== 0; + } + + public gotSplit(): boolean { + if (this.fetchResults.length !== 0) { + const fetchResult = this.fetchResults[0]; + if (fetchResult.fetchResultType === FetchResultType.Exception) { + if (DocumentProducer._needPartitionKeyRangeCacheRefresh(fetchResult.error)) { + return true; + } + } + } + return false; + } + + private _getAndResetActiveResponseHeaders(): CosmosHeaders { + const ret = this.respHeaders; + this.respHeaders = getInitialHeader(); + return ret; + } + + private _updateStates(err: any, allFetched: boolean): void { + if (err) { + this.err = err; + return; + } + if (allFetched) { + this.allFetched = true; + } + if (this.internalExecutionContext.continuationToken === this.continuationToken) { + // nothing changed + return; + } + this.previousContinuationToken = this.continuationToken; + this.continuationToken = this.internalExecutionContext.continuationToken; + } + + private static _needPartitionKeyRangeCacheRefresh(error: any): boolean { + // TODO: error + return ( + error.code === StatusCodes.Gone && + "substatus" in error && + error["substatus"] === SubStatusCodes.PartitionKeyRangeGone + ); + } + + /** + * Fetches and bufferes the next page of results in internal buffer + */ + public async bufferMore(diagnosticNode: DiagnosticNodeInternal): Promise { + if (this.err) { + throw this.err; + } + + try { + const { result: resources, headers: headerResponse } = + await this.internalExecutionContext.fetchMore(diagnosticNode); + ++this.generation; + this._updateStates(undefined, resources === undefined); + if (resources !== undefined) { + // add fetched header to the 1st element in the buffer + let addHeaderToFetchResult = true; + resources.forEach((element: any) => { + this.fetchResults.push( + new FetchResult( + element, + undefined, + addHeaderToFetchResult ? headerResponse : getInitialHeader(), + ), + ); + addHeaderToFetchResult = false; + }); + } + + // need to modify the header response so that the query metrics are per partition + if (headerResponse != null && Constants.HttpHeaders.QueryMetrics in headerResponse) { + // "0" is the default partition before one is actually assigned. + const queryMetrics = headerResponse[Constants.HttpHeaders.QueryMetrics]["0"]; + + // Wraping query metrics in a object where the keys are the partition key range. + headerResponse[Constants.HttpHeaders.QueryMetrics] = {}; + headerResponse[Constants.HttpHeaders.QueryMetrics][this.targetPartitionKeyRange.id] = + queryMetrics; + } + mergeHeaders(this.respHeaders, headerResponse); + } catch (err: any) { + if (DocumentProducer._needPartitionKeyRangeCacheRefresh(err)) { + // Split just happend + // Buffer the error so the execution context can still get the feedResponses in the itemBuffer + const bufferedError = new FetchResult(undefined, err); + this.fetchResults.push(bufferedError); + mergeHeaders(this.respHeaders, err.headers); + } else { + this._updateStates(err, err.resources === undefined); + throw err; + } + } + } + + public getTargetParitionKeyRange(): PartitionKeyRange { + return this.targetPartitionKeyRange; + } + /** + * Peak the next item in the buffer + */ + public peakNextItem(): any { + if (this.err) { + throw this.err; + } + if (this.allFetched || this.fetchResults.length === 0) { + return undefined; + } + const fetchResult = this.fetchResults[0]; + + switch (fetchResult.fetchResultType) { + case FetchResultType.Done: + return undefined; + + case FetchResultType.Exception: // do not throw this error + return undefined; + + case FetchResultType.Result: + return fetchResult.feedResponse; + } + } + + /** + * Returns the first item in the buffered results if any, or [] otherwise. + */ + public async fetchNextItem(): Promise> { + if (this.err) { + this._updateStates(this.err, undefined); + throw this.err; + } + if (this.allFetched) { + return { result: undefined, headers: this._getAndResetActiveResponseHeaders() }; + } + try { + const { result, headers } = await this.current(); + this._updateStates(undefined, result === undefined); + if (result === undefined || result.length === 0) { + return { result: undefined, headers }; + } + return { result, headers }; // + } catch (err: any) { + this._updateStates(err, err.item === undefined); + throw err; + } + } + /** + * Fetches all the buffered results + */ + public async fetchBufferedItems(): Promise> { + if (this.err) { + this._updateStates(this.err, undefined); + throw this.err; + } + if (this.allFetched) { + return { result: undefined, headers: this._getAndResetActiveResponseHeaders() }; + } + const resources: any[] = []; + const resHeaders: CosmosHeaders = getInitialHeader(); + try { + while (this.fetchResults.length > 0) { + const { result, headers } = await this.current(); + this._updateStates(undefined, result === undefined); + mergeHeaders(resHeaders, headers); + if (result === undefined) { + return { result: resources.length > 0 ? resources : undefined, headers: resHeaders }; + } else { + resources.push(result); + } + } + return { result: resources, headers: resHeaders }; + } catch (err: any) { + this._updateStates(err, err.item === undefined); + throw err; + } + } + + /** + * Retrieve the current element on the DocumentProducer. + */ + private async current(): Promise> { + // If something is buffered just give that + if (this.fetchResults.length > 0) { + const fetchResult = this.fetchResults.shift(); + // Need to unwrap fetch results + switch (fetchResult.fetchResultType) { + case FetchResultType.Done: + return { + result: undefined, + headers: this._getAndResetActiveResponseHeaders(), + }; + case FetchResultType.Exception: + fetchResult.error.headers = this._getAndResetActiveResponseHeaders(); + throw fetchResult.error; + case FetchResultType.Result: + return { + result: fetchResult.feedResponse, + headers: this._getAndResetActiveResponseHeaders(), + }; + } + } + + // If there isn't anymore items left to fetch then let the user know. + if (this.allFetched) { + return { + result: undefined, + headers: this._getAndResetActiveResponseHeaders(), + }; + } + + // If the internal buffer is empty, return empty result + return { result: [], headers: this._getAndResetActiveResponseHeaders() }; + } +} diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/hybridQueryExecutionContext.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/hybridQueryExecutionContext.ts index 8c76358f2822..174c8ddc90ed 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/hybridQueryExecutionContext.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/hybridQueryExecutionContext.ts @@ -1,10 +1,11 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -import { AzureLogger, createClientLogger } from "@azure/logger"; -import { ClientContext } from "../ClientContext"; -import { DiagnosticNodeInternal } from "../diagnostics/DiagnosticNodeInternal"; -import { +import type { AzureLogger } from "@azure/logger"; +import { createClientLogger } from "@azure/logger"; +import type { ClientContext } from "../ClientContext"; +import type { DiagnosticNodeInternal } from "../diagnostics/DiagnosticNodeInternal"; +import type { FeedOptions, GlobalStatistics, PartitionedQueryExecutionInfo, @@ -14,11 +15,12 @@ import { } from "../request"; import { HybridSearchQueryResult } from "../request/hybridSearchQueryResult"; import { GlobalStatisticsAggregator } from "./Aggregators/GlobalStatisticsAggregator"; -import { CosmosHeaders } from "./CosmosHeaders"; -import { ExecutionContext } from "./ExecutionContext"; +import type { CosmosHeaders } from "./CosmosHeaders"; +import type { ExecutionContext } from "./ExecutionContext"; import { getInitialHeader, mergeHeaders } from "./headerUtils"; import { ParallelQueryExecutionContext } from "./parallelQueryExecutionContext"; import { PipelinedQueryExecutionContext } from "./pipelinedQueryExecutionContext"; +import { Stack } from "../common/Stack"; /** @hidden */ export enum HybridQueryExecutionContextBaseStates { @@ -42,6 +44,9 @@ export class HybridQueryExecutionContext implements ExecutionContext { "documentdb-formattablehybridsearchquery-totaldocumentcount"; private RRF_CONSTANT = 60; // Constant for RRF score calculation private logger: AzureLogger = createClientLogger("HybridQueryExecutionContext"); + private hybridSearchResult: HybridSearchQueryResult[] = []; + private componentExecutionContextStack: Stack; + private uniqueItems = new Map(); constructor( private clientContext: ClientContext, @@ -86,6 +91,7 @@ export class HybridQueryExecutionContext implements ExecutionContext { this.createComponentExecutionContexts(); this.state = HybridQueryExecutionContextBaseStates.initialized; } + this.componentExecutionContextStack = new Stack(); } public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise> { const nextItemRespHeaders = getInitialHeader(); @@ -119,7 +125,7 @@ export class HybridQueryExecutionContext implements ExecutionContext { } } - public async fetchMore(diagnosticNode: DiagnosticNodeInternal): Promise> { + public async fetchMore(diagnosticNode?: DiagnosticNodeInternal): Promise> { const fetchMoreRespHeaders = getInitialHeader(); return this.fetchMoreInternal(diagnosticNode, fetchMoreRespHeaders); } @@ -157,12 +163,16 @@ export class HybridQueryExecutionContext implements ExecutionContext { ): Promise { try { while (this.globalStatisticsExecutionContext.hasMoreResults()) { - const result = await this.globalStatisticsExecutionContext.nextItem(diagnosticNode); - const globalStatistics: GlobalStatistics = result.result; + const result = await this.globalStatisticsExecutionContext.fetchMore(diagnosticNode); mergeHeaders(fetchMoreRespHeaders, result.headers); - if (globalStatistics) { - // iterate over the components update placeholders from globalStatistics - this.globalStatisticsAggregator.aggregate(globalStatistics); + if (result.result) { + for (const item of result.result) { + const globalStatistics: GlobalStatistics = item; + if (globalStatistics) { + // iterate over the components update placeholders from globalStatistics + this.globalStatisticsAggregator.aggregate(globalStatistics); + } + } } } } catch (error) { @@ -184,45 +194,72 @@ export class HybridQueryExecutionContext implements ExecutionContext { return; } try { - const hybridSearchResult: HybridSearchQueryResult[] = []; - const uniqueItems = new Map(); - - for (const componentExecutionContext of this.componentsExecutionContext) { - while (componentExecutionContext.hasMoreResults()) { - const result = await componentExecutionContext.fetchMore(diagnosticNode); - const response = result.result; - mergeHeaders(fetchMoreRespHeaders, result.headers); - if (response) { - response.forEach((item: any) => { - const hybridItem = HybridSearchQueryResult.create(item); - if (!uniqueItems.has(hybridItem.rid)) { - uniqueItems.set(hybridItem.rid, hybridItem); - } - }); + if (this.options.enableQueryControl) { + // keep track of componentExecutionContexts that have more results and maek call to them in LIFO order + if (!this.componentExecutionContextStack.isEmpty()) { + const componentExecutionContext = this.componentExecutionContextStack.pop(); + if (componentExecutionContext.hasMoreResults()) { + const result = await componentExecutionContext.fetchMore(diagnosticNode); + const response = result.result; + mergeHeaders(fetchMoreRespHeaders, result.headers); + if (response) { + response.forEach((item: any) => { + const hybridItem = HybridSearchQueryResult.create(item); + if (!this.uniqueItems.has(hybridItem.rid)) { + this.uniqueItems.set(hybridItem.rid, hybridItem); + } + }); + } + if (componentExecutionContext.hasMoreResults()) { + this.componentExecutionContextStack.push(componentExecutionContext); + } } } + if (this.componentExecutionContextStack.isEmpty()) { + this.processUniqueItems(); + } + } else { + for (const componentExecutionContext of this.componentsExecutionContext) { + while (componentExecutionContext.hasMoreResults()) { + const result = await componentExecutionContext.fetchMore(diagnosticNode); + const response = result.result; + mergeHeaders(fetchMoreRespHeaders, result.headers); + if (response) { + response.forEach((item: any) => { + const hybridItem = HybridSearchQueryResult.create(item); + if (!this.uniqueItems.has(hybridItem.rid)) { + this.uniqueItems.set(hybridItem.rid, hybridItem); + } + }); + } + } + } + this.processUniqueItems(); } - uniqueItems.forEach((item) => hybridSearchResult.push(item)); - if (hybridSearchResult.length === 0 || hybridSearchResult.length === 1) { - // return the result as no or one element is present - hybridSearchResult.forEach((item) => this.buffer.push(item.data)); - this.state = HybridQueryExecutionContextBaseStates.draining; - return; - } - - // Initialize an array to hold ranks for each document - const sortedHybridSearchResult = this.sortHybridSearchResultByRRFScore(hybridSearchResult); - // store the result to buffer - // add only data from the sortedHybridSearchResult in the buffer - sortedHybridSearchResult.forEach((item) => this.buffer.push(item.data)); - this.applySkipAndTakeToBuffer(); - this.state = HybridQueryExecutionContextBaseStates.draining; } catch (error) { this.state = HybridQueryExecutionContextBaseStates.done; throw error; } } + private processUniqueItems(): void { + this.uniqueItems.forEach((item) => this.hybridSearchResult.push(item)); + if (this.hybridSearchResult.length === 0 || this.hybridSearchResult.length === 1) { + // return the result as no or one element is present + this.hybridSearchResult.forEach((item) => this.buffer.push(item.data)); + this.state = HybridQueryExecutionContextBaseStates.draining; + return; + } + + // Initialize an array to hold ranks for each document + const sortedHybridSearchResult = this.sortHybridSearchResultByRRFScore(this.hybridSearchResult); + // store the result to buffer + // add only data from the sortedHybridSearchResult in the buffer + sortedHybridSearchResult.forEach((item) => this.buffer.push(item.data)); + this.applySkipAndTakeToBuffer(); + this.state = HybridQueryExecutionContextBaseStates.draining; + } + private applySkipAndTakeToBuffer(): void { const { skip, take } = this.partitionedQueryExecutionInfo.hybridSearchQueryInfo; if (skip) { @@ -342,21 +379,43 @@ export class HybridQueryExecutionContext implements ExecutionContext { return; } try { - const componentExecutionContext = this.componentsExecutionContext[0]; - const hybridSearchResult: HybridSearchQueryResult[] = []; - while (componentExecutionContext.hasMoreResults()) { - const result = await componentExecutionContext.fetchMore(diagNode); - const response = result.result; - mergeHeaders(fetchMoreRespHeaders, result.headers); - if (response) { - response.forEach((item: any) => { - hybridSearchResult.push(HybridSearchQueryResult.create(item)); - }); + if (this.options.enableQueryControl) { + const componentExecutionContext = this.componentsExecutionContext[0]; + if (componentExecutionContext.hasMoreResults()) { + const result = await componentExecutionContext.fetchMore(diagNode); + const response = result.result; + mergeHeaders(fetchMoreRespHeaders, result.headers); + if (response) { + response.forEach((item: any) => { + this.hybridSearchResult.push(HybridSearchQueryResult.create(item).data); + }); + } } + if (!componentExecutionContext.hasMoreResults()) { + this.state = HybridQueryExecutionContextBaseStates.draining; + this.hybridSearchResult.forEach((item) => this.buffer.push(item)); + this.applySkipAndTakeToBuffer(); + this.state = HybridQueryExecutionContextBaseStates.draining; + } + return; + } else { + const componentExecutionContext = this.componentsExecutionContext[0]; + const hybridSearchResult: HybridSearchQueryResult[] = []; + // add check for enable query control + while (componentExecutionContext.hasMoreResults()) { + const result = await componentExecutionContext.fetchMore(diagNode); + const response = result.result; + mergeHeaders(fetchMoreRespHeaders, result.headers); + if (response) { + response.forEach((item: any) => { + hybridSearchResult.push(HybridSearchQueryResult.create(item)); + }); + } + } + hybridSearchResult.forEach((item) => this.buffer.push(item.data)); + this.applySkipAndTakeToBuffer(); + this.state = HybridQueryExecutionContextBaseStates.draining; } - hybridSearchResult.forEach((item) => this.buffer.push(item.data)); - this.applySkipAndTakeToBuffer(); - this.state = HybridQueryExecutionContextBaseStates.draining; } catch (error) { this.state = HybridQueryExecutionContextBaseStates.done; throw error; @@ -380,17 +439,17 @@ export class HybridQueryExecutionContext implements ExecutionContext { queryInfo: componentQueryInfo, queryRanges: this.partitionedQueryExecutionInfo.queryRanges, }; - this.componentsExecutionContext.push( - new PipelinedQueryExecutionContext( - this.clientContext, - this.collectionLink, - componentQueryInfo.rewrittenQuery, - this.options, - componentPartitionExecutionInfo, - this.correlatedActivityId, - this.emitRawOrderByPayload, - ), + const executionContext = new PipelinedQueryExecutionContext( + this.clientContext, + this.collectionLink, + componentQueryInfo.rewrittenQuery, + this.options, + componentPartitionExecutionInfo, + this.correlatedActivityId, + this.emitRawOrderByPayload, ); + this.componentsExecutionContext.push(executionContext); + this.componentExecutionContextStack.push(executionContext); } } private processComponentQueries( diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/index.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/index.ts index 503e4d2c43a8..3a3fabe75999 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/index.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/index.ts @@ -4,7 +4,7 @@ export * from "./headerUtils"; export * from "./SqlQuerySpec"; export * from "./defaultQueryExecutionContext"; export * from "./Aggregators"; -export * from "./documentProducer"; +// export * from "./documentProducer"; export * from "./FetchResult"; export * from "./orderByDocumentProducerComparator"; export * from "./ExecutionContext"; @@ -13,3 +13,4 @@ export * from "./parallelQueryExecutionContext"; export * from "./orderByQueryExecutionContext"; export * from "./pipelinedQueryExecutionContext"; export * from "./orderByComparator"; +export * from "./documentProducer2"; diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/orderByQueryExecutionContext.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/orderByQueryExecutionContext.ts index e98a9353e4cb..8654e46d5dc1 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/orderByQueryExecutionContext.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/orderByQueryExecutionContext.ts @@ -3,8 +3,9 @@ import type { ClientContext } from "../ClientContext"; import type { PartitionedQueryExecutionInfo } from "../request/ErrorResponse"; import type { FeedOptions } from "../request/FeedOptions"; -import type { DocumentProducer } from "./documentProducer"; +import type { DocumentProducer } from "./documentProducer2"; import type { ExecutionContext } from "./ExecutionContext"; +import { DiagnosticNodeInternal } from "../diagnostics/DiagnosticNodeInternal"; import { OrderByDocumentProducerComparator } from "./orderByDocumentProducerComparator"; import { ParallelQueryExecutionContextBase } from "./parallelQueryExecutionContextBase"; import type { SqlQuerySpec } from "./SqlQuerySpec"; @@ -58,4 +59,14 @@ export class OrderByQueryExecutionContext public documentProducerComparator(docProd1: DocumentProducer, docProd2: DocumentProducer): any { return this.orderByComparator.compare(docProd1, docProd2); } + + private async bufferMore(diagnosticNode?: DiagnosticNodeInternal): Promise { + await this.bufferDocumentProducers(diagnosticNode); + await this.fillBufferFromBufferQueue(true); + } + + public async fetchMore(diagnosticNode?: DiagnosticNodeInternal): Promise { + await this.bufferMore(diagnosticNode); + return this.drainBufferedItems(); + } } diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContext.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContext.ts index 3eb70eda4570..95a424959308 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContext.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContext.ts @@ -1,9 +1,11 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -import type { DocumentProducer } from "./documentProducer"; +import type { DocumentProducer } from "./documentProducer2"; import type { ExecutionContext } from "./ExecutionContext"; import { ParallelQueryExecutionContextBase } from "./parallelQueryExecutionContextBase"; +import { Response } from "../request"; +import { DiagnosticNodeInternal } from "../diagnostics/DiagnosticNodeInternal"; /** * Provides the ParallelQueryExecutionContext. @@ -28,4 +30,15 @@ export class ParallelQueryExecutionContext ): number { return docProd1.generation - docProd2.generation; } + + private async bufferMore(diagnosticNode?: DiagnosticNodeInternal): Promise { + // TODO: need to upadte headers from here, so make sure it returns it + await this.bufferDocumentProducers(diagnosticNode); + await this.fillBufferFromBufferQueue(); + } + + public async fetchMore(diagnosticNode?: DiagnosticNodeInternal): Promise> { + await this.bufferMore(diagnosticNode); + return this.drainBufferedItems(); + } } diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContextBase.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContextBase.ts index 2ef38260fdc3..4003534d55a6 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContextBase.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContextBase.ts @@ -11,14 +11,11 @@ import type { PartitionedQueryExecutionInfo } from "../request/ErrorResponse"; import { QueryRange } from "../routing/QueryRange"; import { SmartRoutingMapProvider } from "../routing/smartRoutingMapProvider"; import type { CosmosHeaders } from "./CosmosHeaders"; -import { DocumentProducer } from "./documentProducer"; +import { DocumentProducer } from "./documentProducer2"; import type { ExecutionContext } from "./ExecutionContext"; import { getInitialHeader, mergeHeaders } from "./headerUtils"; import type { SqlQuerySpec } from "./SqlQuerySpec"; import { DiagnosticNodeInternal, DiagnosticNodeType } from "../diagnostics/DiagnosticNodeInternal"; -import { addDignosticChild } from "../utils/diagnostics"; -import { MetadataLookUpType } from "../CosmosDiagnostics"; -import { CosmosDbDiagnosticLevel } from "../diagnostics/CosmosDbDiagnosticLevel"; /** @hidden */ const logger: AzureLogger = createClientLogger("parallelQueryExecutionContextBase"); @@ -39,9 +36,11 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont protected sortOrders: any; private requestContinuation: any; private respHeaders: CosmosHeaders; - private orderByPQ: PriorityQueue; + private unfilledDocumentProducersQueue: PriorityQueue; + private bufferedDocumentProducersQueue: PriorityQueue; + // TODO: update type of buffer from any + private buffer: any[]; private sem: any; - private waitingForInternalExecutionContexts: number; private diagnosticNodeWrapper: { consumed: boolean; diagnosticNode: DiagnosticNodeInternal; @@ -86,14 +85,18 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont this.state = ParallelQueryExecutionContextBase.STATES.started; this.routingProvider = new SmartRoutingMapProvider(this.clientContext); this.sortOrders = this.partitionedQueryExecutionInfo.queryInfo.orderBy; + this.buffer = []; this.requestContinuation = options ? options.continuationToken || options.continuation : null; // response headers of undergoing operation this.respHeaders = getInitialHeader(); + this.unfilledDocumentProducersQueue = new PriorityQueue( + (a: DocumentProducer, b: DocumentProducer) => a.generation - b.generation, + ); // Make priority queue for documentProducers // The comparator is supplied by the derived class - this.orderByPQ = new PriorityQueue( + this.bufferedDocumentProducersQueue = new PriorityQueue( (a: DocumentProducer, b: DocumentProducer) => this.documentProducerComparator(b, a), ); // Creating the documentProducers @@ -104,7 +107,6 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont // ensure the lock is released after finishing up try { const targetPartitionRanges = await this._onTargetPartitionRanges(); - this.waitingForInternalExecutionContexts = targetPartitionRanges.length; const maxDegreeOfParallelism = options.maxDegreeOfParallelism === undefined || options.maxDegreeOfParallelism < 1 @@ -118,7 +120,6 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont maxDegreeOfParallelism, ); - const parallelismSem = semaphore(maxDegreeOfParallelism); let filteredPartitionKeyRanges = []; // The document producers generated from filteredPartitionKeyRanges const targetPartitionQueryExecutionContextList: DocumentProducer[] = []; @@ -128,45 +129,33 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont } else { filteredPartitionKeyRanges = targetPartitionRanges; } - // Create one documentProducer for each partitionTargetRange filteredPartitionKeyRanges.forEach((partitionTargetRange: any) => { // TODO: any partitionTargetRange // no async callback + const queryRange = QueryRange.parsePartitionKeyRange(partitionTargetRange); targetPartitionQueryExecutionContextList.push( - this._createTargetPartitionQueryExecutionContext(partitionTargetRange), + this._createTargetPartitionQueryExecutionContext( + partitionTargetRange, + undefined, + queryRange.min, + queryRange.max, + false, + ), ); }); // Fill up our priority queue with documentProducers targetPartitionQueryExecutionContextList.forEach((documentProducer): void => { // has async callback - const throttledFunc = async (): Promise => { - try { - const { result: document, headers } = await documentProducer.current( - this.getDiagnosticNode(), - ); - this._mergeWithActiveResponseHeaders(headers); - if (document === undefined) { - // no results on this one - return; - } - // if there are matching results in the target ex range add it to the priority queue - try { - this.orderByPQ.enq(documentProducer); - } catch (e: any) { - this.err = e; - } - } catch (err: any) { - this._mergeWithActiveResponseHeaders(err.headers); - this.err = err; - } finally { - parallelismSem.leave(); - this._decrementInitiationLock(); - } - }; - parallelismSem.take(throttledFunc); + try { + this.unfilledDocumentProducersQueue.enq(documentProducer); + } catch (e: any) { + this.err = e; + } }); + + this.sem.leave(); } catch (err: any) { this.err = err; // release the lock @@ -182,18 +171,6 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont dp2: DocumentProducer, ): number; - private _decrementInitiationLock(): void { - // decrements waitingForInternalExecutionContexts - // if waitingForInternalExecutionContexts reaches 0 releases the semaphore and changes the state - this.waitingForInternalExecutionContexts = this.waitingForInternalExecutionContexts - 1; - if (this.waitingForInternalExecutionContexts === 0) { - this.sem.leave(); - if (this.orderByPQ.size() === 0) { - this.state = ParallelQueryExecutionContextBase.STATES.inProgress; - } - } - } - private _mergeWithActiveResponseHeaders(headers: CosmosHeaders): void { mergeHeaders(this.respHeaders, headers); } @@ -224,6 +201,7 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont */ private async _getReplacementPartitionKeyRanges( documentProducer: DocumentProducer, + diagnosticNode: DiagnosticNodeInternal, ): Promise { const partitionKeyRange = documentProducer.targetPartitionKeyRange; // Download the new routing map @@ -233,74 +211,56 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont return this.routingProvider.getOverlappingRanges( this.collectionLink, [queryRange], - this.getDiagnosticNode(), + diagnosticNode, ); } - // TODO: P0 Code smell - can barely tell what this is doing - /** - * Removes the current document producer from the priqueue, - * replaces that document producer with child document producers, - * then reexecutes the originFunction with the corrrected executionContext - */ - private async _repairExecutionContext( + private async _enqueueReplacementDocumentProducers( diagnosticNode: DiagnosticNodeInternal, - originFunction: any, + documentProducer: DocumentProducer, ): Promise { - // TODO: any // Get the replacement ranges - // Removing the invalid documentProducer from the orderByPQ - const parentDocumentProducer = this.orderByPQ.deq(); - try { - const replacementPartitionKeyRanges: any[] = - await this._getReplacementPartitionKeyRanges(parentDocumentProducer); - const replacementDocumentProducers: DocumentProducer[] = []; + const replacementPartitionKeyRanges = await this._getReplacementPartitionKeyRanges( + documentProducer, + diagnosticNode, + ); + + if (replacementPartitionKeyRanges.length === 0) { + throw new Error("PartitionKeyRangeGone error but no replacement partition key ranges"); + } else if (replacementPartitionKeyRanges.length === 1) { + // Partition is gone due to Merge + // Create the replacement documentProducer with populateEpkRangeHeaders Flag set to true to set startEpk and endEpk headers + const replacementDocumentProducer = this._createTargetPartitionQueryExecutionContext( + replacementPartitionKeyRanges[0], + documentProducer.continuationToken, + documentProducer.startEpk, + documentProducer.endEpk, + true, + ); + + this.unfilledDocumentProducersQueue.enq(replacementDocumentProducer); + } else { // Create the replacement documentProducers + const replacementDocumentProducers: DocumentProducer[] = []; replacementPartitionKeyRanges.forEach((partitionKeyRange) => { + const queryRange = QueryRange.parsePartitionKeyRange(partitionKeyRange); // Create replacment document producers with the parent's continuationToken const replacementDocumentProducer = this._createTargetPartitionQueryExecutionContext( partitionKeyRange, - parentDocumentProducer.continuationToken, + documentProducer.continuationToken, + queryRange.min, + queryRange.max, + false, ); replacementDocumentProducers.push(replacementDocumentProducer); }); - // We need to check if the documentProducers even has anything left to fetch from before enqueing them - const checkAndEnqueueDocumentProducer = async ( - documentProducerToCheck: DocumentProducer, - checkNextDocumentProducerCallback: any, - ): Promise => { - try { - const { result: afterItem } = await documentProducerToCheck.current(diagnosticNode); - if (afterItem === undefined) { - // no more results left in this document producer, so we don't enqueue it - } else { - // Safe to put document producer back in the queue - this.orderByPQ.enq(documentProducerToCheck); - } - await checkNextDocumentProducerCallback(); - } catch (err: any) { - this.err = err; - return; + // add document producers to the queue + replacementDocumentProducers.forEach((replacementDocumentProducer) => { + if (replacementDocumentProducer.hasMoreResults()) { + this.unfilledDocumentProducersQueue.enq(replacementDocumentProducer); } - }; - const checkAndEnqueueDocumentProducers = async (rdp: DocumentProducer[]): Promise => { - if (rdp.length > 0) { - // We still have a replacementDocumentProducer to check - const replacementDocumentProducer = rdp.shift(); - await checkAndEnqueueDocumentProducer(replacementDocumentProducer, async () => { - await checkAndEnqueueDocumentProducers(rdp); - }); - } else { - // reexecutes the originFunction with the corrrected executionContext - return originFunction(); - } - }; - // Invoke the recursive function to get the ball rolling - await checkAndEnqueueDocumentProducers(replacementDocumentProducers); - } catch (err: any) { - this.err = err; - throw err; + }); } } @@ -313,193 +273,15 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont ); } - /** - * Checks to see if the executionContext needs to be repaired. - * if so it repairs the execution context and executes the ifCallback, - * else it continues with the current execution context and executes the elseCallback - */ - private async _repairExecutionContextIfNeeded( - diagnosticNode: DiagnosticNodeInternal, - ifCallback: any, - elseCallback: any, - ): Promise { - const documentProducer = this.orderByPQ.peek(); - // Check if split happened - try { - await documentProducer.current(diagnosticNode); - elseCallback(); - } catch (err: any) { - if (ParallelQueryExecutionContextBase._needPartitionKeyRangeCacheRefresh(err)) { - // Split has happened so we need to repair execution context before continueing - return addDignosticChild( - (childNode) => this._repairExecutionContext(childNode, ifCallback), - diagnosticNode, - DiagnosticNodeType.QUERY_REPAIR_NODE, - ); - } else { - // Something actually bad happened ... - this.err = err; - throw err; - } - } - } - - /** - * Fetches the next element in the ParallelQueryExecutionContextBase. - */ - public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise> { - if (this.err) { - // if there is a prior error return error - throw this.err; - } - return new Promise>((resolve, reject) => { - this.sem.take(() => { - if (!this.diagnosticNodeWrapper.consumed) { - diagnosticNode.addChildNode( - this.diagnosticNodeWrapper.diagnosticNode, - CosmosDbDiagnosticLevel.debug, - MetadataLookUpType.QueryPlanLookUp, - ); - this.diagnosticNodeWrapper.diagnosticNode = undefined; - this.diagnosticNodeWrapper.consumed = true; - } else { - this.diagnosticNodeWrapper.diagnosticNode = diagnosticNode; - } - - // NOTE: lock must be released before invoking quitting - if (this.err) { - // release the lock before invoking callback - this.sem.leave(); - // if there is a prior error return error - this.err.headers = this._getAndResetActiveResponseHeaders(); - reject(this.err); - return; - } - - if (this.orderByPQ.size() === 0) { - // there is no more results - this.state = ParallelQueryExecutionContextBase.STATES.ended; - // release the lock before invoking callback - this.sem.leave(); - return resolve({ - result: undefined, - headers: this._getAndResetActiveResponseHeaders(), - }); - } - - const ifCallback = (): void => { - // Release the semaphore to avoid deadlock - this.sem.leave(); - // Reexcute the function - return resolve(this.nextItem(diagnosticNode)); - }; - const elseCallback = async (): Promise => { - let documentProducer: DocumentProducer; - try { - documentProducer = this.orderByPQ.deq(); - } catch (e: any) { - // if comparing elements of the priority queue throws exception - // set that error and return error - this.err = e; - // release the lock before invoking callback - this.sem.leave(); - this.err.headers = this._getAndResetActiveResponseHeaders(); - reject(this.err); - return; - } - - let item: any; - let headers: CosmosHeaders; - try { - const response = await documentProducer.nextItem(diagnosticNode); - item = response.result; - headers = response.headers; - this._mergeWithActiveResponseHeaders(headers); - if (item === undefined) { - // this should never happen - // because the documentProducer already has buffered an item - // assert item !== undefined - this.err = new Error( - `Extracted DocumentProducer from the priority queue \ - doesn't have any buffered item!`, - ); - // release the lock before invoking callback - this.sem.leave(); - return resolve({ - result: undefined, - headers: this._getAndResetActiveResponseHeaders(), - }); - } - } catch (err: any) { - this.err = new Error( - `Extracted DocumentProducer from the priority queue fails to get the \ - buffered item. Due to ${JSON.stringify(err)}`, - ); - this.err.headers = this._getAndResetActiveResponseHeaders(); - // release the lock before invoking callback - this.sem.leave(); - reject(this.err); - return; - } - - // we need to put back the document producer to the queue if it has more elements. - // the lock will be released after we know document producer must be put back in the queue or not - try { - const { result: afterItem, headers: otherHeaders } = - await documentProducer.current(diagnosticNode); - this._mergeWithActiveResponseHeaders(otherHeaders); - if (afterItem === undefined) { - // no more results is left in this document producer - } else { - try { - const headItem = documentProducer.fetchResults[0]; - if (typeof headItem === "undefined") { - throw new Error( - "Extracted DocumentProducer from PQ is invalid state with no result!", - ); - } - this.orderByPQ.enq(documentProducer); - } catch (e: any) { - // if comparing elements in priority queue throws exception - // set error - this.err = e; - } - } - } catch (err: any) { - if (ParallelQueryExecutionContextBase._needPartitionKeyRangeCacheRefresh(err)) { - // We want the document producer enqueued - // So that later parts of the code can repair the execution context - this.orderByPQ.enq(documentProducer); - } else { - // Something actually bad happened - this.err = err; - reject(this.err); - } - } finally { - // release the lock before returning - this.sem.leave(); - } - // invoke the callback on the item - return resolve({ - result: item, - headers: this._getAndResetActiveResponseHeaders(), - }); - }; - this._repairExecutionContextIfNeeded(diagnosticNode, ifCallback, elseCallback).catch( - reject, - ); - }); - }); - } - /** * Determine if there are still remaining resources to processs based on the value of the continuation * token or the elements remaining on the current batch in the QueryIterator. * @returns true if there is other elements to process in the ParallelQueryExecutionContextBase. */ public hasMoreResults(): boolean { - return !( - this.state === ParallelQueryExecutionContextBase.STATES.ended || this.err !== undefined + return ( + !this.err && + (this.buffer.length > 0 || this.state !== ParallelQueryExecutionContextBase.STATES.ended) ); } @@ -509,6 +291,9 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont private _createTargetPartitionQueryExecutionContext( partitionKeyTargetRange: any, continuationToken?: any, + startEpk?: string, + endEpk?: string, + populateEpkRangeHeaders?: boolean, ): DocumentProducer { // TODO: any // creates target partition range Query Execution Context @@ -539,6 +324,255 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont partitionKeyTargetRange, options, this.correlatedActivityId, + startEpk, + endEpk, + populateEpkRangeHeaders, ); } + protected async drainBufferedItems(): Promise> { + return new Promise>((resolve, reject) => { + this.sem.take(() => { + if (this.err) { + // if there is a prior error return error + this.sem.leave(); + this.err.headers = this._getAndResetActiveResponseHeaders(); + reject(this.err); + return; + } + + // return undefined if there is no more results + if (this.buffer.length === 0) { + this.sem.leave(); + return resolve({ + result: this.state === ParallelQueryExecutionContextBase.STATES.ended ? undefined : [], + headers: this._getAndResetActiveResponseHeaders(), + }); + } + // draing the entire buffer object and return that in result of return object + const bufferedResults = this.buffer; + this.buffer = []; + + // release the lock before returning + this.sem.leave(); + // invoke the callback on the item + return resolve({ + result: bufferedResults, + headers: this._getAndResetActiveResponseHeaders(), + }); + }); + }); + } + + /** + * Buffers document producers based on the maximum degree of parallelism. + * Moves document producers from the unfilled queue to the buffered queue. + * @param diagnosticNode - The diagnostic node for logging and tracing. + * @returns A promise that resolves when buffering is complete. + */ + protected async bufferDocumentProducers(diagnosticNode?: DiagnosticNodeInternal): Promise { + return new Promise((resolve, reject) => { + this.sem.take(async () => { + if (this.err) { + this.sem.leave(); + reject(this.err); + return; + } + this.updateStates(this.err); + + if (this.state === ParallelQueryExecutionContextBase.STATES.ended) { + this.sem.leave(); + resolve(); + return; + } + + if (this.unfilledDocumentProducersQueue.size() === 0) { + this.sem.leave(); + resolve(); + return; + } + + try { + const maxDegreeOfParallelism = + this.options.maxDegreeOfParallelism === undefined || + this.options.maxDegreeOfParallelism < 1 + ? this.unfilledDocumentProducersQueue.size() + : Math.min( + this.options.maxDegreeOfParallelism, + this.unfilledDocumentProducersQueue.size(), + ); + + const documentProducers: DocumentProducer[] = []; + while ( + documentProducers.length < maxDegreeOfParallelism && + this.unfilledDocumentProducersQueue.size() > 0 + ) { + let documentProducer: DocumentProducer; + try { + documentProducer = this.unfilledDocumentProducersQueue.deq(); + } catch (e: any) { + this.err = e; + this.sem.leave(); + this.err.headers = this._getAndResetActiveResponseHeaders(); + reject(this.err); + return; + } + documentProducers.push(documentProducer); + } + + // const ifCallback = (): void => { + // this.sem.leave(); + // resolve(this.bufferDocumentProducers(diagnosticNode)); // Retry the method if repair is required + // }; + + // const elseCallback = async (): Promise => { + const bufferDocumentProducer = async ( + documentProducer: DocumentProducer, + ): Promise => { + try { + await documentProducer.bufferMore(diagnosticNode); + // if buffer of document producer is filled, add it to the buffered document producers queue + const nextItem = documentProducer.peakNextItem(); + if (nextItem !== undefined) { + this.bufferedDocumentProducersQueue.enq(documentProducer); + } else if (documentProducer.hasMoreResults()) { + this.unfilledDocumentProducersQueue.enq(documentProducer); + } + } catch (err) { + if (ParallelQueryExecutionContextBase._needPartitionKeyRangeCacheRefresh(err)) { + // We want the document producer enqueued + // So that later parts of the code can repair the execution context + // refresh the partition key ranges and ctreate new document producers and add it to the queue + + await this._enqueueReplacementDocumentProducers(diagnosticNode, documentProducer); + resolve(); + } else { + this.err = err; + this.sem.leave(); + this.err.headers = this._getAndResetActiveResponseHeaders(); + reject(err); + } + } + }; + + try { + await Promise.all( + documentProducers.map((producer) => bufferDocumentProducer(producer)), + ); + } catch (err) { + this.err = err; + this.err.headers = this._getAndResetActiveResponseHeaders(); + reject(err); + return; + } finally { + this.sem.leave(); + } + resolve(); + // }; + // this._repairExecutionContextIfNeeded( + // this.getDiagnosticNode(), + // ifCallback, + // elseCallback, + // ).catch(reject); + } catch (err) { + this.sem.leave(); + this.err = err; + this.err.headers = this._getAndResetActiveResponseHeaders(); + reject(err); + } + }); + }); + } + /** + * Drains the buffer of filled document producers and appends their items to the main buffer. + * @param isOrderBy - Indicates if the query is an order by query. + * @returns A promise that resolves when the buffer is filled. + */ + protected async fillBufferFromBufferQueue(isOrderBy: boolean = false): Promise { + return new Promise((resolve, reject) => { + this.sem.take(async () => { + if (this.err) { + // if there is a prior error return error + this.sem.leave(); + this.err.headers = this._getAndResetActiveResponseHeaders(); + reject(this.err); + return; + } + + if ( + this.state === ParallelQueryExecutionContextBase.STATES.ended || + this.bufferedDocumentProducersQueue.size() === 0 + ) { + this.sem.leave(); + resolve(); + return; + } + + try { + if (isOrderBy) { + while ( + this.unfilledDocumentProducersQueue.isEmpty() && + this.bufferedDocumentProducersQueue.size() > 0 + ) { + const documentProducer = this.bufferedDocumentProducersQueue.deq(); + const { result, headers } = await documentProducer.fetchNextItem(); + this._mergeWithActiveResponseHeaders(headers); + if (result) { + this.buffer.push(result); + } + if (documentProducer.peakNextItem() !== undefined) { + this.bufferedDocumentProducersQueue.enq(documentProducer); + } else if (documentProducer.hasMoreResults()) { + this.unfilledDocumentProducersQueue.enq(documentProducer); + } else { + // no more results in document producer + } + } + } else { + while (this.bufferedDocumentProducersQueue.size() > 0) { + const documentProducer = this.bufferedDocumentProducersQueue.deq(); + const { result, headers } = await documentProducer.fetchBufferedItems(); + this._mergeWithActiveResponseHeaders(headers); + if (result) { + this.buffer.push(...result); + } + if (documentProducer.hasMoreResults()) { + this.unfilledDocumentProducersQueue.enq(documentProducer); + } + } + } + this.updateStates(this.err); + } catch (err) { + this.err = err; + this.err.headers = this._getAndResetActiveResponseHeaders(); + reject(this.err); + return; + } finally { + // release the lock before returning + this.sem.leave(); + } + resolve(); + return; + }); + }); + } + + private updateStates(error: any): void { + if (error) { + this.err = error; + this.state = ParallelQueryExecutionContextBase.STATES.ended; + return; + } + + if (this.state === ParallelQueryExecutionContextBase.STATES.started) { + this.state = ParallelQueryExecutionContextBase.STATES.inProgress; + } + + const hasNoActiveProducers = + this.unfilledDocumentProducersQueue.size() === 0 && + this.bufferedDocumentProducersQueue.size() === 0; + + if (hasNoActiveProducers) { + this.state = ParallelQueryExecutionContextBase.STATES.ended; + } + } } diff --git a/sdk/cosmosdb/cosmos/src/queryExecutionContext/pipelinedQueryExecutionContext.ts b/sdk/cosmosdb/cosmos/src/queryExecutionContext/pipelinedQueryExecutionContext.ts index fac35e33cdd7..3cc86aaf6a16 100644 --- a/sdk/cosmosdb/cosmos/src/queryExecutionContext/pipelinedQueryExecutionContext.ts +++ b/sdk/cosmosdb/cosmos/src/queryExecutionContext/pipelinedQueryExecutionContext.ts @@ -162,106 +162,53 @@ export class PipelinedQueryExecutionContext implements ExecutionContext { this.endpoint = new UnorderedDistinctEndpointComponent(this.endpoint); } } - } - - public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise> { - return this.endpoint.nextItem(diagnosticNode); + this.fetchBuffer = []; } // Removed callback here beacuse it wouldn't have ever worked... public hasMoreResults(): boolean { - return this.endpoint.hasMoreResults(); + return this.fetchBuffer.length !== 0 || this.endpoint.hasMoreResults(); } public async fetchMore(diagnosticNode: DiagnosticNodeInternal): Promise> { - // if the wrapped endpoint has different implementation for fetchMore use that - // otherwise use the default implementation - if (typeof this.endpoint.fetchMore === "function") { - return this.endpoint.fetchMore(diagnosticNode); - } else { - this.fetchBuffer = []; - this.fetchMoreRespHeaders = getInitialHeader(); - return this.nonStreamingOrderBy - ? this._nonStreamingFetchMoreImplementation(diagnosticNode) - : this._fetchMoreImplementation(diagnosticNode); - } + this.fetchMoreRespHeaders = getInitialHeader(); + return this._fetchMoreImplementation(diagnosticNode); } private async _fetchMoreImplementation( diagnosticNode: DiagnosticNodeInternal, ): Promise> { try { - const { result: item, headers } = await this.endpoint.nextItem(diagnosticNode); - mergeHeaders(this.fetchMoreRespHeaders, headers); - if (item === undefined) { - // no more results - if (this.fetchBuffer.length === 0) { - return { - result: undefined, - headers: this.fetchMoreRespHeaders, - }; - } else { - // Just give what we have - const temp = this.fetchBuffer; - this.fetchBuffer = []; - return { result: temp, headers: this.fetchMoreRespHeaders }; - } + if (this.fetchBuffer.length >= this.pageSize) { + const temp = this.fetchBuffer.slice(0, this.pageSize); + this.fetchBuffer = this.fetchBuffer.slice(this.pageSize); + return { result: temp, headers: this.fetchMoreRespHeaders }; } else { - this.fetchBuffer.push(item); - if (this.fetchBuffer.length >= this.pageSize) { - // fetched enough results - const temp = this.fetchBuffer.slice(0, this.pageSize); - this.fetchBuffer = this.fetchBuffer.splice(this.pageSize); - return { result: temp, headers: this.fetchMoreRespHeaders }; - } else { - // recursively fetch more - // TODO: is recursion a good idea? - return this._fetchMoreImplementation(diagnosticNode); + const response = await this.endpoint.fetchMore(diagnosticNode); + mergeHeaders(this.fetchMoreRespHeaders, response.headers); + if (response === undefined || response.result === undefined) { + if (this.fetchBuffer.length > 0) { + const temp = this.fetchBuffer; + this.fetchBuffer = []; + return { result: temp, headers: this.fetchMoreRespHeaders }; + } else { + return { result: undefined, headers: this.fetchMoreRespHeaders }; + } } - } - } catch (err: any) { - mergeHeaders(this.fetchMoreRespHeaders, err.headers); - err.headers = this.fetchMoreRespHeaders; - if (err) { - throw err; - } - } - } - - private async _nonStreamingFetchMoreImplementation( - diagnosticNode: DiagnosticNodeInternal, - ): Promise> { - try { - const { result: item, headers } = await this.endpoint.nextItem(diagnosticNode); - mergeHeaders(this.fetchMoreRespHeaders, headers); - if (item === undefined) { - // no more results - if (this.fetchBuffer.length === 0) { - return { - result: undefined, - headers: this.fetchMoreRespHeaders, - }; - } else { - // Just give what we have - const temp = this.fetchBuffer; - this.fetchBuffer = []; - return { result: temp, headers: this.fetchMoreRespHeaders }; - } - } else { - // append the result - if (typeof item !== "object") { - this.fetchBuffer.push(item); - } else if (Object.keys(item).length !== 0) { - this.fetchBuffer.push(item); - } - if (this.fetchBuffer.length >= this.pageSize) { - // fetched enough results - const temp = this.fetchBuffer.slice(0, this.pageSize); - this.fetchBuffer = this.fetchBuffer.splice(this.pageSize); - return { result: temp, headers: this.fetchMoreRespHeaders }; - } else { - return this._nonStreamingFetchMoreImplementation(diagnosticNode); + this.fetchBuffer.push(...response.result); + + if (this.options.enableQueryControl) { + if (this.fetchBuffer.length >= this.pageSize) { + const temp = this.fetchBuffer.slice(0, this.pageSize); + this.fetchBuffer = this.fetchBuffer.slice(this.pageSize); + return { result: temp, headers: this.fetchMoreRespHeaders }; + } else { + const temp = this.fetchBuffer; + this.fetchBuffer = []; + return { result: temp, headers: this.fetchMoreRespHeaders }; + } } + return this._fetchMoreImplementation(diagnosticNode); } } catch (err: any) { mergeHeaders(this.fetchMoreRespHeaders, err.headers); diff --git a/sdk/cosmosdb/cosmos/src/queryIterator.ts b/sdk/cosmosdb/cosmos/src/queryIterator.ts index 937438f2ef1a..81a448955ca9 100644 --- a/sdk/cosmosdb/cosmos/src/queryIterator.ts +++ b/sdk/cosmosdb/cosmos/src/queryIterator.ts @@ -47,7 +47,6 @@ export class QueryIterator { private queryPlanPromise: Promise>; private isInitialized: boolean; private correlatedActivityId: string; - private nonStreamingOrderBy: boolean = false; private partitionKeyRangeCache: PartitionKeyRangeCache; /** @@ -202,6 +201,7 @@ export class QueryIterator { throw error; } } + return new FeedResponse( response.result, response.headers, @@ -244,11 +244,11 @@ export class QueryIterator { while (this.queryExecutionContext.hasMoreResults()) { let response: Response; try { - response = await this.queryExecutionContext.nextItem(diagnosticNode); + response = await this.queryExecutionContext.fetchMore(diagnosticNode); } catch (error: any) { if (this.needsQueryPlan(error)) { await this.createExecutionContext(diagnosticNode); - response = await this.queryExecutionContext.nextItem(diagnosticNode); + response = await this.queryExecutionContext.fetchMore(diagnosticNode); } else { throw error; } @@ -256,16 +256,8 @@ export class QueryIterator { const { result, headers } = response; // concatenate the results and fetch more mergeHeaders(this.fetchAllLastResHeaders, headers); - if (result !== undefined) { - if ( - this.nonStreamingOrderBy && - typeof result === "object" && - Object.keys(result).length === 0 - ) { - // ignore empty results from NonStreamingOrderBy Endpoint components. - } else { - this.fetchAllTempResources.push(result); - } + if (result) { + this.fetchAllTempResources.push(...result); } } return new FeedResponse( @@ -324,7 +316,6 @@ export class QueryIterator { queryPlan: PartitionedQueryExecutionInfo, ): Promise { const queryInfo = queryPlan.queryInfo; - this.nonStreamingOrderBy = queryInfo.hasNonStreamingOrderBy ? true : false; if (queryInfo.aggregates.length > 0 && queryInfo.hasSelectValue === false) { throw new Error("Aggregate queries must use the VALUE keyword"); } diff --git a/sdk/cosmosdb/cosmos/src/request/FeedOptions.ts b/sdk/cosmosdb/cosmos/src/request/FeedOptions.ts index a5c76a89d194..906c28886473 100644 --- a/sdk/cosmosdb/cosmos/src/request/FeedOptions.ts +++ b/sdk/cosmosdb/cosmos/src/request/FeedOptions.ts @@ -128,4 +128,10 @@ export interface FeedOptions extends SharedOptions { * Default: false; When set to true, it allows queries to bypass the default behavior that blocks nonStreaming queries without top or limit clauses. */ allowUnboundedNonStreamingQueries?: boolean; + + /** + * Enable query control for the query. + * Would give empty results if the results is not ready to served. + */ + enableQueryControl?: boolean; } diff --git a/sdk/cosmosdb/cosmos/test/internal/unit/common/TestParallelQueryExecutionContext.ts b/sdk/cosmosdb/cosmos/test/internal/unit/common/TestParallelQueryExecutionContext.ts new file mode 100644 index 000000000000..1b620f903bef --- /dev/null +++ b/sdk/cosmosdb/cosmos/test/internal/unit/common/TestParallelQueryExecutionContext.ts @@ -0,0 +1,33 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import { + DocumentProducer, + ExecutionContext, + ParallelQueryExecutionContextBase, +} from "../../../../src/queryExecutionContext"; +import { Response } from "../../../../src/request"; +import { DiagnosticNodeInternal } from "../../../../src/diagnostics/DiagnosticNodeInternal"; + +export class TestParallelQueryExecutionContext + extends ParallelQueryExecutionContextBase + implements ExecutionContext +{ + public documentProducerComparator( + docProd1: DocumentProducer, + docProd2: DocumentProducer, + ): number { + return docProd1.generation - docProd2.generation; + } + + private async bufferMore(diagnosticNode?: DiagnosticNodeInternal): Promise { + // TODO: need to upadte headers from here, so make sure it returns it + await this.bufferDocumentProducers(diagnosticNode); + await this.fillBufferFromBufferQueue(); + } + + public async fetchMore(diagnosticNode?: DiagnosticNodeInternal): Promise> { + await this.bufferMore(diagnosticNode); + return this.drainBufferedItems(); + } +} diff --git a/sdk/cosmosdb/cosmos/test/internal/unit/documentProducer.spec.ts b/sdk/cosmosdb/cosmos/test/internal/unit/documentProducer.spec.ts new file mode 100644 index 000000000000..1668d8db458e --- /dev/null +++ b/sdk/cosmosdb/cosmos/test/internal/unit/documentProducer.spec.ts @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +// import assert from "assert"; +import { DocumentProducer, FetchResult, SqlQuerySpec } from "../../../src/queryExecutionContext"; +import { MockedClientContext } from "../../public/common/MockClientContext"; +import { ClientContext, PartitionKeyRange } from "../../../src"; +import assert from "assert"; + +describe("11Test DocumentProducer", function () { + const mockedClientContext: ClientContext = new MockedClientContext({}) as any; + const sqlQuerySpec: SqlQuerySpec = { query: "SELECT c.id from c" }; + const pkRange: PartitionKeyRange = { + id: "0", + minInclusive: "", + maxExclusive: "FF", + ridPrefix: 0, + throughputFraction: 0.1, + status: "online", + parents: [], + }; + + it("fetchBufferedItems should return first item", async function () { + const documentProducer = new DocumentProducer( + mockedClientContext, + "mockCollectionLink", + sqlQuerySpec, + pkRange, + {}, + "mockCorrelatedActivityId", + ); + documentProducer.fetchResults = [ + new FetchResult({ id: "1" }, undefined), + new FetchResult({ id: "2" }, undefined), + new FetchResult({ id: "3" }, undefined), + ]; + + let item = await documentProducer.fetchBufferedItems(); + assert.strictEqual(item.result.length, 3); + + item = await documentProducer.fetchBufferedItems(); + assert.strictEqual(item.result.length, 0); + documentProducer.allFetched = true; + + item = await documentProducer.fetchBufferedItems(); + assert.strictEqual(item.result, undefined); + }); + + it("fetchNextItem should return first item", async function () { + const documentProducer = new DocumentProducer( + mockedClientContext, + "mockCollectionLink", + sqlQuerySpec, + pkRange, + {}, + "mockCorrelatedActivityId", + ); + documentProducer.fetchResults = [ + new FetchResult({ id: "1" }, undefined), + new FetchResult({ id: "2" }, undefined), + ]; + + let item = await documentProducer.fetchNextItem(); + assert.strictEqual(item.result.id, "1"); + + item = await documentProducer.fetchNextItem(); + assert.strictEqual(item.result.id, "2"); + + item = await documentProducer.fetchNextItem(); + assert.strictEqual(item.result, undefined); + + documentProducer.allFetched = true; + item = await documentProducer.fetchNextItem(); + assert.strictEqual(item.result, undefined); + }); + + it("peak item should return first item", async function () { + const documentProducer = new DocumentProducer( + mockedClientContext, + "mockCollectionLink", + sqlQuerySpec, + pkRange, + {}, + "mockCorrelatedActivityId", + ); + documentProducer.fetchResults = [new FetchResult({ id: "1" }, undefined)]; + + let item = await documentProducer.peakNextItem(); + assert.strictEqual(item.id, "1"); + + await documentProducer.fetchNextItem(); + + item = await documentProducer.peakNextItem(); + assert.strictEqual(item, undefined); + + documentProducer.allFetched = true; + item = await documentProducer.peakNextItem(); + assert.strictEqual(item, undefined); + }); +}); diff --git a/sdk/cosmosdb/cosmos/test/internal/unit/hybridExecutionContext.spec.ts b/sdk/cosmosdb/cosmos/test/internal/unit/hybridExecutionContext.spec.ts index cc4e06813a05..7ac3fe83743f 100644 --- a/sdk/cosmosdb/cosmos/test/internal/unit/hybridExecutionContext.spec.ts +++ b/sdk/cosmosdb/cosmos/test/internal/unit/hybridExecutionContext.spec.ts @@ -1,21 +1,15 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -import { - ClientContext, - CosmosDbDiagnosticLevel, - DiagnosticNodeInternal, - FeedOptions, - QueryInfo, - DiagnosticNodeType, -} from "../../../src"; +import { CosmosDbDiagnosticLevel, DiagnosticNodeInternal, DiagnosticNodeType } from "../../../src"; +import type { ClientContext, FeedOptions, QueryInfo } from "../../../src"; import { HybridQueryExecutionContext, HybridQueryExecutionContextBaseStates, } from "../../../src/queryExecutionContext/hybridQueryExecutionContext"; -import { HybridSearchQueryInfo } from "../../../src/request/ErrorResponse"; -import { GlobalStatistics } from "../../../src/request/globalStatistics"; +import type { HybridSearchQueryInfo } from "../../../src/request/ErrorResponse"; +import type { GlobalStatistics } from "../../../src/request/globalStatistics"; import assert from "assert"; -import { HybridSearchQueryResult } from "../../../src/request/hybridSearchQueryResult"; +import type { HybridSearchQueryResult } from "../../../src/request/hybridSearchQueryResult"; import sinon from "sinon"; import { MockedClientContext } from "../../public/common/MockClientContext"; @@ -91,11 +85,13 @@ describe("hybridQueryExecutionContext", function () { .onCall(1) .returns(false); // Second call returns false - sinon.stub(context["globalStatisticsExecutionContext"], "nextItem").resolves({ - result: { - documentCount: 2, - fullTextStatistics: [{ totalWordCount: 100, hitCounts: [1, 2, 3] }], - }, + sinon.stub(context["globalStatisticsExecutionContext"], "fetchMore").resolves({ + result: [ + { + documentCount: 2, + fullTextStatistics: [{ totalWordCount: 100, hitCounts: [1, 2, 3] }], + }, + ], headers: {}, code: 200, substatus: 0, diff --git a/sdk/cosmosdb/cosmos/test/internal/unit/partitionMerge.spec.ts b/sdk/cosmosdb/cosmos/test/internal/unit/partitionMerge.spec.ts new file mode 100644 index 000000000000..41321400a671 --- /dev/null +++ b/sdk/cosmosdb/cosmos/test/internal/unit/partitionMerge.spec.ts @@ -0,0 +1,266 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +import { + ClientConfigDiagnostic, + ClientContext, + ConsistencyLevel, + Constants, + CosmosClientOptions, + CosmosDbDiagnosticLevel, + DiagnosticNodeInternal, + FeedOptions, + GlobalEndpointManager, + QueryInfo, + RequestOptions, + QueryIterator, + PartitionKeyRange, + Resource, + StatusCodes, +} from "../../../src"; +import { expect, assert } from "chai"; +import { TestParallelQueryExecutionContext } from "./common/TestParallelQueryExecutionContext"; +import sinon from "sinon"; +import { SubStatusCodes } from "../../../src/common"; + +const createMockPartitionKeyRange = (id: string, minInclusive: string, maxExclusive: string) => ({ + id, // Range ID + _rid: "range-rid", // Resource ID of the partition key range + minInclusive, // Minimum value of the partition key range + maxExclusive, // Maximum value of the partition key range + _etag: "sample-etag", // ETag for concurrency control + _self: `/dbs/sample-db/colls/sample-collection/pkranges/${id}`, // Self-link + throughputFraction: 1.0, // Throughput assigned to this partition + status: "Online", // Status of the partition +}); + +const createMockDocument = (id: string, name: string, value: string) => ({ + id, + _rid: "sample-rid-2", + _ts: Date.now(), + _self: "/dbs/sample-db/colls/sample-collection/docs/sample-id-2", + _etag: "sample-etag-2", + name: name, + value: value, +}); + +function createTestClientContext( + options: Partial, + diagnosticLevel: CosmosDbDiagnosticLevel, +) { + const clientOps: CosmosClientOptions = { + endpoint: "", + connectionPolicy: { + enableEndpointDiscovery: false, + preferredLocations: ["https://localhhost"], + }, + ...options, + }; + const globalEndpointManager = new GlobalEndpointManager( + clientOps, + async (diagnosticNode: DiagnosticNodeInternal, opts: RequestOptions) => { + expect(opts).to.exist; // eslint-disable-line no-unused-expressions + const dummyAccount: any = diagnosticNode; + return dummyAccount; + }, + ); + const clientConfig: ClientConfigDiagnostic = { + endpoint: "", + resourceTokensConfigured: true, + tokenProviderConfigured: true, + aadCredentialsConfigured: true, + connectionPolicyConfigured: true, + consistencyLevel: ConsistencyLevel.BoundedStaleness, + defaultHeaders: {}, + agentConfigured: true, + userAgentSuffix: "", + pluginsConfigured: true, + sDKVersion: Constants.SDKVersion, + ...options, + }; + const clientContext = new ClientContext( + clientOps, + globalEndpointManager, + clientConfig, + diagnosticLevel, + ); + return clientContext; +} + +const collectionLink = "/dbs/testDb/colls/testCollection"; // Sample collection link +const query = "SELECT * FROM c"; // Example query string or SqlQuerySpec object +const options: FeedOptions = { maxItemCount: 2, maxDegreeOfParallelism: 1 }; +const queryInfo: QueryInfo = { + orderBy: ["Ascending"], + rewrittenQuery: "SELECT * FROM c", +} as QueryInfo; +const partitionedQueryExecutionInfo = { + queryRanges: [ + { + min: "", + max: "1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", + isMinInclusive: true, // Whether the minimum value is inclusive + isMaxInclusive: false, + }, + { + min: "1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", + max: "FF", + isMinInclusive: true, // Whether the minimum value is inclusive + isMaxInclusive: false, + }, + ], + queryInfo: queryInfo, + partitionedQueryExecutionInfoVersion: 1, +}; +const cosmosClientOptions = { + endpoint: "https://your-cosmos-db.documents.azure.com:443/", + key: "your-cosmos-db-key", + userAgentSuffix: "MockClient", +}; +const correlatedActivityId = "sample-activity-id"; // Example correlated activity ID + +const diagnosticLevel = CosmosDbDiagnosticLevel.info; + +describe("Partition-Merge", function () { + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); // Mock ClientContext instance + const mockPartitionKeyRange1 = createMockPartitionKeyRange( + "parent1", + "", + "1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", + ); + const mockPartitionKeyRange2 = createMockPartitionKeyRange( + "parent2", + "1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", + "FF", + ); + + const fetchAllInternalStub = sinon.stub().resolves({ + resources: [mockPartitionKeyRange1, mockPartitionKeyRange2], + headers: { "x-ms-request-charge": "1.23" }, + code: 200, + }); + sinon.stub(clientContext, "queryPartitionKeyRanges").returns({ + fetchAllInternal: fetchAllInternalStub, // Add fetchAllInternal to mimic expected structure + } as unknown as QueryIterator); + + const mockDocument1 = createMockDocument( + "sample-id-1", + "Sample Document 1", + "This is the first sample document", + ); + const mockDocument2 = createMockDocument( + "sample-id-2", + "Sample Document 2", + "This is the second sample document", + ); + + // Define a stub for queryFeed in clientContext + sinon.stub(clientContext, "queryFeed").resolves({ + result: [mockDocument1, mockDocument2] as unknown as Resource, // Add result to mimic expected structure + headers: { + "x-ms-request-charge": "3.5", // Example RU charge + "x-ms-continuation": "token-for-next-page", // Continuation token for pagination + }, + code: 200, // Optional status code + }); + + // Create a new instance of TestParallelQueryExecutionContext + const context = new TestParallelQueryExecutionContext( + clientContext, + collectionLink, + query, + options, + partitionedQueryExecutionInfo, + correlatedActivityId, + ); + context["options"] = options; + + it("there should be 2 document producers in the unfilledDocumentProducersQueue as there are two partition key ranges", async function () { + // Assert that the priority queue has 2 document producers + assert.equal(context["unfilledDocumentProducersQueue"].size(), 2); + + // Assert that the document producers have the correct start and end EPKs and populateEpkRangeHeaders is false + context["unfilledDocumentProducersQueue"].forEach((docProd) => { + if (docProd.targetPartitionKeyRange.id === mockPartitionKeyRange1.id) { + assert.equal(docProd.startEpk, mockPartitionKeyRange1.minInclusive); + assert.equal(docProd.endEpk, mockPartitionKeyRange1.maxExclusive); + } else if (docProd.targetPartitionKeyRange.id === mockPartitionKeyRange2.id) { + assert.equal(docProd.startEpk, mockPartitionKeyRange2.minInclusive); + assert.equal(docProd.endEpk, mockPartitionKeyRange2.maxExclusive); + } + assert.equal(docProd.populateEpkRangeHeaders, false); + }); + }); + + it("Correct parent epk ranges are picked up in the newly created child document producers and _enqueueReplacementDocumentProducers function should be called if partition is gone due to merge", async function () { + const parentDocProd1 = context["unfilledDocumentProducersQueue"].peek(); + + // Stub the bufferMore method of the document producers to throw a Gone error + context["unfilledDocumentProducersQueue"].forEach((docProd) => { + sinon.stub(docProd, "bufferMore").rejects({ + code: StatusCodes.Gone, + substatus: SubStatusCodes.PartitionKeyRangeGone, + message: "Partition key range is gone", + }); + }); + const parentDocumentProducer1StartEpk = parentDocProd1.startEpk; + const parentDocumentProducer1EndEpk = parentDocProd1.endEpk; + + // Mocking the _getReplacementPartitionKeyRanges function to return a single partition key range + const getReplacementPartitionKeyRangesStub = sinon + .stub(context as any, "_getReplacementPartitionKeyRanges") + .resolves([createMockPartitionKeyRange("child1", "", "1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")]); + + // Creating a spy on the _enqueueReplacementDocumentProducers function + const enqueueSpy = sinon.spy(context as any, "_enqueueReplacementDocumentProducers"); + + try { + // The query fails because the fetchMore method of the first document producer throws a Gone error + await context.fetchMore(context["diagnosticNodeWrapper"]["diagnosticNode"]); + assert.fail("Expected query to fail"); + } catch (err) { + assert(err); + } + + // Assert that the _enqueueReplacementDocumentProducers function was called once + assert(enqueueSpy.calledOnce); + enqueueSpy.restore(); + + // Assert that the priority queue has 2 document producers. One parent and one newly created child + assert.equal(context["unfilledDocumentProducersQueue"].size(), 2); + + // Assert that the newly created document producer has the correct start and end EPKs from Parent and populateEpkRangeHeaders is true + context["unfilledDocumentProducersQueue"].forEach((docProd) => { + if (docProd.targetPartitionKeyRange.id === "child1") { + assert.equal(docProd.startEpk, parentDocumentProducer1StartEpk); + assert.equal(docProd.endEpk, parentDocumentProducer1EndEpk); + assert.equal(docProd.populateEpkRangeHeaders, true); + } + }); + + // Removing the child document producer from the priority queue + context["unfilledDocumentProducersQueue"].deq(); + + // Assert that the priority queue has 1 document producer + assert.equal(context["unfilledDocumentProducersQueue"].size(), 1); + + const parentDocProd2 = context["unfilledDocumentProducersQueue"].peek(); + + const parentDocumentProducer2StartEpk = parentDocProd2.startEpk; + const parentDocumentProducer2EndEpk = parentDocProd2.endEpk; + + // Restoring and mocking again the _getReplacementPartitionKeyRanges function + getReplacementPartitionKeyRangesStub.restore(); + sinon + .stub(context as any, "_getReplacementPartitionKeyRanges") + .resolves([createMockPartitionKeyRange("child2", "1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", "FF")]); + + // Assert that the newly created document producer has the correct start and end EPKs from Parent and populateEpkRangeHeaders is true + context["unfilledDocumentProducersQueue"].forEach((docProd) => { + if (docProd.targetPartitionKeyRange.id === "child2") { + assert.equal(docProd.startEpk, parentDocumentProducer2StartEpk); + assert.equal(docProd.endEpk, parentDocumentProducer2EndEpk); + assert.equal(docProd.populateEpkRangeHeaders, true); + } + }); + }); +}); diff --git a/sdk/cosmosdb/cosmos/test/internal/unit/defaultQueryExecutionContext.spec.ts b/sdk/cosmosdb/cosmos/test/internal/unit/query/defaultQueryExecutionContext.spec.ts similarity index 85% rename from sdk/cosmosdb/cosmos/test/internal/unit/defaultQueryExecutionContext.spec.ts rename to sdk/cosmosdb/cosmos/test/internal/unit/query/defaultQueryExecutionContext.spec.ts index afebf8c10211..a40d53f54dba 100644 --- a/sdk/cosmosdb/cosmos/test/internal/unit/defaultQueryExecutionContext.spec.ts +++ b/sdk/cosmosdb/cosmos/test/internal/unit/query/defaultQueryExecutionContext.spec.ts @@ -1,12 +1,12 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -import type { FetchFunctionCallback } from "../../../src/queryExecutionContext"; -import { DefaultQueryExecutionContext } from "../../../src/queryExecutionContext"; -import type { FeedOptions } from "../../../src"; +import type { FetchFunctionCallback } from "../../../../src/queryExecutionContext"; +import { DefaultQueryExecutionContext } from "../../../../src/queryExecutionContext/defaultQueryExecutionContext"; +import type { FeedOptions } from "../../../../src"; import assert from "assert"; -import { sleep } from "../../../src/common"; -import { createDummyDiagnosticNode } from "../../public/common/TestHelpers"; -import { getEmptyCosmosDiagnostics } from "../../../src/utils/diagnostics"; +import { sleep } from "../../../../src/common"; +import { createDummyDiagnosticNode } from "../../../public/common/TestHelpers"; +import { getEmptyCosmosDiagnostics } from "../../../../src/utils/diagnostics"; describe("defaultQueryExecutionContext", function () { it("should not buffer items if bufferItems is false", async function () { diff --git a/sdk/cosmosdb/cosmos/test/internal/unit/query/orderByQueryExecutionContext.spec.ts b/sdk/cosmosdb/cosmos/test/internal/unit/query/orderByQueryExecutionContext.spec.ts new file mode 100644 index 000000000000..8fbd9aa99269 --- /dev/null +++ b/sdk/cosmosdb/cosmos/test/internal/unit/query/orderByQueryExecutionContext.spec.ts @@ -0,0 +1,387 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import sinon from "sinon"; +import { CosmosDbDiagnosticLevel } from "../../../../src/diagnostics/CosmosDbDiagnosticLevel"; +import type { QueryInfo } from "../../../../src/request/ErrorResponse"; +import { createTestClientContext } from "./parallelQueryExecutionContextBase.spec"; +import type { QueryIterator } from "../../../../src/queryIterator"; +import type { PartitionKeyRange } from "../../../../src/client/Container/PartitionKeyRange"; +import type { Resource } from "../../../../src/client/Resource"; +import { OrderByQueryExecutionContext } from "../../../../src/queryExecutionContext/orderByQueryExecutionContext"; +import type { FeedOptions } from "../../../../src/request/FeedOptions"; +import assert from "assert"; +import { createDummyDiagnosticNode } from "../../../public/common/TestHelpers"; + +describe("OrderByQueryExecutionContext", function () { + const collectionLink = "/dbs/testDb/colls/testCollection"; // Sample collection link + const query = "SELECT * FROM c order by c.id"; // Example query string or SqlQuerySpec object + const queryInfo: QueryInfo = { + orderBy: ["Ascending"], + orderByExpressions: [ + { + expression: "c.id", // Replace `propertyName` with the field you are ordering by + type: "PropertyRef", // Type of the expression + }, + ], + groupByAliasToAggregateType: {}, // No group by in this example + distinctType: "None", // Indicates no DISTINCT in the query + hasSelectValue: false, // Assuming no SELECT VALUE + hasNonStreamingOrderBy: false, // Set to true if using non-streaming ORDER BY + }; + + const partitionedQueryExecutionInfo = { + queryRanges: [ + { + min: "00", + max: "AA", + isMinInclusive: true, // Whether the minimum value is inclusive + isMaxInclusive: false, + }, + { + min: "AA", + max: "BB", + isMinInclusive: true, // Whether the minimum value is inclusive + isMaxInclusive: false, + }, + { + min: "BB", + max: "FF", + isMinInclusive: true, // Whether the minimum value is inclusive + isMaxInclusive: false, + }, + ], + queryInfo: queryInfo, + partitionedQueryExecutionInfoVersion: 1, + }; + const correlatedActivityId = "sample-activity-id"; // Example correlated activity ID + // Mock dependencies for ClientContext + const cosmosClientOptions = { + endpoint: "https://your-cosmos-db.documents.azure.com:443/", + key: "your-cosmos-db-key", + userAgentSuffix: "MockClient", + }; + + const diagnosticLevel = CosmosDbDiagnosticLevel.info; + const createMockPartitionKeyRange = (id: string, minInclusive: string, maxExclusive: string) => ({ + id, // Range ID + _rid: "range-rid", // Resource ID of the partition key range + minInclusive, // Minimum value of the partition key range + maxExclusive, // Maximum value of the partition key range + _etag: "sample-etag", // ETag for concurrency control + _self: `/dbs/sample-db/colls/sample-collection/pkranges/${id}`, // Self-link + throughputFraction: 1.0, // Throughput assigned to this partition + status: "Online", // Status of the partition + }); + + const createMockDocument = (id: string, name: string, value: string) => ({ + orderByItems: [ + { + item: id, // Value of the property used in ORDER BY (e.g., timestamp or other sortable field) + }, + ], + payload: { + id: id, // Unique identifier for the document + name: name, // Property used in ORDER BY + otherProperty: 42, // Other properties in the document + value: value, // Value of the document + }, + rid: "2d3g45", // Resource ID of the document + ts: 1692968400, // Timestamp of the document + _etag: '"0x8D9F8B2B2C1A9F0"', // ETag for concurrency control + }); + + // it("should buffer empty result when maxdegree of parallism 2", async function () { + // const options: FeedOptions = { maxItemCount: 10, maxDegreeOfParallelism: 2 }; + + // const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); // Mock ClientContext instance + // const mockPartitionKeyRange1 = createMockPartitionKeyRange("0", "", "AA"); + // const mockPartitionKeyRange2 = createMockPartitionKeyRange("1", "AA", "BB"); + // const mockPartitionKeyRange3 = createMockPartitionKeyRange("2", "BB", "FF"); + + // const fetchAllInternalStub = sinon.stub().resolves({ + // resources: [mockPartitionKeyRange1, mockPartitionKeyRange2, mockPartitionKeyRange3], + // headers: { "x-ms-request-charge": "1.23" }, + // code: 200, + // }); + + // sinon.stub(clientContext, "queryPartitionKeyRanges").returns({ + // fetchAllInternal: fetchAllInternalStub, // Add fetchAllInternal to mimic expected structure + // } as unknown as QueryIterator); + + // // Define a mock document (resource) returned from queryFeed + // const mockDocument1 = createMockDocument( + // "1", + // "Sample Document 1", + // "This is the first sample document", + // ); + // const mockDocument2 = createMockDocument( + // "2", + // "Sample Document 2", + // "This is the second sample document", + // ); + // // Define a stub for queryFeed in clientContext + // sinon.stub(clientContext, "queryFeed").resolves({ + // result: [mockDocument1, mockDocument2] as unknown as Resource, // Add result to mimic expected structure + // headers: { + // "x-ms-request-charge": "3.5", // Example RU charge + // "x-ms-continuation": "token-for-next-page", // Continuation token for pagination + // }, + // code: 200, // Optional status code + // }); + // const context = new OrderByQueryExecutionContext( + // clientContext, + // collectionLink, + // query, + // {}, + // partitionedQueryExecutionInfo, + // correlatedActivityId, + // ); + // context["options"] = options; + + // await context.bufferMore(); + + // assert.equal(context["buffer"].length, 0); + // }); + + // it("should buffer result when buffermore called twice", async function () { + // const options: FeedOptions = { maxItemCount: 10, maxDegreeOfParallelism: 2 }; + + // const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); // Mock ClientContext instance + // const mockPartitionKeyRange1 = createMockPartitionKeyRange("0", "", "AA"); + // const mockPartitionKeyRange2 = createMockPartitionKeyRange("1", "AA", "BB"); + // const mockPartitionKeyRange3 = createMockPartitionKeyRange("2", "BB", "FF"); + + // const fetchAllInternalStub = sinon.stub().resolves({ + // resources: [mockPartitionKeyRange1, mockPartitionKeyRange2, mockPartitionKeyRange3], + // headers: { "x-ms-request-charge": "1.23" }, + // code: 200, + // }); + + // sinon.stub(clientContext, "queryPartitionKeyRanges").returns({ + // fetchAllInternal: fetchAllInternalStub, // Add fetchAllInternal to mimic expected structure + // } as unknown as QueryIterator); + + // // Define a mock document (resource) returned from queryFeed + // const mockDocument1 = createMockDocument( + // "1", + // "Sample Document 1", + // "This is the first sample document", + // ); + // const mockDocument2 = createMockDocument( + // "2", + // "Sample Document 2", + // "This is the second sample document", + // ); + // // Define a stub for queryFeed in clientContext + // sinon.stub(clientContext, "queryFeed").resolves({ + // result: [mockDocument1, mockDocument2] as unknown as Resource, // Add result to mimic expected structure + // headers: { + // "x-ms-request-charge": "3.5", // Example RU charge + // "x-ms-continuation": "token-for-next-page", // Continuation token for pagination + // }, + // code: 200, // Optional status code + // }); + // const context = new OrderByQueryExecutionContext( + // clientContext, + // collectionLink, + // query, + // {}, + // partitionedQueryExecutionInfo, + // correlatedActivityId, + // ); + // context["options"] = options; + + // await context.bufferMore(); + // await context.bufferMore(); + + // assert.equal(context["buffer"].length, 4); + // }); + + // TODO: figure out how to simulate hasmore results as false + it("should return result when fetchMore called", async function () { + const options: FeedOptions = { maxItemCount: 10, maxDegreeOfParallelism: 2 }; + + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); // Mock ClientContext instance + const mockPartitionKeyRange1 = createMockPartitionKeyRange("0", "", "AA"); + const mockPartitionKeyRange2 = createMockPartitionKeyRange("1", "AA", "BB"); + const mockPartitionKeyRange3 = createMockPartitionKeyRange("2", "BB", "FF"); + + const fetchAllInternalStub = sinon.stub().resolves({ + resources: [mockPartitionKeyRange1, mockPartitionKeyRange2, mockPartitionKeyRange3], + headers: { "x-ms-request-charge": "1.23" }, + code: 200, + }); + + sinon.stub(clientContext, "queryPartitionKeyRanges").returns({ + fetchAllInternal: fetchAllInternalStub, // Add fetchAllInternal to mimic expected structure + } as unknown as QueryIterator); + + // Define a mock document (resource) returned from queryFeed + const mockDocument1 = createMockDocument( + "1", + "Sample Document 1", + "This is the first sample document", + ); + const mockDocument2 = createMockDocument( + "2", + "Sample Document 2", + "This is the second sample document", + ); + const mockDocument3 = createMockDocument( + "3", + "Sample Document 3", + "This is the third sample document", + ); + + const mockDocumentList = [mockDocument1, mockDocument2, mockDocument3]; + let i = 0; + // Define a stub for queryFeed in clientContext + sinon.stub(clientContext, "queryFeed").callsFake(async () => { + return { + result: [mockDocumentList[i++]] as unknown as Resource, // Add result to mimic expected structure + headers: { + "x-ms-request-charge": "3.5", // Example RU charge + }, + code: 200, // Optional status code + }; + }); + + const context = new OrderByQueryExecutionContext( + clientContext, + collectionLink, + query, + {}, + partitionedQueryExecutionInfo, + correlatedActivityId, + ); + + context["options"] = options; + const result = []; + let count = 0; + while (context.hasMoreResults()) { + const response = await context.fetchMore(createDummyDiagnosticNode()); + if (response && response.result) { + result.push(...response.result); + } + count++; + } + assert.equal(result.length, 3); + // check ordering of the result 1,2,3 + assert.equal(result[0].payload.id, "1"); + assert.equal(result[1].payload.id, "2"); + assert.equal(result[2].payload.id, "3"); + }); + + it("fetchMore should handle different distribution of data across document producers", async function () { + const options: FeedOptions = { maxItemCount: 10, maxDegreeOfParallelism: 2 }; + + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); // Mock ClientContext instance + const mockPartitionKeyRange1 = createMockPartitionKeyRange("0", "", "AA"); + const mockPartitionKeyRange2 = createMockPartitionKeyRange("1", "AA", "BB"); + const mockPartitionKeyRange3 = createMockPartitionKeyRange("2", "BB", "FF"); + + const fetchAllInternalStub = sinon.stub().resolves({ + resources: [mockPartitionKeyRange1, mockPartitionKeyRange2, mockPartitionKeyRange3], + headers: { "x-ms-request-charge": "1.23" }, + code: 200, + }); + + sinon.stub(clientContext, "queryPartitionKeyRanges").returns({ + fetchAllInternal: fetchAllInternalStub, // Add fetchAllInternal to mimic expected structure + } as unknown as QueryIterator); + + // Define a mock document (resource) returned from queryFeed + const mockDocument1 = createMockDocument( + "1", + "Sample Document 1", + "This is the first sample document", + ); + const mockDocument2 = createMockDocument( + "2", + "Sample Document 2", + "This is the second sample document", + ); + const mockDocument3 = createMockDocument( + "3", + "Sample Document 3", + "This is the third sample document", + ); + const mockDocument4 = createMockDocument( + "4", + "Sample Document 4", + "This is the fourth sample document", + ); + + let i = -1; + // Define a stub for queryFeed in clientContext + // Define a stub for queryFeed in clientContext + sinon.stub(clientContext, "queryFeed").callsFake(async () => { + i++; + if (i === 0) { + return { + result: [mockDocument1] as unknown as Resource, // Add result to mimic expected structure + headers: { + "x-ms-request-charge": "3.5", // Example RU charge + "x-ms-continuation": "token-for-next", // Continuation token for pagination + }, + code: 200, // Optional status code + }; + } else if (i === 1) { + return { + result: [mockDocument2, mockDocument3] as unknown as Resource, // Add result to mimic expected structure + headers: { + "x-ms-request-charge": "3.5", // Example RU charge + "x-ms-continuation": "token-for-next", // Continuation token for pagination + }, + code: 200, // Optional status code + }; + } else if (i === 2) { + return { + result: [mockDocument1, mockDocument2, mockDocument3] as unknown as Resource, // Add result to mimic expected structure + headers: { + "x-ms-request-charge": "3.5", // Example RU charge + "x-ms-continuation": "token-for-next", // Continuation token for pagination + }, + code: 200, // Optional status code + }; + } else { + return { + result: [mockDocument4] as unknown as Resource, // Add result to mimic expected structure + headers: { + "x-ms-request-charge": "3.5", // Example RU charge + }, + code: 200, // Optional status code + }; + } + }); + + const context = new OrderByQueryExecutionContext( + clientContext, + collectionLink, + query, + {}, + partitionedQueryExecutionInfo, + correlatedActivityId, + ); + + context["options"] = options; + + const responses = []; + for (let j = 0; j < 5; j++) { + const response = await context.fetchMore(createDummyDiagnosticNode()); + responses.push(response); + } + + assert.equal(responses[0].result.length, 0); + assert.equal(responses[1].result.length, 1); + assert.equal(responses[2].result.length, 4); + assert.equal(responses[3].result.length, 1); + assert.equal(responses[4].result.length, 3); + + await context.fetchMore(); + assert.equal(context.hasMoreResults(), false); + }); + + // TODO: add tests for one of document producer becoming empty +}); diff --git a/sdk/cosmosdb/cosmos/test/internal/unit/query/parallelQueryExecutionContextBase.spec.ts b/sdk/cosmosdb/cosmos/test/internal/unit/query/parallelQueryExecutionContextBase.spec.ts new file mode 100644 index 000000000000..b785d0d4d170 --- /dev/null +++ b/sdk/cosmosdb/cosmos/test/internal/unit/query/parallelQueryExecutionContextBase.spec.ts @@ -0,0 +1,516 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import sinon from "sinon"; +import { + ClientConfigDiagnostic, + ClientContext, + ConsistencyLevel, + Constants, + CosmosClientOptions, + CosmosDbDiagnosticLevel, + DiagnosticNodeInternal, + FeedOptions, + GlobalEndpointManager, + PartitionKeyRange, + QueryInfo, + QueryIterator, + RequestOptions, + Resource, + StatusCodes, +} from "../../../../src"; +import { TestParallelQueryExecutionContext } from "../common/TestParallelQueryExecutionContext"; +import { expect } from "chai"; +import { assert } from "chai"; +import { createDummyDiagnosticNode } from "../../../public/common/TestHelpers"; +describe("parallelQueryExecutionContextBase", function () { + const collectionLink = "/dbs/testDb/colls/testCollection"; // Sample collection link + const query = "SELECT * FROM c"; // Example query string or SqlQuerySpec object + const queryInfo: QueryInfo = { + orderBy: ["Ascending"], + rewrittenQuery: "SELECT * FROM c", + } as QueryInfo; + const partitionedQueryExecutionInfo = { + queryRanges: [ + { + min: "00", + max: "AA", + isMinInclusive: true, // Whether the minimum value is inclusive + isMaxInclusive: false, + }, + { + min: "AA", + max: "BB", + isMinInclusive: true, // Whether the minimum value is inclusive + isMaxInclusive: false, + }, + { + min: "BB", + max: "FF", + isMinInclusive: true, // Whether the minimum value is inclusive + isMaxInclusive: false, + }, + ], + queryInfo: queryInfo, + partitionedQueryExecutionInfoVersion: 1, + }; + const correlatedActivityId = "sample-activity-id"; // Example correlated activity ID + // Mock dependencies for ClientContext + const cosmosClientOptions = { + endpoint: "https://your-cosmos-db.documents.azure.com:443/", + key: "your-cosmos-db-key", + userAgentSuffix: "MockClient", + }; + + const diagnosticLevel = CosmosDbDiagnosticLevel.info; + const createMockPartitionKeyRange = (id: string, minInclusive: string, maxExclusive: string) => ({ + id, // Range ID + _rid: "range-rid", // Resource ID of the partition key range + minInclusive, // Minimum value of the partition key range + maxExclusive, // Maximum value of the partition key range + _etag: "sample-etag", // ETag for concurrency control + _self: `/dbs/sample-db/colls/sample-collection/pkranges/${id}`, // Self-link + throughputFraction: 1.0, // Throughput assigned to this partition + status: "Online", // Status of the partition + }); + + const createMockDocument = (id: string, name: string, value: string) => ({ + id, + _rid: "sample-rid-2", + _ts: Date.now(), + _self: "/dbs/sample-db/colls/sample-collection/docs/sample-id-2", + _etag: "sample-etag-2", + name: name, + value: value, + }); + describe("bufferDocumentProducers", function () { + beforeEach(function () {}); + + it("should add 2 document producers to bufferedDocumentProducersQueue from unfilledDocumentProducersQueue when maxDegreeOfParallism = 2", async function () { + const options: FeedOptions = { maxItemCount: 10, maxDegreeOfParallelism: 2 }; + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); // Mock ClientContext instance + const mockPartitionKeyRange1 = createMockPartitionKeyRange("0", "", "AA"); + const mockPartitionKeyRange2 = createMockPartitionKeyRange("1", "AA", "BB"); + const mockPartitionKeyRange3 = createMockPartitionKeyRange("2", "BB", "FF"); + + const fetchAllInternalStub = sinon.stub().resolves({ + resources: [mockPartitionKeyRange1, mockPartitionKeyRange2, mockPartitionKeyRange3], + headers: { "x-ms-request-charge": "1.23" }, + code: 200, + }); + sinon.stub(clientContext, "queryPartitionKeyRanges").returns({ + fetchAllInternal: fetchAllInternalStub, // Add fetchAllInternal to mimic expected structure + } as unknown as QueryIterator); + + // Define a mock document (resource) returned from queryFeed + const mockDocument1 = createMockDocument( + "sample-id-1", + "Sample Document 1", + "This is the first sample document", + ); + const mockDocument2 = createMockDocument( + "sample-id-2", + "Sample Document 2", + "This is the second sample document", + ); + // Define a stub for queryFeed in clientContext + sinon.stub(clientContext, "queryFeed").resolves({ + result: [mockDocument1, mockDocument2] as unknown as Resource, // Add result to mimic expected structure + headers: { + "x-ms-request-charge": "3.5", // Example RU charge + "x-ms-continuation": "token-for-next-page", // Continuation token for pagination + }, + code: 200, // Optional status code + }); + + // Create mock instance of TestParallelQueryExecutionContext + const context = new TestParallelQueryExecutionContext( + clientContext, + collectionLink, + query, + options, + partitionedQueryExecutionInfo, + correlatedActivityId, + ); + context["options"] = options; + + // Call bufferDocumentProducers + await (context as any).bufferDocumentProducers(createDummyDiagnosticNode()); + + assert.equal(context["bufferedDocumentProducersQueue"].size(), 2); + assert.equal( + context["bufferedDocumentProducersQueue"].peek().targetPartitionKeyRange.id, + "0", + ); + assert.equal( + (await context["bufferedDocumentProducersQueue"].peek().fetchNextItem()).result, + mockDocument1, + ); + assert.equal(context["unfilledDocumentProducersQueue"].size(), 1); + assert.equal( + context["unfilledDocumentProducersQueue"].peek().targetPartitionKeyRange.id, + "2", + ); + assert.equal( + (await context["unfilledDocumentProducersQueue"].peek().fetchNextItem()).result, + undefined, + ); + }); + // TODO: Failing fix it + it("should release the semaphore if an error occurs", async function () { + const options: FeedOptions = { maxItemCount: 10, maxDegreeOfParallelism: 2 }; + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); // Mock ClientContext instance + + const fetchAllInternalStub = sinon.stub().rejects({ + code: 404, + body: { + message: "Partition key range not found", + }, + headers: { "x-ms-request-charge": "0" }, + }); + sinon.stub(clientContext, "queryPartitionKeyRanges").returns({ + fetchAllInternal: fetchAllInternalStub, // Add fetchAllInternal to mimic expected structure + } as unknown as QueryIterator); + + const context = new TestParallelQueryExecutionContext( + clientContext, + collectionLink, + query, + options, + partitionedQueryExecutionInfo, + correlatedActivityId, + ); + context["options"] = options; + + // Create a spy for semaphore.release + const releaseSpy = sinon.spy(context["sem"], "leave"); + try { + // Call bufferDocumentProducers + await (context as any).bufferDocumentProducers(createDummyDiagnosticNode()); + } catch (err) { + assert.equal(context["err"].code, 404); + assert.equal(releaseSpy.callCount, 2); + assert.equal(context["bufferedDocumentProducersQueue"].size(), 0); + assert.equal(context["unfilledDocumentProducersQueue"].size(), 0); + } + }); + + // TODO: FIX + it.skip("should propagate an existing error if this.err is already set", async function () { + const options: FeedOptions = { maxItemCount: 10, maxDegreeOfParallelism: 2 }; + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); // Mock ClientContext instance + + const mockPartitionKeyRange1 = createMockPartitionKeyRange("0", "", "AA"); + + const fetchAllInternalStub = sinon.stub().resolves({ + resources: [mockPartitionKeyRange1], + headers: { "x-ms-request-charge": "1.23" }, + code: 200, + }); + sinon.stub(clientContext, "queryPartitionKeyRanges").returns({ + fetchAllInternal: fetchAllInternalStub, // Add fetchAllInternal to mimic expected structure + } as unknown as QueryIterator); + + const context = new TestParallelQueryExecutionContext( + clientContext, + collectionLink, + query, + options, + partitionedQueryExecutionInfo, + correlatedActivityId, + ); + context["options"] = options; + context["err"] = { + code: 404, + body: { + message: "Partition key range not found", + }, + headers: { "x-ms-request-charge": "0" }, + }; + + // Create a spy for semaphore.release + const releaseSpy = sinon.spy(context["sem"], "leave"); + try { + // Call bufferDocumentProducers + await (context as any).bufferDocumentProducers(createDummyDiagnosticNode()); + } catch (err) { + console.log("error thrown from should propagate:", err); + assert.equal(err.code, 404); + assert.equal(releaseSpy.callCount, 2); + assert.equal(context["bufferedDocumentProducersQueue"].size(), 0); + assert.equal(context["unfilledDocumentProducersQueue"].size(), 0); + } + }); + + // TODO: FIX + it.skip("should invoke _repairExecutionContext when a split error occurs and retry after repair", async function () { + const options: FeedOptions = { maxItemCount: 10, maxDegreeOfParallelism: 2 }; + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); // Mock ClientContext instance + + let callCount = 0; + const fetchAllInternalStub = sinon.stub().callsFake(() => { + callCount++; + if (callCount === 1) { + return { + code: StatusCodes.Gone, + body: { + message: "Partition key range split", + }, + headers: { "x-ms-request-charge": "0" }, + }; + } else { + return { + resources: [ + createMockPartitionKeyRange("0", "", "AA"), + createMockPartitionKeyRange("1", "AA", "BB"), + createMockPartitionKeyRange("2", "BB", "FF"), + ], + headers: { "x-ms-request-charge": "1.23" }, + code: 200, + }; + } + }); + sinon.stub(clientContext, "queryPartitionKeyRanges").returns({ + fetchAllInternal: fetchAllInternalStub, // Add fetchAllInternal to mimic expected structure + } as unknown as QueryIterator); + + const context = new TestParallelQueryExecutionContext( + clientContext, + collectionLink, + query, + options, + partitionedQueryExecutionInfo, + correlatedActivityId, + ); + context["options"] = options; + + // Create a spy for _repairExecutionContext + const repairSpy = sinon.spy(context as any, "_repairExecutionContext"); + + // Call bufferDocumentProducers + await (context as any).bufferDocumentProducers(createDummyDiagnosticNode()); + + assert.equal(repairSpy.callCount, 1); + }); + }); + + describe("fillBufferFromBufferQueue", function () { + // TODO: failing --> timeout + it("should fill internal buffer from buffer queue for parallel query", async function () { + const options: FeedOptions = { maxItemCount: 10, maxDegreeOfParallelism: 1 }; + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); // Mock ClientContext instance + + const mockPartitionKeyRange1 = createMockPartitionKeyRange("0", "", "AA"); + const mockPartitionKeyRange2 = createMockPartitionKeyRange("1", "AA", "BB"); + const mockPartitionKeyRange3 = createMockPartitionKeyRange("2", "BB", "FF"); + + const fetchAllInternalStub = sinon.stub().resolves({ + resources: [mockPartitionKeyRange1, mockPartitionKeyRange2, mockPartitionKeyRange3], + headers: { "x-ms-request-charge": "1.23" }, + code: 200, + }); + sinon.stub(clientContext, "queryPartitionKeyRanges").returns({ + fetchAllInternal: fetchAllInternalStub, // Add fetchAllInternal to mimic expected structure + } as unknown as QueryIterator); + + // Define a mock document (resource) returned from queryFeed + const mockDocument1 = createMockDocument( + "sample-id-1", + "Sample Document 1", + "This is the first sample document", + ); + const mockDocument2 = createMockDocument( + "sample-id-2", + "Sample Document 2", + "This is the second sample document", + ); + // Define a stub for queryFeed in clientContext + sinon.stub(clientContext, "queryFeed").resolves({ + result: [mockDocument1, mockDocument2] as unknown as Resource, // Add result to mimic expected structure + headers: { + "x-ms-request-charge": "3.5", // Example RU charge + "x-ms-continuation": "token-for-next-page", // Continuation token for pagination + }, + code: 200, // Optional status code + }); + + const context = new TestParallelQueryExecutionContext( + clientContext, + collectionLink, + query, + options, + partitionedQueryExecutionInfo, + correlatedActivityId, + ); + await (context as any).bufferDocumentProducers(createDummyDiagnosticNode()); + + // Call fillBufferFromBufferQueue + await (context as any).fillBufferFromBufferQueue(); + + assert.equal(context["buffer"].length, 2); + }); + }); + + describe("drainBufferedItems", function () { + let options: FeedOptions; + let clientContext: ClientContext; + let context: TestParallelQueryExecutionContext; + + beforeEach(function () { + options = { maxItemCount: 10, maxDegreeOfParallelism: 2 }; + clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); + initializeMockPartitionKeyRanges(createMockPartitionKeyRange, clientContext, [ + ["", "AA"], + ["AA", "BB"], + ["BB", "FF"], + ]); + context = new TestParallelQueryExecutionContext( + clientContext, + collectionLink, + query, + options, + partitionedQueryExecutionInfo, + correlatedActivityId, + ); + context["options"] = options; + }); + + it("should return an empty array if buffer is empty", async function () { + const result = await (context as any).drainBufferedItems(); + assert.deepEqual(result.result, []); + assert.exists(result.headers); + }); + + it("should return buffered items and clear the buffer", async function () { + const mockDocument1 = createMockDocument( + "sample-id-1", + "Sample Document 1", + "This is the first sample document", + ); + const mockDocument2 = createMockDocument( + "sample-id-2", + "Sample Document 2", + "This is the second sample document", + ); + context["buffer"] = [mockDocument1, mockDocument2]; + + const result = await (context as any).drainBufferedItems(); + + assert.deepEqual(result.result, [mockDocument1, mockDocument2]); + assert.exists(result.headers); + assert.equal(context["buffer"].length, 0); + }); + + it("should propagate an existing error if this.err is already set", async function () { + context["err"] = { + code: 404, + body: { + message: "Partition key range not found", + }, + headers: { "x-ms-request-charge": "0" }, + }; + + try { + await (context as any).drainBufferedItems(); + } catch (err) { + assert.equal(context["err"].code, 404); + assert.equal(context["buffer"].length, 0); + } + }); + + it("should release the semaphore if an error occurs", async function () { + context["err"] = { + code: 404, + body: { + message: "Partition key range not found", + }, + headers: { "x-ms-request-charge": "0" }, + }; + + const releaseSpy = sinon.spy(context["sem"], "leave"); + + try { + await (context as any).drainBufferedItems(); + } catch (err) { + assert.equal(context["err"].code, 404); + assert.equal(releaseSpy.callCount, 2); + assert.equal(context["buffer"].length, 0); + } + }); + }); +}); + +export function initializeMockPartitionKeyRanges( + createMockPartitionKeyRange: ( + id: string, + minInclusive: string, + maxExclusive: string, + ) => { + id: string; // Range ID + _rid: string; // Resource ID of the partition key range + minInclusive: string; // Minimum value of the partition key range + maxExclusive: string; // Maximum value of the partition key range + _etag: string; // ETag for concurrency control + _self: string; // Self-link + throughputFraction: number; // Throughput assigned to this partition + status: string; + }, + clientContext: ClientContext, + ranges: [string, string][], +): void { + const partitionKeyRanges = ranges.map((range, index) => + createMockPartitionKeyRange(index.toString(), range[0], range[1]), + ); + + const fetchAllInternalStub = sinon.stub().resolves({ + resources: partitionKeyRanges, + headers: { "x-ms-request-charge": "1.23" }, + code: 200, + }); + sinon.stub(clientContext, "queryPartitionKeyRanges").returns({ + fetchAllInternal: fetchAllInternalStub, // Add fetchAllInternal to mimic expected structure + } as unknown as QueryIterator); +} + +export function createTestClientContext( + options: Partial, + diagnosticLevel: CosmosDbDiagnosticLevel, +): ClientContext { + const clientOps: CosmosClientOptions = { + endpoint: "", + connectionPolicy: { + enableEndpointDiscovery: false, + preferredLocations: ["https://localhhost"], + }, + ...options, + }; + const globalEndpointManager = new GlobalEndpointManager( + clientOps, + async (diagnosticNode: DiagnosticNodeInternal, opts: RequestOptions) => { + expect(opts).to.exist; // eslint-disable-line no-unused-expressions + const dummyAccount: any = diagnosticNode; + return dummyAccount; + }, + ); + const clientConfig: ClientConfigDiagnostic = { + endpoint: "", + resourceTokensConfigured: true, + tokenProviderConfigured: true, + aadCredentialsConfigured: true, + connectionPolicyConfigured: true, + consistencyLevel: ConsistencyLevel.BoundedStaleness, + defaultHeaders: {}, + agentConfigured: true, + userAgentSuffix: "", + pluginsConfigured: true, + sDKVersion: Constants.SDKVersion, + ...options, + }; + const clientContext = new ClientContext( + clientOps, + globalEndpointManager, + clientConfig, + diagnosticLevel, + ); + return clientContext; +} diff --git a/sdk/cosmosdb/cosmos/test/internal/unit/query/pipelinedQueryExecutionContext.spec.ts b/sdk/cosmosdb/cosmos/test/internal/unit/query/pipelinedQueryExecutionContext.spec.ts new file mode 100644 index 000000000000..168e33d7da64 --- /dev/null +++ b/sdk/cosmosdb/cosmos/test/internal/unit/query/pipelinedQueryExecutionContext.spec.ts @@ -0,0 +1,305 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import assert from "assert"; +import { CosmosDbDiagnosticLevel } from "../../../../src/diagnostics/CosmosDbDiagnosticLevel"; +import { PipelinedQueryExecutionContext } from "../../../../src/queryExecutionContext/pipelinedQueryExecutionContext"; +import { QueryInfo } from "../../../../src/request/ErrorResponse"; +import { getEmptyCosmosDiagnostics } from "../../../../src/utils/diagnostics"; +import { createDummyDiagnosticNode } from "../../../public/common/TestHelpers"; +import { createTestClientContext } from "./parallelQueryExecutionContextBase.spec"; + +describe("PipelineQueryExecutionContext", function () { + describe("fetchMore", function () { + const collectionLink = "/dbs/testDb/colls/testCollection"; // Sample collection link + const query = "SELECT * FROM c"; // Example query string or SqlQuerySpec object + const queryInfo: QueryInfo = { + distinctType: "None", + top: null, + offset: null, + limit: null, + orderBy: ["Ascending"], + rewrittenQuery: "SELECT * FROM c", + groupByExpressions: [], + aggregates: [], + groupByAliasToAggregateType: {}, + hasNonStreamingOrderBy: false, + hasSelectValue: false, + }; + const partitionedQueryExecutionInfo = { + queryRanges: [ + { + min: "00", + max: "AA", + isMinInclusive: true, // Whether the minimum value is inclusive + isMaxInclusive: false, + }, + { + min: "AA", + max: "BB", + isMinInclusive: true, // Whether the minimum value is inclusive + isMaxInclusive: false, + }, + { + min: "BB", + max: "FF", + isMinInclusive: true, // Whether the minimum value is inclusive + isMaxInclusive: false, + }, + ], + queryInfo: queryInfo, + partitionedQueryExecutionInfoVersion: 1, + }; + const correlatedActivityId = "sample-activity-id"; // Example correlated activity ID + // Mock dependencies for ClientContext + const cosmosClientOptions = { + endpoint: "https://your-cosmos-db.documents.azure.com:443/", + key: "your-cosmos-db-key", + userAgentSuffix: "MockClient", + }; + + const diagnosticLevel = CosmosDbDiagnosticLevel.info; + + const createMockDocument = (id: string, name: string, value: string) => ({ + id, + _rid: "sample-rid-2", + _ts: Date.now(), + _self: "/dbs/sample-db/colls/sample-collection/docs/sample-id-2", + _etag: "sample-etag-2", + name: name, + value: value, + }); + + it("should fetch more", async function () { + const options = { maxItemCount: 10, maxDegreeOfParallelism: 1 }; + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); + const context = new PipelinedQueryExecutionContext( + clientContext, + collectionLink, + query, + options, + partitionedQueryExecutionInfo, + correlatedActivityId, + false, + ); + // Mock the endpoint's fetchMore method to return 3 documents in every call + context["endpoint"] = { + fetchMore: async () => { + return { + result: [ + createMockDocument("1", "doc1", "value1"), + createMockDocument("2", "doc2", "value2"), + createMockDocument("3", "doc3", "value3"), + ], + headers: {}, + diagnostics: getEmptyCosmosDiagnostics(), + }; + }, + nextItem: async () => null, + hasMoreResults: () => false, + }; + + const response = await context.fetchMore(createDummyDiagnosticNode()); + const result = response.result; + + // Verify the result + assert.strictEqual(result.length, 10); + assert.strictEqual(result[0].id, "1"); + assert.strictEqual(result[1].id, "2"); + assert.strictEqual(result[2].id, "3"); + }); + + it("should fetch more when empty resutls in begining", async function () { + const options = { maxItemCount: 10, maxDegreeOfParallelism: 1 }; + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); + const context = new PipelinedQueryExecutionContext( + clientContext, + collectionLink, + query, + options, + partitionedQueryExecutionInfo, + correlatedActivityId, + false, + ); + let i = 0; + context["endpoint"] = { + fetchMore: async () => { + if (i < 3) { + i++; + return { + result: [], + headers: {}, + diagnostics: getEmptyCosmosDiagnostics(), + }; + } + return { + result: [ + createMockDocument("1", "doc1", "value1"), + createMockDocument("2", "doc2", "value2"), + createMockDocument("3", "doc3", "value3"), + ], + headers: {}, + diagnostics: getEmptyCosmosDiagnostics(), + }; + }, + nextItem: async () => null, + hasMoreResults: () => true, + }; + + const response = await context.fetchMore(createDummyDiagnosticNode()); + + const result = response.result; + assert.strictEqual(result.length, 10); + }); + + it("should return 3 response when backend returns undefined after that", async function () { + const options = { maxItemCount: 10, maxDegreeOfParallelism: 1 }; + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); + const context = new PipelinedQueryExecutionContext( + clientContext, + collectionLink, + query, + options, + partitionedQueryExecutionInfo, + correlatedActivityId, + false, + ); + let i = 0; + context["endpoint"] = { + fetchMore: async () => { + if (i < 1) { + i++; + return { + result: [ + createMockDocument("1", "doc1", "value1"), + createMockDocument("2", "doc2", "value2"), + createMockDocument("3", "doc3", "value3"), + ], + headers: {}, + diagnostics: getEmptyCosmosDiagnostics(), + }; + } + return { + result: undefined, + headers: {}, + diagnostics: getEmptyCosmosDiagnostics(), + }; + }, + nextItem: async () => null, + hasMoreResults: () => true, + }; + + const response = await context.fetchMore(createDummyDiagnosticNode()); + const result = response.result; + assert.strictEqual(result.length, 3); + }); + + it("should return undefined when backend returns undefined", async function () { + const options = { maxItemCount: 10, maxDegreeOfParallelism: 1 }; + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); + const context = new PipelinedQueryExecutionContext( + clientContext, + collectionLink, + query, + options, + partitionedQueryExecutionInfo, + correlatedActivityId, + false, + ); + context["endpoint"] = { + fetchMore: async () => { + return { + result: undefined, + headers: {}, + diagnostics: getEmptyCosmosDiagnostics(), + }; + }, + nextItem: async () => null, + hasMoreResults: () => false, + }; + + const response = await context.fetchMore(createDummyDiagnosticNode()); + const result = response.result; + assert.strictEqual(result, undefined); + }); + + it("should stop on empty array when backend returns empty array and enableQueryControl is true", async function () { + const options = { maxItemCount: 10, maxDegreeOfParallelism: 1, enableQueryControl: true }; + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); + const context = new PipelinedQueryExecutionContext( + clientContext, + collectionLink, + query, + options, + partitionedQueryExecutionInfo, + correlatedActivityId, + true, + ); + let i = 0; + context["endpoint"] = { + fetchMore: async () => { + if (i < 1) { + i++; + return { + result: [], + headers: {}, + diagnostics: getEmptyCosmosDiagnostics(), + }; + } + return { + result: [ + createMockDocument("1", "doc1", "value1"), + createMockDocument("2", "doc2", "value2"), + createMockDocument("3", "doc3", "value3"), + ], + headers: {}, + diagnostics: getEmptyCosmosDiagnostics(), + }; + }, + nextItem: async () => null, + hasMoreResults: () => true, + }; + + const response = await context.fetchMore(createDummyDiagnosticNode()); + const result = response.result; + assert.strictEqual(result.length, 0); + + const response2 = await context.fetchMore(createDummyDiagnosticNode()); + const result2 = response2.result; + assert.strictEqual(result2.length, 3); + }); + + it("enableQueryCOntrol is true and returned data is greater than maxItemCount", async function () { + const options = { maxItemCount: 2, maxDegreeOfParallelism: 1, enableQueryControl: true }; + const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); + const context = new PipelinedQueryExecutionContext( + clientContext, + collectionLink, + query, + options, + partitionedQueryExecutionInfo, + correlatedActivityId, + true, + ); + context["endpoint"] = { + fetchMore: async () => { + return { + result: [ + createMockDocument("1", "doc1", "value1"), + createMockDocument("2", "doc2", "value2"), + createMockDocument("3", "doc3", "value3"), + ], + headers: {}, + diagnostics: getEmptyCosmosDiagnostics(), + }; + }, + nextItem: async () => null, + hasMoreResults: () => true, + }; + + const response = await context.fetchMore(createDummyDiagnosticNode()); + const result = response.result; + assert.strictEqual(result.length, 2); + }); + }); +}); diff --git a/sdk/cosmosdb/cosmos/test/public/functional/NonStreamingQueryPolicy.spec.ts b/sdk/cosmosdb/cosmos/test/public/functional/NonStreamingQueryPolicy.spec.ts index 9670dfddcacc..875a7bed8117 100644 --- a/sdk/cosmosdb/cosmos/test/public/functional/NonStreamingQueryPolicy.spec.ts +++ b/sdk/cosmosdb/cosmos/test/public/functional/NonStreamingQueryPolicy.spec.ts @@ -20,7 +20,7 @@ describe("Vector search feature", async () => { database = await getTestDatabase("vector embedding database"); }); - it("validate VectorEmbeddingPolicy", async function () { + it("validate-VectorEmbeddingPolicy", async function () { const indexingPolicy: IndexingPolicy = { vectorIndexes: [ { path: "/vector1", type: VectorIndexType.Flat }, diff --git a/sdk/cosmosdb/cosmos/test/public/functional/endpointComponent/NonStreamingOrderByDistinctEndpointComponent.spec.ts b/sdk/cosmosdb/cosmos/test/public/functional/endpointComponent/NonStreamingOrderByDistinctEndpointComponent.spec.ts index 7ed548541db2..0fa42f336a08 100644 --- a/sdk/cosmosdb/cosmos/test/public/functional/endpointComponent/NonStreamingOrderByDistinctEndpointComponent.spec.ts +++ b/sdk/cosmosdb/cosmos/test/public/functional/endpointComponent/NonStreamingOrderByDistinctEndpointComponent.spec.ts @@ -23,7 +23,7 @@ describe("NonStreamingOrderByDistinctEndpointComponent", () => { assert.equal(component["priorityQueueBufferSize"], bufferSize); }); - it("should handle nextItem method correctly", async () => { + it("should handle fetchMore method correctly", async () => { let id = 1; let item = 1; const mockExecutionContext: ExecutionContext = { @@ -35,14 +35,20 @@ describe("NonStreamingOrderByDistinctEndpointComponent", () => { } }, nextItem: async () => ({ - result: { - orderByItems: [ - { - item: item++, - }, - ], - payload: { id: id++ }, - }, + result: {}, + headers: {}, + }), + fetchMore: async () => ({ + result: [ + { + orderByItems: [ + { + item: item++, + }, + ], + payload: { id: id++ }, + }, + ], headers: {}, }), } as ExecutionContext; @@ -56,18 +62,15 @@ describe("NonStreamingOrderByDistinctEndpointComponent", () => { ); let count = 1; - let result_id = 1; - // call nextItem, for first 99 items it will give empty result + // call fetchMore, for first 99 items it will give empty result while (component.hasMoreResults()) { - const response = await component.nextItem({} as any); + const response = await component.fetchMore({} as any); if (count < 99) { - assert.deepStrictEqual(response.result, {}); + assert.deepStrictEqual(response.result, []); } else { - assert.deepStrictEqual(response.result, { id: result_id++ }); + assert.deepStrictEqual(response.result.length, count); } count++; } - // Final result array should be empty after all results processed - assert.equal(component["finalResultArray"].length, 0); }); }); diff --git a/sdk/cosmosdb/cosmos/test/public/functional/endpointComponent/NonStreamingOrderByEndpointComponent.spec.ts b/sdk/cosmosdb/cosmos/test/public/functional/endpointComponent/NonStreamingOrderByEndpointComponent.spec.ts index 9f074e1f4def..0f4acfc8d2df 100644 --- a/sdk/cosmosdb/cosmos/test/public/functional/endpointComponent/NonStreamingOrderByEndpointComponent.spec.ts +++ b/sdk/cosmosdb/cosmos/test/public/functional/endpointComponent/NonStreamingOrderByEndpointComponent.spec.ts @@ -20,7 +20,7 @@ describe("NonStreamingOrderByEndpointComponent", () => { assert.equal(component["priorityQueueBufferSize"], bufferSize); }); - it("should handle nextItem method correctly", async () => { + it("should handle fetchMore method correctly", async () => { let id = 1; let item = 1; const mockExecutionContext: ExecutionContext = { @@ -32,14 +32,20 @@ describe("NonStreamingOrderByEndpointComponent", () => { } }, nextItem: async () => ({ - result: { - orderByItems: [ - { - item: item++, - }, - ], - payload: { id: id++ }, - }, + result: {}, + headers: {}, + }), + fetchMore: async () => ({ + result: [ + { + orderByItems: [ + { + item: item++, + }, + ], + payload: { id: id++ }, + }, + ], headers: {}, }), } as ExecutionContext; @@ -51,18 +57,15 @@ describe("NonStreamingOrderByEndpointComponent", () => { ); let count = 1; - let result_id = 1; - // call nextItem, for first 100 items it will give empty result + // call fetchMore, for first 99 items it will give empty result while (component.hasMoreResults()) { - const response = await component.nextItem({} as any); + const response = await component.fetchMore({} as any); if (count < 99) { - assert.deepStrictEqual(response.result, {}); + assert.deepStrictEqual(response.result, []); } else { - assert.deepStrictEqual(response.result, { id: result_id++ }); + assert.deepStrictEqual(response.result.length, count); } count++; } - // Queue should be empty after dequeueing - assert.equal(component["nonStreamingOrderByPQ"].size(), 0); }); }); diff --git a/sdk/cosmosdb/cosmos/test/public/integration/aggregateQuery.spec.ts b/sdk/cosmosdb/cosmos/test/public/integration/aggregateQuery.spec.ts index e0aa7d812602..857cd4f5f677 100644 --- a/sdk/cosmosdb/cosmos/test/public/integration/aggregateQuery.spec.ts +++ b/sdk/cosmosdb/cosmos/test/public/integration/aggregateQuery.spec.ts @@ -132,6 +132,51 @@ describe("Aggregate Query", function (this: Suite) { ); }; + const validateExecuteNextAndHasMoreResultsWithEnableQueryControl = async function ( + queryIterator: QueryIterator, + options: any, + expectedResults: any[], + ): Promise { + const pageSize = options["maxItemCount"]; + let totalFetchedResults: any[] = []; + let totalExecuteNextRequestCharge = 0; + + while (totalFetchedResults.length <= expectedResults.length) { + const { resources: results, requestCharge } = await queryIterator.fetchNext(); + + if (results && results.length > 0) { + totalFetchedResults = totalFetchedResults.concat(results); + } + totalExecuteNextRequestCharge += requestCharge; + + if ( + !queryIterator.hasMoreResults() || + totalFetchedResults.length === expectedResults.length + ) { + break; + } + + if (totalFetchedResults.length < expectedResults.length) { + // there are more results + assert(queryIterator.hasMoreResults(), "hasMoreResults expects to return true"); + } else { + // no more results + assert.equal( + expectedResults.length, + totalFetchedResults.length, + "executeNext: didn't fetch all the results", + ); + assert( + results.length <= pageSize, + "executeNext: actual fetch size is more than the requested page size", + ); + } + } + // no more results + assert.deepStrictEqual(totalFetchedResults, expectedResults); + assert.equal(queryIterator.hasMoreResults(), false, "hasMoreResults: no more results is left"); + }; + const ValidateAsyncIterator = async function ( queryIterator: QueryIterator, expectedResults: any[], @@ -171,6 +216,18 @@ describe("Aggregate Query", function (this: Suite) { ); queryIterator.reset(); await ValidateAsyncIterator(queryIterator, expectedResults); + + // Adding these to test the new flag enableQueryControl in FeedOptions + options.enableQueryControl = true; + const queryIteratorWithEnableQueryControl = container.items.query(query, options); + await validateFetchAll(queryIteratorWithEnableQueryControl, expectedResults); + + queryIteratorWithEnableQueryControl.reset(); + await validateExecuteNextAndHasMoreResultsWithEnableQueryControl( + queryIteratorWithEnableQueryControl, + options, + expectedResults, + ); }; it("SELECT VALUE AVG", async function () { diff --git a/sdk/cosmosdb/cosmos/test/public/integration/aggregates/groupBy.spec.ts b/sdk/cosmosdb/cosmos/test/public/integration/aggregates/groupBy.spec.ts index dbac0aa8fbef..41633f04f67a 100644 --- a/sdk/cosmosdb/cosmos/test/public/integration/aggregates/groupBy.spec.ts +++ b/sdk/cosmosdb/cosmos/test/public/integration/aggregates/groupBy.spec.ts @@ -1,15 +1,11 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -import type { Container, ContainerDefinition } from "../../../../src"; +import type { Container, ContainerDefinition, FeedOptions } from "../../../../src"; import { bulkInsertItems, getTestContainer, removeAllDatabases } from "../../common/TestHelpers"; import assert from "assert"; import groupBySnapshot from "./groupBy.snapshot"; import type { Context } from "mocha"; -const options = { - maxItemCount: 100, -}; - const items = [ { id: "01", @@ -524,6 +520,7 @@ const items = [ address: { city: "Atlanta", state: "GA", zip: 30301 }, }, ]; +let container: Container; describe("Cross partition GROUP BY", () => { const containerDefinition: ContainerDefinition = { @@ -532,11 +529,32 @@ describe("Cross partition GROUP BY", () => { paths: ["/id"], }, }; - const containerOptions = { offerThroughput: 25100 }; - let container: Container; + before(async () => { + await removeAllDatabases(); + container = await getTestContainer( + "GROUP BY Query", + undefined, + containerDefinition, + containerOptions, + ); + await bulkInsertItems(container, items); + }); + const options: FeedOptions = { + maxItemCount: 100, + }; + runCrosspartitionGROUPBYTests(options); + + const optionsWithEnableQueryControl: FeedOptions = { + maxItemCount: 100, + enableQueryControl: true, + }; + runCrosspartitionGROUPBYTests(optionsWithEnableQueryControl); +}); + +function runCrosspartitionGROUPBYTests(options: FeedOptions): void { let currentTestTitle: string; let snapshotNumber: number; @@ -549,17 +567,6 @@ describe("Cross partition GROUP BY", () => { snapshotNumber = 1; }); - before(async () => { - await removeAllDatabases(); - container = await getTestContainer( - "GROUP BY Query", - undefined, - containerDefinition, - containerOptions, - ); - await bulkInsertItems(container, items); - }); - it("by number", async () => { const queryIterator = container.items.query("SELECT c.age FROM c GROUP BY c.age", options); const result = await queryIterator.fetchAll(); @@ -773,4 +780,4 @@ describe("Cross partition GROUP BY", () => { assert(result.resources.length === 1); assert(result.requestCharge > 0); }); -}); +} diff --git a/sdk/cosmosdb/cosmos/test/public/integration/crossPartition.spec.ts b/sdk/cosmosdb/cosmos/test/public/integration/crossPartition.spec.ts index 279bdd014879..2e6635ace069 100644 --- a/sdk/cosmosdb/cosmos/test/public/integration/crossPartition.spec.ts +++ b/sdk/cosmosdb/cosmos/test/public/integration/crossPartition.spec.ts @@ -27,10 +27,10 @@ function compare(key: string) { }; } -describe("Cross Partition", function (this: Suite) { +describe("Cross-Partition", function (this: Suite) { this.timeout(process.env.MOCHA_TIMEOUT || "30000"); - describe("Validate Query", function () { + describe("Validate-Query", function () { const documentDefinitions = generateDocuments(20); const containerDefinition: ContainerDefinition = { @@ -137,7 +137,6 @@ describe("Cross Partition", function (this: Suite) { expectedCount || (expectedOrderIds && expectedOrderIds.length) || documentDefinitions.length; - while (queryIterator.hasMoreResults()) { const { resources: results, queryMetrics, requestCharge } = await queryIterator.fetchNext(); totalIteratorCalls++; @@ -170,7 +169,6 @@ describe("Cross Partition", function (this: Suite) { if (expectedIteratorCalls) { assert.equal(totalIteratorCalls, expectedIteratorCalls); } - // no more results validateResults(totalFetchedResults, expectedOrderIds, expectedCount); assert.equal( @@ -192,6 +190,52 @@ describe("Cross Partition", function (this: Suite) { ); }; + const validateFetchNextAndHasMoreResultsWithEnableQueryControl = async function ( + queryIterator: QueryIterator, + expectedOrderIds: string[], + expectedCount: number, + ): Promise { + let totalExecuteNextRequestCharge = 0; + let totalIteratorCalls = 0; + let totalFetchedResults: any[] = []; + const expectedLength = + expectedCount || + (expectedOrderIds && expectedOrderIds.length) || + documentDefinitions.length; + while (queryIterator.hasMoreResults()) { + const { resources: results, queryMetrics, requestCharge } = await queryIterator.fetchNext(); + totalIteratorCalls++; + assert(queryMetrics, "expected response have query metrics"); + + if (totalFetchedResults.length > expectedLength) { + break; + } + if (results) { + totalFetchedResults = totalFetchedResults.concat(results); + } + totalExecuteNextRequestCharge += requestCharge; + assert(requestCharge >= 0); + + if (totalFetchedResults.length < expectedLength) { + assert(queryIterator.hasMoreResults(), "hasMoreResults expects to return true"); + } else { + // no more results + assert.equal( + expectedLength, + totalFetchedResults.length, + "executeNext: didn't fetch all the results", + ); + } + } + // no more results + validateResults(totalFetchedResults, expectedOrderIds, expectedCount); + assert.equal( + queryIterator.hasMoreResults(), + false, + "hasMoreResults: no more results is left", + ); + }; + const validateAsyncIterator = async function ( queryIterator: QueryIterator, expectedOrderIds: any[], @@ -258,6 +302,22 @@ describe("Cross Partition", function (this: Suite) { ); queryIterator.reset(); await validateAsyncIterator(queryIterator, expectedOrderIds, expectedCount); + + // Adding these to test the new flag enableQueryControl in FeedOptions + options.enableQueryControl = true; + const queryIteratorWithEnableQueryControl = container.items.query(query, options); + await validateFetchAll( + queryIteratorWithEnableQueryControl, + options, + expectedOrderIds, + expectedCount, + ); + queryIteratorWithEnableQueryControl.reset(); + await validateFetchNextAndHasMoreResultsWithEnableQueryControl( + queryIteratorWithEnableQueryControl, + expectedOrderIds, + expectedCount, + ); }; it("Validate Parallel Query As String With maxDegreeOfParallelism = 0", async function () { @@ -275,7 +335,7 @@ describe("Cross Partition", function (this: Suite) { }); }); - it("Validate Parallel Query As String With maxDegreeOfParallelism: -1", async function () { + it("Validate-Parallel-Query As String With maxDegreeOfParallelism: -1", async function () { // simple order by query in string format const query = "SELECT * FROM root r"; const options: FeedOptions = { diff --git a/sdk/cosmosdb/cosmos/test/public/integration/fullTextSearch.spec.ts b/sdk/cosmosdb/cosmos/test/public/integration/fullTextSearch.spec.ts index 3824f247d0e7..315f42109f1e 100644 --- a/sdk/cosmosdb/cosmos/test/public/integration/fullTextSearch.spec.ts +++ b/sdk/cosmosdb/cosmos/test/public/integration/fullTextSearch.spec.ts @@ -1,9 +1,9 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -import { Suite } from "mocha"; +import type { Suite } from "mocha"; import assert from "assert"; -import { ContainerDefinition, Container } from "../../../src"; +import type { ContainerDefinition, Container } from "../../../src"; import { getTestContainer, removeAllDatabases, readAndParseJSONFile } from "../common/TestHelpers"; describe.skip("Validate full text search queries", function (this: Suite) { diff --git a/sdk/cosmosdb/cosmos/test/public/integration/split.spec.ts b/sdk/cosmosdb/cosmos/test/public/integration/split.spec.ts index 5d717541f38b..1aa802547f28 100644 --- a/sdk/cosmosdb/cosmos/test/public/integration/split.spec.ts +++ b/sdk/cosmosdb/cosmos/test/public/integration/split.spec.ts @@ -90,8 +90,10 @@ describe("Partition Splits", () => { // results in duplicates by trying to read from two partitions assert(resources.length >= documentDefinitions.length); }); - - it("split errors surface as 503", async () => { + // NOTE: This test is skipped because we have updated the contracts to not throw 410s. + // Previously, 410s were thrown from the parallelQueryExecutionContextBase constructor, + // but now they are handled in the fetchMore method. Therefore, this test is skipped and will be removed after reviews. + it.skip("split errors surface as 503", async () => { const options: CosmosClientOptions = { endpoint, key: masterKey }; const plugins: PluginConfig[] = [ { diff --git a/sdk/cosmosdb/cosmos/tsconfig.strict.json b/sdk/cosmosdb/cosmos/tsconfig.strict.json index e7d76b6ba3f8..3d4ce2ff3270 100644 --- a/sdk/cosmosdb/cosmos/tsconfig.strict.json +++ b/sdk/cosmosdb/cosmos/tsconfig.strict.json @@ -69,6 +69,7 @@ "src/queryExecutionContext/EndpointComponent/UnorderedDistinctEndpointComponent.ts", "src/queryExecutionContext/defaultQueryExecutionContext.ts", "src/queryExecutionContext/documentProducer.ts", + "src/queryExecutionContext/documentProducer2.ts", "src/queryExecutionContext/ExecutionContext.ts", "src/queryExecutionContext/headerUtils.ts", "src/queryExecutionContext/orderByDocumentProducerComparator.ts", @@ -160,7 +161,6 @@ "test/public/common/TestHelpers.ts", "test/public/integration/session.spec.ts", "test/internal/unit/auth.spec.ts", - "test/internal/unit/defaultQueryExecutionContext.spec.ts", "test/internal/unit/helper.spec.ts", "test/internal/unit/inMemoryCollectionRoutingMap.spec.ts", "test/internal/unit/platform.spec.ts", @@ -182,6 +182,7 @@ "test/internal/unit/nonStreamingOrderByMap.spec.ts", "test/internal/unit/utils/supportedQueryFeaturesBuilder.spec.ts", "test/internal/unit/getHeader.spec.ts", + "test/internal/unit/partitionMerge.spec.ts", "test/public/common/BaselineTest.PathParser.ts", "test/public/common/TestData.ts", "test/public/common/setup.ts", @@ -234,6 +235,9 @@ "test/public/functional/NonStreamingQueryPolicy.spec.ts", "test/public/functional/computedProperties.spec.ts", "test/internal/unit/utils/nonStreamingOrderByPriorityQueue.spec.ts", - "test/internal/unit/globalStatisticsAggregator.spec.ts" + "test/internal/unit/globalStatisticsAggregator.spec.ts", + "test/internal/unit/common/*.ts", + "test/internal/unit/query/*.ts", + "test/internal/unit/documentProducer.spec.ts" ] }