Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/query pipeline rewrite 2 #32578

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions sdk/cosmosdb/cosmos/review/cosmos.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,7 @@ export interface FeedOptions extends SharedOptions {
continuationToken?: string;
continuationTokenLimitInKB?: number;
disableNonStreamingOrderByQuery?: boolean;
enableQueryControl?: boolean;
enableScanInQuery?: boolean;
forceQueryPlan?: boolean;
maxDegreeOfParallelism?: number;
Expand Down
36 changes: 36 additions & 0 deletions sdk/cosmosdb/cosmos/src/common/Stack.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

export class Stack<T> {
private items: T[] = [];

// Push an element onto the stack
push(element: T): void {
this.items.push(element);
}

// Pop an element off the stack
pop(): T | undefined {
return this.items.pop();
}

// Peek at the top element of the stack
peek(): T | undefined {
return this.items[this.items.length - 1];
}

// Check if the stack is empty
isEmpty(): boolean {
return this.items.length === 0;
}

// Get the size of the stack
size(): number {
return this.items.length;
}

// Clear the stack
clear(): void {
this.items = [];
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

import { GlobalStatistics } from "../../request/globalStatistics";
import { Aggregator } from "./Aggregator";
import type { GlobalStatistics } from "../../request/globalStatistics";
import type { Aggregator } from "./Aggregator";

export class GlobalStatisticsAggregator implements Aggregator {
private globalStatistics: GlobalStatistics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ import { getInitialHeader, mergeHeaders } from "../headerUtils";
import { emptyGroup, extractAggregateResult } from "./emptyGroup";
import type { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal";

interface GroupByResponse {
result: GroupByResult;
headers: CosmosHeaders;
}

interface GroupByResult {
groupByItems: any[];
payload: any;
Expand All @@ -32,52 +27,51 @@ export class GroupByEndpointComponent implements ExecutionContext {
private readonly aggregateResultArray: any[] = [];
private completed: boolean = false;

public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
// If we have a full result set, begin returning results
if (this.aggregateResultArray.length > 0) {
return {
result: this.aggregateResultArray.pop(),
headers: getInitialHeader(),
};
}
public hasMoreResults(): boolean {
return this.executionContext.hasMoreResults();
}

public async fetchMore(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
if (this.completed) {
return {
result: undefined,
headers: getInitialHeader(),
};
}

const aggregateHeaders = getInitialHeader();
const response = await this.executionContext.fetchMore(diagnosticNode);
mergeHeaders(aggregateHeaders, response.headers);

while (this.executionContext.hasMoreResults()) {
// Grab the next result
const { result, headers } = (await this.executionContext.nextItem(
diagnosticNode,
)) as GroupByResponse;
mergeHeaders(aggregateHeaders, headers);
if (response === undefined || response.result === undefined) {
// If there are any groupings, consolidate and return them
if (this.groupings.size > 0) {
return this.consolidateGroupResults(aggregateHeaders);
}
return { result: undefined, headers: aggregateHeaders };
}

for (const item of response.result as GroupByResult[]) {
// If it exists, process it via aggregators
if (result) {
const group = result.groupByItems ? await hashObject(result.groupByItems) : emptyGroup;
if (item) {
const group = item.groupByItems ? await hashObject(item.groupByItems) : emptyGroup;
const aggregators = this.groupings.get(group);
const payload = result.payload;
const payload = item.payload;
if (aggregators) {
// Iterator over all results in the payload
Object.keys(payload).map((key) => {
for (const key of Object.keys(payload)) {
// in case the value of a group is null make sure we create a dummy payload with item2==null
const effectiveGroupByValue = payload[key]
? payload[key]
: new Map().set("item2", null);
const aggregateResult = extractAggregateResult(effectiveGroupByValue);
aggregators.get(key).aggregate(aggregateResult);
});
}
} else {
// This is the first time we have seen a grouping. Setup the initial result without aggregate values
const grouping = new Map();
this.groupings.set(group, grouping);
// Iterator over all results in the payload
Object.keys(payload).map((key) => {
for (const key of Object.keys(payload)) {
const aggregateType = this.queryInfo.groupByAliasToAggregateType[key];
// Create a new aggregator for this specific aggregate field
const aggregator = createAggregator(aggregateType);
Expand All @@ -88,11 +82,22 @@ export class GroupByEndpointComponent implements ExecutionContext {
} else {
aggregator.aggregate(payload[key]);
}
});
}
}
}
}

if (this.executionContext.hasMoreResults()) {
return {
result: [],
headers: aggregateHeaders,
};
} else {
return this.consolidateGroupResults(aggregateHeaders);
}
}

private consolidateGroupResults(aggregateHeaders: CosmosHeaders): Response<any> {
for (const grouping of this.groupings.values()) {
const groupResult: any = {};
for (const [aggregateKey, aggregator] of grouping.entries()) {
Expand All @@ -101,13 +106,6 @@ export class GroupByEndpointComponent implements ExecutionContext {
this.aggregateResultArray.push(groupResult);
}
this.completed = true;
return {
result: this.aggregateResultArray.pop(),
headers: aggregateHeaders,
};
}

public hasMoreResults(): boolean {
return this.executionContext.hasMoreResults() || this.aggregateResultArray.length > 0;
return { result: this.aggregateResultArray, headers: aggregateHeaders };
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ import { getInitialHeader, mergeHeaders } from "../headerUtils";
import { emptyGroup, extractAggregateResult } from "./emptyGroup";
import type { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal";

interface GroupByResponse {
result: GroupByResult;
headers: CosmosHeaders;
}

interface GroupByResult {
groupByItems: any[];
payload: any;
Expand All @@ -36,39 +31,36 @@ export class GroupByValueEndpointComponent implements ExecutionContext {
this.aggregateType = this.queryInfo.aggregates[0];
}

public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
// Start returning results if we have processed a full results set
if (this.aggregateResultArray.length > 0) {
return {
result: this.aggregateResultArray.pop(),
headers: getInitialHeader(),
};
}
public hasMoreResults(): boolean {
return this.executionContext.hasMoreResults();
}

public async fetchMore(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
if (this.completed) {
return {
result: undefined,
headers: getInitialHeader(),
};
}

const aggregateHeaders = getInitialHeader();
const response = await this.executionContext.fetchMore(diagnosticNode);
mergeHeaders(aggregateHeaders, response.headers);

while (this.executionContext.hasMoreResults()) {
// Grab the next result
const { result, headers } = (await this.executionContext.nextItem(
diagnosticNode,
)) as GroupByResponse;
mergeHeaders(aggregateHeaders, headers);
if (response === undefined || response.result === undefined) {
if (this.aggregators.size > 0) {
return this.generateAggregateResponse(aggregateHeaders);
}
return { result: undefined, headers: aggregateHeaders };
}

// If it exists, process it via aggregators
if (result) {
for (const item of response.result as GroupByResult[]) {
if (item) {
let grouping: string = emptyGroup;
let payload: any = result;
if (result.groupByItems) {
let payload: any = item;
if (item.groupByItems) {
// If the query contains a GROUP BY clause, it will have a payload property and groupByItems
payload = result.payload;
grouping = await hashObject(result.groupByItems);
payload = item.payload;
grouping = await hashObject(item.groupByItems);
}

const aggregator = this.aggregators.get(grouping);
Expand Down Expand Up @@ -99,18 +91,26 @@ export class GroupByValueEndpointComponent implements ExecutionContext {
headers: aggregateHeaders,
};
}
// If no results are left in the underlying execution context, convert our aggregate results to an array

if (this.executionContext.hasMoreResults()) {
return { result: [], headers: aggregateHeaders };
} else {
// If no results are left in the underlying execution context, convert our aggregate results to an array
return this.generateAggregateResponse(aggregateHeaders);
}
}

private generateAggregateResponse(aggregateHeaders: CosmosHeaders): Response<any> {
for (const aggregator of this.aggregators.values()) {
this.aggregateResultArray.push(aggregator.getResult());
const result = aggregator.getResult();
if (result !== undefined) {
this.aggregateResultArray.push(result);
}
}
this.completed = true;
return {
result: this.aggregateResultArray.pop(),
result: this.aggregateResultArray,
headers: aggregateHeaders,
};
}

public hasMoreResults(): boolean {
return this.executionContext.hasMoreResults() || this.aggregateResultArray.length > 0;
}
}
Loading
Loading