Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JS sdk changes for merge support #31354

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/cosmosdb/cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- Partition merge support: This feature adds support for Partition merge (preview) feature. Requests from JS SDK will not be blocked, when the feature is enabled. [docs](https://learn.microsoft.com/azure/cosmos-db/merge)

### Breaking Changes

### Bugs Fixed
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmosdb/cosmos/review/cosmos.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ export const Constants: {
EnableCrossPartitionQuery: string;
ParallelizeCrossPartitionQuery: string;
ResponseContinuationTokenLimitInKB: string;
SDKSupportedCapabilities: string;
PopulateQueryMetrics: string;
QueryMetrics: string;
PopulateIndexMetrics: string;
Expand Down
5 changes: 5 additions & 0 deletions sdk/cosmosdb/cosmos/src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ export const Constants = {
EnableCrossPartitionQuery: "x-ms-documentdb-query-enablecrosspartition",
ParallelizeCrossPartitionQuery: "x-ms-documentdb-query-parallelizecrosspartitionquery",
ResponseContinuationTokenLimitInKB: "x-ms-documentdb-responsecontinuationtokenlimitinkb",
SDKSupportedCapabilities: "x-ms-cosmos-sdk-supportedcapabilities",

// QueryMetrics
// Request header to tell backend to give you query metrics.
Expand Down Expand Up @@ -494,3 +495,7 @@ export enum QueryFeature {
CountIf = "CountIf",
HybridSearch = "HybridSearch",
}

export enum SDKSupportedCapabilities {
PartitionMerge = 1,
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ export class DocumentProducer {
public generation: number = 0;
private respHeaders: CosmosHeaders;
private internalExecutionContext: DefaultQueryExecutionContext;
public startEpk: string;
public endEpk: string;
public populateEpkRangeHeaders: boolean;

/**
* Provides the Target Partition Range Query Execution Context.
Expand All @@ -49,6 +52,9 @@ export class DocumentProducer {
targetPartitionKeyRange: PartitionKeyRange,
options: FeedOptions,
correlatedActivityId: string,
startEpk?: string,
endEpk?: string,
populateEpkRangeHeaders?: boolean,
ujjwalsoni1707 marked this conversation as resolved.
Show resolved Hide resolved
) {
// TODO: any options
this.collectionLink = collectionLink;
Expand All @@ -68,6 +74,9 @@ export class DocumentProducer {
this.fetchFunction,
correlatedActivityId,
);
this.startEpk = startEpk;
this.endEpk = endEpk;
this.populateEpkRangeHeaders = populateEpkRangeHeaders;
}
/**
* Synchronously gives the contiguous buffered results (stops at the first non result) if any
Expand Down Expand Up @@ -101,6 +110,8 @@ export class DocumentProducer {
const path = getPathFromLink(this.collectionLink, ResourceType.item);
diagnosticNode.addData({ partitionKeyRangeId: this.targetPartitionKeyRange.id });
const id = getIdFromLink(this.collectionLink);
const startEpk = this.populateEpkRangeHeaders ? this.startEpk : undefined;
const endEpk = this.populateEpkRangeHeaders ? this.endEpk : undefined;

return this.clientContext.queryFeed({
path,
Expand All @@ -112,6 +123,8 @@ export class DocumentProducer {
diagnosticNode,
partitionKeyRangeId: this.targetPartitionKeyRange["id"],
correlatedActivityId: correlatedActivityId,
startEpk: startEpk,
endEpk: endEpk,
});
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,15 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont
filteredPartitionKeyRanges.forEach((partitionTargetRange: any) => {
// TODO: any partitionTargetRange
// no async callback
const queryRange = QueryRange.parsePartitionKeyRange(partitionTargetRange);
targetPartitionQueryExecutionContextList.push(
this._createTargetPartitionQueryExecutionContext(partitionTargetRange),
this._createTargetPartitionQueryExecutionContext(
partitionTargetRange,
undefined,
queryRange.min,
queryRange.max,
false,
),
);
});

Expand Down Expand Up @@ -220,7 +227,7 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont
}

/**
* Gets the replacement ranges for a partitionkeyrange that has been split
* Gets the replacement ranges for a partitionkeyrange that has been split or merged
*/
private async _getReplacementPartitionKeyRanges(
documentProducer: DocumentProducer,
Expand Down Expand Up @@ -254,50 +261,76 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont
try {
const replacementPartitionKeyRanges: any[] =
await this._getReplacementPartitionKeyRanges(parentDocumentProducer);
const replacementDocumentProducers: DocumentProducer[] = [];
// Create the replacement documentProducers
replacementPartitionKeyRanges.forEach((partitionKeyRange) => {
// Create replacment document producers with the parent's continuationToken

if (replacementPartitionKeyRanges.length === 0) {
throw new Error("PartitionKeyRangeGone error but no replacement partition key ranges");
}

if (replacementPartitionKeyRanges.length === 1) {
ujjwalsoni1707 marked this conversation as resolved.
Show resolved Hide resolved
// Partition is gone due to Merge
// Create the replacement documentProducer with populateEpkRangeHeaders Flag set to true to set startEpk and endEpk headers
const replacementDocumentProducer = this._createTargetPartitionQueryExecutionContext(
partitionKeyRange,
replacementPartitionKeyRanges[0],
parentDocumentProducer.continuationToken,
parentDocumentProducer.startEpk,
parentDocumentProducer.endEpk,
true,
);
replacementDocumentProducers.push(replacementDocumentProducer);
});
// We need to check if the documentProducers even has anything left to fetch from before enqueing them
const checkAndEnqueueDocumentProducer = async (
documentProducerToCheck: DocumentProducer,
checkNextDocumentProducerCallback: any,
): Promise<void> => {
try {
const { result: afterItem } = await documentProducerToCheck.current(diagnosticNode);
if (afterItem === undefined) {
// no more results left in this document producer, so we don't enqueue it
// Enqueue the document producer and reexecutes the originFunction with the corrrected executionContext
this.orderByPQ.enq(replacementDocumentProducer);
return originFunction();
} else {
// Partition is gone due to Split
const replacementDocumentProducers: DocumentProducer[] = [];
// Create the replacement documentProducers with populateEpkRangeHeaders Flag set to false
replacementPartitionKeyRanges.forEach((partitionKeyRange) => {
// Create replacment document producers with the parent's continuationToken
const queryRange = QueryRange.parsePartitionKeyRange(partitionKeyRange);
const replacementDocumentProducer = this._createTargetPartitionQueryExecutionContext(
partitionKeyRange,
parentDocumentProducer.continuationToken,
queryRange.min,
queryRange.max,
false,
);
replacementDocumentProducers.push(replacementDocumentProducer);
});

// We need to check if the documentProducers even has anything left to fetch from before enqueing them
const checkAndEnqueueDocumentProducer = async (
documentProducerToCheck: DocumentProducer,
checkNextDocumentProducerCallback: any,
): Promise<void> => {
try {
const { result: afterItem } = await documentProducerToCheck.current(diagnosticNode);
if (afterItem === undefined) {
// no more results left in this document producer, so we don't enqueue it
} else {
// Safe to put document producer back in the queue
this.orderByPQ.enq(documentProducerToCheck);
}

await checkNextDocumentProducerCallback();
} catch (err: any) {
this.err = err;
return;
}
};
const checkAndEnqueueDocumentProducers = async (rdp: DocumentProducer[]): Promise<any> => {
if (rdp.length > 0) {
// We still have a replacementDocumentProducer to check
const replacementDocumentProducer = rdp.shift();
await checkAndEnqueueDocumentProducer(replacementDocumentProducer, async () => {
await checkAndEnqueueDocumentProducers(rdp);
});
} else {
// Safe to put document producer back in the queue
this.orderByPQ.enq(documentProducerToCheck);
// reexecutes the originFunction with the corrrected executionContext
return originFunction();
}

await checkNextDocumentProducerCallback();
} catch (err: any) {
this.err = err;
return;
}
};
const checkAndEnqueueDocumentProducers = async (rdp: DocumentProducer[]): Promise<any> => {
if (rdp.length > 0) {
// We still have a replacementDocumentProducer to check
const replacementDocumentProducer = rdp.shift();
await checkAndEnqueueDocumentProducer(replacementDocumentProducer, async () => {
await checkAndEnqueueDocumentProducers(rdp);
});
} else {
// reexecutes the originFunction with the corrrected executionContext
return originFunction();
}
};
// Invoke the recursive function to get the ball rolling
await checkAndEnqueueDocumentProducers(replacementDocumentProducers);
};
// Invoke the recursive function to get the ball rolling
await checkAndEnqueueDocumentProducers(replacementDocumentProducers);
}
} catch (err: any) {
this.err = err;
throw err;
Expand All @@ -324,13 +357,13 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont
elseCallback: any,
): Promise<void> {
const documentProducer = this.orderByPQ.peek();
// Check if split happened
// Check if split or merge happened
try {
await documentProducer.current(diagnosticNode);
elseCallback();
} catch (err: any) {
if (ParallelQueryExecutionContextBase._needPartitionKeyRangeCacheRefresh(err)) {
// Split has happened so we need to repair execution context before continueing
// Split or merge has happened so we need to repair execution context before continueing
return addDignosticChild(
(childNode) => this._repairExecutionContext(childNode, ifCallback),
diagnosticNode,
Expand Down Expand Up @@ -509,6 +542,9 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont
private _createTargetPartitionQueryExecutionContext(
partitionKeyTargetRange: any,
continuationToken?: any,
startEpk?: string,
endEpk?: string,
populateEpkRangeHeaders?: boolean,
): DocumentProducer {
// TODO: any
// creates target partition range Query Execution Context
Expand Down Expand Up @@ -539,6 +575,9 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont
partitionKeyTargetRange,
options,
this.correlatedActivityId,
startEpk,
endEpk,
populateEpkRangeHeaders,
);
}
}
11 changes: 10 additions & 1 deletion sdk/cosmosdb/cosmos/src/request/request.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import { setAuthorizationHeader } from "../auth";
import { Constants, HTTPMethod, jsonStringifyAndEscapeNonASCII, ResourceType } from "../common";
import {
Constants,
HTTPMethod,
jsonStringifyAndEscapeNonASCII,
ResourceType,
SDKSupportedCapabilities,
} from "../common";
import type { CosmosClientOptions } from "../CosmosClientOptions";
import type { PartitionKeyInternal } from "../documents";
import type { CosmosHeaders } from "../queryExecutionContext";
Expand Down Expand Up @@ -68,6 +74,9 @@ export async function getHeaders({
...defaultHeaders,
};

// Adding SDKSupportedCapabilities header to hint that SDK supports partition merge
headers[Constants.HttpHeaders.SDKSupportedCapabilities] = SDKSupportedCapabilities.PartitionMerge;

if (useMultipleWriteLocations) {
headers[Constants.HttpHeaders.ALLOW_MULTIPLE_WRITES] = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import {
DocumentProducer,
ExecutionContext,
ParallelQueryExecutionContextBase,
} from "../../../../src/queryExecutionContext";

export class TestParallelQueryExecutionContext
extends ParallelQueryExecutionContextBase
implements ExecutionContext
{
public documentProducerComparator(
docProd1: DocumentProducer,
docProd2: DocumentProducer,
): number {
return docProd1.generation - docProd2.generation;
}
}
Loading
Loading