Skip to content

Commit

Permalink
fix(cu): in findresult, check for exact eval by message id
Browse files Browse the repository at this point in the history
  • Loading branch information
jfrain99 committed Jan 7, 2025
1 parent 859af1e commit b572f49
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 21 deletions.
4 changes: 1 addition & 3 deletions servers/cu/src/domain/dal.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ export const evaluatorSchema = z.function()
export const findEvaluationSchema = z.function()
.args(z.object({
processId: z.string(),
to: z.coerce.number().nullish(),
ordinate: z.coerce.string().nullish(),
cron: z.string().nullish()
messageId: z.string()
}))
.returns(z.promise(evaluationSchema))

Expand Down
4 changes: 1 addition & 3 deletions servers/cu/src/domain/lib/chainEvaluation.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ function loadLatestEvaluationWith ({ findEvaluation, findLatestProcessMemory, lo

return findEvaluation({
processId: ctx.id,
to: ctx.to,
ordinate: ctx.ordinate,
cron: ctx.cron
messageId: ctx.messageId
})
.map((evaluation) => {
logger(
Expand Down
4 changes: 1 addition & 3 deletions servers/cu/src/domain/lib/loadProcess.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ function loadLatestEvaluationWith ({ findEvaluation, findLatestProcessMemory, sa

return findEvaluation({
processId: ctx.id,
to: ctx.to,
ordinate: ctx.ordinate,
cron: ctx.cron
messageId: ctx.messageId
})
.map((evaluation) => {
logger(
Expand Down
19 changes: 14 additions & 5 deletions servers/cu/src/effects/ao-evaluation.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,32 @@ const fromEvaluationDoc = pipe(
toEvaluation
)

// TODO: Change the query to use process Id, message Id, order by id
export function findEvaluationWith ({ db }) {
function createQuery ({ processId, timestamp, ordinate, cron }) {
function createQuery ({ processId, messageId }) {
return {
sql: `
SELECT
id, "processId", "messageId", "deepHash", nonce, epoch, timestamp,
ordinate, "blockHeight", cron, "evaluatedAt", output
FROM ${EVALUATIONS_TABLE}
WHERE
id = ?;
id > ? AND id <= ?
messageId = ?
ORDER BY
id ASC
LIMIT 1;
`,
parameters: [createEvaluationId({ processId, timestamp, ordinate, cron })]
parameters: [
createEvaluationId({ processId, timestamp: '' }),
createEvaluationId({ processId, timestamp: COLLATION_SEQUENCE_MAX_CHAR }),
messageId
]
}
}

return ({ processId, to, ordinate, cron }) => {
return of({ processId, timestamp: to, ordinate, cron })
return ({ processId, messageId }) => {
return of({ processId, messageId })
.chain(fromPromise((params) => db.query(createQuery(params))))
.map(defaultTo([]))
.map(head)
Expand Down
10 changes: 3 additions & 7 deletions servers/cu/src/effects/ao-evaluation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('ao-evaluation', () => {
findEvaluationWith({
db: {
query: async ({ parameters }) => {
assert.deepStrictEqual(parameters, ['process-123,1702677252111,1'])
assert.deepStrictEqual(parameters, ['process-123,', 'process-123,􏿿', 'message-123'])

return [{
id: 'process-123,1702677252111,1',
Expand All @@ -41,9 +41,7 @@ describe('ao-evaluation', () => {

const res = await findEvaluation({
processId: 'process-123',
to: 1702677252111,
ordinate: '1',
cron: undefined
messageId: 'message-123'
})

assert.deepStrictEqual(res, {
Expand Down Expand Up @@ -73,9 +71,7 @@ describe('ao-evaluation', () => {

const res = await findEvaluation({
processId: 'process-123',
to: 1702677252111,
ordinate: '1',
cron: undefined
messageId: 'message-123'
})
.catch(err => {
assert.equal(err.status, 404)
Expand Down
7 changes: 7 additions & 0 deletions servers/cu/src/effects/sqlite.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ const createCheckpointFilesIndexes = async (db) => db.prepare(
(processId, timestamp);`
).run()

const createEvaluationsIndexes = async (db) => db.prepare(
`CREATE INDEX IF NOT EXISTS idx_${EVALUATIONS_TABLE}_id_messageId
ON ${EVALUATIONS_TABLE}
(id, messageId);`
).run()

let internalSqliteDb
export async function createSqliteClient ({ url, bootstrap = false, walLimit = bytes.parse('100mb') }) {
if (internalSqliteDb) return internalSqliteDb
Expand Down Expand Up @@ -138,6 +144,7 @@ export async function createSqliteClient ({ url, bootstrap = false, walLimit = b
.then(() => createMessagesIndexes(db))
.then(() => createCheckpointsIndexes(db))
.then(() => createCheckpointFilesIndexes(db))
.then(() => createEvaluationsIndexes(db))
}

return {
Expand Down

0 comments on commit b572f49

Please sign in to comment.