Skip to content

Commit

Permalink
feat(observability): propagate current instance to async scope
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomas2D committed Sep 9, 2024
1 parent bf985d1 commit 085d69a
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 29 deletions.
7 changes: 4 additions & 3 deletions src/agents/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import { FrameworkError } from "@/errors.js";
import { AgentMeta } from "@/agents/types.js";
import { Serializable } from "@/internals/serializable.js";
import { GetRunContext, GetRunInstance, Run, RunContext } from "@/context.js";
import { GetRunContext, RunContext } from "@/context.js";
import { Emitter } from "@/emitter/emitter.js";
import { BaseMemory } from "@/memory/base.js";

Expand All @@ -40,13 +40,14 @@ export abstract class BaseAgent<
...[input, options]: Partial<TOptions> extends TOptions
? [input: TInput, options?: TOptions]
: [input: TInput, options: TOptions]
): Run<TOutput, GetRunInstance<typeof this>, [input: TInput, options?: TOptions]> {
) {
if (this.isRunning) {
throw new AgentError("Agent is already running!");
}

return RunContext.enter(
{ self: this, signal: options?.signal, params: [input, options] as const },
this,
{ signal: options?.signal, params: [input, options] as const },
async (context) => {
try {
// @ts-expect-error
Expand Down
35 changes: 18 additions & 17 deletions src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,21 @@ export interface RunContextCallbacks {
finish: Callback<null>;
}

export type GetRunContext<T> = T extends RunInstance<infer P> ? RunContext<P> : never;
export type GetRunContext<T, P = any> = T extends RunInstance ? RunContext<T, P> : never;
export type GetRunInstance<T> = T extends RunInstance<infer P> ? P : never;

export class Run<T, C, P = any> extends LazyPromise<T> {
export class Run<R, I extends RunInstance, P = any> extends LazyPromise<R> {
constructor(
handler: () => Promise<T>,
protected readonly runContext: RunContext<C, P>,
handler: () => Promise<R>,
protected readonly runContext: GetRunContext<I, P>,
) {
super(handler);
}

readonly [Symbol.toStringTag] = "Promise";

observe(fn: (emitter: Emitter<C>) => void) {
fn(this.runContext.emitter as any);
observe(fn: (emitter: Emitter<GetRunInstance<I>>) => void) {
fn(this.runContext.emitter);
return this;
}

Expand All @@ -60,19 +60,18 @@ export class Run<T, C, P = any> extends LazyPromise<T> {
return this;
}

middleware(fn: (context: RunContext<C, P>) => void) {
middleware(fn: (context: GetRunContext<I, P>) => void) {
fn(this.runContext);
return this;
}
}

export interface RunContextInput<A, P> {
self: RunInstance<A>;
export interface RunContextInput<P> {
params: P;
signal?: AbortSignal;
}

export class RunContext<I, P = any> extends Serializable {
export class RunContext<T extends RunInstance, P = any> extends Serializable {
static #storage = new AsyncLocalStorage<RunContext<any>>();

protected readonly controller: AbortController;
Expand All @@ -92,7 +91,8 @@ export class RunContext<I, P = any> extends Serializable {
}

constructor(
protected readonly input: RunContextInput<I, P>,
public readonly instance: T,
protected readonly input: RunContextInput<P>,
parent?: RunContext<any>,
) {
super();
Expand All @@ -107,7 +107,7 @@ export class RunContext<I, P = any> extends Serializable {
this.controller = new AbortController();
registerSignals(this.controller, [input.signal, parent?.signal]);

this.emitter = input.self.emitter.child<I>({
this.emitter = instance.emitter.child<GetRunInstance<T>>({
context: this.context,
trace: {
id: this.groupId,
Expand All @@ -125,14 +125,15 @@ export class RunContext<I, P = any> extends Serializable {
this.controller.abort(new FrameworkError("Context destroyed."));
}

static enter<I2, R2, P2>(
input: RunContextInput<I2, P2>,
fn: (context: RunContext<I2, P2>) => Promise<R2>,
static enter<C2 extends RunInstance, R2, P2>(
instance: C2,
input: RunContextInput<P2>,
fn: (context: GetRunContext<C2, P2>) => Promise<R2>,
) {
const parent = RunContext.#storage.getStore();
const runContext = new RunContext(input, parent);
const runContext = new RunContext(instance, input, parent) as GetRunContext<C2, P2>;

return new Run<R2, I2, P2>(async () => {
return new Run(async () => {
const emitter = runContext.emitter.child<RunContextCallbacks>({
namespace: ["run"],
creator: runContext,
Expand Down
8 changes: 4 additions & 4 deletions src/llms/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ export abstract class BaseLLM<

generate(input: TInput, options?: TGenerateOptions) {
return RunContext.enter(
{ self: this, params: [input, options] as const, signal: options?.signal },
this,
{ params: [input, options] as const, signal: options?.signal },
async (run) => {
try {
await run.emitter.emit("start", { input, options });
Expand All @@ -147,7 +148,6 @@ export abstract class BaseLLM<
...options,
signal: controller.signal,
},
// @ts-expect-error wrong types
run,
)) {
chunks.push(chunk);
Expand Down Expand Up @@ -190,9 +190,9 @@ export abstract class BaseLLM<
async *stream(input: TInput, options?: StreamGenerateOptions): AsyncStream<TOutput> {
return yield* emitterToGenerator(async ({ emit }) => {
return RunContext.enter(
{ self: this, params: [input, options] as const, signal: options?.signal },
this,
{ params: [input, options] as const, signal: options?.signal },
async (run) => {
// @ts-expect-error wrong types
for await (const token of this._stream(input, options ?? {}, run)) {
emit(token);
}
Expand Down
9 changes: 4 additions & 5 deletions src/tools/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ export abstract class Tool<

run(input: ToolInputRaw<this>, options?: TRunOptions): Promise<TOutput> {
return RunContext.enter(
{ self: this, signal: options?.signal, params: [input, options] as const },
this,
{ signal: options?.signal, params: [input, options] as const },
async (run) => {
const meta = { input, options };
let errorPropagated = false;
Expand All @@ -217,10 +218,8 @@ export abstract class Tool<
errorPropagated = false;
await run.emitter.emit("start", { ...meta });
return this.cache.enabled
? // @ts-expect-error wrong types
await this._runCached(input, options, run)
: // @ts-expect-error wrong types
await this._run(input, options, run);
? await this._runCached(input, options, run)
: await this._run(input, options, run);
},
onError: async (error) => {
errorPropagated = true;
Expand Down

0 comments on commit 085d69a

Please sign in to comment.