Skip to content

Commit

Permalink
Merge pull request #143 from permaweb/VinceJuliano/mu-cleanup
Browse files Browse the repository at this point in the history
fix(mu) fix mu database connection issues
  • Loading branch information
VinceJuliano authored Nov 9, 2023
2 parents 0ab5f23 + 54ceff2 commit 1a74619
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 114 deletions.
173 changes: 64 additions & 109 deletions servers/mu/src/domain/clients/dbInstance.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,146 +3,101 @@ import pgPromise from 'pg-promise';
const pgp = pgPromise();
const db = pgp(process.env.MU_DATABASE_URL);

export async function getTx(id) {
try {
const result = await db.oneOrNone('SELECT * FROM "transactions" WHERE "_id" = $1', [id]);
if (result) {
return result;
}
throw { status: 404, message: 'Transaction not found' };
} catch (error) {
const maxRetries = 5;
const retryDelay = 500;

async function withRetry(operation, args, retryCount = 0) {
try {
return await operation(...args);
} catch (error) {
if (error.message.includes('Connection terminated unexpectedly') && retryCount < maxRetries) {
const delay = retryDelay * Math.pow(2, retryCount);
console.log(`Database connection was lost, retrying in ${delay} ms...`);
await new Promise(resolve => setTimeout(resolve, delay));
return withRetry(operation, args, retryCount + 1);
} else {
throw error;
}
}
}

export async function getTx(id) {
return withRetry(db.oneOrNone, ['SELECT * FROM "transactions" WHERE "_id" = $1', [id]]);
}

export async function putTx(doc) {
try {
await db.none(
'INSERT INTO "transactions" ("_id", "data", "processId", "cachedAt") VALUES ($1, $2, $3, $4)',
[doc._id, JSON.stringify(doc.data), doc.processId, doc.cachedAt]
);
return doc;
} catch (error) {
throw error;
}
return withRetry(db.none, [
'INSERT INTO "transactions" ("_id", "data", "processId", "cachedAt") VALUES ($1, $2, $3, $4)',
[doc._id, JSON.stringify(doc.data), doc.processId, doc.cachedAt]
]);
}

export async function findTx(id) {
try {
const docs = await db.any('SELECT * FROM "transactions" WHERE "_id" = $1', [id]);
return { docs };
} catch (error) {
throw error;
}
return withRetry(db.any, ['SELECT * FROM "transactions" WHERE "_id" = $1', [id]]);
}

export async function putMsg(doc) {
try {
await db.none(
'INSERT INTO "messages" ("_id", "fromTxId", "toTxId", "msg", "cachedAt") VALUES ($1, $2, $3, $4, $5)',
[doc._id, doc.fromTxId, doc.toTxId, JSON.stringify(doc.msg), doc.cachedAt]
);
return doc;
} catch (error) {
throw error;
}
return withRetry(db.none, [
'INSERT INTO "messages" ("_id", "fromTxId", "toTxId", "msg", "cachedAt") VALUES ($1, $2, $3, $4, $5)',
[doc._id, doc.fromTxId, doc.toTxId, JSON.stringify(doc.msg), doc.cachedAt]
]);
}

export async function getMsg(id) {
try {
const result = await db.oneOrNone('SELECT * FROM "messages" WHERE "_id" = $1', [id]);
if (result) {
return result;
}
throw { status: 404, message: 'Message not found' };
} catch (error) {
throw error;
}
return withRetry(db.oneOrNone, ['SELECT * FROM "messages" WHERE "_id" = $1', [id]]);
}

export async function findMsgs(fromTxId) {
try {
const docs = await db.any('SELECT * FROM "messages" WHERE "fromTxId" = $1', [fromTxId]);
return { docs };
} catch (error) {
throw error;
}
return withRetry(db.any, ['SELECT * FROM "messages" WHERE "fromTxId" = $1', [fromTxId]]);
}


export async function putSpawn(doc) {
try {
await db.none(
'INSERT INTO "spawns" ("_id", "fromTxId", "toTxId", "spawn", "cachedAt") VALUES ($1, $2, $3, $4, $5)',
[doc._id, doc.fromTxId, doc.toTxId, JSON.stringify(doc.spawn), doc.cachedAt]
);
return doc;
} catch (error) {
throw error;
}
return withRetry(db.none, [
'INSERT INTO "spawns" ("_id", "fromTxId", "toTxId", "spawn", "cachedAt") VALUES ($1, $2, $3, $4, $5)',
[doc._id, doc.fromTxId, doc.toTxId, JSON.stringify(doc.spawn), doc.cachedAt]
]);
}

export async function findSpawns(fromTxId) {
try {
const docs = await db.any('SELECT * FROM "spawns" WHERE "fromTxId" = $1', [fromTxId]);
return { docs };
} catch (error) {
throw error;
}
return withRetry(db.any, ['SELECT * FROM "spawns" WHERE "fromTxId" = $1', [fromTxId]]);
}

export async function putMonitor(doc) {
try {
const { _id, lastFromSortKey } = doc;

const existingMonitor = await db.oneOrNone('SELECT * FROM "monitored_processes" WHERE "_id" = $1', [_id]);

if (existingMonitor) {
await db.none(
'UPDATE "monitored_processes" SET "lastFromSortKey" = $1 WHERE "_id" = $2',
[lastFromSortKey, _id]
);
return doc;
} else {
await db.none(
'INSERT INTO "monitored_processes" ("_id", "authorized", "lastFromSortKey", "interval", "block", "createdAt") VALUES ($1, $2, $3, $4, $5, $6)',
[doc._id, doc.authorized, doc.lastFromSortKey, doc.interval, JSON.stringify(doc.block), doc.createdAt]
);
return doc;
}
} catch (error) {
throw error;
}
}
const operation = async () => {
const existingMonitor = await db.oneOrNone('SELECT * FROM "monitored_processes" WHERE "_id" = $1', [doc._id]);

if (existingMonitor) {
await db.none(
'UPDATE "monitored_processes" SET "lastFromSortKey" = $1 WHERE "_id" = $2',
[doc.lastFromSortKey, doc._id]
);
} else {
await db.none(
'INSERT INTO "monitored_processes" ("_id", "authorized", "lastFromSortKey", "interval", "block", "createdAt") VALUES ($1, $2, $3, $4, $5, $6)',
[doc._id, doc.authorized, doc.lastFromSortKey, doc.interval, JSON.stringify(doc.block), doc.createdAt]
);
}

return doc;
};

return withRetry(operation, []);
}

export async function getMonitor(id) {
try {
const result = await db.oneOrNone('SELECT * FROM "monitored_processes" WHERE "_id" = $1', [id]);
if (result) {
return {
...result,
createdAt: parseInt(result.createdAt)
};
}
throw { status: 404, message: 'Monitored process not found' };
} catch (error) {
throw error;
}
return withRetry(db.oneOrNone, ['SELECT * FROM "monitored_processes" WHERE "_id" = $1', [id]]);
}

export async function findMonitors() {
try {
const docs = await db.any('SELECT * FROM "monitored_processes"');

const convertedDocs = docs.map(doc => ({
...doc,
createdAt: parseInt(doc.createdAt)
}));

return { docs: convertedDocs };
} catch (error) {
throw error;
}
return withRetry(async () => {
const docs = await db.any('SELECT * FROM "monitored_processes"');
return {
docs: docs.map(doc => ({
...doc,
createdAt: parseInt(doc.createdAt)
}))
};
}, []);
}

export default {
Expand Down
11 changes: 6 additions & 5 deletions servers/mu/src/domain/lib/monitor/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const updateMonitor = dataStoreClient.updateMonitorWith({dbInstance, logger})

parentPort.on('message', (message) => {
if(message.label === 'start') {
// setInterval(() => processMonitors(), 1000)
setInterval(() => processMonitors(), 1000)
parentPort.postMessage(`Monitor worker started`)
} else {
parentPort.postMessage(`Invalid message`)
Expand Down Expand Up @@ -84,8 +84,9 @@ async function fetchScheduled(monitor) {
let scheduled = await response.json();
return scheduled;
} catch (error) {
console.error('Error in fetchScheduled:', error);
throw error;
console.log('Error in fetchScheduled:', error);
console.log('for monitor: ')
console.log(monitor)
}
}

Expand All @@ -96,7 +97,7 @@ async function processMonitor(monitor) {
try {
let scheduled = await fetchScheduled(monitor)

if(scheduled.length < 1) return;
if(!scheduled || scheduled.length < 1) return [];

let fromTxId = `scheduled-${Math.floor(Math.random() * 1e18).toString()}`

Expand Down Expand Up @@ -134,7 +135,7 @@ async function processMonitor(monitor) {
return {status: 'ok'}
} catch(e) {
console.error('Error in processMonitor:', e);
throw error;
throw e;
}

}
Expand Down

0 comments on commit 1a74619

Please sign in to comment.