diff --git a/control-plane/docker-compose.dev.yml b/control-plane/docker-compose.dev.yml index 52961c89..0a350d3d 100644 --- a/control-plane/docker-compose.dev.yml +++ b/control-plane/docker-compose.dev.yml @@ -45,6 +45,7 @@ configs: queues { run-process {} run-generate-name {} + email-ingestion {} customer-telemetry {} external-tool-call {} } diff --git a/control-plane/scripts/sendTestEmail.sh b/control-plane/scripts/sendTestEmail.sh new file mode 100755 index 00000000..104409b4 --- /dev/null +++ b/control-plane/scripts/sendTestEmail.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +MESSAGE_BODY=$(cat <\"},{\"name\":\"Received\",\"value\":\"from mr85p00im-ztdg06011901.me.com (mr85p00im-ztdg06011901.me.com [17.58.23.198]) by inbound-smtp.us-east-1.amazonaws.com with SMTP id io5akr05ppl6s5095kjm6p5rusbbsaode3gn6j81 for test@run.inferable.ai; Tue, 31 Dec 2024 04:15:55 +0000 (UTC)\"},{\"name\":\"X-SES-Spam-Verdict\",\"value\":\"PASS\"},{\"name\":\"X-SES-Virus-Verdict\",\"value\":\"PASS\"},{\"name\":\"Received-SPF\",\"value\":\"pass (spfCheck: domain of icloud.com designates 17.58.23.198 as permitted sender) client-ip=17.58.23.198; envelope-from=john@johnjcsmith.com; helo=mr85p00im-ztdg06011901.me.com;\"},{\"name\":\"Authentication-Results\",\"value\":\"amazonses.com; spf=pass (spfCheck: domain of icloud.com designates 17.58.23.198 as permitted sender) client-ip=17.58.23.198; envelope-from=john@johnjcsmith.com; helo=mr85p00im-ztdg06011901.me.com; dkim=fail header.i=@johnjcsmith.com; dmarc=none header.from=johnjcsmith.com;\"},{\"name\":\"X-SES-RECEIPT\",\"value\":\"AEFBQUFBQUFBQUFHWTBhd3Fsa3ZBdE9GNnJUbWw0ZDFEemdKSlRuNUxNMWkveGFwMGtiVTBzanVNdWE5eEhodFhHM1MrWVhkYnVSN2FDVFNGQ0NIdlQ4V1pjdDVOQVVQL2dPMU9nUHQ0a1dQbEdRdUJqZE94NWt3Ri9XTEtXWlllYkhZVW9TTkZsUHZITUcxZ0VPb255OU9WT3pWS0dsUkZ5L0I4V2ltV3JJNmxVdEJWMW1WK0pYalBHT2RyUi9nVUpEZFNaei9paWl5L3Z6eUtNMm9IZ2hDSlZvSEZGcXlqTmhKV2d4QmgyYjlhR2VkUGcvbVVlZStHT29wak9zbWg0Q0g2MDhBbXlibkFWUXlZZGdYOEVhQ1ppckY1enlPUjZUM05JU050Q3o5b3lvMnpHRGJad3c9PQ==\"},{\"name\":\"X-SES-DKIM-SIGNATURE\",\"value\":\"a=rsa-sha256; q=dns/txt; b=azfbEksYORI2UHG1unhG3Lv+PKFEbkyjxwQjM940Lb1HPrIthv4Ip3Q4tjqsadGt09/NsEz/pZ5eU9mwFPwtKQOuTyvKe52BMD2rnBcCpdpWetHqPqxv7vMgCS10eInfQSipSibMJRzhGBTQ/l/wV+1zlGywIpVue+md5ySyL0I=; c=relaxed/simple; s=ug7nbtf4gccmlpwj322ax3p6ow6yfsug; d=amazonses.com; t=1735618556; v=1; bh=frcCV1k9oG9oKj3dpUqdJg1PxRT2RSN/XKdLCPjaYaY=; h=From:To:Cc:Bcc:Subject:Date:Message-ID:MIME-Version:Content-Type:X-SES-RECEIPT;\"},{\"name\":\"DKIM-Signature\",\"value\":\"v=1; a=rsa-sha256; c=relaxed/relaxed; d=johnjcsmith.com; s=sig1; t=1735618554; bh=47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=; h=From:Content-Type:Mime-Version:Subject:Message-Id:Date:To:x-icloud-hme; b=okadMTLg+ro8E2nndzhy3joxbL5YO33ubsa7NBcQXXxQnGGjQ95gErjP5iAciJAR5/+4LdUiYrfZHg/cLfSR+ZIwZ6/CQsQ3VYZZh5X/klbTLlfNurDo2SwwAAEDwnocwIQ1BF2PCBIbL0yaqmkXKw0+7RxAsKc5pn0dp2rrHhr6d3lLN2sU+EBnKVcZgt0yv7HSe5goghjEmy9+0VcQNwIySZhko4cV7l7po93m46rUY5Wi2k2Yh/vzL777vbLIxNtPwp4eRL46ucea8J9EA91uSHgyLom1TPQyXPIqA7/DYXyolC4xJxUGSNPw45NfYWs4mZAZpmc1sOSf5xHCFA==\"},{\"name\":\"Received\",\"value\":\"from smtpclient.apple (mr38p00im-dlb-asmtp-mailmevip.me.com [17.57.152.18]) by mr85p00im-ztdg06011901.me.com (Postfix) with ESMTPSA id DC5C41349C75 for ; Tue, 31 Dec 2024 04:15:52 +0000 (UTC)\"},{\"name\":\"From\",\"value\":\"john@johnjcsmith.com\"},{\"name\":\"Content-Type\",\"value\":\"text/plain\"},{\"name\":\"Content-Transfer-Encoding\",\"value\":\"7bit\"},{\"name\":\"Mime-Version\",\"value\":\"1.0 (Mac OS X Mail 16.0 \\\\(3826.300.87.4.3\\\\))\"},{\"name\":\"Subject\",\"value\":\"This is a test\"},{\"name\":\"Message-Id\",\"value\":\"<93FC27CD-9054-4BB5-ADA9-C9CB425D3844@johnjcsmith.com>\"},{\"name\":\"Date\",\"value\":\"Tue, 31 Dec 2024 14:45:38 +1030\"},{\"name\":\"To\",\"value\":\"test@run.inferable.ai\"},{\"name\":\"X-Mailer\",\"value\":\"Apple Mail (2.3826.300.87.4.3)\"},{\"name\":\"X-Proofpoint-GUID\",\"value\":\"1yPlQIuJlVJxiCqK-1kin_Sns19OcPdQ\"},{\"name\":\"X-Proofpoint-ORIG-GUID\",\"value\":\"1yPlQIuJlVJxiCqK-1kin_Sns19OcPdQ\"},{\"name\":\"X-Proofpoint-Virus-Version\",\"value\":\"\"},{\"name\":\"X-Proofpoint-Spam-Details\",\"value\":\"rule=notspam policy=default score=0 phishscore=0 malwarescore=0 spamscore=0 bulkscore=0 adultscore=0 mlxlogscore=343 suspectscore=0 mlxscore=0 clxscore=1030 classifier=spam adjust=0 reason=mlx scancount=1 engine=8.19.0-2308100000 definitions=main-2412310033\"}],\"commonHeaders\":{\"returnPath\":\"john@johnjcsmith.com\",\"from\":[\"john@johnjcsmith.com\"],\"date\":\"Tue, 31 Dec 2024 14:45:38 +1030\",\"to\":[\"test@run.inferable.ai\"],\"messageId\":\"<93FC27CD-9054-4BB5-ADA9-C9CB425D3844@johnjcsmith.com>\",\"subject\":\"This is a test\"}},\"receipt\":{\"timestamp\":\"2024-12-31T04:15:55.348Z\",\"processingTimeMillis\":764,\"recipients\":[\"test@run.inferable.ai\"],\"spamVerdict\":{\"status\":\"PASS\"},\"virusVerdict\":{\"status\":\"PASS\"},\"spfVerdict\":{\"status\":\"PASS\"},\"dkimVerdict\":{\"status\":\"FAIL\"},\"dmarcVerdict\":{\"status\":\"GRAY\"},\"action\":{\"type\":\"SNS\",\"topicArn\":\"arn:aws:sns:us-east-1:590183853800:email-ingestion-topic\",\"encoding\":\"UTF8\"}},\"content\":\"Return-Path: \\r\\nReceived: from mr85p00im-ztdg06011901.me.com (mr85p00im-ztdg06011901.me.com [17.58.23.198])\\r\\n by inbound-smtp.us-east-1.amazonaws.com with SMTP id io5akr05ppl6s5095kjm6p5rusbbsaode3gn6j81\\r\\n for test@run.inferable.ai;\\r\\n Tue, 31 Dec 2024 04:15:55 +0000 (UTC)\\r\\nX-SES-Spam-Verdict: PASS\\r\\nX-SES-Virus-Verdict: PASS\\r\\nReceived-SPF: pass (spfCheck: domain of icloud.com designates 17.58.23.198 as permitted sender) client-ip=17.58.23.198; envelope-from=john@johnjcsmith.com; helo=mr85p00im-ztdg06011901.me.com;\\r\\nAuthentication-Results: amazonses.com;\\r\\n spf=pass (spfCheck: domain of icloud.com designates 17.58.23.198 as permitted sender) client-ip=17.58.23.198; envelope-from=john@johnjcsmith.com; helo=mr85p00im-ztdg06011901.me.com;\\r\\n dkim=fail header.i=@johnjcsmith.com;\\r\\n dmarc=none header.from=johnjcsmith.com;\\r\\nX-SES-RECEIPT: AEFBQUFBQUFBQUFHWTBhd3Fsa3ZBdE9GNnJUbWw0ZDFEemdKSlRuNUxNMWkveGFwMGtiVTBzanVNdWE5eEhodFhHM1MrWVhkYnVSN2FDVFNGQ0NIdlQ4V1pjdDVOQVVQL2dPMU9nUHQ0a1dQbEdRdUJqZE94NWt3Ri9XTEtXWlllYkhZVW9TTkZsUHZITUcxZ0VPb255OU9WT3pWS0dsUkZ5L0I4V2ltV3JJNmxVdEJWMW1WK0pYalBHT2RyUi9nVUpEZFNaei9paWl5L3Z6eUtNMm9IZ2hDSlZvSEZGcXlqTmhKV2d4QmgyYjlhR2VkUGcvbVVlZStHT29wak9zbWg0Q0g2MDhBbXlibkFWUXlZZGdYOEVhQ1ppckY1enlPUjZUM05JU050Q3o5b3lvMnpHRGJad3c9PQ==\\r\\nX-SES-DKIM-SIGNATURE: a=rsa-sha256; q=dns/txt; b=azfbEksYORI2UHG1unhG3Lv+PKFEbkyjxwQjM940Lb1HPrIthv4Ip3Q4tjqsadGt09/NsEz/pZ5eU9mwFPwtKQOuTyvKe52BMD2rnBcCpdpWetHqPqxv7vMgCS10eInfQSipSibMJRzhGBTQ/l/wV+1zlGywIpVue+md5ySyL0I=; c=relaxed/simple; s=ug7nbtf4gccmlpwj322ax3p6ow6yfsug; d=amazonses.com; t=1735618556; v=1; bh=frcCV1k9oG9oKj3dpUqdJg1PxRT2RSN/XKdLCPjaYaY=; h=From:To:Cc:Bcc:Subject:Date:Message-ID:MIME-Version:Content-Type:X-SES-RECEIPT;\\r\\nDKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=johnjcsmith.com;\\r\\n\\ts=sig1; t=1735618554;\\r\\n\\tbh=47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=;\\r\\n\\th=From:Content-Type:Mime-Version:Subject:Message-Id:Date:To:\\r\\n\\t x-icloud-hme;\\r\\n\\tb=okadMTLg+ro8E2nndzhy3joxbL5YO33ubsa7NBcQXXxQnGGjQ95gErjP5iAciJAR5\\r\\n\\t /+4LdUiYrfZHg/cLfSR+ZIwZ6/CQsQ3VYZZh5X/klbTLlfNurDo2SwwAAEDwnocwIQ\\r\\n\\t 1BF2PCBIbL0yaqmkXKw0+7RxAsKc5pn0dp2rrHhr6d3lLN2sU+EBnKVcZgt0yv7HSe\\r\\n\\t 5goghjEmy9+0VcQNwIySZhko4cV7l7po93m46rUY5Wi2k2Yh/vzL777vbLIxNtPwp4\\r\\n\\t eRL46ucea8J9EA91uSHgyLom1TPQyXPIqA7/DYXyolC4xJxUGSNPw45NfYWs4mZAZp\\r\\n\\t mc1sOSf5xHCFA==\\r\\nReceived: from smtpclient.apple (mr38p00im-dlb-asmtp-mailmevip.me.com [17.57.152.18])\\r\\n\\tby mr85p00im-ztdg06011901.me.com (Postfix) with ESMTPSA id DC5C41349C75\\r\\n\\tfor ; Tue, 31 Dec 2024 04:15:52 +0000 (UTC)\\r\\nFrom: john@johnjcsmith.com\\r\\nContent-Type: text/plain\\r\\nContent-Transfer-Encoding: 7bit\\r\\nMime-Version: 1.0 (Mac OS X Mail 16.0 \\\\(3826.300.87.4.3\\\\))\\r\\nSubject: This is a test\\r\\nMessage-Id: <93FC27CD-9054-4BB5-ADA9-C9CB425D3844@johnjcsmith.com>\\r\\nDate: Tue, 31 Dec 2024 14:45:38 +1030\\r\\nTo: test@run.inferable.ai\\r\\nX-Mailer: Apple Mail (2.3826.300.87.4.3)\\r\\nX-Proofpoint-GUID: 1yPlQIuJlVJxiCqK-1kin_Sns19OcPdQ\\r\\nX-Proofpoint-ORIG-GUID: 1yPlQIuJlVJxiCqK-1kin_Sns19OcPdQ\\r\\nX-Proofpoint-Virus-Version:\\r\\nX-Proofpoint-Spam-Details: rule=notspam policy=default score=0 phishscore=0 malwarescore=0 spamscore=0\\r\\n bulkscore=0 adultscore=0 mlxlogscore=343 suspectscore=0 mlxscore=0\\r\\n clxscore=1030 classifier=spam adjust=0 reason=mlx scancount=1\\r\\n engine=8.19.0-2308100000 definitions=main-2412310033\\r\\n\\r\\n\"}", + "Timestamp" : "2024-12-31T04:15:56.138Z", + "SignatureVersion" : "1", + "Signature" : "SOtyVndg0O6ldUwxRYsYORfRQF2mjz8HoEBGrR79bqO1Py7wLFJgSVABQFBVxEIypZZ6HGRFw57FT7V5BnSM5zbKfep0ERyYhcBaSHg9dW9bPsDYZqDHxXxcjlILOYepALZ9Fiy2Trk5Rw2bFZMsbsgIIXx/YcuFKw3zYwq7r062fVm6+y3EHgS8fK5w88cGjChEM4ktbCUQKWtzdQYdOLLlHmAnf8pScsVx7M2SpCdfZRprfNWgmiNDXvDpSHCYtcoi2iEVMtLYDxGFtE0m9ZJjbIOPTlowEkAp/m/y/6MB0/+Fvv1UbOghjpgUiIESu8CWMpcwKgZvYLcDsMNqCg==", + "SigningCertURL" : "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-9c6465fa7f48f5cacd23014631ec1136.pem", + "UnsubscribeURL" : "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:590183853800:email-ingestion-topic:1b1ec808-c78a-43fc-a08a-e19b3be0cd7d" +} +EOF +) + +aws --region=us-west-2 --endpoint=http://localhost:9324 sqs send-message --queue-url http://localhost:9324/000000000000/email-ingestion --message-body "$MESSAGE_BODY" diff --git a/control-plane/src/index.ts b/control-plane/src/index.ts index 92a76586..91d3c734 100644 --- a/control-plane/src/index.ts +++ b/control-plane/src/index.ts @@ -13,6 +13,7 @@ import * as redis from "./modules/redis"; import * as toolhouse from "./modules/integrations/toolhouse"; import * as externalCalls from "./modules/jobs/external"; import * as models from "./modules/models/routing"; +import * as email from "./modules/email"; import { logContext, logger } from "./modules/observability/logger"; import * as workflows from "./modules/workflows/workflows"; import * as slack from "./modules/integrations/slack"; @@ -146,11 +147,15 @@ const startTime = Date.now(); workflows.start(), models.start(), redis.start(), - customerTelemetry.start(), - toolhouse.start(), - externalCalls.start(), slack.start(app), - ...(env.EE_DEPLOYMENT ? [flagsmith?.getEnvironmentFlags(), analytics.start()] : []), + ...(env.EE_DEPLOYMENT ? [ + customerTelemetry.start(), + toolhouse.start(), + externalCalls.start(), + email.start(), + flagsmith?.getEnvironmentFlags(), + analytics.start() + ] : []), ]) .then(() => { logger.info("Dependencies started", { latency: Date.now() - startTime }); @@ -190,6 +195,7 @@ process.on("SIGTERM", async () => { customerTelemetry.stop(), externalCalls.stop(), slack.stop(), + email.stop(), ]); logger.info("Shutdown complete"); diff --git a/control-plane/src/modules/email/index.ts b/control-plane/src/modules/email/index.ts new file mode 100644 index 00000000..014a772c --- /dev/null +++ b/control-plane/src/modules/email/index.ts @@ -0,0 +1,131 @@ +import { Consumer } from "sqs-consumer"; +import { env } from "../../utilities/env"; +import { sqs } from "../sqs"; +import { z } from "zod"; +import { Message } from "@aws-sdk/client-sqs"; +import { logger } from "../observability/logger"; +import { safeParse } from "../../utilities/safe-parse"; + +const sesMessageSchema = z.object({ + notificationType: z.string(), + mail: z.object({ + timestamp: z.string().datetime(), + source: z.string().email(), + messageId: z.string(), + destination: z.array(z.string().email()), + headersTruncated: z.boolean(), + headers: z.array( + z.object({ + name: z.string(), + value: z.string(), + }) + ), + commonHeaders: z.object({ + returnPath: z.string().email(), + from: z.array(z.string().email()), + date: z.string(), + to: z.array(z.string().email()), + messageId: z.string(), + subject: z.string(), + }), + }), + receipt: z.object({ + timestamp: z.string().datetime(), + processingTimeMillis: z.number(), + recipients: z.array(z.string().email()), + spamVerdict: z.object({ status: z.string() }), + virusVerdict: z.object({ status: z.string() }), + spfVerdict: z.object({ status: z.string() }), + dkimVerdict: z.object({ status: z.string() }), + dmarcVerdict: z.object({ status: z.string() }), + action: z.object({ + type: z.string(), + topicArn: z.string(), + encoding: z.string(), + }), + }), + content: z.string(), +}) + + +const snsNotificationSchema = z.object({ + Type: z.literal("Notification"), + MessageId: z.string(), + TopicArn: z.string(), + Subject: z.string(), + Message: z.string() + Timestamp: z.string().datetime(), + SignatureVersion: z.string(), + Signature: z.string(), + SigningCertURL: z.string().url(), + UnsubscribeURL: z.string().url(), +}); + +const emailIngestionConsumer = env.SQS_EMAIL_INGESTION_QUEUE_URL + ? Consumer.create({ + queueUrl: env.SQS_EMAIL_INGESTION_QUEUE_URL, + batchSize: 5, + visibilityTimeout: 60, + heartbeatInterval: 30, + handleMessage: handleEmailIngestion, + sqs, + }) + : undefined; + +export const start = async () => { + emailIngestionConsumer?.start() +}; + +export const stop = async () => { + emailIngestionConsumer?.stop(); +}; + +async function handleEmailIngestion(message: Message) { + try { + const notificationJson = safeParse(message.Body); + if (!notificationJson.success) { + logger.error("SNS notification is not valid JSON", { + error: notificationJson.error, + }); + return; + } + + const notification = snsNotificationSchema.safeParse(notificationJson.data); + if (!notification.success) { + logger.error("Could not parse SNS notification", { + error: notification.error, + }); + return; + } + + + const sesJson = safeParse(notification.data.Message); + if (!sesJson.success) { + logger.error("SES message is not valid JSON", { + error: sesJson.error, + }); + return; + } + + const sesMessage = sesMessageSchema.safeParse(sesJson.data); + if (!sesMessage.success) { + logger.error("Could not parse SES message", { + error: sesMessage.error, + }); + return; + } + + logger.info("Ingesting email event", { + messageId: sesMessage.data.mail.messageId, + source: sesMessage.data.mail.source, + destination: sesMessage.data.mail.destination, + subject: sesMessage.data.mail.commonHeaders.subject, + }); + + + } catch (error) { + logger.error("Error while ingesting email event", { + error, + }); + } +} diff --git a/control-plane/src/utilities/env.ts b/control-plane/src/utilities/env.ts index a1d421de..cfc9840d 100644 --- a/control-plane/src/utilities/env.ts +++ b/control-plane/src/utilities/env.ts @@ -50,6 +50,7 @@ const envSchema = z SQS_RUN_GENERATE_NAME_QUEUE_URL: z.string(), SQS_CUSTOMER_TELEMETRY_QUEUE_URL: z.string(), SQS_EXTERNAL_TOOL_CALL_QUEUE_URL: z.string(), + SQS_EMAIL_INGESTION_QUEUE_URL: z.string(), SQS_BASE_QUEUE_URL: z.string().optional(),