Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move the results download to the electron app #168

Merged
merged 12 commits into from
Jul 30, 2024
Merged
57 changes: 0 additions & 57 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,56 +102,6 @@ func downloadFile(ctx context.Context, key string, filename string, cfg Config)
return nil
}

func downloadFiles(ctx context.Context, sink string, prefix string, cfg Config) error {
client, err := s3Client(ctx, cfg)
if err != nil {
return err
}

res, err := client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(cfg.BucketName),
Prefix: aws.String(prefix),
})

if err != nil {
return err
}

for _, content := range res.Contents {
pathWithoutPrefix := strings.TrimPrefix(*content.Key, prefix)
dirPath, filename := filepath.Split(pathWithoutPrefix)

// note this is temporary, as this code will be moved to the frontend
location := filepath.Join(sink, "files", dirPath)
if err := os.MkdirAll(location, 0755); err != nil {
return err
}
result, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(cfg.BucketName),
Key: content.Key,
})

if err != nil {
return err
}

file, err := os.OpenFile(filepath.Join(location, filename), os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
result.Body.Close()
return err
}

if _, err := io.Copy(file, result.Body); err != nil {
result.Body.Close()
return err
}

result.Body.Close()
}

return nil
}

func deleteFiles(ctx context.Context, prefix string, extraFiles []string, cfg Config) {
client, err := s3Client(ctx, cfg)
if err != nil {
Expand Down Expand Up @@ -834,13 +784,6 @@ func cleanupRun(ctx context.Context, db *sql.DB, executionId int64, executionUui
if err := results(ctx, db, executionId, pipeline, workflow); err != nil {
log.Printf("failed to save results; err=%v", err)
}

if cfg.IsLocal {
sink := filepath.Join(pipeline.Sink, "history", executionUuid)
if err := downloadFiles(ctx, sink, s3Key, cfg); err != nil {
log.Printf("failed to download files; err=%v", err)
}
}
}

func cloudExecute(pipeline *zjson.Pipeline, executionId int64, executionUuid string, organization string, build bool, cfg Config, db *sql.DB, hub *Hub) {
Expand Down
176 changes: 62 additions & 114 deletions frontend/package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"pino-caller": "^3.4.0",
"pino-pretty": "^11.2.1",
"rfdc": "^1.3.1",
"s3-sync-client": "^4.3.1",
"source-map-support": "^0.5.21",
"use-immer": "^0.9.0",
"uuidv7": "^0.6.3"
Expand Down
14 changes: 14 additions & 0 deletions frontend/server/execution.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import path from "path";
import { syncS3ToLocalDirectory } from "./s3";

export async function syncExecutionResults(
buffer,
pipelineUuid,
executionUuid,
anvilConfiguration,
) {
const s3Prefix = `${pipelineUuid}/${executionUuid}`;
const localPath = path.join(buffer, "history", executionUuid, "files");

await syncS3ToLocalDirectory(s3Prefix, localPath, anvilConfiguration);
}
40 changes: 24 additions & 16 deletions frontend/server/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import {
import { publicProcedure, router } from "./trpc";
import { errorHandling } from "./middleware";
import { logger } from "./logger.js";
import { anvilConfigurationSchema } from "./schema";
import { syncExecutionResults } from "./execution";

export const appRouter = router({
getBlocks: publicProcedure.use(errorHandling).query(async () => {
Expand Down Expand Up @@ -224,22 +226,7 @@ export const appRouter = router({
buffer: z.string(),
name: z.string(),
rebuild: z.boolean(),
anvilConfiguration: z.object({
name: z.string(),
anvil: z.object({
host: z.string(),
port: z.string(),
token: z.string(),
}),
s3: z.object({
host: z.string(),
port: z.string(),
region: z.string(),
bucket: z.string(),
accessKeyId: z.string(),
secretAccessKey: z.string(),
}),
}),
anvilConfiguration: anvilConfigurationSchema,
}),
)
.mutation(async (opts) => {
Expand Down Expand Up @@ -293,6 +280,27 @@ export const appRouter = router({

return await saveBlockSpecs(blockPath, blockSpecs);
}),
downloadExecutionResults: publicProcedure
.use(errorHandling)
.input(
z.object({
buffer: z.string(),
pipelineUuid: z.string(),
executionUuid: z.string(),
anvilConfiguration: anvilConfigurationSchema,
}),
)
.mutation(async (opts) => {
const { input } = opts;
const { buffer, pipelineUuid, executionUuid, anvilConfiguration } = input;

await syncExecutionResults(
buffer,
pipelineUuid,
executionUuid,
anvilConfiguration,
);
}),
runTest: publicProcedure
.use(errorHandling)
.input(
Expand Down
19 changes: 16 additions & 3 deletions frontend/server/s3.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import {
CopyObjectCommand,
GetObjectCommand,
HeadObjectCommand,
PutObjectCommand,
GetObjectCommand,
CopyObjectCommand,
S3Client,
} from "@aws-sdk/client-s3";
import fs from "fs/promises";
import path from "path";
import { S3SyncClient } from "s3-sync-client";
import config from "../config";
import { getDirectoryFilesRecursive } from "./fileSystem";
import path from "path";
import { logger } from "./logger";

function getClient(configuration) {
Expand Down Expand Up @@ -183,3 +184,15 @@ export async function getFile(key, destinationPath, anvilConfiguration) {
throw new Error(message);
}
}

export async function syncS3ToLocalDirectory(
s3Prefix,
localPath,
anvilConfiguration,
) {
const client = getClient(anvilConfiguration);
const { sync } = new S3SyncClient({ client: client });

const s3Path = `s3://${anvilConfiguration.s3.bucket}/${s3Prefix}`;
await sync(s3Path, localPath);
}
17 changes: 17 additions & 0 deletions frontend/server/schema.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { z } from "zod";

export const anvilConfigurationSchema = z.object({
name: z.string(),
anvil: z.object({
host: z.string(),
port: z.string(),
}),
s3: z.object({
host: z.string(),
port: z.string(),
region: z.string(),
bucket: z.string(),
accessKeyId: z.string(),
secretAccessKey: z.string(),
}),
});
5 changes: 4 additions & 1 deletion frontend/src/components/ui/ExecutionDataGrid.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@ import {
import { PipelineStopButton } from "./PipelineStopButton";
import { useState, useEffect } from "react";
import { activeConfigurationAtom } from "@/atoms/anvilConfigurationsAtom";
import { useSyncExecutionResults } from "@/hooks/useExecutionResults";

export const ExecutionDataGrid = ({ executions, closeModal }) => {
const [workspace, setWorkspace] = useImmerAtom(workspaceAtom);
const [pipelineList, setPipelineList] = useState([]);
const [configuration] = useAtom(activeConfigurationAtom);
const syncResults = useSyncExecutionResults();

const selectPipeline = (pipeline) => {
const selectPipeline = async (pipeline) => {
const key = pipeline.id + "." + pipeline.record.Execution;

setWorkspace((draft) => {
draft.tabs[key] = {};
draft.active = key;
});
await syncResults(key);

closeModal();
};
Expand Down
13 changes: 10 additions & 3 deletions frontend/src/components/ui/SocketFetcher.jsx
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { useAtom } from "jotai";
import { parseLogLine } from "@/atoms/logsAtom";
import { pipelineAtom } from "@/atoms/pipelineAtom";
import { useEffect, useCallback } from "react";
import { useSyncExecutionResults } from "@/hooks/useExecutionResults";
import { useStableWebSocket } from "@/hooks/useStableWebsocket";
import { useUnifiedLogs } from "@/hooks/useUnifiedLogs";
import { parseLogLine } from "@/atoms/logsAtom";
import { enableMapSet } from "immer";
import { useAtom } from "jotai";
import { useCallback, useEffect } from "react";

enableMapSet();

Expand All @@ -13,6 +14,7 @@ export default function SocketFetcher() {
const { lastMessage, readyState, wsError } = useStableWebSocket(
pipeline?.socketUrl,
);
const syncResults = useSyncExecutionResults();

const { updateLogs } = useUnifiedLogs();

Expand All @@ -32,6 +34,11 @@ export default function SocketFetcher() {
}
}
});

if (parsedLogEntry?.event?.tag === "outputs") {
const key = `${pipeline.record.Uuid}.${pipeline.record.Execution}`;
syncResults(key);
}
});

useEffect(() => {
Expand Down
23 changes: 23 additions & 0 deletions frontend/src/hooks/useExecutionResults.jsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { workspaceAtom } from "@/atoms/pipelineAtom";
import { activeConfigurationAtom } from "@/atoms/anvilConfigurationsAtom";
import { trpc } from "@/utils/trpc";
import { useAtom } from "jotai";

export const useSyncExecutionResults = () => {
const [workspace] = useAtom(workspaceAtom);
const [configuration] = useAtom(activeConfigurationAtom);
const downloadExecutionResults = trpc.downloadExecutionResults.useMutation();

const syncExecutionResults = async (key) => {
console.log("syncin");
const pipeline = workspace.pipelines[key];
await downloadExecutionResults.mutateAsync({
buffer: pipeline.buffer,
pipelineUuid: pipeline.record.Uuid,
executionUuid: pipeline.record.Execution,
anvilConfiguration: configuration,
});
};

return syncExecutionResults;
};
Loading