Skip to content

Commit

Permalink
fix(redis): properly close redis connections (#109)
Browse files Browse the repository at this point in the history
Signed-off-by: Radek Ježek <[email protected]>
  • Loading branch information
jezekra1 authored Dec 9, 2024
1 parent d66dd42 commit d274908
Show file tree
Hide file tree
Showing 13 changed files with 395 additions and 282 deletions.
16 changes: 16 additions & 0 deletions migrations/Migration20241126122701.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/**
* Copyright 2024 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Migration } from '@mikro-orm/migrations-mongodb';

import { User } from '@/users/entities/user.entity';
Expand Down
16 changes: 16 additions & 0 deletions migrations/Migration20241206091921.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/**
* Copyright 2024 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Migration } from '@mikro-orm/migrations-mongodb';
import { Ref, ref } from '@mikro-orm/core';

Expand Down
34 changes: 27 additions & 7 deletions src/jobs/bullmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { globby } from 'globby';
import { DefaultJobOptions, Job, Queue, Worker, WorkerOptions } from 'bullmq';
import { isTruthy } from 'remeda';

import { createClient } from '../redis.js';
import { defaultRedisConnectionOptions } from '../redis.js';
import { getLogger } from '../logger.js';
import { gateway } from '../metrics.js';

Expand All @@ -42,10 +42,11 @@ const getQueueLogger = (queueName: string, job?: Job) =>

const logger = getLogger();

const connection = createClient({
const connectionOpts = {
...defaultRedisConnectionOptions,
// https://docs.bullmq.io/guide/going-to-production#maxretriesperrequest
maxRetriesPerRequest: null
});
};

const defaultJobOptions = {
removeOnComplete: true,
Expand Down Expand Up @@ -89,14 +90,16 @@ interface CreateQueueInput<T, U> {
jobHandler?: (job: Job<T>) => Promise<U>;
}

const Queues = new Map<QueueName, Queue>();

export function createQueue<T, U>({
name,
jobsOptions,
workerOptions,
jobHandler
}: CreateQueueInput<T, U>) {
const queue = new Queue<T, U>(name, {
connection: connection.options,
connection: connectionOpts,
defaultJobOptions: jobsOptions ? { ...defaultJobOptions, ...jobsOptions } : defaultJobOptions
});

Expand All @@ -113,12 +116,13 @@ export function createQueue<T, U>({
// We need to set autorun to false otherwise the worker might pick up stuff while ORM is not ready
autorun: false,
...workerOptions,
connection: connection.options
connection: connectionOpts
}
);
addCallbacks(worker, queue);
Workers.set(name, worker);
}
Queues.set(name, queue);

return { queue };
}
Expand All @@ -133,9 +137,25 @@ export async function runWorkers(queueNames: QueueName[]) {
logger.info({ queueNames }, `Workers started successfully`);
}

export async function closeAllQueues() {
await Promise.all(
[...Queues.values()].map(async (queue) => {
if (!(await queue.isPaused())) {
await queue.close();
}
})
);
logger.info('Queues shutdown successfully');
}

export async function closeAllWorkers() {
await Promise.all([...Workers.values()].map((worker) => worker.close()));
connection.quit();
await Promise.all(
[...Workers.values()].map(async (worker) => {
if (!worker.isPaused()) {
await worker.close();
}
})
);
logger.info('Workers shutdown successfully');
}

Expand Down
21 changes: 11 additions & 10 deletions src/rate-limit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@ import { FastifyPluginAsync, FastifyRequest } from 'fastify';
import fp from 'fastify-plugin';
import rateLimit, { errorResponseBuilderContext } from '@fastify/rate-limit';

import { createClient } from './redis.js';
import { closeClient, createRedisClient } from './redis.js';
import { AuthSecret, determineAuthType, scryptSecret } from './auth/utils.js';
import { toErrorResponseDto } from './errors/plugin.js';
import { APIError, APIErrorCode } from './errors/error.entity.js';

export const rateLimitPlugin: FastifyPluginAsync = fp.default(async (app) => {
const redis = createClient({
/**
* "The default parameters of a redis connection are not the fastest to provide a rate-limit. We suggest to customize the connectTimeout and maxRetriesPerRequest.
* Source: https://github.com/fastify/fastify-rate-limit
*/
connectTimeout: 1000, // 500 was too low, getting ETIMEDOUT
maxRetriesPerRequest: 1
});
const redis = createRedisClient({
/**
* "The default parameters of a redis connection are not the fastest to provide a rate-limit. We suggest to customize the connectTimeout and maxRetriesPerRequest.
* Source: https://github.com/fastify/fastify-rate-limit
*/
connectTimeout: 1000, // 500 was too low, getting ETIMEDOUT
maxRetriesPerRequest: 1
});

export const rateLimitPlugin: FastifyPluginAsync = fp.default(async (app) => {
await app.register(rateLimit, {
global: true,
max: 25,
Expand Down Expand Up @@ -67,4 +67,5 @@ export const rateLimitPlugin: FastifyPluginAsync = fp.default(async (app) => {
}
}
});
app.addHook('onClose', () => closeClient(redis));
});
68 changes: 47 additions & 21 deletions src/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,57 @@
*/

import { Redis, RedisOptions } from 'ioredis';
import { parseURL } from 'ioredis/built/utils';

import { REDIS_CA_CERT, REDIS_CACHE_CA_CERT, REDIS_CACHE_URL, REDIS_URL } from './config.js';

export function createClient(opts?: Partial<RedisOptions>): Redis {
const client = new Redis(REDIS_URL, {
tls:
REDIS_URL.startsWith('rediss') && REDIS_CA_CERT
? {
ca: Buffer.from(REDIS_CA_CERT)
}
: undefined,
...opts
});
return client;
export const defaultRedisConnectionOptions: RedisOptions = {
...parseURL(REDIS_URL),
tls:
REDIS_URL.startsWith('rediss') && REDIS_CA_CERT ? { ca: Buffer.from(REDIS_CA_CERT) } : undefined
};

export const defaultRedisCacheConnectionOptions: RedisOptions = {
...parseURL(REDIS_CACHE_URL),
tls:
REDIS_CACHE_URL.startsWith('rediss') && REDIS_CACHE_CA_CERT
? { ca: Buffer.from(REDIS_CACHE_CA_CERT) }
: undefined,
connectTimeout: 1000,
maxRetriesPerRequest: 1
};

export const sharedRedisClient = new Redis(defaultRedisConnectionOptions);
export const sharedRedisCacheClient = new Redis(defaultRedisCacheConnectionOptions);

const CLIENTS: Redis[] = [sharedRedisClient, sharedRedisCacheClient];

export async function withRedisClient<R>(
asyncCallback: (redis: Redis) => Promise<R>,
opts?: Partial<RedisOptions>
) {
const client = new Redis(REDIS_URL, { ...defaultRedisConnectionOptions, ...opts });
try {
return await asyncCallback(client);
} finally {
await closeClient(client);
}
}

export function createCacheClient(opts?: Partial<RedisOptions>): Redis {
const client = new Redis(REDIS_CACHE_URL, {
tls:
REDIS_URL.startsWith('rediss') && REDIS_CACHE_CA_CERT
? {
ca: Buffer.from(REDIS_CACHE_CA_CERT)
}
: undefined,
...opts
});
export function createRedisClient(opts?: Partial<RedisOptions>) {
const client = new Redis({ ...defaultRedisConnectionOptions, ...opts });
CLIENTS.push(client);
return client;
}

export async function closeClient(client: Redis) {
if (client.status !== 'end') {
await new Promise<void>((resolve) => {
client.quit(() => resolve());
});
}
}

export async function closeAllClients() {
await Promise.all(CLIENTS.map((client) => closeClient(client)));
}
94 changes: 48 additions & 46 deletions src/runs/execution/event-handlers/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { FrameworkError, Version } from 'bee-agent-framework';
import { EventMeta, Emitter, Callback } from 'bee-agent-framework/emitter/emitter';
import { Callback, Emitter, EventMeta } from 'bee-agent-framework/emitter/emitter';
import { ref } from '@mikro-orm/core';
import { Role } from 'bee-agent-framework/llms/primitives/message';
import { BeeCallbacks } from 'bee-agent-framework/agents/bee/types';
Expand Down Expand Up @@ -43,11 +43,11 @@ import { RunStatus } from '@/runs/entities/run.entity.js';
import { APIError } from '@/errors/error.entity.js';
import { jobRegistry } from '@/metrics.js';
import { EmitterEvent } from '@/run-steps/entities/emitter-event.entity';
import { createClient } from '@/redis.js';
import { createApproveChannel, toRunDto } from '@/runs/runs.service';
import { RequiredToolApprove } from '@/runs/entities/requiredToolApprove.entity';
import { ToolApprovalType } from '@/runs/entities/toolApproval.entity';
import { ToolType } from '@/tools/entities/tool/tool.entity';
import { withRedisClient } from '@/redis.js';

const agentToolExecutionTime = new Summary({
name: 'agent_tool_execution_time_seconds',
Expand Down Expand Up @@ -111,50 +111,52 @@ export function createBeeStreamingHandler(ctx: AgentContext) {
: toolCall.type)
)?.requireApproval === ToolApprovalType.ALWAYS
) {
const client = createClient();
await new Promise((resolve, reject) => {
client.subscribe(createApproveChannel(ctx.run, toolCall), async (err) => {
try {
if (err) {
reject(err);
} else {
ctx.run.requireAction(
new RequiredToolApprove({
toolCalls: [...(ctx.run.requiredAction?.toolCalls ?? []), toolCall]
})
);
await ORM.em.flush();
await ctx.publish({
event: 'thread.run.requires_action',
data: toRunDto(ctx.run)
});
await ctx.publish({
event: 'done',
data: '[DONE]'
});
}
} catch (err) {
reject(err);
}
});
client.on('message', async (_, approval) => {
try {
ctx.run.submitAction();
await ORM.em.flush();
if (approval !== 'true') {
reject(
new ToolError('User has not approved this tool to run.', [], {
isFatal: false,
isRetryable: false
})
);
}
resolve(true);
} catch (err) {
reject(err);
}
});
});
await withRedisClient(
(client) =>
new Promise((resolve, reject) => {
client.subscribe(createApproveChannel(ctx.run, toolCall), async (err) => {
try {
if (err) {
reject(err);
} else {
ctx.run.requireAction(
new RequiredToolApprove({
toolCalls: [...(ctx.run.requiredAction?.toolCalls ?? []), toolCall]
})
);
await ORM.em.flush();
await ctx.publish({
event: 'thread.run.requires_action',
data: toRunDto(ctx.run)
});
await ctx.publish({
event: 'done',
data: '[DONE]'
});
}
} catch (err) {
reject(err);
}
});
client.on('message', async (_, approval) => {
try {
ctx.run.submitAction();
await ORM.em.flush();
if (approval !== 'true') {
reject(
new ToolError('User has not approved this tool to run.', [], {
isFatal: false,
isRetryable: false
})
);
}
resolve(true);
} catch (err) {
reject(err);
}
});
})
);
}
}

Expand Down
Loading

0 comments on commit d274908

Please sign in to comment.