Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] add listSessions() to service bus receivers #31442

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

### Features Added

- Adds `deleteMessages` which deletes messages from the queue.
- Add `deleteMessages` which deletes messages from the queue.
- Add the experimental diagnostic feature `omitMessageBody` via `PeekMessagesOptions` under `./experimental` subpath export.
- Add `listSessions` to receivers.

### Breaking Changes

Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { MessagingError } from '@azure/core-amqp';
import type { NamedKeyCredential } from '@azure/core-auth';
import { OperationOptions } from '@azure/core-client';
import type { OperationTracingOptions } from '@azure/core-tracing';
import type { PagedAsyncIterableIterator } from '@azure/core-paging';
import { PagedAsyncIterableIterator } from '@azure/core-paging';
import type { PageSettings } from '@azure/core-paging';
import type { ProxySettings } from '@azure/core-rest-pipeline';
import { RetryMode } from '@azure/core-amqp';
Expand Down Expand Up @@ -497,6 +497,7 @@ export interface ServiceBusReceiver {
getMessageIterator(options?: GetMessageIteratorOptions): AsyncIterableIterator<ServiceBusReceivedMessage>;
identifier: string;
isClosed: boolean;
listSessions(options?: OperationOptions): PagedAsyncIterableIterator<string>;
peekMessages(maxMessageCount: number, options?: PeekMessagesOptions): Promise<ServiceBusReceivedMessage[]>;
purgeMessages(options?: PurgeMessagesOptions): Promise<number>;
receiveDeferredMessages(sequenceNumbers: Long | Long[], options?: OperationOptionsBase): Promise<ServiceBusReceivedMessage[]>;
Expand Down
19 changes: 17 additions & 2 deletions sdk/servicebus/service-bus/samples-dev/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ export async function main() {
await sendMessage(sbClient, listOfScientists[8], "session-2");
await sendMessage(sbClient, listOfScientists[9], "session-2");

await listSessions(sbClient);

await receiveMessages(sbClient, "session-1");
await receiveMessages(sbClient, "session-2");
} finally {
Expand Down Expand Up @@ -111,7 +113,7 @@ async function receiveMessages(sbClient: ServiceBusClient, sessionId: string) {
endDate = now + 20000;
}

let remainingTime: number = endDate - now;
const remainingTime: number = endDate - now;

console.log(`Waiting for ${remainingTime} milliseconds for messages to arrive.`);

Expand All @@ -123,13 +125,26 @@ async function receiveMessages(sbClient: ServiceBusClient, sessionId: string) {

await receiver.close();
break;
} catch (err: any) {
} catch {
// `err` was already logged part of `processError` above.
await receiver.close();
}
}
}

async function listSessions(sbClient: ServiceBusClient) {
const receiver = sbClient.createReceiver(queueName);
// also available on session receivers
// const receiver = await sbClient.acceptNextSession(queueName);

const sessionIterator = receiver.listSessions();
console.log(`Listing sessions:`);
for await (const id of sessionIterator) {
console.log(` ${id}`);
}
await receiver.close();
}

main().catch((err) => {
console.log("Session Sample - Error occurred: ", err);
process.exit(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
"dependencies": {
"@azure/service-bus": "latest",
"dotenv": "latest",
"@azure/identity": "^4.2.1",
"@azure/identity": "^4.4.1",
"ws": "^8.0.0",
"https-proxy-agent": "^7.0.0",
"@azure/core-auth": "^1.3.0"
"@azure/core-auth": "^1.8.0"
}
}
19 changes: 17 additions & 2 deletions sdk/servicebus/service-bus/samples/v7/javascript/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ async function main() {
await sendMessage(sbClient, listOfScientists[8], "session-2");
await sendMessage(sbClient, listOfScientists[9], "session-2");

await listSessions(sbClient);

await receiveMessages(sbClient, "session-1");
await receiveMessages(sbClient, "session-2");
} finally {
Expand Down Expand Up @@ -109,7 +111,7 @@ async function receiveMessages(sbClient, sessionId) {
endDate = now + 20000;
}

let remainingTime = endDate - now;
const remainingTime = endDate - now;

console.log(`Waiting for ${remainingTime} milliseconds for messages to arrive.`);

Expand All @@ -121,13 +123,26 @@ async function receiveMessages(sbClient, sessionId) {

await receiver.close();
break;
} catch (err) {
} catch {
// `err` was already logged part of `processError` above.
await receiver.close();
}
}
}

async function listSessions(sbClient) {
const receiver = sbClient.createReceiver(queueName);
// also available on session receivers
// const receiver = await sbClient.acceptNextSession(queueName);

const sessionIterator = receiver.listSessions();
console.log(`Listing sessions:`);
for await (const id of sessionIterator) {
console.log(` ${id}`);
}
await receiver.close();
}

main().catch((err) => {
console.log("Session Sample - Error occurred: ", err);
process.exit(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
"dependencies": {
"@azure/service-bus": "latest",
"dotenv": "latest",
"@azure/identity": "^4.2.1",
"@azure/identity": "^4.4.1",
"ws": "^8.0.0",
"https-proxy-agent": "^7.0.0",
"@azure/core-auth": "^1.3.0"
"@azure/core-auth": "^1.8.0"
},
"devDependencies": {
"@types/ws": "^7.2.4",
Expand Down
19 changes: 17 additions & 2 deletions sdk/servicebus/service-bus/samples/v7/typescript/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ export async function main() {
await sendMessage(sbClient, listOfScientists[8], "session-2");
await sendMessage(sbClient, listOfScientists[9], "session-2");

await listSessions(sbClient);

await receiveMessages(sbClient, "session-1");
await receiveMessages(sbClient, "session-2");
} finally {
Expand Down Expand Up @@ -110,7 +112,7 @@ async function receiveMessages(sbClient: ServiceBusClient, sessionId: string) {
endDate = now + 20000;
}

let remainingTime: number = endDate - now;
const remainingTime: number = endDate - now;

console.log(`Waiting for ${remainingTime} milliseconds for messages to arrive.`);

Expand All @@ -122,13 +124,26 @@ async function receiveMessages(sbClient: ServiceBusClient, sessionId: string) {

await receiver.close();
break;
} catch (err: any) {
} catch {
// `err` was already logged part of `processError` above.
await receiver.close();
}
}
}

async function listSessions(sbClient: ServiceBusClient) {
const receiver = sbClient.createReceiver(queueName);
// also available on session receivers
// const receiver = await sbClient.acceptNextSession(queueName);

const sessionIterator = receiver.listSessions();
console.log(`Listing sessions:`);
for await (const id of sessionIterator) {
console.log(` ${id}`);
}
await receiver.close();
}

main().catch((err) => {
console.log("Session Sample - Error occurred: ", err);
process.exit(1);
Expand Down
60 changes: 59 additions & 1 deletion sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ import type { ListRequestOptions } from "../serviceBusAtomManagementClient.js";
*/
export interface SendManagementRequestOptions extends SendRequestOptions {
/**
* The name of the sender or receiver link associated with the managmenet operations.
* The name of the sender or receiver link associated with the managemnet operations.
* This is used for service side optimization.
*/
associatedLinkName?: string;
Expand Down Expand Up @@ -1553,6 +1553,64 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
}
}

/**
* Get sessions.
* @returns A list of session ids.
*/
async getSessions(
options?: ListRequestOptions & OperationOptionsBase & SendManagementRequestOptions,
): Promise<string[]> {
throwErrorIfConnectionClosed(this._context);
try {
const updatedOptions = (await this.initWithUniqueReplyTo(options)) as ListRequestOptions &
OperationOptionsBase &
SendManagementRequestOptions;
const messageBody: any = {
top: updatedOptions?.maxCount
? types.wrap_int(updatedOptions.maxCount)
: types.wrap_int(max32BitNumber),
skip: updatedOptions?.skip ? types.wrap_int(updatedOptions.skip) : types.wrap_int(0),
};
messageBody["last-updated-time"] = new Date(253402300800000);
const request: RheaMessage = {
body: messageBody,
reply_to: this.replyTo,
application_properties: {
operation: Constants.operations.enumerateSessions,
},
};
request.application_properties![Constants.trackingId] = generate_uuid();

managementClientLogger.verbose(
"%s Get sessions request body: %O.",
this.logPrefix,
request.body,
);
const response = await this._makeManagementRequest(
request,
managementClientLogger,
updatedOptions,
);
// Reference: https://learn.microsoft.com/azure/service-bus-messaging/service-bus-amqp-request-response#response-8
if (
response.application_properties!.statusCode === 204 ||
!response.body ||
!Array.isArray(response.body["sessions-ids"])
) {
return [];
}

return response.body["sessions-ids"];
} catch (err: any) {
const error = translateServiceBusError(err);
managementClientLogger.logError(
error,
`${this.logPrefix} An error occurred while sending the get sessions request to $management endpoint`,
);
throw error;
}
}

protected removeLinkFromContext(): void {
delete this._context.managementClients[this.name];
}
Expand Down
54 changes: 54 additions & 0 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
deadLetterMessage,
deferMessage,
getMessageIterator,
getSessions,
} from "./receiverCommon.js";
import type Long from "long";
import type { ServiceBusMessageImpl, DeadLetterOptions } from "../serviceBusMessage.js";
Expand All @@ -45,6 +46,12 @@ import { ensureValidIdentifier } from "../util/utils.js";
import { toSpanOptions, tracingClient } from "../diagnostics/tracing.js";
import { extractSpanContextFromServiceBusMessage } from "../diagnostics/instrumentServiceBusMessage.js";
import type { TracingSpanLink } from "@azure/core-tracing";
import {
getPagedAsyncIterator,
type PagedAsyncIterableIterator,
type PagedResult,
} from "@azure/core-paging";
import type { OperationOptions } from "@azure/core-client";

/**
* The default time to wait for messages _after_ the first message
Expand Down Expand Up @@ -298,6 +305,15 @@ export interface ServiceBusReceiver {
* @throws ServiceBusError if the service returns an error while renewing message lock.
*/
renewMessageLock(message: ServiceBusReceivedMessage): Promise<Date>;

/**
* Returns an async iterable iterator to list all the sessions in a messaging entity.
*
* .byPage() returns an async iterable iterator to list the session id's in pages.
*
* @returns An asyncIterableIterator that supports paging.
*/
listSessions(options?: OperationOptions): PagedAsyncIterableIterator<string>;
}

/**
Expand Down Expand Up @@ -722,6 +738,44 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
);
}

/**
* Returns an async iterable iterator to list all the sessions in a messaging entity.
*
* .byPage() returns an async iterable iterator to list the session id's in pages.
*
* @returns An asyncIterableIterator that supports paging.
*/
listSessions(
options?: OperationOptions,
): PagedAsyncIterableIterator<string, string[], { maxPageSize?: number }> {
logger.verbose(`Performing operation - listSessions() with options: %j`, options);
const pagedResult: PagedResult<string[], { maxPageSize?: number }, number> = {
firstPageLink: 0,
getPage: async (pageLink, maxPageSize) => {
const top = maxPageSize ?? 100;
const sessions = await getSessions(
this._context,
this.entityPath,
this._getAssociatedReceiverName(),
this._retryOptions,
{
skip: pageLink,
maxCount: top,
...options,
},
);
return sessions.length
? {
page: sessions,
nextPageLink: sessions.length > 0 ? pageLink + sessions.length : undefined,
}
: undefined;
},
};

return getPagedAsyncIterator(pagedResult);
}

async close(): Promise<void> {
try {
this._isClosed = true;
Expand Down
39 changes: 39 additions & 0 deletions sdk/servicebus/service-bus/src/receivers/receiverCommon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import { delay, isDefined } from "@azure/core-util";
import type { TracingSpanLink } from "@azure/core-tracing";
import { toSpanOptions, tracingClient } from "../diagnostics/tracing.js";
import { extractSpanContextFromServiceBusMessage } from "../diagnostics/instrumentServiceBusMessage.js";
import type { ListRequestOptions } from "../serviceBusAtomManagementClient.js";
import type { OperationOptions } from "@azure/core-client";

/**
* @internal
Expand Down Expand Up @@ -323,6 +325,43 @@ export async function settleMessageOperation(
});
}

/**
* @internal
*
* Get all sessions associated with an entity.
*/
export async function getSessions(
// eslint-disable-next-line @azure/azure-sdk/ts-use-interface-parameters
context: ConnectionContext,
entityPath: string,
receiverName: string | undefined,
retryOptions: RetryOptions,
options?: ListRequestOptions & OperationOptions,
): Promise<string[]> {
return tracingClient.withSpan(
"ServiceBusReceiver.getSessions",
options ?? {},
async (updatedOptions) => {
const operationPromise = async (): Promise<string[]> => {
return context.getManagementClient(entityPath).getSessions({
...updatedOptions,
associatedLinkName: receiverName,
requestName: "getSessions",
timeoutInMs: retryOptions.timeoutInMs,
});
};
const config: RetryConfig<string[]> = {
operation: operationPromise,
connectionId: context.connectionId,
operationType: RetryOperationType.management,
retryOptions: retryOptions,
abortSignal: updatedOptions?.abortSignal,
};
return retry<string[]>(config);
},
);
}

/** @internal */
export interface RetryForeverArgs<T> {
retryConfig: RetryConfig<T>;
Expand Down
Loading
Loading