Skip to content

Commit

Permalink
fix(mu): only add / remove message from db on success / failure
Browse files Browse the repository at this point in the history
  • Loading branch information
jfrain99 committed Jan 8, 2025
1 parent ed84031 commit 11f0094
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 45 deletions.
4 changes: 2 additions & 2 deletions servers/mu/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ const CONFIG_ENVS = {
TASK_QUEUE_RETRY_DELAY: process.env.TASK_QUEUE_RETRY_DELAY || 1000,
DISABLE_TRACE: process.env.DISABLE_TRACE !== 'false',
SPAWN_PUSH_ENABLED: process.env.SPAWN_PUSH_ENABLED === 'true',
ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1572103,
ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1580000,
GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5,
GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000,
MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 5,
Expand All @@ -135,7 +135,7 @@ const CONFIG_ENVS = {
TASK_QUEUE_RETRY_DELAY: process.env.TASK_QUEUE_RETRY_DELAY || 1000,
DISABLE_TRACE: process.env.DISABLE_TRACE !== 'false',
SPAWN_PUSH_ENABLED: process.env.SPAWN_PUSH_ENABLED === 'true',
ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1572103,
ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1580000,
GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5,
GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000,
MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 5,
Expand Down
85 changes: 42 additions & 43 deletions servers/mu/src/domain/api/sendDataItem.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export function sendDataItemWith ({
const pullResult = pullResultWith({ fetchResult, logger })
const writeProcess = writeProcessTxWith({ locateScheduler, writeDataItem, logger })
const getResult = getResultWith({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIES, GET_RESULT_RETRY_DELAY })
const insertMessage = insertMessageWith({ db })

const locateProcessLocal = fromPromise(locateProcessSchema.implement(locateProcess))

Expand Down Expand Up @@ -72,19 +73,24 @@ export function sendDataItemWith ({
*/
crank: () => {
return of({ ...res, initialTxId: res.tx.id, pullResultAttempts: 0 })
.chain(fromPromise(insertMessage))
.chain(fromPromise(getResult))
.chain(fromPromise(deleteMessage))
.chain((ctx) => {
const { msgs, spawns, assigns, initialTxId, messageId: parentId } = ctx
return crank({
msgs,
spawns,
assigns,
initialTxId,
parentId
.bichain(
(error) => {
return of(error)
.map(() => logger({ log: 'Failed to get result of message, adding to message recovery database...', end: true }, res))
.chain(() => fromPromise(insertMessage)(res))
.chain(() => Rejected(error))
},
(ctx) => {
const { msgs, spawns, assigns, initialTxId, messageId: parentId } = ctx
return crank({
msgs,
spawns,
assigns,
initialTxId,
parentId
})
})
})
.bimap(
(res) => {
logger({ log: 'Failed to push messages', end: true }, ctx)
Expand All @@ -100,32 +106,6 @@ export function sendDataItemWith ({
}
))

async function insertMessage (ctx) {
const query = {
sql: `
INSERT OR IGNORE INTO ${MESSAGES_TABLE} (
id,
timestamp,
data,
retries
) VALUES (?, ?, ?, 0)
`,
parameters: [ctx.logId, new Date().getTime(), JSON.stringify(ctx)]
}
return await db.run(query).then(() => ctx)
}

async function deleteMessage (ctx) {
const query = {
sql: `
DELETE FROM ${MESSAGES_TABLE}
WHERE id = ?
`,
parameters: [ctx.logId]
}
return await db.run(query).then(() => ctx)
}

/**
* If the Data Item is a Process, we push an Assignment
* if the target is present. And as per aop6 Boot Loader
Expand Down Expand Up @@ -241,10 +221,26 @@ export function sendDataItemWith ({
}
}

function insertMessageWith ({ db }) {
return async (ctx) => {
const query = {
sql: `
INSERT OR IGNORE INTO ${MESSAGES_TABLE} (
id,
timestamp,
data,
retries
) VALUES (?, ?, ?, 0)
`,
parameters: [ctx.logId, new Date().getTime(), JSON.stringify(ctx)]
}
return await db.run(query).then(() => ctx)
}
}

function getResultWith ({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIES, GET_RESULT_RETRY_DELAY }) {
const getCuAddress = getCuAddressWith({ selectNode, logger })
const pullResult = pullResultWith({ fetchResult, logger })

/**
* Attempt to get the result of the message from the CU
* If it fails, retry recursively up to GET_RESULT_MAX_RETRIES times
Expand All @@ -257,6 +253,7 @@ function getResultWith ({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIE
.chain(pullResult)
.bichain(
fromPromise(async (_err) => {
logger({ log: `Error pulling result on attempt ${attempts} / ${GET_RESULT_MAX_RETRIES}, attempting to retry...` }, ctx)
if (attempts < GET_RESULT_MAX_RETRIES) {
// Increment the retry count
ctx.pullResultAttempts++
Expand All @@ -268,7 +265,7 @@ function getResultWith ({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIE
return await getResult(ctx)
}
// If we've reached the max retries, throw an error
throw new Error(`GetResult ran out of retries (${GET_RESULT_MAX_RETRIES}). Bubbling error...`)
throw new Error(`GetResult ran out of retries (${GET_RESULT_MAX_RETRIES}). Bubbling error...`, { cause: ctx })
}),
// If the result is successful, return it as Resolved
Resolved
Expand Down Expand Up @@ -384,22 +381,24 @@ export function startMessageRecoveryCronWith ({ selectNode, fetchResult, logger,
}
const logId = ctx.logId
logger({ log: `Attempting to recover message, retry ${retries} of ${MESSAGE_RECOVERY_MAX_RETRIES}` }, { logId })

ctx.pullResultAttempts = 0
// Attempt the result of the message from the CU
return getResult(ctx)
.then((res) => {
const { msgs, spawns, assigns, initialTxId, messageId: parentId } = res
const { msgs, spawns, assigns, messageId: parentId } = res
// Push the results of the message to the crank function
return crank({
msgs,
spawns,
assigns,
initialTxId,
initialTxId: ctx.tx.id,
parentId
})
}).toPromise()
})
.then(() => {
// If the message is successfully recovered, delete it from the database
logger({ log: 'Successfully pushed message results', end: true }, { logId })
logger({ log: 'Successfully recovered and pushed message results', end: true }, { logId })
return deleteMessage(logId)
})
.then(() => {
Expand Down

0 comments on commit 11f0094

Please sign in to comment.