diff --git a/servers/cu/src/domain/client/ao-process.js b/servers/cu/src/domain/client/ao-process.js index f5a99ec6a..a48866b8f 100644 --- a/servers/cu/src/domain/client/ao-process.js +++ b/servers/cu/src/domain/client/ao-process.js @@ -8,7 +8,7 @@ import { always, applySpec, compose, defaultTo, evolve, head, identity, map, pat import { z } from 'zod' import { LRUCache } from 'lru-cache' -import { isEarlierThan, isEqualTo, isLaterThan } from '../utils.js' +import { isEarlierThan, isEqualTo, isLaterThan, parseTags } from '../utils.js' import { processSchema } from '../model.js' import { PROCESSES_TABLE, COLLATION_SEQUENCE_MIN_CHAR } from './sqlite.js' @@ -392,13 +392,17 @@ export function findProcessMemoryBeforeWith ({ compose( map(prop('node')), map((node) => { + const tags = parseTags(node.tags) return { id: node.id, - timestamp: pluckTagValue('Timestamp', node.tags), - ordinate: pluckTagValue('Nonce', node.tags), - blockHeight: pluckTagValue('Block-Height', node.tags), - cron: pluckTagValue('Cron-Interval', node.tags), - encoding: pluckTagValue('Content-Encoding', node.tags) + timestamp: parseInt(tags.Timestamp), + epoch: parseInt(tags.Epoch), + nonce: parseInt(tags.Nonce), + ordinate: tags.Nonce, + module: tags.Module, + blockHeight: parseInt(tags['Block-Height']), + cron: tags['Cron-Interval'], + encoding: tags['Content-Encoding'] } }) ), @@ -468,11 +472,15 @@ export function findProcessMemoryBeforeWith ({ * Finally map the Checkpoint to the expected shape */ .map((Memory) => ({ + src: 'memory', Memory, + moduleId: cached.evaluation.moduleId, timestamp: cached.evaluation.timestamp, blockHeight: cached.evaluation.blockHeight, - cron: cached.evaluation.cron, - ordinate: cached.evaluation.ordinate + epoch: cached.evaluation.epoch, + nonce: cached.evaluation.nonce, + ordinate: cached.evaluation.ordinate, + cron: cached.evaluation.cron })) }) } @@ -511,11 +519,15 @@ export function findProcessMemoryBeforeWith ({ * Finally map the Checkpoint to the expected shape */ .map((Memory) => ({ + src: 'file', Memory, + moduleId: checkpoint.evaluation.moduleId, timestamp: checkpoint.evaluation.timestamp, + epoch: checkpoint.evaluation.epoch, + nonce: checkpoint.evaluation.nonce, + ordinate: checkpoint.evaluation.ordinate, blockHeight: checkpoint.evaluation.blockHeight, - cron: checkpoint.evaluation.cron, - ordinate: checkpoint.evaluation.ordinate + cron: checkpoint.evaluation.cron })) .bimap( (err) => { @@ -574,17 +586,23 @@ export function findProcessMemoryBeforeWith ({ }) /** * Finally map the Checkpoint to the expected shape + * + * (see determineLatestCheckpointBefore) */ .map((Memory) => ({ + src: 'arweave', Memory, + moduleId: latestCheckpoint.module, timestamp: latestCheckpoint.timestamp, - blockHeight: latestCheckpoint.blockHeight, - cron: latestCheckpoint.cron, + epoch: latestCheckpoint.epoch, + nonce: latestCheckpoint.nonce, /** * Derived from Nonce on Checkpoint * (see determineLatestCheckpointBefore) */ - ordinate: latestCheckpoint.ordinate + ordinate: latestCheckpoint.ordinate, + blockHeight: latestCheckpoint.blockHeight, + cron: latestCheckpoint.cron })) .bimap( (err) => { @@ -612,8 +630,12 @@ export function findProcessMemoryBeforeWith ({ } return Resolved({ + src: 'cold_start', Memory: null, + moduleId: undefined, timestamp: undefined, + epoch: undefined, + nonce: undefined, blockHeight: undefined, cron: undefined, /** @@ -654,7 +676,7 @@ export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EA if (cached && isLaterThan(cached, { timestamp, ordinate, cron })) return logger( - 'Saving latest memory for process "%s" with parameters "%j"', + 'Caching latest memory for process "%s" with parameters "%j"', processId, { messageId, timestamp, ordinate, cron, blockHeight } ) diff --git a/servers/cu/src/domain/client/ao-process.test.js b/servers/cu/src/domain/client/ao-process.test.js index 10a1fe1d0..04f71dca6 100644 --- a/servers/cu/src/domain/client/ao-process.test.js +++ b/servers/cu/src/domain/client/ao-process.test.js @@ -322,7 +322,11 @@ describe('ao-process', () => { const res = await findProcessMemoryBefore(target) assert.deepStrictEqual(res, { + src: 'memory', Memory, + moduleId: 'module-123', + epoch: cachedEval.epoch, + nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, blockHeight: cachedEval.blockHeight, cron: cachedEval.cron, @@ -369,13 +373,19 @@ describe('ao-process', () => { describe('should use if in LRU In-Memory Cache cannot be used', () => { test('no checkpoint in LRU In-Memory cache', async () => { - const res = await findProcessMemoryBefore(target) - - assert.ok(res.Memory) - assert.equal(res.timestamp, cachedEval.timestamp) - assert.equal(res.blockHeight, cachedEval.blockHeight) - assert.equal(res.ordinate, cachedEval.ordinate) - assert.equal(res.cron, undefined) + const { Memory, ...res } = await findProcessMemoryBefore(target) + + assert.ok(Memory) + assert.deepStrictEqual(res, { + src: 'file', + moduleId: cachedEval.moduleId, + epoch: cachedEval.epoch, + nonce: cachedEval.nonce, + timestamp: cachedEval.timestamp, + blockHeight: cachedEval.blockHeight, + cron: cachedEval.cron, + ordinate: cachedEval.ordinate + }) }) test('later checkpoint in LRU In-Memory cache', async () => { @@ -392,13 +402,19 @@ describe('ao-process', () => { } }) - const res = await findProcessMemoryBefore(target) - - assert.ok(res.Memory) - assert.equal(res.timestamp, cachedEval.timestamp) - assert.equal(res.blockHeight, cachedEval.blockHeight) - assert.equal(res.ordinate, cachedEval.ordinate) - assert.equal(res.cron, undefined) + const { Memory, ...res } = await findProcessMemoryBefore(target) + + assert.ok(Memory) + assert.deepStrictEqual(res, { + src: 'file', + moduleId: cachedEval.moduleId, + epoch: cachedEval.epoch, + nonce: cachedEval.nonce, + timestamp: cachedEval.timestamp, + blockHeight: cachedEval.blockHeight, + cron: cachedEval.cron, + ordinate: cachedEval.ordinate + }) }) }) @@ -437,7 +453,9 @@ describe('ao-process', () => { node: { id: 'tx-123', tags: [ + { name: 'Module', value: `${cachedEval.moduleId}` }, { name: 'Timestamp', value: `${cachedEval.timestamp}` }, + { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: `${cachedEval.nonce}` }, { name: 'Block-Height', value: `${cachedEval.blockHeight}` }, { name: 'Content-Encoding', value: `${cachedEval.encoding}` } @@ -474,13 +492,19 @@ describe('ao-process', () => { describe('should use if the LRU In-Memory and File checkpoint cannot be used', async () => { test('no file checkpoint is found or is later than target', async () => { - const res = await findProcessMemoryBefore(target) - - assert.ok(res.Memory) - assert.equal(res.timestamp, cachedEval.timestamp) - assert.equal(res.blockHeight, cachedEval.blockHeight) - assert.equal(res.ordinate, cachedEval.ordinate) - assert.equal(res.cron, undefined) + const { Memory, ...res } = await findProcessMemoryBefore(target) + + assert.ok(Memory) + assert.deepStrictEqual(res, { + src: 'arweave', + moduleId: cachedEval.moduleId, + epoch: cachedEval.epoch, + nonce: cachedEval.nonce, + timestamp: cachedEval.timestamp, + blockHeight: cachedEval.blockHeight, + cron: cachedEval.cron, + ordinate: cachedEval.ordinate + }) }) test('file checkpoint fails to be downloaded', async () => { @@ -494,13 +518,19 @@ describe('ao-process', () => { } }) - const res = await findProcessMemoryBefore(target) - - assert.ok(res.Memory) - assert.equal(res.timestamp, cachedEval.timestamp) - assert.equal(res.blockHeight, cachedEval.blockHeight) - assert.equal(res.ordinate, cachedEval.ordinate) - assert.equal(res.cron, undefined) + const { Memory, ...res } = await findProcessMemoryBefore(target) + + assert.ok(Memory) + assert.deepStrictEqual(res, { + src: 'arweave', + moduleId: cachedEval.moduleId, + epoch: cachedEval.epoch, + nonce: cachedEval.nonce, + timestamp: cachedEval.timestamp, + blockHeight: cachedEval.blockHeight, + cron: cachedEval.cron, + ordinate: cachedEval.ordinate + }) }) }) @@ -522,7 +552,9 @@ describe('ao-process', () => { ...edges[0].node, id: 'tx-not-encoded', tags: [ + { name: 'Module', value: `${cachedEval.moduleId}` }, { name: 'Timestamp', value: `${cachedEval.timestamp}` }, + { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: `${cachedEval.nonce}` }, { name: 'Block-Height', value: `${cachedEval.blockHeight}` }, { name: 'Not-Content-Encoding', value: `${cachedEval.encoding}` } @@ -555,7 +587,9 @@ describe('ao-process', () => { node: { ...edges[0].node, tags: [ + { name: 'Module', value: `${cachedEval.moduleId}` }, { name: 'Timestamp', value: `${cachedEval.timestamp + 1000}` }, + { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: '12' }, { name: 'Block-Height', value: `${cachedEval.blockHeight}` }, { name: 'Content-Encoding', value: `${cachedEval.encoding}` } @@ -589,7 +623,9 @@ describe('ao-process', () => { node: { ...edges[0].node, tags: [ + { name: 'Module', value: `${cachedEval.moduleId}` }, { name: 'Timestamp', value: `${cachedEval.timestamp + 1000}` }, + { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: '12' }, { name: 'Block-Height', value: `${cachedEval.blockHeight}` }, { name: 'Content-Encoding', value: `${cachedEval.encoding}` } @@ -629,7 +665,9 @@ describe('ao-process', () => { node: { ...edges[0].node, tags: [ + { name: 'Module', value: `${cachedEval.moduleId}` }, { name: 'Timestamp', value: `${cachedEval.timestamp + 1000}` }, + { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: '12' }, { name: 'Block-Height', value: `${cachedEval.blockHeight}` }, { name: 'Content-Encoding', value: `${cachedEval.encoding}` } @@ -669,8 +707,12 @@ describe('ao-process', () => { PROCESS_IGNORE_ARWEAVE_CHECKPOINTS: [] } const COLDSTART = { + src: 'cold_start', Memory: null, + moduleId: undefined, timestamp: undefined, + epoch: undefined, + nonce: undefined, blockHeight: undefined, cron: undefined, ordinate: '0' @@ -695,7 +737,9 @@ describe('ao-process', () => { node: { id: 'tx-123', tags: [ + { name: 'Module', value: `${cachedEval.moduleId}` }, { name: 'Timestamp', value: `${cachedEval.timestamp + 11000}` }, + { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: `${cachedEval.nonce}` }, { name: 'Block-Height', value: `${cachedEval.blockHeight}` }, { name: 'Content-Encoding', value: `${cachedEval.encoding}` } diff --git a/servers/cu/src/domain/lib/loadProcess.js b/servers/cu/src/domain/lib/loadProcess.js index c3bec1a7a..0aafacc15 100644 --- a/servers/cu/src/domain/lib/loadProcess.js +++ b/servers/cu/src/domain/lib/loadProcess.js @@ -1,5 +1,5 @@ import { Rejected, Resolved, fromPromise, of } from 'hyper-async' -import { always, identity, isNotNil, mergeRight, pick } from 'ramda' +import { always, identity, isNotNil, mergeRight, omit, pick } from 'ramda' import { z } from 'zod' import { findEvaluationSchema, findProcessSchema, loadProcessSchema, locateProcessSchema, saveProcessSchema } from '../dal.js' @@ -177,11 +177,12 @@ function getProcessMetaWith ({ loadProcess, locateProcess, findProcess, saveProc })) } -function loadLatestEvaluationWith ({ findEvaluation, findProcessMemoryBefore, loadLatestSnapshot, logger }) { +function loadLatestEvaluationWith ({ findEvaluation, findProcessMemoryBefore, loadLatestSnapshot, saveLatestProcessMemory, logger }) { findEvaluation = fromPromise(findEvaluationSchema.implement(findEvaluation)) // TODO: wrap in zod schemas to enforce contract findProcessMemoryBefore = fromPromise(findProcessMemoryBefore) loadLatestSnapshot = fromPromise(loadLatestSnapshot) + saveLatestProcessMemory = fromPromise(saveLatestProcessMemory) function maybeExactEvaluation (ctx) { /** @@ -222,23 +223,60 @@ function loadLatestEvaluationWith ({ findEvaluation, findProcessMemoryBefore, lo processId: ctx.id, timestamp: ctx.to, ordinate: ctx.ordinate, - cron: ctx.cron + cron: ctx.cron, + omitMemory: false }) - .map((found) => { + .chain((found) => { const exact = found.timestamp === ctx.to && found.ordinate === ctx.ordinate && found.cron === ctx.cron - return { - result: { - Memory: found.Memory - }, - from: found.timestamp, - ordinate: found.ordinate, - fromBlockHeight: found.blockHeight, - fromCron: found.cron, - exact - } + return of() + .chain(() => { + /** + * Nothing to backfill in-memory cache with, + * so simply noop + */ + if (['memory', 'cold_start'].includes(found.src)) return Resolved(found) + + logger( + 'Seeding cache with latest checkpoint found with parameters "%j"', + omit(['Memory'], found) + ) + /** + * Immediatley attempt to save the memory loaded from a checkpoint + * into the LRU In-memory cache, which will cut + * down on calls to load checkpoints from arweave (it will load from cache instead) + */ + return saveLatestProcessMemory({ + processId: ctx.id, + evalCount: 0, + /** + * map found + */ + Memory: found.Memory, + moduleId: found.moduleId, + // messageId: found.messageId, + timestamp: found.timestamp, + epoch: found.epoch, + nonce: found.nonce, + ordinate: found.ordinate, + blockHeight: found.blockHeight, + cron: found.cron + }) + .bichain(Resolved, Resolved) + .map(() => found) + }) + .map(() => ({ + result: { + Memory: found.Memory + }, + from: found.timestamp, + ordinate: found.ordinate, + fromBlockHeight: found.blockHeight, + fromCron: found.cron, + exact + })) }) } diff --git a/servers/cu/src/domain/lib/loadProcess.test.js b/servers/cu/src/domain/lib/loadProcess.test.js index 07b930851..bef2ff3a4 100644 --- a/servers/cu/src/domain/lib/loadProcess.test.js +++ b/servers/cu/src/domain/lib/loadProcess.test.js @@ -23,12 +23,17 @@ describe('loadProcess', () => { saveProcess: async () => PROCESS, findEvaluation: async () => { throw { status: 404 } }, findProcessMemoryBefore: async () => ({ + src: 'cold_start', Memory: null, + moduleId: undefined, timestamp: undefined, + epoch: undefined, + nonce: undefined, blockHeight: undefined, cron: undefined, ordinate: COLLATION_SEQUENCE_MIN_CHAR }), + saveLatestProcessMemory: async () => assert.fail('should not be called on cold_start'), locateProcess: async ({ processId: id }) => { assert.equal(id, PROCESS) return { url: 'https://foo.bar' } @@ -87,12 +92,17 @@ describe('loadProcess', () => { saveProcess: async () => assert.fail('should not save if found in db'), findEvaluation: async () => { throw { status: 404 } }, findProcessMemoryBefore: async () => ({ + src: 'cold_start', Memory: null, + moduleId: undefined, timestamp: undefined, + epoch: undefined, + nonce: undefined, blockHeight: undefined, cron: undefined, ordinate: COLLATION_SEQUENCE_MIN_CHAR }), + saveLatestProcessMemory: async () => assert.fail('should not be called on cold_start'), locateProcess: async ({ processId, schedulerHint }) => { assert.equal(processId, PROCESS) assert.equal(schedulerHint, 'scheduler-123') @@ -113,7 +123,7 @@ describe('loadProcess', () => { assert.equal(res.id, PROCESS) }) - test('use latest evaluation from db to set Memory, result, from, and evaluatedAt on ctx', async () => { + test('use exact match from db', async () => { const cachedEvaluation = { processId: PROCESS, messageId: 'message-123', @@ -146,9 +156,10 @@ describe('loadProcess', () => { findProcess: async () => { throw { status: 404 } }, saveProcess: async () => PROCESS, findEvaluation: async () => cachedEvaluation, - findLatestEvaluation: async ({ processId, timestamp }) => { + findProcessMemoryBefore: async ({ processId, timestamp }) => { assert.fail('should not be called when exact match is found') }, + saveLatestProcessMemory: async () => assert.fail('should not be called if exact match if found'), locateProcess: async ({ processId: id }) => ({ url: 'https://foo.bar' }), loadProcess: async (id) => ({ owner: 'woohoo', @@ -167,6 +178,101 @@ describe('loadProcess', () => { assert.equal(res.id, PROCESS) }) + test('use latest in cache', async () => { + const cached = { + src: 'memory', + Memory: Buffer.from('hello world'), + moduleId: 'module-123', + timestamp: 1697574792000, + epoch: 0, + nonce: 11, + blockHeight: 123, + cron: undefined, + ordinate: '11' + } + + const tags = [ + { name: 'Module', value: 'foobar' }, + { name: 'Data-Protocol', value: 'ao' }, + { name: 'Type', value: 'Process' }, + { name: 'Foo', value: 'Bar' } + ] + const loadProcess = loadProcessWith({ + findProcess: async () => { throw { status: 404 } }, + saveProcess: async () => PROCESS, + findEvaluation: async () => { throw { status: 404 } }, + findProcessMemoryBefore: async ({ processId, timestamp }) => cached, + saveLatestProcessMemory: async () => assert.fail('should not be called if memory'), + locateProcess: async ({ processId: id }) => ({ url: 'https://foo.bar' }), + loadProcess: async (id) => ({ + owner: 'woohoo', + tags, + block: { height: 123, timestamp: 1697574792000 } + }), + logger + }) + + const res = await loadProcess({ id: PROCESS, to: 1697574792000 }).toPromise() + assert.deepStrictEqual(res.from, cached.timestamp) + assert.deepStrictEqual(res.ordinate, cached.ordinate) + assert.deepStrictEqual(res.fromCron, cached.cron) + assert.deepStrictEqual(res.fromBlockHeight, cached.blockHeight) + assert.equal(res.id, PROCESS) + }) + + test('backfill cache if latest from arweave or file', async () => { + const cached = { + Memory: Buffer.from('hello world'), + moduleId: 'module-123', + timestamp: 1697574792000, + epoch: 0, + nonce: 11, + blockHeight: 123, + cron: undefined, + ordinate: '11' + } + + const tags = [ + { name: 'Module', value: 'foobar' }, + { name: 'Data-Protocol', value: 'ao' }, + { name: 'Type', value: 'Process' }, + { name: 'Foo', value: 'Bar' } + ] + const loadProcess = loadProcessWith({ + findProcess: async () => { throw { status: 404 } }, + saveProcess: async () => PROCESS, + findEvaluation: async () => { throw { status: 404 } }, + findProcessMemoryBefore: async ({ processId, timestamp }) => cached, + saveLatestProcessMemory: async (args) => { + assert.deepStrictEqual(args, { + processId: PROCESS, + evalCount: 0, + Memory: cached.Memory, + moduleId: cached.moduleId, + timestamp: cached.timestamp, + epoch: cached.epoch, + nonce: cached.nonce, + ordinate: cached.ordinate, + blockHeight: cached.blockHeight, + cron: cached.cron + }) + }, + locateProcess: async ({ processId: id }) => ({ url: 'https://foo.bar' }), + loadProcess: async (id) => ({ + owner: 'woohoo', + tags, + block: { height: 123, timestamp: 1697574792000 } + }), + logger + }) + + cached.src = 'file' + await loadProcess({ id: PROCESS, to: 1697574792000 }).toPromise() + + cached.src = 'arweave' + await loadProcess({ id: PROCESS, to: 1697574792000 }).toPromise() + }) + test('save process to db if fetched from chain', async () => { const tags = [ { name: 'Module', value: 'foobar' }, @@ -187,12 +293,17 @@ describe('loadProcess', () => { }, findEvaluation: async () => { throw { status: 404 } }, findProcessMemoryBefore: async () => ({ + src: 'cold_start', Memory: null, + moduleId: undefined, timestamp: undefined, + epoch: undefined, + nonce: undefined, blockHeight: undefined, cron: undefined, ordinate: COLLATION_SEQUENCE_MIN_CHAR }), + saveLatestProcessMemory: async () => assert.fail('should not be called on cold_start'), locateProcess: async ({ processId: id }) => ({ url: 'https://foo.bar' }), loadProcess: async (id) => ({ owner: 'woohoo', @@ -217,12 +328,17 @@ describe('loadProcess', () => { saveProcess: async () => { throw { status: 409 } }, findEvaluation: async () => { throw { status: 404 } }, findProcessMemoryBefore: async () => ({ + src: 'cold_start', Memory: null, + moduleId: undefined, timestamp: undefined, + epoch: undefined, + nonce: undefined, blockHeight: undefined, cron: undefined, ordinate: COLLATION_SEQUENCE_MIN_CHAR }), + saveLatestProcessMemory: async () => assert.fail('should not be called on cold_start'), locateProcess: async ({ processId: id }) => ({ url: 'https://foo.bar' }), loadProcess: async (id) => ({ owner: 'woohoo', @@ -245,12 +361,17 @@ describe('loadProcess', () => { saveProcess: async () => PROCESS, findEvaluation: async () => { throw { status: 404 } }, findProcessMemoryBefore: async () => ({ + src: 'cold_start', Memory: null, + moduleId: undefined, timestamp: undefined, + epoch: undefined, + nonce: undefined, blockHeight: undefined, cron: undefined, ordinate: COLLATION_SEQUENCE_MIN_CHAR }), + saveLatestProcessMemory: async () => assert.fail('should not be called on cold_start'), locateProcess: async ({ processId: id }) => ({ url: 'https://foo.bar' }), loadProcess: async (id) => ({ owner: 'woohoo', @@ -275,12 +396,17 @@ describe('loadProcess', () => { saveProcess: async () => PROCESS, findEvaluation: async () => { throw { status: 404 } }, findProcessMemoryBefore: async () => ({ + src: 'cold_start', Memory: null, + moduleId: undefined, timestamp: undefined, + epoch: undefined, + nonce: undefined, blockHeight: undefined, cron: undefined, ordinate: COLLATION_SEQUENCE_MIN_CHAR }), + saveLatestProcessMemory: async () => assert.fail('should not be called on cold_start'), locateProcess: async ({ processId: id }) => ({ url: 'https://foo.bar' }), loadProcess: async (id) => ({ owner: 'woohoo', @@ -305,12 +431,17 @@ describe('loadProcess', () => { saveProcess: async () => PROCESS, findEvaluation: async () => { throw { status: 404 } }, findProcessMemoryBefore: async () => ({ + src: 'cold_start', Memory: null, + moduleId: undefined, timestamp: undefined, + epoch: undefined, + nonce: undefined, blockHeight: undefined, cron: undefined, ordinate: COLLATION_SEQUENCE_MIN_CHAR }), + saveLatestProcessMemory: async () => assert.fail('should not be called on cold_start'), locateProcess: async ({ processId: id }) => ({ url: 'https://foo.bar' }), loadProcess: async (id) => ({ owner: 'woohoo',