From 1d5a5c9b2086780930d2099f3d439c0dd5918ba2 Mon Sep 17 00:00:00 2001 From: John Smith Date: Mon, 30 Dec 2024 15:45:32 +1030 Subject: [PATCH] feat: Slack approvals (#422) * feat: Slack approvals * chore: Cleanup Slack action handling --- control-plane/src/modules/calls/router.ts | 4 +- .../src/modules/integrations/slack/index.ts | 193 ++++++++++++++++-- control-plane/src/modules/jobs/jobs.test.ts | 14 +- control-plane/src/modules/jobs/jobs.ts | 35 +++- control-plane/src/modules/workflows/notify.ts | 18 ++ 5 files changed, 232 insertions(+), 32 deletions(-) diff --git a/control-plane/src/modules/calls/router.ts b/control-plane/src/modules/calls/router.ts index df197908..7030f9b7 100644 --- a/control-plane/src/modules/calls/router.ts +++ b/control-plane/src/modules/calls/router.ts @@ -102,7 +102,7 @@ export const callsRouter = initServer().router( }); await jobs.requestApproval({ - jobId: callId, + callId: callId, clusterId, }); @@ -322,7 +322,7 @@ export const callsRouter = initServer().router( } await jobs.submitApproval({ - call, + callId, clusterId, approved: request.body.approved, }); diff --git a/control-plane/src/modules/integrations/slack/index.ts b/control-plane/src/modules/integrations/slack/index.ts index 25dc6c8c..70a948d2 100644 --- a/control-plane/src/modules/integrations/slack/index.ts +++ b/control-plane/src/modules/integrations/slack/index.ts @@ -1,4 +1,4 @@ -import { App, KnownEventFromType, webApi } from "@slack/bolt"; +import { App, BlockAction, KnownEventFromType, SlackAction, webApi } from "@slack/bolt"; import { FastifySlackReceiver } from "./receiver"; import { env } from "../../../utilities/env"; import { FastifyInstance } from "fastify"; @@ -14,12 +14,16 @@ import { InstallableIntegration } from "../types"; import { integrationSchema } from "../schema"; import { z } from "zod"; import { getUserForCluster } from "../../clerk"; - -let app: App | undefined; +import { submitApproval } from "../../jobs/jobs"; const THREAD_META_KEY = "slackThreadTs"; const CHANNEL_META_KEY = "slackChannel"; +const CALL_APPROVE_ACTION_ID = "call_approve"; +const CALL_DENY_ACTION_ID = "call_deny"; + +let app: App | undefined; + type MessageEvent = { event: KnownEventFromType<"message">; client: webApi.WebClient; @@ -128,6 +132,77 @@ export const handleNewRunMessage = async ({ } }; +export const handleApprovalRequest = async ({ + callId, + runId, + clusterId, + service, + targetFn, + metadata, +}: { + callId: string; + runId: string; + clusterId: string; + service: string; + targetFn: string; + metadata?: Record; +}) => { + if (!metadata?.[THREAD_META_KEY] || !metadata?.[CHANNEL_META_KEY]) { + return; + } + + const integration = await integrationByCluster(clusterId); + if (!integration || !integration.slack) { + throw new Error(`Could not find Slack integration for cluster: ${clusterId}`); + } + + const token = await getAccessToken(integration.slack.nangoConnectionId); + if (!token) { + throw new Error(`Could not fetch access token for Slack integration: ${integration.slack.nangoConnectionId}`); + } + + const client = new webApi.WebClient(token) + + client?.chat.postMessage({ + thread_ts: metadata[THREAD_META_KEY], + channel: metadata[CHANNEL_META_KEY], + blocks: [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": `I need your approval to call \`${service}.${targetFn}\` on run <${env.APP_ORIGIN}/clusters/${clusterId}/runs/${runId}|${runId}>` + } + }, + { + "type": "actions", + "elements": [ + { + "type": "button", + "text": { + "type": "plain_text", + "text": "Approve", + "emoji": true + }, + "value": callId, + "action_id": CALL_APPROVE_ACTION_ID + }, + { + "type": "button", + "text": { + "type": "plain_text", + "text": "Deny", + "emoji": true + }, + "value": callId, + "action_id": CALL_DENY_ACTION_ID + } + ] + } + ] + }); +}; + export const start = async (fastify: FastifyInstance) => { const SLACK_SIGNING_SECRET = env.SLACK_SIGNING_SECRET; @@ -170,6 +245,9 @@ export const start = async (fastify: FastifyInstance) => { }), }); + app.action(CALL_APPROVE_ACTION_ID, async (params) => handleCallApprovalAction({ ...params, actionId: CALL_APPROVE_ACTION_ID })); + app.action(CALL_DENY_ACTION_ID, async (params) => handleCallApprovalAction({ ...params, actionId: CALL_DENY_ACTION_ID })); + // Event listener for mentions app.event("app_mention", async ({ event, client }) => { logger.info("Received mention event. Responding.", event); @@ -186,6 +264,11 @@ export const start = async (fastify: FastifyInstance) => { app.event("message", async ({ event, client, context }) => { logger.info("Received message event. Responding.", event); + if (event.subtype === "message_changed") { + logger.info("Received message change event. Ignoring.", event); + return; + } + if (isBotMessage(event)) { logger.info("Received message from bot. Ignoring.", event); return; @@ -212,7 +295,12 @@ export const start = async (fastify: FastifyInstance) => { } try { - const user = await authenticateUser(event, client, integration); + if (!hasUser(event)) { + logger.warn("Slack event has no user.", { event }); + throw new AuthenticationError("Slack event has no user"); + } + + const user = await authenticateUser(event.user, client, integration); if (hasThread(event)) { await handleExistingThread({ @@ -258,8 +346,16 @@ const isDirectMessage = (e: KnownEventFromType<"message">): boolean => { }; const hasUser = (e: any): e is { user: string } => { - return typeof e?.user === "string"; - }; + return typeof e?.user === "string"; +}; + +const isBlockAction = (e: SlackAction): e is BlockAction => { + return typeof e?.type === "string" && e.type === "block_actions"; +} + +const hasValue = (e: any): e is { value: string } => { + return 'value' in e && typeof e?.value === "string"; +} // eslint-disable-next-line @typescript-eslint/no-explicit-any const isBotMessage = (e: any): boolean => { @@ -430,22 +526,21 @@ const handleExistingThread = async ({ event, client, clusterId, userId }: Messag throw new Error("Event had no text"); }; -const authenticateUser = async (event: KnownEventFromType<"message">, client: webApi.WebClient, integration: { cluster_id: string }) => { +const authenticateUser = async (userId: string, client: webApi.WebClient, integration: { cluster_id: string }) => { if (!env.CLERK_SECRET_KEY) { logger.info("Missing CLERK_SECRET_KEY. Skipping Slack user authentication."); return } - if (!hasUser(event)) { - logger.warn("Slack event has no user."); - throw new AuthenticationError("Slack event has no user"); - } - const slackUser = await client.users.info({ - user: event.user, + user: userId, token: client.token, }); + logger.info("Authenticating Slack user", { + slackUser + }) + const confirmed = slackUser.user?.is_email_confirmed; const email = slackUser.user?.profile?.email; @@ -469,3 +564,75 @@ const authenticateUser = async (event: KnownEventFromType<"message">, client: we return clerkUser; }; + +const handleCallApprovalAction = async ({ + ack, + body, + client, + context, + actionId +}: { + ack: () => Promise, + body: SlackAction, + client: webApi.WebClient, + context: { teamId?: string }, + actionId: typeof CALL_APPROVE_ACTION_ID | typeof CALL_DENY_ACTION_ID + }) => { + await ack(); + + if (!isBlockAction(body)) { + throw new Error("Slack Action was unexpected type"); + } + + const approved = actionId === CALL_APPROVE_ACTION_ID; + const teamId = context.teamId; + const channelId = body.channel?.id; + const messageTs = body.message?.ts; + const action = body.actions.find(a => a.action_id === actionId); + + if (!teamId || !channelId || !messageTs || !action || !hasValue(action)) { + throw new Error("Slack action does not conform to expected structure"); + } + + const integration = await integrationByTeam(teamId); + + if (!integration || !integration.cluster_id) { + throw new Error("Could not find Slack integration for teamId"); + } + + const user = await authenticateUser(body.user.id, client, integration); + + if (!user) { + logger.warn("Slack user could not be authenticated."); + throw new AuthenticationError("Slack user could not be authenticated."); + } + + await submitApproval({ + approved, + callId: action.value, + clusterId: integration.cluster_id + }); + + logger.info("Call approval received via Slack", { + approved, + channelId, + messageTs, + callId: action.value, + }); + + const blockMessage = `${approved ? "✅" : "❌"} Call \`${action.value}\` was ${approved ? "approved" : "denied"}`; + + await client.chat.update({ + channel: channelId, + ts: messageTs, + blocks: [ + { + type: 'section', + text: { + type: 'mrkdwn', + text: blockMessage + }, + }, + ], + }); +}; diff --git a/control-plane/src/modules/jobs/jobs.test.ts b/control-plane/src/modules/jobs/jobs.test.ts index 0a54d7be..7787dea0 100644 --- a/control-plane/src/modules/jobs/jobs.test.ts +++ b/control-plane/src/modules/jobs/jobs.test.ts @@ -150,7 +150,7 @@ describe("selfHealJobs", () => { expect(createJobResult.created).toBe(true); await requestApproval({ - jobId: createJobResult.id, + callId: createJobResult.id, clusterId: owner.clusterId, }); @@ -463,7 +463,7 @@ describe("submitApproval", () => { await requestApproval({ clusterId: owner.clusterId, - jobId: result.id, + callId: result.id, }); const retreivedJob1 = await getJob({ @@ -475,7 +475,7 @@ describe("submitApproval", () => { await submitApproval({ clusterId: owner.clusterId, - call: retreivedJob1!, + callId: retreivedJob1!.id, approved: true, }); @@ -491,7 +491,7 @@ describe("submitApproval", () => { // Re-submitting approval should be a no-op await submitApproval({ clusterId: owner.clusterId, - call: retreivedJob1!, + callId: retreivedJob1!.id, approved: false, }); @@ -519,7 +519,7 @@ describe("submitApproval", () => { await requestApproval({ clusterId: owner.clusterId, - jobId: result.id, + callId: result.id, }); const retreivedJob1 = await getJob({ @@ -531,7 +531,7 @@ describe("submitApproval", () => { await submitApproval({ clusterId: owner.clusterId, - call: retreivedJob1!, + callId: retreivedJob1!.id, approved: false, }); @@ -547,7 +547,7 @@ describe("submitApproval", () => { // Re-submitting approval should be a no-op await submitApproval({ clusterId: owner.clusterId, - call: retreivedJob1!, + callId: retreivedJob1!.id, approved: true, }); diff --git a/control-plane/src/modules/jobs/jobs.ts b/control-plane/src/modules/jobs/jobs.ts index 8800b42d..d9639189 100644 --- a/control-plane/src/modules/jobs/jobs.ts +++ b/control-plane/src/modules/jobs/jobs.ts @@ -8,6 +8,7 @@ import * as events from "../observability/events"; import { packer } from "../packer"; import { resumeRun } from "../workflows/workflows"; import { selfHealJobs as selfHealCalls } from "./persist-result"; +import { notifyApprovalRequest } from "../workflows/notify"; export { createJob } from "./create-job"; export { acknowledgeJob, persistJobResult } from "./persist-result"; @@ -369,21 +370,32 @@ export const pollJobs = async ({ return jobs; }; -export async function requestApproval({ jobId, clusterId }: { jobId: string; clusterId: string }) { - await data.db +export async function requestApproval({ callId, clusterId }: { callId: string; clusterId: string }) { + const [updated] = await data.db .update(data.jobs) .set({ approval_requested: true, }) - .where(and(eq(data.jobs.id, jobId), eq(data.jobs.cluster_id, clusterId))); + .returning({ + callId: data.jobs.id, + clusterId: data.jobs.cluster_id, + runId: data.jobs.workflow_id, + service: data.jobs.service, + targetFn: data.jobs.target_fn, + }) + .where(and(eq(data.jobs.id, callId), eq(data.jobs.cluster_id, clusterId))); + + if (updated.runId) { + await notifyApprovalRequest(updated); + } } export async function submitApproval({ - call, + callId, clusterId, approved, }: { - call: NonNullable>>; + callId: string; clusterId: string; approved: boolean; }) { @@ -399,7 +411,7 @@ export async function submitApproval({ }) .where( and( - eq(data.jobs.id, call.id), + eq(data.jobs.id, callId), eq(data.jobs.cluster_id, clusterId), // Do not allow denying a job that has already been approved isNull(data.jobs.approved), @@ -407,7 +419,7 @@ export async function submitApproval({ ) ); } else { - await data.db + const [updated] = await data.db .update(data.jobs) .set({ approved: false, @@ -417,9 +429,12 @@ export async function submitApproval({ message: "This call was denied by the user.", }), }) + .returning({ + runId: data.jobs.workflow_id, + }) .where( and( - eq(data.jobs.id, call.id), + eq(data.jobs.id, callId), eq(data.jobs.cluster_id, clusterId), // Do not allow denying a job that has already been approved isNull(data.jobs.approved), @@ -427,10 +442,10 @@ export async function submitApproval({ ) ); - if (call.runId) { + if (updated?.runId) { await resumeRun({ clusterId, - id: call.runId, + id: updated.runId, }); } } diff --git a/control-plane/src/modules/workflows/notify.ts b/control-plane/src/modules/workflows/notify.ts index a176e9b7..d50c7f49 100644 --- a/control-plane/src/modules/workflows/notify.ts +++ b/control-plane/src/modules/workflows/notify.ts @@ -7,6 +7,24 @@ import { getClusterBackgroundRun, Run } from "./workflows"; import { workflowMessages } from "../data"; import * as slack from "../integrations/slack"; + +export const notifyApprovalRequest = async ({ + callId, + clusterId, + runId, + service, + targetFn, +}: { + callId: string; + clusterId: string; + runId: string; + service: string; + targetFn: string; +}) => { + const metadata = await getRunMetadata({ clusterId, runId }); + await slack.handleApprovalRequest({ callId, clusterId, runId, service, targetFn, metadata }); +}; + export const notifyNewMessage = async ({ message, metadata,