Skip to content

Commit

Permalink
feat: Initial email ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
johnjcsmith committed Dec 31, 2024
1 parent c896645 commit 269f321
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 4 deletions.
1 change: 1 addition & 0 deletions control-plane/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ configs:
queues {
run-process {}
run-generate-name {}
email-ingestion {}
customer-telemetry {}
external-tool-call {}
}
Expand Down
19 changes: 19 additions & 0 deletions control-plane/scripts/sendTestEmail.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

MESSAGE_BODY=$(cat <<EOF
{
"Type" : "Notification",
"MessageId" : "cab9e25a-0881-5266-80a0-bdb77abd99a0",
"TopicArn" : "arn:aws:sns:us-east-1:590183853800:email-ingestion-topic",
"Subject" : "Amazon SES Email Receipt Notification",
"Message" : "{\"notificationType\":\"Received\",\"mail\":{\"timestamp\":\"2024-12-31T04:15:55.348Z\",\"source\":\"[email protected]\",\"messageId\":\"io5akr05ppl6s5095kjm6p5rusbbsaode3gn6j81\",\"destination\":[\"[email protected]\"],\"headersTruncated\":false,\"headers\":[{\"name\":\"Return-Path\",\"value\":\"<[email protected]>\"},{\"name\":\"Received\",\"value\":\"from mr85p00im-ztdg06011901.me.com (mr85p00im-ztdg06011901.me.com [17.58.23.198]) by inbound-smtp.us-east-1.amazonaws.com with SMTP id io5akr05ppl6s5095kjm6p5rusbbsaode3gn6j81 for [email protected]; Tue, 31 Dec 2024 04:15:55 +0000 (UTC)\"},{\"name\":\"X-SES-Spam-Verdict\",\"value\":\"PASS\"},{\"name\":\"X-SES-Virus-Verdict\",\"value\":\"PASS\"},{\"name\":\"Received-SPF\",\"value\":\"pass (spfCheck: domain of icloud.com designates 17.58.23.198 as permitted sender) client-ip=17.58.23.198; [email protected]; helo=mr85p00im-ztdg06011901.me.com;\"},{\"name\":\"Authentication-Results\",\"value\":\"amazonses.com; spf=pass (spfCheck: domain of icloud.com designates 17.58.23.198 as permitted sender) client-ip=17.58.23.198; [email protected]; helo=mr85p00im-ztdg06011901.me.com; dkim=fail [email protected]; dmarc=none header.from=johnjcsmith.com;\"},{\"name\":\"X-SES-RECEIPT\",\"value\":\"AEFBQUFBQUFBQUFHWTBhd3Fsa3ZBdE9GNnJUbWw0ZDFEemdKSlRuNUxNMWkveGFwMGtiVTBzanVNdWE5eEhodFhHM1MrWVhkYnVSN2FDVFNGQ0NIdlQ4V1pjdDVOQVVQL2dPMU9nUHQ0a1dQbEdRdUJqZE94NWt3Ri9XTEtXWlllYkhZVW9TTkZsUHZITUcxZ0VPb255OU9WT3pWS0dsUkZ5L0I4V2ltV3JJNmxVdEJWMW1WK0pYalBHT2RyUi9nVUpEZFNaei9paWl5L3Z6eUtNMm9IZ2hDSlZvSEZGcXlqTmhKV2d4QmgyYjlhR2VkUGcvbVVlZStHT29wak9zbWg0Q0g2MDhBbXlibkFWUXlZZGdYOEVhQ1ppckY1enlPUjZUM05JU050Q3o5b3lvMnpHRGJad3c9PQ==\"},{\"name\":\"X-SES-DKIM-SIGNATURE\",\"value\":\"a=rsa-sha256; q=dns/txt; b=azfbEksYORI2UHG1unhG3Lv+PKFEbkyjxwQjM940Lb1HPrIthv4Ip3Q4tjqsadGt09/NsEz/pZ5eU9mwFPwtKQOuTyvKe52BMD2rnBcCpdpWetHqPqxv7vMgCS10eInfQSipSibMJRzhGBTQ/l/wV+1zlGywIpVue+md5ySyL0I=; c=relaxed/simple; s=ug7nbtf4gccmlpwj322ax3p6ow6yfsug; d=amazonses.com; t=1735618556; v=1; bh=frcCV1k9oG9oKj3dpUqdJg1PxRT2RSN/XKdLCPjaYaY=; h=From:To:Cc:Bcc:Subject:Date:Message-ID:MIME-Version:Content-Type:X-SES-RECEIPT;\"},{\"name\":\"DKIM-Signature\",\"value\":\"v=1; a=rsa-sha256; c=relaxed/relaxed; d=johnjcsmith.com; s=sig1; t=1735618554; bh=47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=; h=From:Content-Type:Mime-Version:Subject:Message-Id:Date:To:x-icloud-hme; b=okadMTLg+ro8E2nndzhy3joxbL5YO33ubsa7NBcQXXxQnGGjQ95gErjP5iAciJAR5/+4LdUiYrfZHg/cLfSR+ZIwZ6/CQsQ3VYZZh5X/klbTLlfNurDo2SwwAAEDwnocwIQ1BF2PCBIbL0yaqmkXKw0+7RxAsKc5pn0dp2rrHhr6d3lLN2sU+EBnKVcZgt0yv7HSe5goghjEmy9+0VcQNwIySZhko4cV7l7po93m46rUY5Wi2k2Yh/vzL777vbLIxNtPwp4eRL46ucea8J9EA91uSHgyLom1TPQyXPIqA7/DYXyolC4xJxUGSNPw45NfYWs4mZAZpmc1sOSf5xHCFA==\"},{\"name\":\"Received\",\"value\":\"from smtpclient.apple (mr38p00im-dlb-asmtp-mailmevip.me.com [17.57.152.18]) by mr85p00im-ztdg06011901.me.com (Postfix) with ESMTPSA id DC5C41349C75 for <[email protected]>; Tue, 31 Dec 2024 04:15:52 +0000 (UTC)\"},{\"name\":\"From\",\"value\":\"[email protected]\"},{\"name\":\"Content-Type\",\"value\":\"text/plain\"},{\"name\":\"Content-Transfer-Encoding\",\"value\":\"7bit\"},{\"name\":\"Mime-Version\",\"value\":\"1.0 (Mac OS X Mail 16.0 \\\\(3826.300.87.4.3\\\\))\"},{\"name\":\"Subject\",\"value\":\"This is a test\"},{\"name\":\"Message-Id\",\"value\":\"<[email protected]>\"},{\"name\":\"Date\",\"value\":\"Tue, 31 Dec 2024 14:45:38 +1030\"},{\"name\":\"To\",\"value\":\"[email protected]\"},{\"name\":\"X-Mailer\",\"value\":\"Apple Mail (2.3826.300.87.4.3)\"},{\"name\":\"X-Proofpoint-GUID\",\"value\":\"1yPlQIuJlVJxiCqK-1kin_Sns19OcPdQ\"},{\"name\":\"X-Proofpoint-ORIG-GUID\",\"value\":\"1yPlQIuJlVJxiCqK-1kin_Sns19OcPdQ\"},{\"name\":\"X-Proofpoint-Virus-Version\",\"value\":\"\"},{\"name\":\"X-Proofpoint-Spam-Details\",\"value\":\"rule=notspam policy=default score=0 phishscore=0 malwarescore=0 spamscore=0 bulkscore=0 adultscore=0 mlxlogscore=343 suspectscore=0 mlxscore=0 clxscore=1030 classifier=spam adjust=0 reason=mlx scancount=1 engine=8.19.0-2308100000 definitions=main-2412310033\"}],\"commonHeaders\":{\"returnPath\":\"[email protected]\",\"from\":[\"[email protected]\"],\"date\":\"Tue, 31 Dec 2024 14:45:38 +1030\",\"to\":[\"[email protected]\"],\"messageId\":\"<[email protected]>\",\"subject\":\"This is a test\"}},\"receipt\":{\"timestamp\":\"2024-12-31T04:15:55.348Z\",\"processingTimeMillis\":764,\"recipients\":[\"[email protected]\"],\"spamVerdict\":{\"status\":\"PASS\"},\"virusVerdict\":{\"status\":\"PASS\"},\"spfVerdict\":{\"status\":\"PASS\"},\"dkimVerdict\":{\"status\":\"FAIL\"},\"dmarcVerdict\":{\"status\":\"GRAY\"},\"action\":{\"type\":\"SNS\",\"topicArn\":\"arn:aws:sns:us-east-1:590183853800:email-ingestion-topic\",\"encoding\":\"UTF8\"}},\"content\":\"Return-Path: <[email protected]>\\r\\nReceived: from mr85p00im-ztdg06011901.me.com (mr85p00im-ztdg06011901.me.com [17.58.23.198])\\r\\n by inbound-smtp.us-east-1.amazonaws.com with SMTP id io5akr05ppl6s5095kjm6p5rusbbsaode3gn6j81\\r\\n for [email protected];\\r\\n Tue, 31 Dec 2024 04:15:55 +0000 (UTC)\\r\\nX-SES-Spam-Verdict: PASS\\r\\nX-SES-Virus-Verdict: PASS\\r\\nReceived-SPF: pass (spfCheck: domain of icloud.com designates 17.58.23.198 as permitted sender) client-ip=17.58.23.198; [email protected]; helo=mr85p00im-ztdg06011901.me.com;\\r\\nAuthentication-Results: amazonses.com;\\r\\n spf=pass (spfCheck: domain of icloud.com designates 17.58.23.198 as permitted sender) client-ip=17.58.23.198; [email protected]; helo=mr85p00im-ztdg06011901.me.com;\\r\\n dkim=fail [email protected];\\r\\n dmarc=none header.from=johnjcsmith.com;\\r\\nX-SES-RECEIPT: AEFBQUFBQUFBQUFHWTBhd3Fsa3ZBdE9GNnJUbWw0ZDFEemdKSlRuNUxNMWkveGFwMGtiVTBzanVNdWE5eEhodFhHM1MrWVhkYnVSN2FDVFNGQ0NIdlQ4V1pjdDVOQVVQL2dPMU9nUHQ0a1dQbEdRdUJqZE94NWt3Ri9XTEtXWlllYkhZVW9TTkZsUHZITUcxZ0VPb255OU9WT3pWS0dsUkZ5L0I4V2ltV3JJNmxVdEJWMW1WK0pYalBHT2RyUi9nVUpEZFNaei9paWl5L3Z6eUtNMm9IZ2hDSlZvSEZGcXlqTmhKV2d4QmgyYjlhR2VkUGcvbVVlZStHT29wak9zbWg0Q0g2MDhBbXlibkFWUXlZZGdYOEVhQ1ppckY1enlPUjZUM05JU050Q3o5b3lvMnpHRGJad3c9PQ==\\r\\nX-SES-DKIM-SIGNATURE: a=rsa-sha256; q=dns/txt; b=azfbEksYORI2UHG1unhG3Lv+PKFEbkyjxwQjM940Lb1HPrIthv4Ip3Q4tjqsadGt09/NsEz/pZ5eU9mwFPwtKQOuTyvKe52BMD2rnBcCpdpWetHqPqxv7vMgCS10eInfQSipSibMJRzhGBTQ/l/wV+1zlGywIpVue+md5ySyL0I=; c=relaxed/simple; s=ug7nbtf4gccmlpwj322ax3p6ow6yfsug; d=amazonses.com; t=1735618556; v=1; bh=frcCV1k9oG9oKj3dpUqdJg1PxRT2RSN/XKdLCPjaYaY=; h=From:To:Cc:Bcc:Subject:Date:Message-ID:MIME-Version:Content-Type:X-SES-RECEIPT;\\r\\nDKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=johnjcsmith.com;\\r\\n\\ts=sig1; t=1735618554;\\r\\n\\tbh=47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=;\\r\\n\\th=From:Content-Type:Mime-Version:Subject:Message-Id:Date:To:\\r\\n\\t x-icloud-hme;\\r\\n\\tb=okadMTLg+ro8E2nndzhy3joxbL5YO33ubsa7NBcQXXxQnGGjQ95gErjP5iAciJAR5\\r\\n\\t /+4LdUiYrfZHg/cLfSR+ZIwZ6/CQsQ3VYZZh5X/klbTLlfNurDo2SwwAAEDwnocwIQ\\r\\n\\t 1BF2PCBIbL0yaqmkXKw0+7RxAsKc5pn0dp2rrHhr6d3lLN2sU+EBnKVcZgt0yv7HSe\\r\\n\\t 5goghjEmy9+0VcQNwIySZhko4cV7l7po93m46rUY5Wi2k2Yh/vzL777vbLIxNtPwp4\\r\\n\\t eRL46ucea8J9EA91uSHgyLom1TPQyXPIqA7/DYXyolC4xJxUGSNPw45NfYWs4mZAZp\\r\\n\\t mc1sOSf5xHCFA==\\r\\nReceived: from smtpclient.apple (mr38p00im-dlb-asmtp-mailmevip.me.com [17.57.152.18])\\r\\n\\tby mr85p00im-ztdg06011901.me.com (Postfix) with ESMTPSA id DC5C41349C75\\r\\n\\tfor <[email protected]>; Tue, 31 Dec 2024 04:15:52 +0000 (UTC)\\r\\nFrom: [email protected]\\r\\nContent-Type: text/plain\\r\\nContent-Transfer-Encoding: 7bit\\r\\nMime-Version: 1.0 (Mac OS X Mail 16.0 \\\\(3826.300.87.4.3\\\\))\\r\\nSubject: This is a test\\r\\nMessage-Id: <[email protected]>\\r\\nDate: Tue, 31 Dec 2024 14:45:38 +1030\\r\\nTo: [email protected]\\r\\nX-Mailer: Apple Mail (2.3826.300.87.4.3)\\r\\nX-Proofpoint-GUID: 1yPlQIuJlVJxiCqK-1kin_Sns19OcPdQ\\r\\nX-Proofpoint-ORIG-GUID: 1yPlQIuJlVJxiCqK-1kin_Sns19OcPdQ\\r\\nX-Proofpoint-Virus-Version:\\r\\nX-Proofpoint-Spam-Details: rule=notspam policy=default score=0 phishscore=0 malwarescore=0 spamscore=0\\r\\n bulkscore=0 adultscore=0 mlxlogscore=343 suspectscore=0 mlxscore=0\\r\\n clxscore=1030 classifier=spam adjust=0 reason=mlx scancount=1\\r\\n engine=8.19.0-2308100000 definitions=main-2412310033\\r\\n\\r\\n\"}",
"Timestamp" : "2024-12-31T04:15:56.138Z",
"SignatureVersion" : "1",
"Signature" : "SOtyVndg0O6ldUwxRYsYORfRQF2mjz8HoEBGrR79bqO1Py7wLFJgSVABQFBVxEIypZZ6HGRFw57FT7V5BnSM5zbKfep0ERyYhcBaSHg9dW9bPsDYZqDHxXxcjlILOYepALZ9Fiy2Trk5Rw2bFZMsbsgIIXx/YcuFKw3zYwq7r062fVm6+y3EHgS8fK5w88cGjChEM4ktbCUQKWtzdQYdOLLlHmAnf8pScsVx7M2SpCdfZRprfNWgmiNDXvDpSHCYtcoi2iEVMtLYDxGFtE0m9ZJjbIOPTlowEkAp/m/y/6MB0/+Fvv1UbOghjpgUiIESu8CWMpcwKgZvYLcDsMNqCg==",
"SigningCertURL" : "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-9c6465fa7f48f5cacd23014631ec1136.pem",
"UnsubscribeURL" : "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:590183853800:email-ingestion-topic:1b1ec808-c78a-43fc-a08a-e19b3be0cd7d"
}
EOF
)

aws --region=us-west-2 --endpoint=http://localhost:9324 sqs send-message --queue-url http://localhost:9324/000000000000/email-ingestion --message-body "$MESSAGE_BODY"
14 changes: 10 additions & 4 deletions control-plane/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import * as redis from "./modules/redis";
import * as toolhouse from "./modules/integrations/toolhouse";
import * as externalCalls from "./modules/jobs/external";
import * as models from "./modules/models/routing";
import * as email from "./modules/email";
import { logContext, logger } from "./modules/observability/logger";
import * as workflows from "./modules/workflows/workflows";
import * as slack from "./modules/integrations/slack";
Expand Down Expand Up @@ -146,11 +147,15 @@ const startTime = Date.now();
workflows.start(),
models.start(),
redis.start(),
customerTelemetry.start(),
toolhouse.start(),
externalCalls.start(),
slack.start(app),
...(env.EE_DEPLOYMENT ? [flagsmith?.getEnvironmentFlags(), analytics.start()] : []),
...(env.EE_DEPLOYMENT ? [
customerTelemetry.start(),
toolhouse.start(),
externalCalls.start(),
email.start(),
flagsmith?.getEnvironmentFlags(),
analytics.start()
] : []),
])
.then(() => {
logger.info("Dependencies started", { latency: Date.now() - startTime });
Expand Down Expand Up @@ -190,6 +195,7 @@ process.on("SIGTERM", async () => {
customerTelemetry.stop(),
externalCalls.stop(),
slack.stop(),
email.stop(),
]);

logger.info("Shutdown complete");
Expand Down
131 changes: 131 additions & 0 deletions control-plane/src/modules/email/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import { Consumer } from "sqs-consumer";
import { env } from "../../utilities/env";
import { sqs } from "../sqs";
import { z } from "zod";
import { Message } from "@aws-sdk/client-sqs";
import { logger } from "../observability/logger";
import { safeParse } from "../../utilities/safe-parse";

const sesMessageSchema = z.object({
notificationType: z.string(),
mail: z.object({
timestamp: z.string().datetime(),
source: z.string().email(),
messageId: z.string(),
destination: z.array(z.string().email()),
headersTruncated: z.boolean(),
headers: z.array(
z.object({
name: z.string(),
value: z.string(),
})
),
commonHeaders: z.object({
returnPath: z.string().email(),
from: z.array(z.string().email()),
date: z.string(),
to: z.array(z.string().email()),
messageId: z.string(),
subject: z.string(),
}),
}),
receipt: z.object({
timestamp: z.string().datetime(),
processingTimeMillis: z.number(),
recipients: z.array(z.string().email()),
spamVerdict: z.object({ status: z.string() }),
virusVerdict: z.object({ status: z.string() }),
spfVerdict: z.object({ status: z.string() }),
dkimVerdict: z.object({ status: z.string() }),
dmarcVerdict: z.object({ status: z.string() }),
action: z.object({
type: z.string(),
topicArn: z.string(),
encoding: z.string(),
}),
}),
content: z.string(),
})


const snsNotificationSchema = z.object({
Type: z.literal("Notification"),
MessageId: z.string(),
TopicArn: z.string(),
Subject: z.string(),
Message: z.string(),
Timestamp: z.string().datetime(),
SignatureVersion: z.string(),
Signature: z.string(),
SigningCertURL: z.string().url(),
UnsubscribeURL: z.string().url(),
});

const emailIngestionConsumer = env.SQS_EMAIL_INGESTION_QUEUE_URL
? Consumer.create({
queueUrl: env.SQS_EMAIL_INGESTION_QUEUE_URL,
batchSize: 5,
visibilityTimeout: 60,
heartbeatInterval: 30,
handleMessage: handleEmailIngestion,
sqs,
})
: undefined;

export const start = async () => {
emailIngestionConsumer?.start()
};

export const stop = async () => {
emailIngestionConsumer?.stop();
};

async function handleEmailIngestion(message: Message) {
try {
const notificationJson = safeParse(message.Body);
if (!notificationJson.success) {
logger.error("SNS notification is not valid JSON", {
error: notificationJson.error,
});
return;
}

const notification = snsNotificationSchema.safeParse(notificationJson.data);
if (!notification.success) {
logger.error("Could not parse SNS notification", {
error: notification.error,
});
return;
}


const sesJson = safeParse(notification.data.Message);
if (!sesJson.success) {
logger.error("SES message is not valid JSON", {
error: sesJson.error,
});
return;
}

const sesMessage = sesMessageSchema.safeParse(sesJson.data);
if (!sesMessage.success) {
logger.error("Could not parse SES message", {
error: sesMessage.error,
});
return;
}

logger.info("Ingesting email event", {
messageId: sesMessage.data.mail.messageId,
source: sesMessage.data.mail.source,
destination: sesMessage.data.mail.destination,
subject: sesMessage.data.mail.commonHeaders.subject,
});


} catch (error) {
logger.error("Error while ingesting email event", {
error,
});
}
}
1 change: 1 addition & 0 deletions control-plane/src/utilities/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const envSchema = z
SQS_RUN_GENERATE_NAME_QUEUE_URL: z.string(),
SQS_CUSTOMER_TELEMETRY_QUEUE_URL: z.string(),
SQS_EXTERNAL_TOOL_CALL_QUEUE_URL: z.string(),
SQS_EMAIL_INGESTION_QUEUE_URL: z.string(),

SQS_BASE_QUEUE_URL: z.string().optional(),

Expand Down

0 comments on commit 269f321

Please sign in to comment.