From 92d3de2b2dd15d18f82b89b82f07766c554e0139 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 10 Jan 2025 10:34:17 +0100 Subject: [PATCH] refactor(core): Validate job data on worker (#12548) --- packages/cli/src/scaling/scaling.service.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 536e835c72331..f20d0764c6aa9 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -1,6 +1,6 @@ import { GlobalConfig } from '@n8n/config'; import { Container, Service } from '@n8n/di'; -import { ErrorReporter, InstanceSettings, Logger } from 'n8n-core'; +import { ErrorReporter, InstanceSettings, isObjectLiteral, Logger } from 'n8n-core'; import { ApplicationError, BINARY_ENCODING, @@ -93,6 +93,12 @@ export class ScalingService { void this.queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => { try { + if (!this.hasValidJobData(job)) { + throw new ApplicationError('Worker received invalid job', { + extra: { jobData: jsonStringify(job, { replaceCircularRefs: true }) }, + }); + } + await this.jobProcessor.processJob(job); } catch (error) { await this.reportJobProcessingError(ensureError(error), job); @@ -503,5 +509,9 @@ export class ScalingService { : jsonStringify(error, { replaceCircularRefs: true }); } + private hasValidJobData(job: Job) { + return isObjectLiteral(job.data) && 'executionId' in job.data && 'loadStaticData' in job.data; + } + // #endregion }