diff --git a/servers/mu/src/config.js b/servers/mu/src/config.js index 982922206..26b0ccc7f 100644 --- a/servers/mu/src/config.js +++ b/servers/mu/src/config.js @@ -108,7 +108,7 @@ const CONFIG_ENVS = { TASK_QUEUE_RETRY_DELAY: process.env.TASK_QUEUE_RETRY_DELAY || 1000, DISABLE_TRACE: process.env.DISABLE_TRACE !== 'false', SPAWN_PUSH_ENABLED: process.env.SPAWN_PUSH_ENABLED === 'true', - ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1572103, + ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1580000, GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5, GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000, MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 5, @@ -135,7 +135,7 @@ const CONFIG_ENVS = { TASK_QUEUE_RETRY_DELAY: process.env.TASK_QUEUE_RETRY_DELAY || 1000, DISABLE_TRACE: process.env.DISABLE_TRACE !== 'false', SPAWN_PUSH_ENABLED: process.env.SPAWN_PUSH_ENABLED === 'true', - ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1572103, + ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1580000, GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5, GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000, MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 5, diff --git a/servers/mu/src/domain/api/sendDataItem.js b/servers/mu/src/domain/api/sendDataItem.js index d9be48f01..282c79ae1 100644 --- a/servers/mu/src/domain/api/sendDataItem.js +++ b/servers/mu/src/domain/api/sendDataItem.js @@ -39,6 +39,7 @@ export function sendDataItemWith ({ const pullResult = pullResultWith({ fetchResult, logger }) const writeProcess = writeProcessTxWith({ locateScheduler, writeDataItem, logger }) const getResult = getResultWith({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIES, GET_RESULT_RETRY_DELAY }) + const insertMessage = insertMessageWith({ db }) const locateProcessLocal = fromPromise(locateProcessSchema.implement(locateProcess)) @@ -72,19 +73,24 @@ export function sendDataItemWith ({ */ crank: () => { return of({ ...res, initialTxId: res.tx.id, pullResultAttempts: 0 }) - .chain(fromPromise(insertMessage)) .chain(fromPromise(getResult)) - .chain(fromPromise(deleteMessage)) - .chain((ctx) => { - const { msgs, spawns, assigns, initialTxId, messageId: parentId } = ctx - return crank({ - msgs, - spawns, - assigns, - initialTxId, - parentId + .bichain( + (error) => { + return of(error) + .map(() => logger({ log: 'Failed to get result of message, adding to message recovery database...', end: true }, res)) + .chain(() => fromPromise(insertMessage)(res)) + .chain(() => Rejected(error)) + }, + (ctx) => { + const { msgs, spawns, assigns, initialTxId, messageId: parentId } = ctx + return crank({ + msgs, + spawns, + assigns, + initialTxId, + parentId + }) }) - }) .bimap( (res) => { logger({ log: 'Failed to push messages', end: true }, ctx) @@ -100,32 +106,6 @@ export function sendDataItemWith ({ } )) - async function insertMessage (ctx) { - const query = { - sql: ` - INSERT OR IGNORE INTO ${MESSAGES_TABLE} ( - id, - timestamp, - data, - retries - ) VALUES (?, ?, ?, 0) - `, - parameters: [ctx.logId, new Date().getTime(), JSON.stringify(ctx)] - } - return await db.run(query).then(() => ctx) - } - - async function deleteMessage (ctx) { - const query = { - sql: ` - DELETE FROM ${MESSAGES_TABLE} - WHERE id = ? - `, - parameters: [ctx.logId] - } - return await db.run(query).then(() => ctx) - } - /** * If the Data Item is a Process, we push an Assignment * if the target is present. And as per aop6 Boot Loader @@ -241,10 +221,26 @@ export function sendDataItemWith ({ } } +function insertMessageWith ({ db }) { + return async (ctx) => { + const query = { + sql: ` + INSERT OR IGNORE INTO ${MESSAGES_TABLE} ( + id, + timestamp, + data, + retries + ) VALUES (?, ?, ?, 0) + `, + parameters: [ctx.logId, new Date().getTime(), JSON.stringify(ctx)] + } + return await db.run(query).then(() => ctx) + } +} + function getResultWith ({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIES, GET_RESULT_RETRY_DELAY }) { const getCuAddress = getCuAddressWith({ selectNode, logger }) const pullResult = pullResultWith({ fetchResult, logger }) - /** * Attempt to get the result of the message from the CU * If it fails, retry recursively up to GET_RESULT_MAX_RETRIES times @@ -257,6 +253,7 @@ function getResultWith ({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIE .chain(pullResult) .bichain( fromPromise(async (_err) => { + logger({ log: `Error pulling result on attempt ${attempts} / ${GET_RESULT_MAX_RETRIES}, attempting to retry...` }, ctx) if (attempts < GET_RESULT_MAX_RETRIES) { // Increment the retry count ctx.pullResultAttempts++ @@ -268,7 +265,7 @@ function getResultWith ({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIE return await getResult(ctx) } // If we've reached the max retries, throw an error - throw new Error(`GetResult ran out of retries (${GET_RESULT_MAX_RETRIES}). Bubbling error...`) + throw new Error(`GetResult ran out of retries (${GET_RESULT_MAX_RETRIES}). Bubbling error...`, { cause: ctx }) }), // If the result is successful, return it as Resolved Resolved @@ -384,22 +381,24 @@ export function startMessageRecoveryCronWith ({ selectNode, fetchResult, logger, } const logId = ctx.logId logger({ log: `Attempting to recover message, retry ${retries} of ${MESSAGE_RECOVERY_MAX_RETRIES}` }, { logId }) + + ctx.pullResultAttempts = 0 // Attempt the result of the message from the CU return getResult(ctx) .then((res) => { - const { msgs, spawns, assigns, initialTxId, messageId: parentId } = res + const { msgs, spawns, assigns, messageId: parentId } = res // Push the results of the message to the crank function return crank({ msgs, spawns, assigns, - initialTxId, + initialTxId: ctx.tx.id, parentId - }) + }).toPromise() }) .then(() => { // If the message is successfully recovered, delete it from the database - logger({ log: 'Successfully pushed message results', end: true }, { logId }) + logger({ log: 'Successfully recovered and pushed message results', end: true }, { logId }) return deleteMessage(logId) }) .then(() => {