Skip to content

Commit

Permalink
feat(cu): backfill in-mem cache at start of eval stream to reduce gat…
Browse files Browse the repository at this point in the history
…eway calls on empty cache
  • Loading branch information
TillaTheHun0 committed Apr 22, 2024
1 parent 0d6e6f2 commit 67ca472
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 58 deletions.
50 changes: 36 additions & 14 deletions servers/cu/src/domain/client/ao-process.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { always, applySpec, compose, defaultTo, evolve, head, identity, map, pat
import { z } from 'zod'
import { LRUCache } from 'lru-cache'

import { isEarlierThan, isEqualTo, isLaterThan } from '../utils.js'
import { isEarlierThan, isEqualTo, isLaterThan, parseTags } from '../utils.js'
import { processSchema } from '../model.js'
import { PROCESSES_TABLE, COLLATION_SEQUENCE_MIN_CHAR } from './sqlite.js'

Expand Down Expand Up @@ -392,13 +392,17 @@ export function findProcessMemoryBeforeWith ({
compose(
map(prop('node')),
map((node) => {
const tags = parseTags(node.tags)
return {
id: node.id,
timestamp: pluckTagValue('Timestamp', node.tags),
ordinate: pluckTagValue('Nonce', node.tags),
blockHeight: pluckTagValue('Block-Height', node.tags),
cron: pluckTagValue('Cron-Interval', node.tags),
encoding: pluckTagValue('Content-Encoding', node.tags)
timestamp: parseInt(tags.Timestamp),
epoch: parseInt(tags.Epoch),
nonce: parseInt(tags.Nonce),
ordinate: tags.Nonce,
module: tags.Module,
blockHeight: parseInt(tags['Block-Height']),
cron: tags['Cron-Interval'],
encoding: tags['Content-Encoding']
}
})
),
Expand Down Expand Up @@ -468,11 +472,15 @@ export function findProcessMemoryBeforeWith ({
* Finally map the Checkpoint to the expected shape
*/
.map((Memory) => ({
src: 'memory',
Memory,
moduleId: cached.evaluation.moduleId,
timestamp: cached.evaluation.timestamp,
blockHeight: cached.evaluation.blockHeight,
cron: cached.evaluation.cron,
ordinate: cached.evaluation.ordinate
epoch: cached.evaluation.epoch,
nonce: cached.evaluation.nonce,
ordinate: cached.evaluation.ordinate,
cron: cached.evaluation.cron
}))
})
}
Expand Down Expand Up @@ -511,11 +519,15 @@ export function findProcessMemoryBeforeWith ({
* Finally map the Checkpoint to the expected shape
*/
.map((Memory) => ({
src: 'file',
Memory,
moduleId: checkpoint.evaluation.moduleId,
timestamp: checkpoint.evaluation.timestamp,
epoch: checkpoint.evaluation.epoch,
nonce: checkpoint.evaluation.nonce,
ordinate: checkpoint.evaluation.ordinate,
blockHeight: checkpoint.evaluation.blockHeight,
cron: checkpoint.evaluation.cron,
ordinate: checkpoint.evaluation.ordinate
cron: checkpoint.evaluation.cron
}))
.bimap(
(err) => {
Expand Down Expand Up @@ -574,17 +586,23 @@ export function findProcessMemoryBeforeWith ({
})
/**
* Finally map the Checkpoint to the expected shape
*
* (see determineLatestCheckpointBefore)
*/
.map((Memory) => ({
src: 'arweave',
Memory,
moduleId: latestCheckpoint.module,
timestamp: latestCheckpoint.timestamp,
blockHeight: latestCheckpoint.blockHeight,
cron: latestCheckpoint.cron,
epoch: latestCheckpoint.epoch,
nonce: latestCheckpoint.nonce,
/**
* Derived from Nonce on Checkpoint
* (see determineLatestCheckpointBefore)
*/
ordinate: latestCheckpoint.ordinate
ordinate: latestCheckpoint.ordinate,
blockHeight: latestCheckpoint.blockHeight,
cron: latestCheckpoint.cron
}))
.bimap(
(err) => {
Expand Down Expand Up @@ -612,8 +630,12 @@ export function findProcessMemoryBeforeWith ({
}

return Resolved({
src: 'cold_start',
Memory: null,
moduleId: undefined,
timestamp: undefined,
epoch: undefined,
nonce: undefined,
blockHeight: undefined,
cron: undefined,
/**
Expand Down Expand Up @@ -654,7 +676,7 @@ export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EA
if (cached && isLaterThan(cached, { timestamp, ordinate, cron })) return

logger(
'Saving latest memory for process "%s" with parameters "%j"',
'Caching latest memory for process "%s" with parameters "%j"',
processId,
{ messageId, timestamp, ordinate, cron, blockHeight }
)
Expand Down
100 changes: 72 additions & 28 deletions servers/cu/src/domain/client/ao-process.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,11 @@ describe('ao-process', () => {
const res = await findProcessMemoryBefore(target)

assert.deepStrictEqual(res, {
src: 'memory',
Memory,
moduleId: 'module-123',
epoch: cachedEval.epoch,
nonce: cachedEval.nonce,
timestamp: cachedEval.timestamp,
blockHeight: cachedEval.blockHeight,
cron: cachedEval.cron,
Expand Down Expand Up @@ -369,13 +373,19 @@ describe('ao-process', () => {

describe('should use if in LRU In-Memory Cache cannot be used', () => {
test('no checkpoint in LRU In-Memory cache', async () => {
const res = await findProcessMemoryBefore(target)

assert.ok(res.Memory)
assert.equal(res.timestamp, cachedEval.timestamp)
assert.equal(res.blockHeight, cachedEval.blockHeight)
assert.equal(res.ordinate, cachedEval.ordinate)
assert.equal(res.cron, undefined)
const { Memory, ...res } = await findProcessMemoryBefore(target)

assert.ok(Memory)
assert.deepStrictEqual(res, {
src: 'file',
moduleId: cachedEval.moduleId,
epoch: cachedEval.epoch,
nonce: cachedEval.nonce,
timestamp: cachedEval.timestamp,
blockHeight: cachedEval.blockHeight,
cron: cachedEval.cron,
ordinate: cachedEval.ordinate
})
})

test('later checkpoint in LRU In-Memory cache', async () => {
Expand All @@ -392,13 +402,19 @@ describe('ao-process', () => {
}
})

const res = await findProcessMemoryBefore(target)

assert.ok(res.Memory)
assert.equal(res.timestamp, cachedEval.timestamp)
assert.equal(res.blockHeight, cachedEval.blockHeight)
assert.equal(res.ordinate, cachedEval.ordinate)
assert.equal(res.cron, undefined)
const { Memory, ...res } = await findProcessMemoryBefore(target)

assert.ok(Memory)
assert.deepStrictEqual(res, {
src: 'file',
moduleId: cachedEval.moduleId,
epoch: cachedEval.epoch,
nonce: cachedEval.nonce,
timestamp: cachedEval.timestamp,
blockHeight: cachedEval.blockHeight,
cron: cachedEval.cron,
ordinate: cachedEval.ordinate
})
})
})

Expand Down Expand Up @@ -437,7 +453,9 @@ describe('ao-process', () => {
node: {
id: 'tx-123',
tags: [
{ name: 'Module', value: `${cachedEval.moduleId}` },
{ name: 'Timestamp', value: `${cachedEval.timestamp}` },
{ name: 'Epoch', value: `${cachedEval.epoch}` },
{ name: 'Nonce', value: `${cachedEval.nonce}` },
{ name: 'Block-Height', value: `${cachedEval.blockHeight}` },
{ name: 'Content-Encoding', value: `${cachedEval.encoding}` }
Expand Down Expand Up @@ -474,13 +492,19 @@ describe('ao-process', () => {

describe('should use if the LRU In-Memory and File checkpoint cannot be used', async () => {
test('no file checkpoint is found or is later than target', async () => {
const res = await findProcessMemoryBefore(target)

assert.ok(res.Memory)
assert.equal(res.timestamp, cachedEval.timestamp)
assert.equal(res.blockHeight, cachedEval.blockHeight)
assert.equal(res.ordinate, cachedEval.ordinate)
assert.equal(res.cron, undefined)
const { Memory, ...res } = await findProcessMemoryBefore(target)

assert.ok(Memory)
assert.deepStrictEqual(res, {
src: 'arweave',
moduleId: cachedEval.moduleId,
epoch: cachedEval.epoch,
nonce: cachedEval.nonce,
timestamp: cachedEval.timestamp,
blockHeight: cachedEval.blockHeight,
cron: cachedEval.cron,
ordinate: cachedEval.ordinate
})
})

test('file checkpoint fails to be downloaded', async () => {
Expand All @@ -494,13 +518,19 @@ describe('ao-process', () => {
}
})

const res = await findProcessMemoryBefore(target)

assert.ok(res.Memory)
assert.equal(res.timestamp, cachedEval.timestamp)
assert.equal(res.blockHeight, cachedEval.blockHeight)
assert.equal(res.ordinate, cachedEval.ordinate)
assert.equal(res.cron, undefined)
const { Memory, ...res } = await findProcessMemoryBefore(target)

assert.ok(Memory)
assert.deepStrictEqual(res, {
src: 'arweave',
moduleId: cachedEval.moduleId,
epoch: cachedEval.epoch,
nonce: cachedEval.nonce,
timestamp: cachedEval.timestamp,
blockHeight: cachedEval.blockHeight,
cron: cachedEval.cron,
ordinate: cachedEval.ordinate
})
})
})

Expand All @@ -522,7 +552,9 @@ describe('ao-process', () => {
...edges[0].node,
id: 'tx-not-encoded',
tags: [
{ name: 'Module', value: `${cachedEval.moduleId}` },
{ name: 'Timestamp', value: `${cachedEval.timestamp}` },
{ name: 'Epoch', value: `${cachedEval.epoch}` },
{ name: 'Nonce', value: `${cachedEval.nonce}` },
{ name: 'Block-Height', value: `${cachedEval.blockHeight}` },
{ name: 'Not-Content-Encoding', value: `${cachedEval.encoding}` }
Expand Down Expand Up @@ -555,7 +587,9 @@ describe('ao-process', () => {
node: {
...edges[0].node,
tags: [
{ name: 'Module', value: `${cachedEval.moduleId}` },
{ name: 'Timestamp', value: `${cachedEval.timestamp + 1000}` },
{ name: 'Epoch', value: `${cachedEval.epoch}` },
{ name: 'Nonce', value: '12' },
{ name: 'Block-Height', value: `${cachedEval.blockHeight}` },
{ name: 'Content-Encoding', value: `${cachedEval.encoding}` }
Expand Down Expand Up @@ -589,7 +623,9 @@ describe('ao-process', () => {
node: {
...edges[0].node,
tags: [
{ name: 'Module', value: `${cachedEval.moduleId}` },
{ name: 'Timestamp', value: `${cachedEval.timestamp + 1000}` },
{ name: 'Epoch', value: `${cachedEval.epoch}` },
{ name: 'Nonce', value: '12' },
{ name: 'Block-Height', value: `${cachedEval.blockHeight}` },
{ name: 'Content-Encoding', value: `${cachedEval.encoding}` }
Expand Down Expand Up @@ -629,7 +665,9 @@ describe('ao-process', () => {
node: {
...edges[0].node,
tags: [
{ name: 'Module', value: `${cachedEval.moduleId}` },
{ name: 'Timestamp', value: `${cachedEval.timestamp + 1000}` },
{ name: 'Epoch', value: `${cachedEval.epoch}` },
{ name: 'Nonce', value: '12' },
{ name: 'Block-Height', value: `${cachedEval.blockHeight}` },
{ name: 'Content-Encoding', value: `${cachedEval.encoding}` }
Expand Down Expand Up @@ -669,8 +707,12 @@ describe('ao-process', () => {
PROCESS_IGNORE_ARWEAVE_CHECKPOINTS: []
}
const COLDSTART = {
src: 'cold_start',
Memory: null,
moduleId: undefined,
timestamp: undefined,
epoch: undefined,
nonce: undefined,
blockHeight: undefined,
cron: undefined,
ordinate: '0'
Expand All @@ -695,7 +737,9 @@ describe('ao-process', () => {
node: {
id: 'tx-123',
tags: [
{ name: 'Module', value: `${cachedEval.moduleId}` },
{ name: 'Timestamp', value: `${cachedEval.timestamp + 11000}` },
{ name: 'Epoch', value: `${cachedEval.epoch}` },
{ name: 'Nonce', value: `${cachedEval.nonce}` },
{ name: 'Block-Height', value: `${cachedEval.blockHeight}` },
{ name: 'Content-Encoding', value: `${cachedEval.encoding}` }
Expand Down
Loading

0 comments on commit 67ca472

Please sign in to comment.