Skip to content

Commit

Permalink
feat: send builtin konsistent message
Browse files Browse the repository at this point in the history
  • Loading branch information
7sete7 committed Jan 3, 2025
1 parent d95f6cf commit 613e80f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
38 changes: 23 additions & 15 deletions src/imports/data/data.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import { getUserSafe } from '@imports/auth/getUser';
import { TRANSACTION_OPTIONS } from '@imports/consts';
import { find } from "@imports/data/api";
import { client } from '@imports/database';
import { Konsistent } from '@imports/konsistent';
import processIncomingChange from '@imports/konsistent/processIncomingChange';
import eventManager from '@imports/lib/EventManager';
import queueManager from '@imports/queue/QueueManager';
import objectsDiff from '@imports/utils/objectsDiff';
import { dateToString, stringToDate } from '../data/dateParser';
import { populateLookupsData } from '../data/populateLookupsData';
Expand Down Expand Up @@ -880,7 +882,10 @@ export async function create({ authTokenId, document, data, contextUser, upsert,
}

if (resultRecord != null) {
if (MetaObject.Namespace.plan?.useExternalKonsistent !== true) {
if (MetaObject.Namespace.plan?.useExternalKonsistent === true) {
tracingSpan?.addEvent('Sending Konsistent message');
await queueManager.sendMessage(Konsistent.queue.resource, Konsistent.queue.name, { metaName: document, operation: 'create', data: resultRecord });
} else {
try {
tracingSpan?.addEvent('Processing sync Konsistent');
await processIncomingChange(document, resultRecord, 'create', user, resultRecord, dbSession);
Expand Down Expand Up @@ -1350,24 +1355,27 @@ export async function update({ authTokenId, document, data, contextUser, tracing
await runScriptAfterSave({ script: metaObject.scriptAfterSave, data: updatedRecords, user, extraData: { original: existsRecords } });
}

if (MetaObject.Namespace.plan?.useExternalKonsistent !== true) {
try {
logger.debug('Processing Konsistent');
tracingSpan?.addEvent('Processing sync Konsistent');
logger.debug('Processing Konsistent');

for await (const record of updatedRecords) {
const original = existsRecords.find(r => r._id === record._id);
const newRecord = omit(record, ['_id', '_createdAt', '_createdBy', '_updatedAt', '_updatedBy']);
for await (const record of updatedRecords) {
const original = existsRecords.find(r => r._id === record._id);
const newRecord = omit(record, ['_id', '_createdAt', '_createdBy', '_updatedAt', '_updatedBy']);

const changedProps = objectsDiff(original, newRecord);
const changedProps = objectsDiff(original, newRecord);
if (MetaObject.Namespace.plan?.useExternalKonsistent === true) {
tracingSpan?.addEvent('Sending Konsistent message');
await queueManager.sendMessage(Konsistent.queue.resource, Konsistent.queue.name, { metaName: document, operation: 'update', data: changedProps });
} else {
try {
tracingSpan?.addEvent('Processing sync Konsistent');
await processIncomingChange(document, record, 'update', user, changedProps, dbSession);
}
} catch (e) {
logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`);
tracingSpan?.addEvent('Error on Konsistent', { error: e.message });
await dbSession.abortTransaction();
} catch (e) {
logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`);
tracingSpan?.addEvent('Error on Konsistent', { error: e.message });
await dbSession.abortTransaction();

return errorReturn(`[${document}] Error on Konsistent: ${e.message}`);
return errorReturn(`[${document}] Error on Konsistent: ${e.message}`);
}
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/imports/konsistent/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,9 @@ export async function setupKonsistent() {
logger.info(`Using external Konsistent? ${Boolean(MetaObject.Namespace.plan?.useExternalKonsistent)}`);
}



export const Konsistent = {
queue: {
resource: process.env.KONSISTENT_QUEUE_RESOURCE ?? 'rabbitmq_default',
name: process.env.KONSISTENT_QUEUE_NAME ?? 'konsistent',
},
};

0 comments on commit 613e80f

Please sign in to comment.