Skip to content

Commit

Permalink
fix(mu): adds comments and readme
Browse files Browse the repository at this point in the history
  • Loading branch information
jfrain99 committed Jan 6, 2025
1 parent b22737b commit ed84031
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 5 deletions.
4 changes: 4 additions & 0 deletions servers/mu/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ There are a few environment variables that you can set:
- `TASK_QUEUE_RETRY_DELAY`: The retry in between each attempt to process a message in the task queue.
- `DISABLE_TRACE`: Whether or not the log tracer should be enabled. Set to any value to disable log tracing. (You must explicitly enable log tracing by setting - `DISABLE_TRACE` to `'false'`)
- `SPAWN_PUSH_ENABLED`: If enabled, this will make the MU attempt to push messages for a spawn as per AOP 6 Boot loader https://github.com/permaweb/ao/issues/730
- `GET_RESULT_MAX_RETRIES`: The amount of attempts for the MU to get the result of a message from the CU.
- `GET_RESULT_RETRY_DELAY`: The retry delay in between each attempt to get the result of a message from the CU.
- `MESSAGE_RECOVERY_MAX_RETRIES`: The amount of attempts for the MU to recover a message from the database.
- `MESSAGE_RECOVERY_RETRY_DELAY`: The retry delay in between each attempt to recover a message from the database.

> You can also use a `.env` file to set environment variables when running in
> development mode, See the `.env.example` for an example `.env`
Expand Down
61 changes: 57 additions & 4 deletions servers/mu/src/domain/api/sendDataItem.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ function getResultWith ({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIE
const getCuAddress = getCuAddressWith({ selectNode, logger })
const pullResult = pullResultWith({ fetchResult, logger })

/**
* Attempt to get the result of the message from the CU
* If it fails, retry recursively up to GET_RESULT_MAX_RETRIES times
* with a delay of GET_RESULT_RETRY_DELAY * (2 ** attempts)
*/
return async function getResult (ctx) {
const attempts = ctx.pullResultAttempts
return of(ctx)
Expand All @@ -253,19 +258,32 @@ function getResultWith ({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIE
.bichain(
fromPromise(async (_err) => {
if (attempts < GET_RESULT_MAX_RETRIES) {
// Increment the retry count
ctx.pullResultAttempts++

// Delay the retry by GET_RESULT_RETRY_DELAY * (2 ** attempts)
await new Promise(resolve => setTimeout(resolve, GET_RESULT_RETRY_DELAY * (2 ** attempts)))

// Recursively retry the getResult function
return await getResult(ctx)
}
// If we've reached the max retries, throw an error
throw new Error(`GetResult ran out of retries (${GET_RESULT_MAX_RETRIES}). Bubbling error...`)
}),
// If the result is successful, return it as Resolved
Resolved
)
.toPromise()
}
}

function selectMessageWith ({ db }) {
/**
* selectMessage
* Selects the oldest message from the database
*
* @returns The oldest message from the database
*/
return async () => {
const timestamp = new Date().getTime()
const query = {
Expand All @@ -277,6 +295,13 @@ function selectMessageWith ({ db }) {
}

function deleteMessageWith ({ db }) {
/**
* deleteMessage
* Deletes a message from the database
*
* @param logId - The logId of the message to delete
* @returns The logId of the deleted message
*/
return async (logId) => {
const query = {
sql: `
Expand All @@ -291,6 +316,17 @@ function deleteMessageWith ({ db }) {
}

function updateMessageTimestampWith ({ db, logger, MESSAGE_RECOVERY_MAX_RETRIES, MESSAGE_RECOVERY_RETRY_DELAY }) {
/**
* updateMessageTimestamp
* If a message is out of retries, deletes it.
* Else, updates the timestamp and retries of a message in the database.
* The timestamp is updated to the current time plus the retry delay.
* This stops it from being selected again until the delay is over.
*
* @param logId - The logId of the message to update
* @param retries - The number of retries the message has had
* @returns The logId of the updated message
*/
return async (logId, retries) => {
const deleteQuery = {
sql: `DELETE FROM ${MESSAGES_TABLE} WHERE id = ?`,
Expand All @@ -301,16 +337,15 @@ function updateMessageTimestampWith ({ db, logger, MESSAGE_RECOVERY_MAX_RETRIES,
sql: `UPDATE OR IGNORE ${MESSAGES_TABLE} SET timestamp = ?, retries = retries + 1 WHERE id = ?`,
parameters: [new Date().getTime() + updateOffset, logId]
}
// Set the query to the update query
let query = updateQuery
// If the message has ran out of retries, run the delete query
if (retries > MESSAGE_RECOVERY_MAX_RETRIES) {
query = deleteQuery
logger({ log: `Message with logId ${logId} has ran out of retries and been deleted`, end: true }, { logId })
}

return await db.run(query).catch((e) => {
console.log('error', e)
return e
}).then(() => logId)
return await db.run(query).then(() => logId)
}
}

Expand All @@ -319,25 +354,41 @@ export function startMessageRecoveryCronWith ({ selectNode, fetchResult, logger,
const selectMessage = selectMessageWith({ db })
const deleteMessage = deleteMessageWith({ db })
const updateMessageTimestamp = updateMessageTimestampWith({ db, logger, MESSAGE_RECOVERY_MAX_RETRIES, MESSAGE_RECOVERY_RETRY_DELAY })

/**
* startMessageRecoveryCron
* Starts the message recovery cron job.
* Every 10 seconds, it will attempt to recover a message.
* If it fails, it will retry the message up to MESSAGE_RECOVERY_MAX_RETRIES times
* with a delay of MESSAGE_RECOVERY_RETRY_DELAY * (2 ** attempts).
*
* If the message is successfully recovered, it will be deleted from the database.
*/
return async () => {
let ct = null
let isJobRunning = false
ct = cron.schedule('*/10 * * * * *', async () => {
if (!isJobRunning) {
isJobRunning = true
ct.stop() // pause cron while recovering messages

// Select the oldest message from the database
await selectMessage()
// Parse the message from the database
.then((res) => ({ ctx: compose(JSON.parse, propOr('{}', 'data'), head)(res), retries: compose(prop('retries'), head)(res) }))
.then(({ ctx, retries }) => {
// If the message is empty (there is no message in the db), return
if (isEmpty(ctx)) {
isJobRunning = false
return
}
const logId = ctx.logId
logger({ log: `Attempting to recover message, retry ${retries} of ${MESSAGE_RECOVERY_MAX_RETRIES}` }, { logId })
// Attempt the result of the message from the CU
return getResult(ctx)
.then((res) => {
const { msgs, spawns, assigns, initialTxId, messageId: parentId } = res
// Push the results of the message to the crank function
return crank({
msgs,
spawns,
Expand All @@ -347,13 +398,15 @@ export function startMessageRecoveryCronWith ({ selectNode, fetchResult, logger,
})
})
.then(() => {
// If the message is successfully recovered, delete it from the database
logger({ log: 'Successfully pushed message results', end: true }, { logId })
return deleteMessage(logId)
})
.then(() => {
isJobRunning = false
})
.catch((e) => {
// If the message is not successfully recovered, update the message timestamp and retry
const delay = MESSAGE_RECOVERY_RETRY_DELAY * (2 ** retries)
logger({ log: `Error recovering message - getResult, retrying in ${delay}ms: ${e}` }, ctx)
updateMessageTimestamp(logId, retries)
Expand Down
2 changes: 1 addition & 1 deletion servers/mu/src/domain/clients/cu.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function resultWith ({ fetch, histogram, CU_URL, logger }) {
).then(okRes),
{
maxRetries: 5,
delay: 1000,
delay: 500,
log: logger,
logId,
name: `fetchResult(${JSON.stringify({
Expand Down

0 comments on commit ed84031

Please sign in to comment.