Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OC-925: Configure ECS task to run on a schedule #753

Merged
merged 5 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions api/serverless-config-default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -490,22 +490,13 @@ functions:
method: GET
cors: true
# Integrations
incrementalAriIngestHttp:
handler: dist/src/components/integration/routes.incrementalAriIngest
timeout: 900
triggerARIIngest:
handler: dist/src/components/integration/routes.triggerARIIngest
events:
- http:
path: ${self:custom.versions.v1}/integrations/ari/incremental
method: POST
cors: true
# Commented out - for the time being ARI ingests will just be manually triggered.
# incrementalAriIngestScheduled:
# handler: dist/src/components/integration/controller.incrementalAriIngest
# timeout: 900
# events:
# - schedule:
# rate: cron(0 5 ? * TUE *) # Every Tuesday at 5 a.m.
# enabled: ${self:custom.scheduledAriIngestEnabled.${opt:stage}, false}

package:
defaultPatterns:
Expand Down
9 changes: 1 addition & 8 deletions api/serverless-config-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,4 @@ functions:
generatePDFsFromQueue:
handler: dist/src/components/sqs/handler.generatePDFs
events:
- sqs: 'arn:aws:sqs:${aws:region}:${aws:accountId}:science-octopus-pdf-queue-${self:provider.stage}'
triggerECSTask:
handler: dist/src/components/integration/routes.triggerECSTask
events:
- http:
path: ${self:custom.versions.v1}/integrations/simple-ecs-task
method: POST
cors: true
- sqs: 'arn:aws:sqs:${aws:region}:${aws:accountId}:science-octopus-pdf-queue-${self:provider.stage}'
4 changes: 3 additions & 1 deletion api/src/components/integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ They should also always be owned by an organisational user account. That is, a u

On deployed environments, integrations are run in containers on AWS Elastic Container Service. These containers are defined in the infrastructure code (see [Dockerfile](../../../../infra/docker/ariImportRunner/Dockerfile)), so they can be built and tested locally from the `infra/docker/ariImportRunner` directory with `docker compose up` (see [compose.yml](../../../../infra/docker/ariImportRunner/compose.yml)).

These task containers may be triggered using an API key protected API endpoint that in turn triggers the task to spin up (e.g. the `triggerARIIngest` endpoint), or automatically at a specified time by an Eventbridge scheduler (see this in the [infra code](../../../../infra/modules/ecs/schedule.tf)).

They can also be run ad hoc on the local environment via npm scripts, for example (from the `api` directory):

`npm run ariImport -- dryRun=true allDepartments=true full=false`
Expand All @@ -42,7 +44,7 @@ On import, ARIs go through a handling flow:

- If no publication exists with the ARI's question ID in its `externalId` field, it is created as a new publication.
- If a publication does exist with the ARI's question ID in its `externalId` field, it is compared to the existing publication for changes.
- If changes are found, the existing publication is reversioned with those changes applied.
- If changes are found, the existing publication is updated with those changes applied. Note that this is not a reversioning - ARI publications always have only one version.
- If no changes are found, no action is taken.

#### How ARI data is mapped to octopus data
Expand Down
5 changes: 3 additions & 2 deletions api/src/components/integration/__tests__/ari.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,10 @@ describe('ARI import processes', () => {
.post('/integrations/ari/incremental')
.query({ apiKey: process.env.TRIGGER_ARI_INGEST_API_KEY });

expect(triggerImport.status).toEqual(202);
// May not seem right but the service task has technically been triggered, hence 200.
expect(triggerImport.status).toEqual(200);
expect(triggerImport.body).toMatchObject({
message: 'Cancelling ingest. Either an import is already in progress or the last import failed.'
message: 'Did not run ingest. Either an import is already in progress or the last import failed.'
});
});
});
40 changes: 3 additions & 37 deletions api/src/components/integration/controller.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,11 @@
import * as I from 'interface';
import * as ingestLogService from 'ingestLog/service';
import * as integrationService from 'integration/service';
import * as response from 'lib/response';

export const incrementalAriIngest = async (
event: I.APIRequest | I.EventBridgeEvent<'Scheduled Event', string>
export const triggerAriIngest = async (
event: I.APIRequest<undefined, I.TriggerAriIngestQueryParams, undefined>
): Promise<I.JSONResponse> => {
// Check if a process is currently running.
const lastLog = await ingestLogService.getMostRecentLog('ARI', true);
const triggeredByHttp = event && 'headers' in event;
const dryRun = triggeredByHttp ? !!event.queryStringParameters?.dryRun : false;
const dryRunMessages: string[] = [];

if (lastLog && !lastLog.end) {
if (dryRun) {
dryRunMessages.push(
'This run would have been cancelled because another run is currently in progress. However, the run has still been simulated.'
);
} else {
return response.json(202, {
message: 'Cancelling ingest. Either an import is already in progress or the last import failed.'
});
}
}

try {
const ingestResult = await integrationService.incrementalAriIngest(dryRun, 'email');

return response.json(
200,
dryRunMessages.length ? { messages: [...dryRunMessages, ingestResult] } : ingestResult
);
} catch (error) {
console.log(error);

return response.json(500, { message: 'Unknown server error.' });
}
};

export const triggerECSTask = async (): Promise<I.JSONResponse> => {
const triggerTaskOutput = await integrationService.triggerECSTask();
const triggerTaskOutput = await integrationService.triggerAriIngest(event.queryStringParameters.dryRun);

return response.json(200, { message: triggerTaskOutput });
};
8 changes: 2 additions & 6 deletions api/src/components/integration/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ import * as middleware from 'middleware';

const triggerAriIngestApiKey = Helpers.checkEnvVariable('TRIGGER_ARI_INGEST_API_KEY');

export const incrementalAriIngest = middy(integrationController.incrementalAriIngest)
export const triggerARIIngest = middy(integrationController.triggerAriIngest)
.use(middleware.doNotWaitForEmptyEventLoop({ runOnError: true, runOnBefore: true, runOnAfter: true }))
.use(middleware.authentication(false, false, triggerAriIngestApiKey))
.use(middleware.validator(integrationSchema.incrementalAriIngestHttp, 'queryStringParameters'));

export const triggerECSTask = middy(integrationController.triggerECSTask)
.use(middleware.doNotWaitForEmptyEventLoop({ runOnError: true, runOnBefore: true, runOnAfter: true }))
.use(middleware.authentication(false, false, triggerAriIngestApiKey));
.use(middleware.validator(integrationSchema.triggerAriIngest, 'queryStringParameters'));
2 changes: 1 addition & 1 deletion api/src/components/integration/schema/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export { default as incrementalAriIngestHttp } from './incrementalAriIngestHttp';
export { default as triggerAriIngest } from './triggerAriIngest';
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import * as I from 'interface';

const incrementalAriIngestHttpSchema: I.Schema = {
const incrementalAriIngestHttpSchema: I.JSONSchemaType<I.TriggerAriIngestQueryParams> = {
type: 'object',
properties: {
apiKey: {
type: 'string'
},
dryRun: {
type: 'boolean'
type: 'boolean',
nullable: true
}
},
additionalProperties: false,
Expand Down
38 changes: 29 additions & 9 deletions api/src/components/integration/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ import * as I from 'interface';
* recent successful ingest (if this start time is available).
*/
export const incrementalAriIngest = async (dryRun: boolean, reportFormat: I.IngestReportFormat): Promise<string> => {
// Check if a process is currently running.
const lastLog = await ingestLogService.getMostRecentLog('ARI', true);

if (lastLog && !lastLog.end) {
finlay-jisc marked this conversation as resolved.
Show resolved Hide resolved
if (dryRun) {
console.log(
'This run would have been cancelled because another run is currently in progress. However, the run has still been simulated.'
);
} else {
return 'Did not run ingest. Either an import is already in progress or the last import failed.';
}
}

const start = new Date();
const MAX_UNCHANGED_STREAK = 5;
// Get most start time of last successful run to help us know when to stop.
Expand Down Expand Up @@ -155,13 +168,20 @@ export const incrementalAriIngest = async (dryRun: boolean, reportFormat: I.Inge
return `${preamble} ${writeCount} publication${writeCount !== 1 ? 's' : ''}.`;
};

export const triggerECSTask = async (): Promise<string> => {
await ecs.runFargateTask({
clusterArn: Helpers.checkEnvVariable('ECS_CLUSTER_ARN'),
securityGroups: [Helpers.checkEnvVariable('ECS_TASK_SECURITY_GROUP_ID')],
subnetIds: Helpers.checkEnvVariable('PRIVATE_SUBNET_IDS').split(','),
taskDefinitionId: Helpers.checkEnvVariable('ECS_TASK_DEFINITION_ID')
});

return 'Done';
export const triggerAriIngest = async (dryRun?: boolean): Promise<string> => {
if (process.env.STAGE !== 'local') {
// If not local, trigger task to run in ECS.
await ecs.runFargateTask({
clusterArn: Helpers.checkEnvVariable('ECS_CLUSTER_ARN'),
...(dryRun && { commandOverride: ['npm', 'run', 'ariImport', '--', 'dryRun=true', 'reportFormat=email'] }),
securityGroups: [Helpers.checkEnvVariable('ECS_TASK_SECURITY_GROUP_ID')],
subnetIds: Helpers.checkEnvVariable('PRIVATE_SUBNET_IDS').split(','),
taskDefinitionId: Helpers.checkEnvVariable('ECS_TASK_DEFINITION_ID')
});

return 'Task triggered.';
} else {
// If local, just run the ingest directly.
return await incrementalAriIngest(!!dryRun, 'file');
}
};
10 changes: 10 additions & 0 deletions api/src/lib/ecs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const client = new ECSClient();

export const runFargateTask = async (config: {
clusterArn: string;
commandOverride?: string[];
securityGroups: string[];
subnetIds: string[];
taskDefinitionId: string;
Expand All @@ -17,6 +18,15 @@ export const runFargateTask = async (config: {
subnets: config.subnetIds
}
},
...(config.commandOverride && {
overrides: {
containerOverrides: [
{
command: config.commandOverride
}
]
}
}),
taskDefinition: config.taskDefinitionId
};
const command = new RunTaskCommand(input);
Expand Down
5 changes: 5 additions & 0 deletions api/src/lib/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1079,3 +1079,8 @@ export interface HandledARI {
}

export type IngestReportFormat = 'email' | 'file';

export interface TriggerAriIngestQueryParams {
apiKey: string;
dryRun?: boolean;
}
97 changes: 97 additions & 0 deletions infra/modules/ecs/schedule.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Code adapted from https://medium.com/@igorkachmaryk/using-terraform-to-setup-aws-eventbridge-scheduler-and-a-scheduled-ecs-task-1208ae077360
resource "aws_iam_role" "scheduler" {
name = "cron-scheduler-role-${var.environment}-${var.project_name}"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = {
Service = ["scheduler.amazonaws.com"]
}
Action = "sts:AssumeRole"
}
]
})
}

resource "aws_iam_role_policy_attachment" "scheduler" {
policy_arn = aws_iam_policy.scheduler.arn
role = aws_iam_role.scheduler.name
}

resource "aws_iam_policy" "scheduler" {
name = "cron-scheduler-policy-${var.environment}-${var.project_name}"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow",
Action = [
"ecs:RunTask"
]
# Replace revision number with *
Resource = ["${trimsuffix(aws_ecs_task_definition.ari-import.arn, ":${aws_ecs_task_definition.ari-import.revision}")}:*"]
},
{
Effect = "Allow",
Action = [
"iam:PassRole"
]
Resource = [aws_iam_role.ecs-task-role.arn, aws_iam_role.ecs-task-exec-role.arn]
},
{
Action = [
"sqs:SendMessage"
],
Effect = "Allow",
Resource = [aws_sqs_queue.scheduler-dlq.arn]
}
]
})
}

resource "aws_scheduler_schedule" "ari_import_cron" {
name = "ari-import-schedule-int"

flexible_time_window {
mode = "OFF"
}

schedule_expression = "cron(0 5 ? * TUE *)" # Run every Tuesday at 5AM

target {
arn = aws_ecs_cluster.ecs.arn
role_arn = aws_iam_role.scheduler.arn

# On prod, override container command to do a dry run instead of a real one.
# The output will be checked before manually triggering a real run using the API.
input = terraform.workspace == "prod" ? jsonencode({
containerOverrides = [
{
command = ["npm", "run", "ariImport", "--", "dryRun=true", "reportFormat=email"]
name = "ari-import"
}
]
}) : null

dead_letter_config {
arn = aws_sqs_queue.scheduler-dlq.arn
}

ecs_parameters {
# Trimming the revision suffix here so that schedule always uses latest revision
task_definition_arn = trimsuffix(aws_ecs_task_definition.ari-import.arn, ":${aws_ecs_task_definition.ari-import.revision}")
launch_type = "FARGATE"

network_configuration {
security_groups = [aws_security_group.ari-import-task-sg.id]
subnets = var.private_subnet_ids
}
}
}
}

resource "aws_sqs_queue" "scheduler-dlq" {
name = "scheduler-dlq-${var.environment}-${var.project_name}"
}
Loading