Skip to content

Commit

Permalink
fixup! feat(log): add basic tracking
Browse files Browse the repository at this point in the history
Signed-off-by: Lukáš Janeček <[email protected]>
  • Loading branch information
Lukáš Janeček committed Dec 11, 2024
1 parent 158969b commit 45fbcc5
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 59 deletions.
38 changes: 21 additions & 17 deletions src/common/log.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
* limitations under the License.
*/

import { ChangeSetType, Entity, ManyToOne, PrimaryKey, Property, ref, Ref } from '@mikro-orm/core';
import { User } from '@zilliz/milvus2-sdk-node';
import { requestContext } from '@fastify/request-context';
import { ChangeSetType, Entity, PrimaryKey, Property } from '@mikro-orm/core';
import { requestContext, RequestContextData } from '@fastify/request-context';

import { generatePrefixedObjectId } from '@/utils/id.js';
import { ProjectPrincipal } from '@/administration/entities/project-principal.entity';
import { jobLocalStorage } from '@/context';

@Entity()
export class Log {
Expand All @@ -29,11 +28,8 @@ export class Log {
@Property()
createdAt: Date = new Date();

@ManyToOne()
projectPrincipal?: Ref<ProjectPrincipal>;

@ManyToOne()
user?: Ref<User>;
@Property()
requestContext: any = {};

@Property()
entity?: string;
Expand All @@ -48,16 +44,24 @@ export class Log {
change?: any;

@Property()
additionalData: any;
additionalData?: any;

constructor({ entity, entityId, type, change, additionalData }: LogInput) {
const user = requestContext.get('user');
if (user) {
this.user = ref(user);
}
const projectPrincipal = requestContext.get('projectPrincipal');
if (projectPrincipal) {
this.projectPrincipal = ref(projectPrincipal);
const contextData: (keyof RequestContextData)[] = [
'user',
'organizationUser',
'apiKey',
'projectPrincipal',
'artifact'
];

contextData.forEach((key: keyof RequestContextData) => {
const value = requestContext.get(key);
if (value) Object.assign(this.requestContext, { [key]: value.id });
});
const job = jobLocalStorage.getStore()?.job.name;
if (job) {
Object.assign(this.requestContext, { job });
}
this.entity = entity;
this.entityId = entityId;
Expand Down
94 changes: 52 additions & 42 deletions src/common/log.subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,82 +4,92 @@ import { Log } from './log.entity';
import { BaseEntity } from './base.entity';

import { inJob, inSeeder } from '@/context';
import { getLogger } from '@/logger';

const loggedEntities: { [key: string]: { types?: ChangeSetType[]; entities: string[] } } = {
Assistant: {
assistant: {
entities: ['project', 'agent']
},
Artifact: {
artifact: {
entities: ['project', 'thread', 'name']
},
Chat: {
chat: {
types: [ChangeSetType.CREATE],
entities: ['artifact']
entities: []
},
Message: {
message: {
entities: ['project']
},
Thread: {
thread: {
entities: ['project']
},
Tool: {
tool: {
entities: ['project', 'name']
},
VectorStore: {
'vector-store': {
entities: ['project', 'name']
},
VectorStoreFile: {
'vector-store-file': {
entities: ['project', 'file']
},
File: {
file: {
entities: ['project', 'filename']
},
Run: {
run: {
entities: ['project', 'assistant', 'status']
},
User: {
user: {
entities: ['email']
},
Organization: {
organization: {
entities: ['name']
},
Project: {
project: {
entities: ['name', 'organization']
},
ApiKey: {
'project-api-key': {
entities: ['project']
}
};

export class LogSubscriber implements EventSubscriber {
onFlush(args: FlushEventArgs): void | Promise<void> {
args.uow.getChangeSets().forEach((cs) => {
if (
loggedEntities[cs.name] &&
(loggedEntities[cs.name].types?.includes(cs.type) ?? true) &&
inSeeder() === false
) {
if (cs.type === ChangeSetType.DELETE && inJob()) return;
try {
args.uow.getChangeSets().forEach((cs) => {
if (
loggedEntities[cs.collection] &&
(loggedEntities[cs.collection].types?.includes(cs.type) ?? true) &&
inSeeder() === false
) {
// Do not log cleanup deletes from jobs
if (cs.type === ChangeSetType.DELETE && inJob()) return;

const log = new Log({
entity: cs.name,
entityId: cs.entity?.id,
type: cs.type,
change: cs.type === ChangeSetType.UPDATE ? cs.payload : undefined,
additionalData:
loggedEntities[cs.name].entities.reduce(
(acc, name) => ({
...acc,
[name]:
cs.entity[name].entity instanceof BaseEntity
? cs.entity[name].id
: cs.entity[name]
}),
{}
) ?? undefined
});
args.uow.computeChangeSet(log);
}
});
const log = new Log({
entity: cs.name,
entityId: cs.entity?.id,
type: cs.type,
// save changes only for update operation
change: cs.type === ChangeSetType.UPDATE ? cs.payload : undefined,
...(loggedEntities[cs.collection].entities.length > 0
? {
additionalData: loggedEntities[cs.collection].entities.reduce(
(acc, name) => ({
...acc,
[name]:
cs.entity[name]?.entity instanceof BaseEntity
? cs.entity[name].id
: cs.entity[name]
}),
{}
)
}
: {})
});
args.uow.computeChangeSet(log);
}
});
} catch (e) {
getLogger().warn(e, 'Error during database logging.');
}
}
}

0 comments on commit 45fbcc5

Please sign in to comment.