From 4cb68353dc26eb5dde953c7478a03557d9c257b3 Mon Sep 17 00:00:00 2001 From: James Zuccon Date: Sat, 28 Dec 2024 19:56:58 +1100 Subject: [PATCH] feat(DB): Chunk SQL Inserts Signed-off-by: James Zuccon --- defaults.env | 5 + src/config.ts | 12 ++ src/db.ts | 453 +++++++++++++++++++++++++++++--------------------- 3 files changed, 282 insertions(+), 188 deletions(-) diff --git a/defaults.env b/defaults.env index f6f87fc..e3fbcde 100644 --- a/defaults.env +++ b/defaults.env @@ -12,6 +12,11 @@ CHAINGRAPH_POSTGRES_MAX_CONNECTIONS= # In real-world testing, this usually reduces the speed of Chaingraph's initial sync, so Chaingraph leaves "synchronous_commit = on" by default. CHAINGRAPH_POSTGRES_SYNCHRONOUS_COMMIT=true +# The target size (in MB) for each INSERT query chunk sent to Postgres. +# During initial sync, the transactions of each block will be inserted into the DB. This value specifies the target size of each INSERT query chunk to prevent excessivley large queries on large blocks. +# Setting to a low value (e.g. 1MB) will allow Chaingraph to perform initial sync on memory constrained devices/servers. +CHAINGRAPH_POSTGRES_INSERT_CHUNK_SIZE_MB=32 + # The target size (in MB) of the buffer which holds downloaded blocks waiting to be saved to the database. This primarily affects memory usage during the initial chain sync. # For best performance, this should be around `CHAINGRAPH_POSTGRES_MAX_CONNECTIONS * maximum block size`, while leaving enough memory available to the host machine. If not set, Chaingraph will measure free memory at startup and attempt to select a reasonable value. CHAINGRAPH_BLOCK_BUFFER_TARGET_SIZE_MB= diff --git a/src/config.ts b/src/config.ts index 46f51ff..66b9288 100644 --- a/src/config.ts +++ b/src/config.ts @@ -44,6 +44,7 @@ const expectedOptions = [ 'CHAINGRAPH_POSTGRES_CONNECTION_STRING', 'CHAINGRAPH_POSTGRES_MAX_CONNECTIONS', 'CHAINGRAPH_POSTGRES_SYNCHRONOUS_COMMIT', + 'CHAINGRAPH_POSTGRES_INSERT_CHUNK_SIZE_MB', 'CHAINGRAPH_TRUSTED_NODES', 'CHAINGRAPH_USER_AGENT', 'NODE_ENV', @@ -362,6 +363,17 @@ const chaingraphUserAgent = const postgresSynchronousCommit = configuration.CHAINGRAPH_POSTGRES_SYNCHRONOUS_COMMIT !== 'false'; +/** + * Set via the `CHAINGRAPH_POSTGRES_INSERT_CHUNK_SIZE_MB` environment variable. + */ +const postgresInsertChunkSizeMbValue = Number( + configuration.CHAINGRAPH_POSTGRES_INSERT_CHUNK_SIZE_MB +); +if (isNaN(postgresInsertChunkSizeMbValue)) { + // eslint-disable-next-line functional/no-throw-statement + throw new Error('CHAINGRAPH_POSTGRES_INSERT_CHUNK_SIZE_MB must be a number.'); +} + /** * `true` if the `NODE_ENV` environment variable is `production`. */ diff --git a/src/db.ts b/src/db.ts index 8c62d18..896c453 100644 --- a/src/db.ts +++ b/src/db.ts @@ -7,6 +7,7 @@ import { } from './components/db-utils.js'; import { postgresConnectionString, + postgresInsertChunkSizeMbValue, postgresMaxConnections, postgresSynchronousCommit, } from './config.js'; @@ -266,48 +267,73 @@ export const recordNodeValidation = async ( }; /** - * Save a block to the database, inserting all transactions which aren't already - * known to exist in the database. (This method should only be used for blocks - * which are not already saved to the database.) - * - * Note: this method trusts its input, and data is not sanitized. (Because all - * inserted data is of type `number`, `boolean`, or `Uint8Array`, we assume SQL - * injections are not a concern.) + * Estimates the size of a transaction for batching purposes */ -export const saveBlock = async ({ - block, - nodeAcceptances, - transactionCache, -}: { - block: ChaingraphBlock; - nodeAcceptances: { - nodeInternalId: number; - acceptedAt: Date | null; - nodeName: string; - }[]; - transactionCache: Agent['transactionCache']; -}) => { - const blockTransactions = block.transactions.reduce<{ - /** - * Transactions known to be successfully saved to the database. - */ - alreadySaved: ChaingraphTransaction[]; - /** - * Transactions in the block which aren't yet known to be saved to the - * database. These must be saved before the block can be saved. - */ - unknown: ChaingraphTransaction[]; - }>( - (transactions, transaction) => { - // eslint-disable-next-line @typescript-eslint/no-unused-expressions - transactionCache.has(transaction.hash) - ? transactions.alreadySaved.push(transaction) - : transactions.unknown.push(transaction); - return transactions; - }, - { alreadySaved: [], unknown: [] } - ); +const estimateTransactionSize = ( + transaction: ChaingraphTransaction +): number => { + // Base size for transaction fields (hash, version, locktime, etc.) + let size = 100; + + // Add estimated size for each input + size += transaction.inputs.reduce((total, input) => { + return total + input.unlockingBytecode.length + 100; // account for other input fields + }, 0); + + // Add estimated size for each output + size += transaction.outputs.reduce((total, output) => { + return ( + total + + output.lockingBytecode.length + + (output.tokenCategory?.length || 0) + + (output.nonfungibleTokenCommitment?.length || 0) + + 100 + ); // account for other output fields + }, 0); + + return size; +}; + +/** + * Splits transactions into batches targeting the specified chunk size per batch + */ +const createTransactionBatches = ( + transactions: ChaingraphTransaction[] +): ChaingraphTransaction[][] => { + const TARGET_BATCH_SIZE = postgresInsertChunkSizeMbValue * 1024; + const batches: ChaingraphTransaction[][] = []; + let currentBatch: ChaingraphTransaction[] = []; + let currentBatchSize = 0; + + for (const transaction of transactions) { + const transactionSize = estimateTransactionSize(transaction); + + if ( + currentBatchSize + transactionSize > TARGET_BATCH_SIZE && + currentBatch.length > 0 + ) { + batches.push(currentBatch); + currentBatch = []; + currentBatchSize = 0; + } + + currentBatch.push(transaction); + currentBatchSize += transactionSize; + } + if (currentBatch.length > 0) { + batches.push(currentBatch); + } + + return batches; +}; + +/** + * Creates the SQL for saving a batch of transactions + */ +const createBatchTransactionsSql = ( + transactions: ChaingraphTransaction[] +): string => { const inputs: { inputIndex: number; transactionHash: string; @@ -319,7 +345,7 @@ export const saveBlock = async ({ content: ChaingraphTransaction['outputs'][number]; }[] = []; - blockTransactions.unknown.forEach((transaction) => { + transactions.forEach((transaction) => { inputs.push( ...transaction.inputs.map((content, inputIndex) => ({ content, @@ -336,156 +362,207 @@ export const saveBlock = async ({ ); }); - const addAllTransactions = /* sql */ ` -WITH unknown_transaction_values (hash, version, locktime, size_bytes, is_coinbase) AS ( - VALUES ${blockTransactions.unknown - .map( - (transaction) => - `('${hexToByteaString(transaction.hash)}'::bytea, ${ - transaction.version - }::bigint, ${transaction.locktime}::bigint, ${ - transaction.sizeBytes - }::bigint, ${transaction.isCoinbase.toString()}::boolean)` + return /* sql */ ` + WITH unknown_transaction_values (hash, version, locktime, size_bytes, is_coinbase) AS ( + VALUES ${transactions + .map( + (transaction) => + `('${hexToByteaString(transaction.hash)}'::bytea, ${ + transaction.version + }::bigint, ${transaction.locktime}::bigint, ${ + transaction.sizeBytes + }::bigint, ${transaction.isCoinbase.toString()}::boolean)` + ) + .join(',')} + ), + unknown_input_values (transaction_hash, input_index, outpoint_index, sequence_number, outpoint_transaction_hash, unlocking_bytecode) AS ( + VALUES ${inputs + .map( + (input) => + `('${hexToByteaString(input.transactionHash)}'::bytea, ${ + input.inputIndex + }::bigint, ${input.content.outpointIndex}::bigint, ${ + input.content.sequenceNumber + }::bigint, '${hexToByteaString( + input.content.outpointTransactionHash + )}'::bytea, '${hexToByteaString( + input.content.unlockingBytecode + )}'::bytea)` + ) + .join(',')} + ), + unknown_output_values (transaction_hash, output_index, value_satoshis, locking_bytecode, token_category, fungible_token_amount, nonfungible_token_capability, nonfungible_token_commitment) AS ( + VALUES ${outputs + .map( + (output) => + `('${hexToByteaString(output.transactionHash)}'::bytea, ${ + output.outputIndex + }::bigint, ${output.content.valueSatoshis.toString()}::bigint, '${hexToByteaString( + output.content.lockingBytecode + )}'::bytea, ${ + output.content.tokenCategory === undefined + ? 'NULL::bytea' + : `'${hexToByteaString(output.content.tokenCategory)}'::bytea` + }, ${ + output.content.fungibleTokenAmount === undefined + ? 'NULL::bigint' + : `${output.content.fungibleTokenAmount.toString()}::bigint` + }, ${ + output.content.nonfungibleTokenCapability === undefined + ? 'NULL::enum_nonfungible_token_capability' + : `'${output.content.nonfungibleTokenCapability}'::enum_nonfungible_token_capability` + }, ${ + output.content.nonfungibleTokenCommitment === undefined + ? 'NULL::bytea' + : `'${hexToByteaString( + output.content.nonfungibleTokenCommitment + )}'::bytea` + })` + ) + .join(',')} + ), + newly_saved_transactions (hash, internal_id) AS ( + INSERT INTO transaction (hash, version, locktime, size_bytes, is_coinbase) + SELECT hash, version, locktime, size_bytes, is_coinbase FROM unknown_transaction_values + ON CONFLICT ON CONSTRAINT "transaction_hash_key" DO NOTHING + RETURNING hash, internal_id + ), + newly_saved_outputs AS ( + INSERT INTO output (transaction_hash, output_index, value_satoshis, locking_bytecode, token_category, fungible_token_amount, nonfungible_token_capability, nonfungible_token_commitment) + SELECT transaction_hash, output_index, value_satoshis, locking_bytecode, token_category::bytea, fungible_token_amount::bigint, nonfungible_token_capability::enum_nonfungible_token_capability, nonfungible_token_commitment::bytea FROM unknown_output_values + WHERE transaction_hash IN (SELECT hash FROM newly_saved_transactions) + ), + newly_saved_inputs AS ( + INSERT INTO input (transaction_internal_id, input_index, outpoint_index, sequence_number, outpoint_transaction_hash, unlocking_bytecode) + SELECT internal_id, input_index, outpoint_index, sequence_number, outpoint_transaction_hash, unlocking_bytecode + FROM unknown_input_values val INNER JOIN newly_saved_transactions txs ON val.transaction_hash = txs.hash ) - .join(',')} -), -unknown_input_values (transaction_hash, input_index, outpoint_index, sequence_number, outpoint_transaction_hash, unlocking_bytecode) AS ( - VALUES ${inputs - .map( - (input) => - `('${hexToByteaString(input.transactionHash)}'::bytea, ${ - input.inputIndex - }::bigint, ${input.content.outpointIndex}::bigint, ${ - input.content.sequenceNumber - }::bigint, '${hexToByteaString( - input.content.outpointTransactionHash - )}'::bytea, '${hexToByteaString( - input.content.unlockingBytecode - )}'::bytea)` - ) - .join(',')} -), -unknown_output_values (transaction_hash, output_index, value_satoshis, locking_bytecode, token_category, fungible_token_amount, nonfungible_token_capability, nonfungible_token_commitment) AS ( - VALUES ${outputs - .map( - (output) => - `('${hexToByteaString(output.transactionHash)}'::bytea, ${ - output.outputIndex - }::bigint, ${output.content.valueSatoshis.toString()}::bigint, '${hexToByteaString( - output.content.lockingBytecode - )}'::bytea, ${ - output.content.tokenCategory === undefined - ? 'NULL::bytea' - : `'${hexToByteaString(output.content.tokenCategory)}'::bytea` - }, ${ - output.content.fungibleTokenAmount === undefined - ? 'NULL::bigint' - : `${output.content.fungibleTokenAmount.toString()}::bigint` - }, ${ - output.content.nonfungibleTokenCapability === undefined - ? 'NULL::enum_nonfungible_token_capability' - : `'${output.content.nonfungibleTokenCapability}'::enum_nonfungible_token_capability` - }, ${ - output.content.nonfungibleTokenCommitment === undefined - ? 'NULL::bytea' - : `'${hexToByteaString( - output.content.nonfungibleTokenCommitment - )}'::bytea` - })` - ) - .join(',')} -), -newly_saved_transactions (hash, internal_id) AS ( - INSERT INTO transaction (hash, version, locktime, size_bytes, is_coinbase) - SELECT hash, version, locktime, size_bytes, is_coinbase FROM unknown_transaction_values - ON CONFLICT ON CONSTRAINT "transaction_hash_key" DO NOTHING - RETURNING hash, internal_id -), -newly_saved_outputs AS ( - INSERT INTO output (transaction_hash, output_index, value_satoshis, locking_bytecode, token_category, fungible_token_amount, nonfungible_token_capability, nonfungible_token_commitment) - SELECT transaction_hash, output_index, value_satoshis, locking_bytecode, token_category::bytea, fungible_token_amount::bigint, nonfungible_token_capability::enum_nonfungible_token_capability, nonfungible_token_commitment::bytea FROM unknown_output_values - WHERE transaction_hash IN (SELECT hash FROM newly_saved_transactions) -), -newly_saved_inputs AS ( - INSERT INTO input (transaction_internal_id, input_index, outpoint_index, sequence_number, outpoint_transaction_hash, unlocking_bytecode) - SELECT internal_id, input_index, outpoint_index, sequence_number, outpoint_transaction_hash, unlocking_bytecode - FROM unknown_input_values val INNER JOIN newly_saved_transactions txs ON val.transaction_hash = txs.hash -) -SELECT COUNT(*) FROM newly_saved_transactions;`; + SELECT COUNT(*) FROM newly_saved_transactions; + `; +}; - /** - * TODO: perf – consider baking this into `addAllTransactions` to avoid re-sending the list of transaction hashes? - * TODO: perf – consider batching blocks during initial sync (targeting 100KB to 1MB queries) - * TODO: perf – use prepared statements - */ - const addBlockQuery = /* sql */ ` -WITH transactions_in_block (hash, transaction_index) AS ( - VALUES ${block.transactions - .map( - (transaction, index) => - `('${hexToByteaString(transaction.hash)}'::bytea, ${index}::bigint)` - ) - .join(',')} -), -accepting_nodes (node_internal_id, accepted_at) AS ( - VALUES ${nodeAcceptances - .map( - (acceptance) => - `(${acceptance.nodeInternalId}, ${ - acceptance.acceptedAt === null - ? 'NULL::timestamp' - : dateToTimestampWithoutTimezone(acceptance.acceptedAt) - })` - ) - .join(',')} -), -joined_transactions (internal_id, transaction_index) AS ( - SELECT db.internal_id, val.transaction_index - FROM transaction db INNER JOIN transactions_in_block val ON val.hash = db.hash -), -inserted_block (internal_id) AS ( - INSERT INTO block (height, version, timestamp, hash, previous_block_hash, merkle_root, bits, nonce, size_bytes) - VALUES (${block.height}, ${block.version}, ${block.timestamp}, - '${hexToByteaString(block.hash)}'::bytea, - '${hexToByteaString(block.previousBlockHash)}'::bytea, - '${hexToByteaString(block.merkleRoot)}'::bytea, - ${block.bits}::bigint, ${block.nonce}::bigint, ${block.sizeBytes}::bigint) - ON CONFLICT ON CONSTRAINT "block_hash_key" DO NOTHING - RETURNING internal_id -), -inserted_block_transactions AS ( - INSERT INTO block_transaction (block_internal_id, transaction_internal_id, transaction_index) - SELECT blk.internal_id, tx.internal_id, tx.transaction_index - FROM inserted_block blk CROSS JOIN joined_transactions tx -), -new_or_existing_block (internal_id) AS ( - SELECT COALESCE ( - (SELECT internal_id FROM inserted_block), - (SELECT internal_id FROM block WHERE block.hash = '${hexToByteaString( - block.hash - )}'::bytea) - ) -) -INSERT INTO node_block (node_internal_id, block_internal_id, accepted_at) - SELECT node.node_internal_id, blk.internal_id, node.accepted_at - FROM new_or_existing_block blk CROSS JOIN accepting_nodes node - ON CONFLICT ON CONSTRAINT "node_block_pkey" DO NOTHING`; - const client = await pool.connect(); - await client.query('BEGIN;'); - const saveTransactionsResult = await client.query<{ count: string }>( - addAllTransactions +/** + * Save a block to the database, inserting all transactions which aren't already + * known to exist in the database. Transactions are processed in ~1MB batches. + */ +export const saveBlock = async ({ + block, + nodeAcceptances, + transactionCache, +}: { + block: ChaingraphBlock; + nodeAcceptances: { + nodeInternalId: number; + acceptedAt: Date | null; + nodeName: string; + }[]; + transactionCache: Agent['transactionCache']; +}) => { + const blockTransactions = block.transactions.reduce<{ + alreadySaved: ChaingraphTransaction[]; + unknown: ChaingraphTransaction[]; + }>( + (transactions, transaction) => { + transactionCache.has(transaction.hash) + ? transactions.alreadySaved.push(transaction) + : transactions.unknown.push(transaction); + return transactions; + }, + { alreadySaved: [], unknown: [] } ); - const attemptedSavedTransactions = blockTransactions.unknown; - const savedTransactionCount = Number(saveTransactionsResult.rows[0]!.count); - const transactionCacheMisses = - attemptedSavedTransactions.length - savedTransactionCount; - await client.query(addBlockQuery); - await client.query('COMMIT;'); - client.release(); - return { - attemptedSavedTransactions, - transactionCacheMisses, - }; + + // Split unknown transactions into ~1MB batches + const transactionBatches = createTransactionBatches( + blockTransactions.unknown + ); + + const client = await pool.connect(); + try { + await client.query('BEGIN;'); + + let totalSavedTransactions = 0; + // Process each batch of transactions + for (const batch of transactionBatches) { + const batchQuery = createBatchTransactionsSql(batch); + const batchResult = await client.query<{ count: string }>(batchQuery); + totalSavedTransactions += Number(batchResult.rows[0]!.count); + } + + // Save the block and link it with transactions + const addBlockQuery = /* sql */ ` + WITH transactions_in_block (hash, transaction_index) AS ( + VALUES ${block.transactions + .map( + (transaction, index) => + `('${hexToByteaString( + transaction.hash + )}'::bytea, ${index}::bigint)` + ) + .join(',')} + ), + accepting_nodes (node_internal_id, accepted_at) AS ( + VALUES ${nodeAcceptances + .map( + (acceptance) => + `(${acceptance.nodeInternalId}, ${ + acceptance.acceptedAt === null + ? 'NULL::timestamp' + : dateToTimestampWithoutTimezone(acceptance.acceptedAt) + })` + ) + .join(',')} + ), + joined_transactions (internal_id, transaction_index) AS ( + SELECT db.internal_id, val.transaction_index + FROM transaction db INNER JOIN transactions_in_block val ON val.hash = db.hash + ), + inserted_block (internal_id) AS ( + INSERT INTO block (height, version, timestamp, hash, previous_block_hash, merkle_root, bits, nonce, size_bytes) + VALUES (${block.height}, ${block.version}, ${block.timestamp}, + '${hexToByteaString(block.hash)}'::bytea, + '${hexToByteaString(block.previousBlockHash)}'::bytea, + '${hexToByteaString(block.merkleRoot)}'::bytea, + ${block.bits}::bigint, ${block.nonce}::bigint, ${ + block.sizeBytes + }::bigint) + ON CONFLICT ON CONSTRAINT "block_hash_key" DO NOTHING + RETURNING internal_id + ), + inserted_block_transactions AS ( + INSERT INTO block_transaction (block_internal_id, transaction_internal_id, transaction_index) + SELECT blk.internal_id, tx.internal_id, tx.transaction_index + FROM inserted_block blk CROSS JOIN joined_transactions tx + ), + new_or_existing_block (internal_id) AS ( + SELECT COALESCE ( + (SELECT internal_id FROM inserted_block), + (SELECT internal_id FROM block WHERE block.hash = '${hexToByteaString( + block.hash + )}'::bytea) + ) + ) + INSERT INTO node_block (node_internal_id, block_internal_id, accepted_at) + SELECT node.node_internal_id, blk.internal_id, node.accepted_at + FROM new_or_existing_block blk CROSS JOIN accepting_nodes node + ON CONFLICT ON CONSTRAINT "node_block_pkey" DO NOTHING`; + + await client.query(addBlockQuery); + await client.query('COMMIT;'); + + const attemptedSavedTransactions = blockTransactions.unknown; + const transactionCacheMisses = + attemptedSavedTransactions.length - totalSavedTransactions; + + return { + attemptedSavedTransactions, + transactionCacheMisses, + }; + } catch (error) { + await client.query('ROLLBACK;'); + throw error; + } finally { + client.release(); + } }; /**