Skip to content

Commit

Permalink
fixup!
Browse files Browse the repository at this point in the history
Signed-off-by: Tomas Pilar <[email protected]>
  • Loading branch information
pilartomas committed Jan 16, 2025
1 parent 9ba8fe8 commit 13604c6
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 32 deletions.
10 changes: 8 additions & 2 deletions src/chat/chat.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import { StatusCodes } from 'http-status-codes';
import {
ChatCompletionCreateBody,
chatCompletionCreateBodySchema,
chatCompletionCreateResponseSchema
chatCompletionCreateResponseSchema,
chatCompletionCreateResponseStreamSchema
} from './dtos/chat-completion-create.js';
import { createChatCompletion } from './chat.service.js';

Expand All @@ -39,7 +40,12 @@ export const chatModule: FastifyPluginAsyncJsonSchemaToTs = async (app) => {
schema: {
body: chatCompletionCreateBodySchema,
response: {
[StatusCodes.OK]: chatCompletionCreateResponseSchema
[StatusCodes.OK]: {
content: {
'application/json': { schema: chatCompletionCreateResponseSchema },
'text/event-stream': { schema: chatCompletionCreateResponseStreamSchema }
}
}
},
tags: [Tag.OPENAI_API]
}
Expand Down
35 changes: 13 additions & 22 deletions src/chat/chat.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ export async function createChatCompletion({
sse.init(res);
try {
for await (const output of llm.stream(...args)) {
sse.send(
res,
JSON.stringify({
sse.send(res, {
data: {
id: chat.id,
object: 'chat.completion.chunk',
model,
Expand All @@ -98,37 +97,29 @@ export async function createChatCompletion({
index,
delta: { role: message.role, content: message.text }
}))
} as ChatCompletionChunk)
);
} as ChatCompletionChunk
});
chat.output = chat.output?.merge(output) ?? output;
}
} catch (err) {
getChatLogger().error({ err }, 'LLM generation failed');
chat.error = err.toString();
sse.send(res, JSON.stringify(chat.error ?? 'Internal server error')); // TODO
sse.send(res, { data: chat.error ?? 'Internal server error' }); // TODO
throw err;
} finally {
sse.end(res);
unsub();
await ORM.em.flush();
}
} else {
try {
chat.output = await llm.generate(...args);
return toChatDto(chat);
} catch (err) {
getChatLogger().error({ err }, 'LLM generation failed');
chat.error = err.toString();
if (err instanceof LLMError) {
throw new APIError({ code: APIErrorCode.SERVICE_ERROR, message: err.message });
}
throw err;
} finally {
await ORM.em.flush();
}
chat.output = await llm.generate(...args);
return toChatDto(chat);
}
} catch (err) {
getChatLogger().error({ err }, 'LLM generation failed');
chat.error = err.toString();
if (err instanceof LLMError) {
throw new APIError({ code: APIErrorCode.SERVICE_ERROR, message: err.message });
} else {
throw err;
}
} finally {
await ORM.em.flush();
}
Expand Down
6 changes: 6 additions & 0 deletions src/chat/dtos/chat-completion-create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { FromSchema, JSONSchema } from 'json-schema-to-ts';
import { ChatMessageRole } from '../constants';

import { chatCompletionSchema } from './chat-completion';
import { chatCompletionChunkSchema } from './chat-completion-chunk';

export const chatCompletionCreateBodySchema = {
type: 'object',
Expand Down Expand Up @@ -69,3 +70,8 @@ export type ChatCompletionCreateBody = FromSchema<typeof chatCompletionCreateBod

export const chatCompletionCreateResponseSchema = chatCompletionSchema;
export type ChatCompletionCreateResponse = FromSchema<typeof chatCompletionCreateResponseSchema>;

export const chatCompletionCreateResponseStreamSchema = chatCompletionChunkSchema;
export type ChatCompletionCreateResponseStream = FromSchema<
typeof chatCompletionCreateResponseStreamSchema
>;
2 changes: 2 additions & 0 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import { organizationUsersModule } from './administration/organization-users.mod
import { apiKeysModule } from './administration/api-keys.module.js';
import { artifactsModule } from './artifacts/artifacts.module.js';
import { chatModule } from './chat/chat.module.js';
import { embeddingsModule } from './embeddings/embeddings.module.js';

const app = fastify({
logger: fastifyLogger,
Expand Down Expand Up @@ -97,6 +98,7 @@ try {
app.register(organizationUsersModule, { prefix: '/v1' });
app.register(artifactsModule, { prefix: '/v1' });
app.register(chatModule, { prefix: '/v1' });
app.register(embeddingsModule, { prefix: '/v1' });

app.register(uiModule, { prefix: '/v1' });

Expand Down
8 changes: 2 additions & 6 deletions src/streaming/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export async function subscribeAndForward(
});
client.on('message', (_, message) => {
const event = JSON.parse(message) as Event;
sse.send(res, createMessage(event));
sse.send(res, { event: event.event, data: event.data });
if (event.event === 'done' || event.event === 'error') {
resolve();
}
Expand All @@ -86,13 +86,9 @@ export async function subscribeAndForward(
});
});
} catch (err) {
sse.send(res, createMessage({ event: 'error', data: err }));
sse.send(res, { event: 'error', data: err });
} finally {
sse.end(res);
}
});
}

function createMessage(event: Event): string {
return `event: ${event.event}\ndata: ${JSON.stringify(event.data)}\n\n`;
}
5 changes: 3 additions & 2 deletions src/streaming/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ export const init = (res: FastifyReply) => {
}
};

export const send = (res: FastifyReply, data: string) => {
res.raw.write(data);
export const send = (res: FastifyReply, { event, data }: { event?: string; data: any }) => {
if (event) res.raw.write(`event: ${event}\n`);
res.raw.write(`data: ${JSON.stringify(data)}\n\n`);
};

export const end = (res: FastifyReply) => {
Expand Down

0 comments on commit 13604c6

Please sign in to comment.