From 91cd412ebeedf9fb31ac547a5a3b0f8e363d8fcd Mon Sep 17 00:00:00 2001 From: Maharaj Teertha Deb <86840917+TeerthaDeb@users.noreply.github.com> Date: Fri, 27 Sep 2024 10:53:56 -0400 Subject: [PATCH 1/8] Katana Added --- Katana/entrypoint.py | 117 ++++++++++++++++ Katana/go.mod | 3 + Katana/main.go | 313 +++++++++++++++++++++++++++++++++++++++++++ Katana/schema.go | 102 ++++++++++++++ 4 files changed, 535 insertions(+) create mode 100644 Katana/entrypoint.py create mode 100644 Katana/go.mod create mode 100644 Katana/main.go create mode 100644 Katana/schema.go diff --git a/Katana/entrypoint.py b/Katana/entrypoint.py new file mode 100644 index 00000000..9ae386d1 --- /dev/null +++ b/Katana/entrypoint.py @@ -0,0 +1,117 @@ +import os +import inspect +import json +import sys +import shutil +# from computations import compute + +def main(): + # Get the directory of the script (block folder) + script_dir = os.path.dirname(os.path.abspath(__file__)) + + # Get the history subfolder from the arguments + compute_dir = sys.argv[1] # Directory with compute logic (block folder) + history_subfolder = os.path.abspath(sys.argv[2]) + + os.chdir(script_dir) + + # Ensure the 'history' subfolder exists + os.makedirs(history_subfolder, exist_ok=True) + + # Add the block folder (compute_dir) to the Python path to import 'computations' + sys.path.insert(0, compute_dir) + + + # Store the initial list of existing files in the current block folder + initial_files = set(os.listdir(script_dir)) + + # Import 'compute' function after adding the path + from computations import compute + + # Move all files from the 'history' folder to the current block folder + for item in os.listdir(history_subfolder): + src_path = os.path.join(history_subfolder, item) + dst_path = os.path.join(script_dir, item) + if os.path.isdir(src_path): + shutil.copytree(src_path, dst_path, dirs_exist_ok=True) + else: + shutil.copy2(src_path, dst_path) + # print(f">>>Copied {item} from 'history' to {script_dir}") + + # Get block ID from the compute directory + block_id = compute_dir + + params = [] + debug_inputs = {} + + # Collect arguments passed to the script + args_dict = {} + for arg in sys.argv[3:]: + key, value = arg.split('=') + # Try to convert value to int or float, if applicable + if value.isdigit(): + args_dict[key] = int(value) + else: + try: + args_dict[key] = float(value) + except ValueError: + args_dict[key] = value # Leave as string if conversion fails + + # Ensure images parameter is a Python list (if it's passed as a string) + if 'images' in args_dict and isinstance(args_dict['images'], str): + # Convert string to a Python list + args_dict['images'] = json.loads(args_dict['images']) + + # Fetch outputs from pipeline.json and get the corresponding parameters + for key in inspect.signature(compute).parameters.keys(): + value = args_dict.get(key) + debug_inputs[key] = value + + if value is not None: + params.append(value) # Append the value directly + else: + print(f"Warning: No value found for {key} in pipeline.json") + + # print("debug|||", debug_inputs) + + # Call the compute function + outputs = compute(*params) + + # Store the final state of files in the block folder (after compute is executed) + final_files = set(os.listdir(script_dir)) + + # Identify new files created during computation (those not in the initial list) + new_files = final_files - initial_files + + # Move the newly created files back to the 'history' folder + for new_file in new_files: + src_path = os.path.join(script_dir, new_file) + dst_path = os.path.join(history_subfolder, new_file) + if os.path.exists(dst_path): + if os.path.isfile(dst_path): + os.remove(dst_path) # Remove the existing file if it exists + elif os.path.isdir(dst_path): + shutil.rmtree(dst_path) # Remove the directory if it exists + + shutil.move(src_path, dst_path) # Move new files to the history subfolder + + + # Output results to files in the 'history' folder + for key, value in outputs.items(): + result = block_id.rsplit('-', 1)[0].split('\\')[-1] + if ("background-removal" in block_id): + output_file_path = os.path.join(history_subfolder, f"{result}-output_path.txt") + elif ("openai-agent" in block_id): + output_file_path = os.path.join(history_subfolder, f"{result}-response.txt") + else: + output_file_path = os.path.join(history_subfolder, f"{result}-image_paths.txt") + + with open(output_file_path, "w") as file: + file.write(json.dumps(value)) + + # Optionally log outputs + json_outputs = json.dumps(outputs) + print(">> outputs: ", json_outputs) + +if __name__ == "__main__": + main() diff --git a/Katana/go.mod b/Katana/go.mod new file mode 100644 index 00000000..59e114fe --- /dev/null +++ b/Katana/go.mod @@ -0,0 +1,3 @@ +module katana + +go 1.21.1 diff --git a/Katana/main.go b/Katana/main.go new file mode 100644 index 00000000..4880c830 --- /dev/null +++ b/Katana/main.go @@ -0,0 +1,313 @@ +package main + +import ( + _ "embed" + "encoding/json" + "fmt" + "log" + "maps" + "os" + "os/exec" + "path/filepath" + "runtime" + "time" +) + +//go:embed entrypoint.py +var entrypoint []byte + +var TMPPATH = "/tmp/katana" + +var historySubfolder = "" + +func execCommand(cmd string, dir string, id string, args Dict, historySubfolder string) error { + var command *exec.Cmd + var commandArgs []string + + // Build the command arguments from the args + for key, value := range args { + commandArgs = append(commandArgs, key+"="+value) + } + + if runtime.GOOS == "windows" { + command = exec.Command("python", dir+"\\entrypoint.py", dir, historySubfolder) + } else { + command = exec.Command("python3", dir+"/entrypoint.py", dir, historySubfolder) + } + + // Append commandArgs to the command + command.Args = append(command.Args, commandArgs...) + + // Set environment variables + command.Env = os.Environ() + command.Env = append(command.Env, "_blockid_="+id) + + // Execute the command and capture output + output, err := command.CombinedOutput() + if err != nil { + log.Println("Error", err) + } else { + log.Println("Output: ", string(output)) + } + + return err +} + +type Dict map[string]string +type Execution func(args Dict) (Dict, error) + +type Message struct { + Dict Dict + Err error +} + +type Task struct { + Name string + Exec func(args Dict) error + MapIn Dict + MapOut Dict + In []<-chan Message + Out []chan<- Message +} + +func (t *Task) Execute(args Dict) (Dict, error) { + inputs := make(Dict) + for key, value := range t.MapIn { + inputs[value] = args[key] + } + + outputs := make(Dict) + err := t.Exec(inputs) + if err != nil { + return outputs, err + } + + for key, value := range t.MapOut { + + data, err := os.ReadFile(filepath.Join(TMPPATH, t.Name, value) + ".txt") // wants to read from the block floder + if err != nil { + var new_histrory_subfolder = historySubfolder[len(filepath.Base(TMPPATH)):] + data, err = os.ReadFile(filepath.Join(TMPPATH, new_histrory_subfolder, value) + ".txt") // wants to read from the 'history' floder + } + // fmt.Println("## Trying to readfile for the task:", t.Name, ". And the file to read: ", data, ", error while reading: ", err) + if err != nil { + return outputs, err + } + + outputs[key] = string(data) + + os.Remove(filepath.Join(TMPPATH, value) + ".txt") + } + + return outputs, nil +} + +type Param struct { + Name string + Value string + Out []chan<- Message +} + +func deployTask(pipeline *Pipeline, historySubfolder string) (Execution, func()) { + tasks := make(map[string]*Task) + inputs := make(map[string]*Param) + outputs := make(map[string]<-chan Message) + + for name, block := range pipeline.Pipeline { // basically this funciton maps key value for parameters, writes entrypoint.py and stores every task in task dictionary. + name := name + block := block + if block.Action.Container.Image != "" { + tasks[name] = &Task{Name: name, Exec: func(args Dict) error { + log.Println("Error: ", args) + return nil + }} + if block.Action.Command.Dir == "" { + os.WriteFile(filepath.Join(TMPPATH, name, "entrypoint.py"), entrypoint, 0644) + } else { + os.WriteFile(filepath.Join(block.Action.Command.Dir, "entrypoint.py"), entrypoint, 0644) + // fmt.Println("Written from 108") + } + + tasks[name] = &Task{Name: name, Exec: func(args Dict) error { // without that entrypoint won't run + return execCommand(block.Action.Command.Exec, filepath.Join(TMPPATH, name), block.Information.Id, args, historySubfolder) + }} + } else if len(block.Action.Parameters) > 0 { + for key, value := range block.Action.Parameters { + inputs[name] = &Param{Name: key, Value: value.Value} + // fmt.Println("Key", name, ", Value: ", value) + } + // fmt.Println("inputs:", inputs) + } else if block.Action.Command.Exec != "" { + if block.Action.Command.Dir == "" { + os.WriteFile("entrypoint.py", entrypoint, 0644) + } else { + os.WriteFile(filepath.Join(block.Action.Command.Dir, "entrypoint.py"), entrypoint, 0644) + } + // fmt.Println("writing entrypoint to :", name) + tasks[name] = &Task{Name: name, Exec: func(args Dict) error { + return execCommand(block.Action.Command.Exec, block.Action.Command.Dir, block.Information.Id, args, historySubfolder) + }} + } else { + log.Fatal("Unknown block") + } + } + + for name, block := range pipeline.Pipeline { // so this blocks extracts inputs and outputs for each block if they are to be executed. + if block.Action.Container.Image != "" || + block.Action.Command.Exec != "" { + task := tasks[name] + task.MapIn = make(Dict) + task.MapOut = make(Dict) + for label, input := range block.Inputs { // I am assuming it takes the input parameters + for _, connection := range input.Connections { + pipe := make(chan Message, 1) + task.In = append(task.In, pipe) + task.MapIn[connection.Block+connection.Variable] = label + if parent, ok := tasks[connection.Block]; ok { + parent.Out = append(parent.Out, pipe) + } else if parent, ok := inputs[connection.Block]; ok { + parent.Out = append(parent.Out, pipe) + } else { + log.Fatal("Unknown connection") + } + } + } + for label, output := range block.Outputs { // I am assuming it takes the output parameters + if len(output.Connections) == 0 { + pipe := make(chan Message, 1) + outputs[name] = pipe + task.Out = append(task.Out, pipe) + } else { + task.MapOut[name+label] = block.Information.Id + "-" + label + } + } + } + } + + for _, task := range tasks { + go runTask(task) + } + + execution := func(args Dict) (Dict, error) { + for name, input := range inputs { + dict := make(Dict) + dict[name+input.Name] = input.Value + for i := 0; i < len(input.Out); i++ { + input.Out[i] <- Message{Dict: dict} + } + } + + results := make(Dict) + for _, output := range outputs { + o := <-output + if o.Err != nil { + return results, o.Err + } + maps.Copy(results, o.Dict) + } + + return results, nil + } + + release := func() { + for _, input := range inputs { + for i := 0; i < len(input.Out); i++ { + close(input.Out[i]) + } + } + } + + return execution, release +} + +func runTask(task *Task) { + args := make(Dict, len(task.In)) + + for { + var executionError error // initially (0x0 , 0x0) + + for i := 0; i < len(task.In); i++ { + // fmt.Print("\nin run task for task: ", task.Name, "\tCurrent Task.in: ", task.In[i]) + arg, ok := <-task.In[i] + // fmt.Println("\t>>ARG , OK: ", arg, ok) + if !ok { + // fmt.Println("NOT OKAY for : ", task.Name) + for _, next := range task.Out { + close(next) + } + return + } + + maps.Copy(args, arg.Dict) + + if arg.Err != nil { + // fmt.Println(">>>>>>>>>ERROR found in : ", task.Name, " , and the error is: ", arg.Err) + executionError = arg.Err + } + } + + if executionError != nil { + // fmt.Println("\n\t\t\t\tERROR FOUND IN LINE: 213") + for _, next := range task.Out { + next <- Message{Err: executionError} + } + + continue + } + + dict, err := task.Execute(args) + // fmt.Println("Executed the task: ", task.Name) + if err != nil { + err = fmt.Errorf("\n\ntask %s: %w", task.Name, err) + } + + message := Message{Dict: dict, Err: err} + + for _, next := range task.Out { + next <- message + } + } +} + +func main() { + + if len(os.Args) < 2 { + log.Fatal("Pipeline name or path must be provided as an argument.") + } + + pipelineName := os.Args[1] + pipelinePath := filepath.Join(".", pipelineName) + + data, err := os.ReadFile(filepath.Join(pipelinePath, "pipeline.json")) + if err != nil { + log.Fatal(err) + } + + var pipeline Pipeline + err = json.Unmarshal(data, &pipeline) + if err != nil { + log.Fatal(err) + } + + if pipeline.Sink != "" { + TMPPATH = pipeline.Sink + } + + // Create the history subfolder using the current timestamp + historyDir := filepath.Join(pipelinePath, "history") + timestamp := time.Now().Format("2006-01-02_15-04-05") + historySubfolder = filepath.Join(historyDir, timestamp) + os.MkdirAll(historySubfolder, os.ModePerm) + + execution, release := deployTask(&pipeline, historySubfolder) + + result, err := execution(make(Dict)) + if err != nil { + // log.Println("Error from line 282:", err) + } else { + log.Println("COMPLETED.", result) + } + + release() + +} diff --git a/Katana/schema.go b/Katana/schema.go new file mode 100644 index 00000000..d0778128 --- /dev/null +++ b/Katana/schema.go @@ -0,0 +1,102 @@ +package main + +type BlockInformation struct { + Id string `json:"id"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + SystemVersions []string `json:"system_versions"` + BlockVersion string `json:"block_version"` + BlockSource string `json:"block_source"` + BlockType string `json:"block_type"` +} + +type PipelineInformation struct { + Name string `json:"name"` + Description string `json:"description"` + SystemVersions string `json:"system_versions"` + Source string `json:"source"` + Type string `json:"type"` +} + +type Connection struct { + Block string `json:"block"` + Variable string `json:"variable"` +} + +type Put struct { + Type string `json:"type"` + Connections []Connection `json:"connections"` + Relays []Connection `json:"relays,omitempty"` +} + +type Container struct { + Image string `json:"image"` + Version string `json:"version"` + CommandLine []string `json:"command_line"` +} + +type Command struct { + Exec string `json:"exec"` + Dir string `json:"dir,omitempty"` +} + +type Action struct { + Container Container `json:"container,omitempty"` + Command Command `json:"command,omitempty"` // commmand replaces docker, actual command,deer paremeter: if the command is empty, i + Pipeline map[string]Block `json:"pipeline,omitempty"` + Parameters map[string]Parameter `json:"parameters,omitempty"` +} + +type TitleBar struct { + BackgroundColor string `json:"background_color,omitempty"` +} + +type Preview struct { + Active string `json:"active,omitempty"` + Content string `json:"content,omitempty"` +} + +type Node struct { + Active string `json:"active"` + TitleBar TitleBar `json:"title_bar"` + Preview Preview `json:"preview"` + Html string `json:"html"` + PosX string `json:"pos_x"` + PosY string `json:"pos_y"` + PosZ string `json:"pos_z"` + Behavior string `json:"behavior,omitempty"` + Order map[string][]string `json:"order,omitempty"` +} + +type Views struct { + Node Node `json:"node"` + Mode string `json:"mode,omitempty"` +} + +type Parameter struct { + Value string `json:"value"` + Type string `json:"type"` +} + +type Event struct { + Inputs map[string]string `json:"inputs,omitempty"` + Outputs map[string]string `json:"outputs,omitempty"` + Log []string `json:"log,omitempty"` +} + +type Block struct { + Information BlockInformation `json:"information"` + Inputs map[string]Put `json:"inputs"` + Outputs map[string]Put `json:"outputs"` + Action Action `json:"action"` + Views Views `json:"views,omitempty"` + Events Event `json:"events"` +} + +type Pipeline struct { + Id string `json:"id"` + Name string `json:"name"` + Pipeline map[string]Block `json:"pipeline"` + Sink string `json:"sink"` // sink is being to write history + Build string `json:"build"` +} From 83e1b6f0ee362d6b5450c2a2f76d9101c379f25e Mon Sep 17 00:00:00 2001 From: Maharaj Teertha Deb <86840917+TeerthaDeb@users.noreply.github.com> Date: Wed, 2 Oct 2024 09:52:27 -0400 Subject: [PATCH 2/8] Katana V: 2 You can Run Katana as : 'go run . --mode= ' example: 'go run . --mode=docker abcd' --- Katana/entrypoint.py | 85 ++++++++++++++++----- Katana/main.go | 172 +++++++++++++++++++++++++++++++++++++++---- 2 files changed, 224 insertions(+), 33 deletions(-) diff --git a/Katana/entrypoint.py b/Katana/entrypoint.py index 9ae386d1..60401a8f 100644 --- a/Katana/entrypoint.py +++ b/Katana/entrypoint.py @@ -3,12 +3,62 @@ import json import sys import shutil -# from computations import compute +import subprocess + +def setup_uv_environment(script_dir): + # Path to the UV environment + uv_env_path = os.path.join(script_dir, ".venv") + + # Create UV environment if it does not exist + if not os.path.exists(uv_env_path): + print("Setting up UV environment...") + + try: + subprocess.run(["uv", "venv", uv_env_path], check=True) + except subprocess.CalledProcessError as e: + print(f"Failed to create UV environment: {e}") + sys.exit(1) + + # Install dependencies from requirements.txt inside the UV environment + requirements_file = os.path.join(script_dir, "requirements.txt") + if os.path.exists(requirements_file): + print("Installing dependencies inside UV environment...") + try: + subprocess.run(["uv", "pip", "install", "-r", requirements_file], check=True, cwd=script_dir) + except subprocess.CalledProcessError as e: + print(f"Failed to install dependencies: {e}") + sys.exit(1) + else: + print("No requirements.txt found. Skipping dependency installation.") + +def rerun_with_uv_environment(script_dir): + # Path to the Python interpreter in the UV environment + uv_python = os.path.join(script_dir, ".venv", "Scripts", "python.exe") + + # Check if the current Python interpreter is already the one in the UV environment + if sys.executable == uv_python: + return # Continue with the script + + if os.path.exists(uv_python): + print(f"Re-running the script with the UV environment's Python interpreter: {uv_python}") + # Re-run the current script with the UV environment's Python interpreter + subprocess.run([uv_python, __file__] + sys.argv[1:], check=True) + sys.exit(0) # Exit the original process + else: + print("Could not find the Python interpreter in the UV environment.") + sys.exit(1) def main(): + print("ARGUMENTS: " , sys.argv) # Get the directory of the script (block folder) script_dir = os.path.dirname(os.path.abspath(__file__)) - + + # Set up the UV environment if uv in argument: + if(sys.argv[3] == "uv"): + setup_uv_environment(script_dir) + #Activate the environment: + rerun_with_uv_environment(script_dir) + # Get the history subfolder from the arguments compute_dir = sys.argv[1] # Directory with compute logic (block folder) history_subfolder = os.path.abspath(sys.argv[2]) @@ -20,7 +70,6 @@ def main(): # Add the block folder (compute_dir) to the Python path to import 'computations' sys.path.insert(0, compute_dir) - # Store the initial list of existing files in the current block folder initial_files = set(os.listdir(script_dir)) @@ -36,7 +85,6 @@ def main(): shutil.copytree(src_path, dst_path, dirs_exist_ok=True) else: shutil.copy2(src_path, dst_path) - # print(f">>>Copied {item} from 'history' to {script_dir}") # Get block ID from the compute directory block_id = compute_dir @@ -46,9 +94,8 @@ def main(): # Collect arguments passed to the script args_dict = {} - for arg in sys.argv[3:]: + for arg in sys.argv[4:]: key, value = arg.split('=') - # Try to convert value to int or float, if applicable if value.isdigit(): args_dict[key] = int(value) else: @@ -57,9 +104,16 @@ def main(): except ValueError: args_dict[key] = value # Leave as string if conversion fails + if(sys.argv[3] == "docker"): + + for key, value in args_dict.items(): + if "\\" in str(value): # Check if value is an absolute path + print("CHECKED") + args_dict[key] = value.split("\\")[-1] # Extract the last part of the path (the file name) + print("DICTS: " , args_dict) + # Ensure images parameter is a Python list (if it's passed as a string) if 'images' in args_dict and isinstance(args_dict['images'], str): - # Convert string to a Python list args_dict['images'] = json.loads(args_dict['images']) # Fetch outputs from pipeline.json and get the corresponding parameters @@ -68,16 +122,16 @@ def main(): debug_inputs[key] = value if value is not None: - params.append(value) # Append the value directly + params.append(value) else: print(f"Warning: No value found for {key} in pipeline.json") - # print("debug|||", debug_inputs) - # Call the compute function + # print(">>>>>>>>>PAssing parameters: " , params) outputs = compute(*params) # Store the final state of files in the block folder (after compute is executed) + final_files = set(os.listdir(script_dir)) # Identify new files created during computation (those not in the initial list) @@ -89,12 +143,11 @@ def main(): dst_path = os.path.join(history_subfolder, new_file) if os.path.exists(dst_path): if os.path.isfile(dst_path): - os.remove(dst_path) # Remove the existing file if it exists + os.remove(dst_path) elif os.path.isdir(dst_path): - shutil.rmtree(dst_path) # Remove the directory if it exists - - shutil.move(src_path, dst_path) # Move new files to the history subfolder + shutil.rmtree(dst_path) + shutil.move(src_path, dst_path) # Output results to files in the 'history' folder for key, value in outputs.items(): @@ -109,9 +162,5 @@ def main(): with open(output_file_path, "w") as file: file.write(json.dumps(value)) - # Optionally log outputs - json_outputs = json.dumps(outputs) - print(">> outputs: ", json_outputs) - if __name__ == "__main__": main() diff --git a/Katana/main.go b/Katana/main.go index 4880c830..8b74b839 100644 --- a/Katana/main.go +++ b/Katana/main.go @@ -3,16 +3,21 @@ package main import ( _ "embed" "encoding/json" + "flag" "fmt" + "io/ioutil" "log" "maps" "os" "os/exec" "path/filepath" "runtime" + "strings" "time" ) +var mode string + //go:embed entrypoint.py var entrypoint []byte @@ -29,10 +34,32 @@ func execCommand(cmd string, dir string, id string, args Dict, historySubfolder commandArgs = append(commandArgs, key+"="+value) } - if runtime.GOOS == "windows" { - command = exec.Command("python", dir+"\\entrypoint.py", dir, historySubfolder) + // Convert the paths to absolute paths + absDir, err := filepath.Abs(dir) + if err != nil { + log.Fatalf("Failed to get absolute path for dir: %v", err) + } + + // Ensure that the historySubfolder is an absolute path, pointing to the parent directory + absHistorySubfolder, err := filepath.Abs(historySubfolder) + if err != nil { + log.Fatalf("Failed to get absolute path for history subfolder: %v", err) + } + + if mode == "docker" { + command = exec.Command("docker", "run", "--rm", + "-v", absDir+":/app", // Mount the block directory as /app in the container + "-v", absHistorySubfolder+":/app/history", // Mount the history subfolder in the container + "pipeline_image", // Docker image + "python", "/app/entrypoint.py", // Run entrypoint.py inside the container + "/app", "/app/history", mode) // Pass arguments to entrypoint.py (use /app/history in the container) } else { - command = exec.Command("python3", dir+"/entrypoint.py", dir, historySubfolder) + // Local execution + if runtime.GOOS == "windows" { + command = exec.Command("python", dir+"\\entrypoint.py", dir, historySubfolder, mode) + } else { + command = exec.Command("python3", dir+"/entrypoint.py", dir, historySubfolder, mode) + } } // Append commandArgs to the command @@ -44,12 +71,7 @@ func execCommand(cmd string, dir string, id string, args Dict, historySubfolder // Execute the command and capture output output, err := command.CombinedOutput() - if err != nil { - log.Println("Error", err) - } else { - log.Println("Output: ", string(output)) - } - + log.Println("Output: ", string(output), "Error from line 45: ", err) return err } @@ -118,14 +140,15 @@ func deployTask(pipeline *Pipeline, historySubfolder string) (Execution, func()) block := block if block.Action.Container.Image != "" { tasks[name] = &Task{Name: name, Exec: func(args Dict) error { - log.Println("Error: ", args) + log.Println("Error from 110: ", args) return nil }} if block.Action.Command.Dir == "" { os.WriteFile(filepath.Join(TMPPATH, name, "entrypoint.py"), entrypoint, 0644) + } else { os.WriteFile(filepath.Join(block.Action.Command.Dir, "entrypoint.py"), entrypoint, 0644) - // fmt.Println("Written from 108") + fmt.Println("Written from 108") } tasks[name] = &Task{Name: name, Exec: func(args Dict) error { // without that entrypoint won't run @@ -257,6 +280,7 @@ func runTask(task *Task) { dict, err := task.Execute(args) // fmt.Println("Executed the task: ", task.Name) + if err != nil { err = fmt.Errorf("\n\ntask %s: %w", task.Name, err) } @@ -271,13 +295,50 @@ func runTask(task *Task) { func main() { - if len(os.Args) < 2 { + flag.StringVar(&mode, "mode", "uv", "Execution mode: uv, no-uv, docker") + flag.Parse() + + if len(os.Args) < 3 { log.Fatal("Pipeline name or path must be provided as an argument.") } - pipelineName := os.Args[1] + pipelineName := os.Args[2] pipelinePath := filepath.Join(".", pipelineName) + if mode == "docker" { // if mood = docker + // for docker file: + mainDockerFile := filepath.Join(pipelinePath, "main-docker") + // Check if 'main-docker' already exists + if _, err := os.Stat(mainDockerFile); err == nil { + fmt.Println("main-docker already exists, using existing image...") + runExistingImage("C:\\Users\\Teertha\\Pictures\\ai-generated-colored-water-drops-on-abstract-background-water-drops-on-colorful-background-colored-wallpaper-ultra-hd-colorful-wallpaper-background-with-colored-bubbles-photo.jpg") + } + // Gather Dockerfiles from each block folder + dockerContent := gatherDockerDependencies(pipelinePath) + requirementsContent := gatherRequirements(pipelinePath) + + // Combine dockerContent and requirements.txt and requirementsContent + dockerContent += "\n# Installing Python requirements\n" + dockerContent += "COPY requirements.txt .\n" + dockerContent += "RUN pip install --no-cache-dir -r requirements.txt\n" + + // Create the main-docker file in the pipeline folder + err := os.WriteFile(mainDockerFile, []byte(dockerContent), 0644) + if err != nil { + fmt.Println("Error writing main-docker:", err) + return + } + + err = os.WriteFile(filepath.Join(pipelinePath, "requirements.txt"), []byte(requirementsContent), 0644) + if err != nil { + fmt.Println("Error writing requirements.txt:", err) + return + } + + // Build the Docker image for the entire pipeline + buildPipelineImage(pipelinePath) + } + data, err := os.ReadFile(filepath.Join(pipelinePath, "pipeline.json")) if err != nil { log.Fatal(err) @@ -303,11 +364,92 @@ func main() { result, err := execution(make(Dict)) if err != nil { - // log.Println("Error from line 282:", err) + log.Println("Error from line 282:", err) } else { - log.Println("COMPLETED.", result) + log.Println("COMPLETED:", result) } release() } + +// Function to gather Docker dependencies from block folders +func gatherDockerDependencies(pipelineDir string) string { + uniqueDockerLines := map[string]bool{} + var dockerfileContent strings.Builder + + filepath.Walk(pipelineDir, func(path string, info os.FileInfo, err error) error { + if info != nil && info.Name() == "Dockerfile" { + content, _ := os.ReadFile(path) + dockerLines := strings.Split(string(content), "\n") + for _, line := range dockerLines { + trimmed := strings.TrimSpace(line) + // Skip lines containing "computations.py" + if trimmed != "" && !uniqueDockerLines[trimmed] && !strings.Contains(trimmed, "computations.py") { + dockerfileContent.WriteString(trimmed + "\n") + uniqueDockerLines[trimmed] = true + } + } + } + return nil + }) + + return dockerfileContent.String() +} + +// Function to gather requirements.txt content from block folders +func gatherRequirements(pipelineDir string) string { + uniqueRequirements := map[string]bool{} + var requirementsContent strings.Builder + + filepath.Walk(pipelineDir, func(path string, info os.FileInfo, err error) error { + if info != nil && info.Name() == "requirements.txt" { + content, _ := ioutil.ReadFile(path) + requirementsLines := strings.Split(string(content), "\n") + for _, line := range requirementsLines { + trimmed := strings.TrimSpace(line) + if trimmed != "" && !uniqueRequirements[trimmed] { + requirementsContent.WriteString(trimmed + "\n") + uniqueRequirements[trimmed] = true + } + } + } + return nil + }) + + return requirementsContent.String() +} + +// Function to build the Docker image from main-docker +func buildPipelineImage(pipelineDir string) { + cmd := exec.Command("docker", "build", "-t", "pipeline_image", "-f", filepath.Join(pipelineDir, "main-docker"), pipelineDir) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + fmt.Println("Error building Docker image:", err) + } else { + fmt.Println("Docker image built successfully.") + } +} + +// Function to use the existing Docker image if 'main-docker' already exists +func runExistingImage(imagePath string) { + // Extract just the filename from the imagePath + imageFile := filepath.Base(imagePath) + + // Set the command to mount the image and history folder + cmd := exec.Command("docker", "run", "--rm", + "-v", imagePath+":/app/"+imageFile, // Mount the image file to /app/image.jpg inside the container + "-v", "/history", // Mount the history folder to /history inside the container + "pipeline_image", // Docker image + "bash", "-c", "ls /app && ls /history") // List the contents of /app and /history to check mounts + + // Execute the command + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + log.Println("Error running Docker image:", err) + } +} From 8323a06dc9a83f23fbdfeb3a0aa8f5dae15645c9 Mon Sep 17 00:00:00 2001 From: Maharaj Teertha Deb <86840917+TeerthaDeb@users.noreply.github.com> Date: Wed, 2 Oct 2024 17:50:21 -0400 Subject: [PATCH 3/8] Katana 3. docker enabled. uv runs fine. DOCKER VERSION IS STILL HARD CODED. --- Katana/entrypoint.py | 26 +++-- Katana/main.go | 271 ++++++++++++++++++++++++------------------- 2 files changed, 169 insertions(+), 128 deletions(-) diff --git a/Katana/entrypoint.py b/Katana/entrypoint.py index 60401a8f..e0ecb663 100644 --- a/Katana/entrypoint.py +++ b/Katana/entrypoint.py @@ -24,6 +24,7 @@ def setup_uv_environment(script_dir): if os.path.exists(requirements_file): print("Installing dependencies inside UV environment...") try: + print("HERE>>>") subprocess.run(["uv", "pip", "install", "-r", requirements_file], check=True, cwd=script_dir) except subprocess.CalledProcessError as e: print(f"Failed to install dependencies: {e}") @@ -59,6 +60,7 @@ def main(): #Activate the environment: rerun_with_uv_environment(script_dir) + # Get the history subfolder from the arguments compute_dir = sys.argv[1] # Directory with compute logic (block folder) history_subfolder = os.path.abspath(sys.argv[2]) @@ -104,21 +106,22 @@ def main(): except ValueError: args_dict[key] = value # Leave as string if conversion fails + args_dict_copy = args_dict.copy() + if(sys.argv[3] == "docker"): - - for key, value in args_dict.items(): + for key, value in args_dict_copy.items(): if "\\" in str(value): # Check if value is an absolute path print("CHECKED") - args_dict[key] = value.split("\\")[-1] # Extract the last part of the path (the file name) - print("DICTS: " , args_dict) + args_dict_copy[key] = value.split("\\")[-1] # Extract the last part of the path (the file name) + print("DICTS: " , args_dict_copy) # Ensure images parameter is a Python list (if it's passed as a string) - if 'images' in args_dict and isinstance(args_dict['images'], str): - args_dict['images'] = json.loads(args_dict['images']) + if 'images' in args_dict_copy and isinstance(args_dict_copy['images'], str): + args_dict_copy['images'] = json.loads(args_dict_copy['images']) # Fetch outputs from pipeline.json and get the corresponding parameters for key in inspect.signature(compute).parameters.keys(): - value = args_dict.get(key) + value = args_dict_copy.get(key) debug_inputs[key] = value if value is not None: @@ -150,14 +153,21 @@ def main(): shutil.move(src_path, dst_path) # Output results to files in the 'history' folder + for key, value in outputs.items(): - result = block_id.rsplit('-', 1)[0].split('\\')[-1] + if sys.argv[3] == "docker": + result = sys.argv[1] + else: + result = block_id.rsplit('-', 1)[0].split('\\')[-1] + if ("background-removal" in block_id): output_file_path = os.path.join(history_subfolder, f"{result}-output_path.txt") elif ("openai-agent" in block_id): output_file_path = os.path.join(history_subfolder, f"{result}-response.txt") else: output_file_path = os.path.join(history_subfolder, f"{result}-image_paths.txt") + + # print("output file path:" , output_file_path) with open(output_file_path, "w") as file: file.write(json.dumps(value)) diff --git a/Katana/main.go b/Katana/main.go index 8b74b839..9227f865 100644 --- a/Katana/main.go +++ b/Katana/main.go @@ -5,14 +5,13 @@ import ( "encoding/json" "flag" "fmt" - "io/ioutil" + "io" "log" "maps" "os" "os/exec" "path/filepath" "runtime" - "strings" "time" ) @@ -35,24 +34,27 @@ func execCommand(cmd string, dir string, id string, args Dict, historySubfolder } // Convert the paths to absolute paths - absDir, err := filepath.Abs(dir) + absDir, err := filepath.Abs(dir) // this should point to the block folder where computations.py is located if err != nil { log.Fatalf("Failed to get absolute path for dir: %v", err) } - - // Ensure that the historySubfolder is an absolute path, pointing to the parent directory + // Convert Windows paths to forward slashes for Docker compatibility + absDir = filepath.ToSlash(absDir) + // Ensure that the historySubfolder is an absolute path absHistorySubfolder, err := filepath.Abs(historySubfolder) if err != nil { log.Fatalf("Failed to get absolute path for history subfolder: %v", err) } + absHistorySubfolder = filepath.ToSlash(absHistorySubfolder) if mode == "docker" { + // Mount the block folder to ensure Docker has access to computations.py command = exec.Command("docker", "run", "--rm", - "-v", absDir+":/app", // Mount the block directory as /app in the container - "-v", absHistorySubfolder+":/app/history", // Mount the history subfolder in the container - "pipeline_image", // Docker image - "python", "/app/entrypoint.py", // Run entrypoint.py inside the container - "/app", "/app/history", mode) // Pass arguments to entrypoint.py (use /app/history in the container) + "-v", absDir+":/app", // Mount the block folder where computations.py is located + "-v", absHistorySubfolder+":/app/history/", // Mount the history folder + "pipeline_image_2", // Docker image -- hardcoded + "python", "/app/entrypoint.py", // Execute entrypoint.py inside Docker + id, "/app/history", mode) // Pass arguments to entrypoint.py (inside Docker paths) } else { // Local execution if runtime.GOOS == "windows" { @@ -71,7 +73,7 @@ func execCommand(cmd string, dir string, id string, args Dict, historySubfolder // Execute the command and capture output output, err := command.CombinedOutput() - log.Println("Output: ", string(output), "Error from line 45: ", err) + log.Println("Output: ", string(output)) return err } @@ -107,9 +109,15 @@ func (t *Task) Execute(args Dict) (Dict, error) { for key, value := range t.MapOut { data, err := os.ReadFile(filepath.Join(TMPPATH, t.Name, value) + ".txt") // wants to read from the block floder + if err != nil { - var new_histrory_subfolder = historySubfolder[len(filepath.Base(TMPPATH)):] - data, err = os.ReadFile(filepath.Join(TMPPATH, new_histrory_subfolder, value) + ".txt") // wants to read from the 'history' floder + if mode != "docker" { + var new_histrory_subfolder = historySubfolder[len(filepath.Base(TMPPATH)):] + data, err = os.ReadFile(filepath.Join(TMPPATH, new_histrory_subfolder, value) + ".txt") // wants to read from the 'history' floder + } else { + println("From line:199. value: ", value, ", TMPPATH:", TMPPATH) + data, err = os.ReadFile(filepath.Join(historySubfolder, value) + ".txt") + } } // fmt.Println("## Trying to readfile for the task:", t.Name, ". And the file to read: ", data, ", error while reading: ", err) if err != nil { @@ -148,7 +156,7 @@ func deployTask(pipeline *Pipeline, historySubfolder string) (Execution, func()) } else { os.WriteFile(filepath.Join(block.Action.Command.Dir, "entrypoint.py"), entrypoint, 0644) - fmt.Println("Written from 108") + // fmt.Println("Written from 108") } tasks[name] = &Task{Name: name, Exec: func(args Dict) error { // without that entrypoint won't run @@ -248,19 +256,16 @@ func runTask(task *Task) { for { var executionError error // initially (0x0 , 0x0) - for i := 0; i < len(task.In); i++ { // fmt.Print("\nin run task for task: ", task.Name, "\tCurrent Task.in: ", task.In[i]) arg, ok := <-task.In[i] // fmt.Println("\t>>ARG , OK: ", arg, ok) if !ok { - // fmt.Println("NOT OKAY for : ", task.Name) for _, next := range task.Out { close(next) } return } - maps.Copy(args, arg.Dict) if arg.Err != nil { @@ -270,7 +275,6 @@ func runTask(task *Task) { } if executionError != nil { - // fmt.Println("\n\t\t\t\tERROR FOUND IN LINE: 213") for _, next := range task.Out { next <- Message{Err: executionError} } @@ -305,40 +309,118 @@ func main() { pipelineName := os.Args[2] pipelinePath := filepath.Join(".", pipelineName) - if mode == "docker" { // if mood = docker - // for docker file: - mainDockerFile := filepath.Join(pipelinePath, "main-docker") - // Check if 'main-docker' already exists - if _, err := os.Stat(mainDockerFile); err == nil { - fmt.Println("main-docker already exists, using existing image...") - runExistingImage("C:\\Users\\Teertha\\Pictures\\ai-generated-colored-water-drops-on-abstract-background-water-drops-on-colorful-background-colored-wallpaper-ultra-hd-colorful-wallpaper-background-with-colored-bubbles-photo.jpg") - } - // Gather Dockerfiles from each block folder - dockerContent := gatherDockerDependencies(pipelinePath) - requirementsContent := gatherRequirements(pipelinePath) + if mode == "docker" { + fmt.Println("Docker mode running") + buildAndRunDockerImage(pipelinePath) + return + } else { + fmt.Println("Running in non-docker mode") + runLocalPipeline(pipelinePath) + } + +} - // Combine dockerContent and requirements.txt and requirementsContent - dockerContent += "\n# Installing Python requirements\n" - dockerContent += "COPY requirements.txt .\n" - dockerContent += "RUN pip install --no-cache-dir -r requirements.txt\n" +// TRYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY - // Create the main-docker file in the pipeline folder - err := os.WriteFile(mainDockerFile, []byte(dockerContent), 0644) - if err != nil { - fmt.Println("Error writing main-docker:", err) - return +func buildAndRunDockerImage(pipelinePath string) { + // Step 1: Check if Docker image already exists + imageName := "pipeline_image_2" + checkCmd := exec.Command("docker", "images", "-q", imageName) + checkOutput, err := checkCmd.Output() + if err != nil { + log.Fatalf("Error checking Docker image: %v", err) + } + + // If the image does not exist, build it + if len(checkOutput) == 0 { + log.Println("Docker image not found, building...") + dockerFile := filepath.Join(pipelinePath, "Dockerfile") + + // Check if Dockerfile exists + if _, err := os.Stat(dockerFile); os.IsNotExist(err) { + log.Fatalf("Dockerfile not found in %s", pipelinePath) } - err = os.WriteFile(filepath.Join(pipelinePath, "requirements.txt"), []byte(requirementsContent), 0644) + // Build the Docker image + buildCmd := exec.Command("docker", "build", "-t", imageName, "-f", dockerFile, pipelinePath) + buildCmd.Stdout = os.Stdout + buildCmd.Stderr = os.Stderr + + log.Println("Building Docker image...") + err := buildCmd.Run() if err != nil { - fmt.Println("Error writing requirements.txt:", err) - return + log.Fatalf("Error building Docker image: %v", err) } + } else { + log.Println("Using existing Docker image...") + } + + // Convert paths to absolute paths with forward slashes + absPipelinePath, err := filepath.Abs(pipelinePath) + if err != nil { + log.Fatalf("Failed to get absolute path: %v", err) + } + absPipelinePath = filepath.ToSlash(absPipelinePath) + + // Step 2: Prepare history folder + historySubfolder = prepareHistoryFolder(absPipelinePath) + + // Step 3: Copy files to history folder + copyToHistoryFolder(historySubfolder) + + // Step 4: Run the local pipeline using the Docker environment + runLocalPipelineInDocker(absPipelinePath, historySubfolder) +} + +func copyToHistoryFolder(historySubfolder string) { + // Hardcoded source file path + srcFile := "C:\\Users\\Teertha\\Pictures\\img.jpg" // HARDCODE + + // Destination path in the history folder + destFile := filepath.Join(historySubfolder, filepath.Base(srcFile)) + + // Copy the file + source, err := os.Open(srcFile) + if err != nil { + log.Fatalf("Failed to open source file: %v", err) + } + defer source.Close() + + destination, err := os.Create(destFile) + if err != nil { + log.Fatalf("Failed to create destination file: %v", err) + } + defer destination.Close() - // Build the Docker image for the entire pipeline - buildPipelineImage(pipelinePath) + _, err = io.Copy(destination, source) + if err != nil { + log.Fatalf("Failed to copy file: %v", err) } + log.Println("Copied file to", destFile) +} + +func prepareHistoryFolder(pipelinePath string) string { + // Create the history folder path + historyDir := filepath.Join(pipelinePath, "history") + + // Create the timestamped subfolder inside history folder + timestamp := time.Now().Format("2006-01-02_15-04-05") + historySubfolder := filepath.Join(historyDir, timestamp) + + // Ensure the history subfolder exists + err := os.MkdirAll(historySubfolder, os.ModePerm) + if err != nil { + log.Fatalf("Failed to create history folder: %v", err) + } + + log.Println("Prepared history folder at", historySubfolder) + + // Return the path of the history subfolder for further use + return historySubfolder +} + +func runLocalPipelineInDocker(pipelinePath string, historySubfolder string) { data, err := os.ReadFile(filepath.Join(pipelinePath, "pipeline.json")) if err != nil { log.Fatal(err) @@ -354,12 +436,9 @@ func main() { TMPPATH = pipeline.Sink } - // Create the history subfolder using the current timestamp - historyDir := filepath.Join(pipelinePath, "history") - timestamp := time.Now().Format("2006-01-02_15-04-05") - historySubfolder = filepath.Join(historyDir, timestamp) - os.MkdirAll(historySubfolder, os.ModePerm) + // Skip history subfolder creation since it is already done + // Deploy the task and execute it execution, release := deployTask(&pipeline, historySubfolder) result, err := execution(make(Dict)) @@ -370,86 +449,38 @@ func main() { } release() - } -// Function to gather Docker dependencies from block folders -func gatherDockerDependencies(pipelineDir string) string { - uniqueDockerLines := map[string]bool{} - var dockerfileContent strings.Builder - - filepath.Walk(pipelineDir, func(path string, info os.FileInfo, err error) error { - if info != nil && info.Name() == "Dockerfile" { - content, _ := os.ReadFile(path) - dockerLines := strings.Split(string(content), "\n") - for _, line := range dockerLines { - trimmed := strings.TrimSpace(line) - // Skip lines containing "computations.py" - if trimmed != "" && !uniqueDockerLines[trimmed] && !strings.Contains(trimmed, "computations.py") { - dockerfileContent.WriteString(trimmed + "\n") - uniqueDockerLines[trimmed] = true - } - } - } - return nil - }) +func runLocalPipeline(pipelinePath string) { + data, err := os.ReadFile(filepath.Join(pipelinePath, "pipeline.json")) + if err != nil { + log.Fatal(err) + } - return dockerfileContent.String() -} + var pipeline Pipeline + err = json.Unmarshal(data, &pipeline) + if err != nil { + log.Fatal(err) + } -// Function to gather requirements.txt content from block folders -func gatherRequirements(pipelineDir string) string { - uniqueRequirements := map[string]bool{} - var requirementsContent strings.Builder - - filepath.Walk(pipelineDir, func(path string, info os.FileInfo, err error) error { - if info != nil && info.Name() == "requirements.txt" { - content, _ := ioutil.ReadFile(path) - requirementsLines := strings.Split(string(content), "\n") - for _, line := range requirementsLines { - trimmed := strings.TrimSpace(line) - if trimmed != "" && !uniqueRequirements[trimmed] { - requirementsContent.WriteString(trimmed + "\n") - uniqueRequirements[trimmed] = true - } - } - } - return nil - }) + if pipeline.Sink != "" { + TMPPATH = pipeline.Sink + } - return requirementsContent.String() -} + // Create the history subfolder using the current timestamp + historyDir := filepath.Join(pipelinePath, "history") + timestamp := time.Now().Format("2006-01-02_15-04-05") + historySubfolder = filepath.Join(historyDir, timestamp) + os.MkdirAll(historySubfolder, os.ModePerm) -// Function to build the Docker image from main-docker -func buildPipelineImage(pipelineDir string) { - cmd := exec.Command("docker", "build", "-t", "pipeline_image", "-f", filepath.Join(pipelineDir, "main-docker"), pipelineDir) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - err := cmd.Run() + execution, release := deployTask(&pipeline, historySubfolder) + + result, err := execution(make(Dict)) if err != nil { - fmt.Println("Error building Docker image:", err) + log.Println("Error from line 481:", err) } else { - fmt.Println("Docker image built successfully.") + log.Println("COMPLETED:", result) } -} -// Function to use the existing Docker image if 'main-docker' already exists -func runExistingImage(imagePath string) { - // Extract just the filename from the imagePath - imageFile := filepath.Base(imagePath) - - // Set the command to mount the image and history folder - cmd := exec.Command("docker", "run", "--rm", - "-v", imagePath+":/app/"+imageFile, // Mount the image file to /app/image.jpg inside the container - "-v", "/history", // Mount the history folder to /history inside the container - "pipeline_image", // Docker image - "bash", "-c", "ls /app && ls /history") // List the contents of /app and /history to check mounts - - // Execute the command - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - err := cmd.Run() - if err != nil { - log.Println("Error running Docker image:", err) - } + release() } From 25e43368fd7d635beeffa45ffb939d23e7e6a224 Mon Sep 17 00:00:00 2001 From: Maharaj Teertha Deb <86840917+TeerthaDeb@users.noreply.github.com> Date: Fri, 4 Oct 2024 12:35:23 -0400 Subject: [PATCH 4/8] Katana Final 0.0 --- Katana/README.MD | 108 +++++++++++++++++++++++++++++++++++++++++++ Katana/entrypoint.py | 4 +- Katana/main.go | 102 ++++++++++++++++++++++------------------ Katana/schema.go | 6 +-- 4 files changed, 172 insertions(+), 48 deletions(-) create mode 100644 Katana/README.MD diff --git a/Katana/README.MD b/Katana/README.MD new file mode 100644 index 00000000..39c9bae9 --- /dev/null +++ b/Katana/README.MD @@ -0,0 +1,108 @@ +# Katana + +Katana is a Go-based tool designed to manage and execute pipelines for various computational tasks, including Docker and non-Docker modes. It handles the execution of blocks defined in pipelines and efficiently manages file movement and history logging. + +## Features + +- Execute pipelines in Docker or locally. +- Supports mounting files and managing environments. +- Tracks execution history in timestamped folders. +- Handles the movement of computation outputs to history folders. + +## Usage + +Katana can be run in two modes: `docker` and `local` (non-docker mode). It expects a pipeline directory with blocks and their corresponding computational logic. + +### Running in Docker Mode + +``` +go run . --mode=docker +``` +- PRE REQ: you must have a dockerfile , and requirements.txt(could be empty) files inside the block folder +- This will build the Docker image if it's not found and execute the pipeline using Docker. +- All necessary files from the pipeline folder will be mounted in Docker containers. + +#### Dockerfile Example + +Here’s an example of a `Dockerfile` that can be used to run the pipeline: + +```Dockerfile +FROM python:3.10-slim + +# Set working directory +WORKDIR /app + +# Copy necessary files +COPY . . + +# Install FFmpeg +RUN apt-get update && \ + apt-get install -y ffmpeg && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Install Python dependencies +RUN pip install -r requirements.txt +``` + + +### Running in Non-Docker Mode + +``` +go run . --mode=uv +``` +or +``` +go run . --mode=no-uv +``` + +- This will execute the pipeline directly on your local machine without Docker. + +## Pipeline Structure + +A pipeline directory contains multiple blocks, each of which has its own computational logic (such as `computations.py`). Here’s an example of the directory structure: + +``` +main_dir/ + |--> pipeline1/ + | |--> block1/ + | | |--> computations.py + | | |--> requirements.txt + | |--> block2/ + | |--> pipeline.json + |--> entrypoint.py + |--> katana (Go) +``` + +Each block has: + +- `computations.py`: The script that executes the block's computation. +- `requirements.txt`: The Python dependencies required to run `computations.py`. + +### History Management + +For each pipeline execution, a timestamped folder will be created inside the `history` folder. All output files will be moved to this folder. + +## Example + +To run the pipeline named `abcd` in Docker mode: + +``` +go run . --mode=docker abcd +``` + +## Configuration + +You can define blocks and their parameters in `pipeline.json` within the pipeline directory. This file defines the computational blocks and how they connect with each other. + + +## Important Notice + +If you encounter an error such as: +`path not found: 'files/fiz/xxxxx...'` + +You may need to update the `computations.py` file. Modify the line that generates the HTML path to ensure it uses a unique identifier. For example: + +```python +html_path = f"viz_{unique_id}.html" +``` diff --git a/Katana/entrypoint.py b/Katana/entrypoint.py index e0ecb663..de6a6b83 100644 --- a/Katana/entrypoint.py +++ b/Katana/entrypoint.py @@ -164,10 +164,12 @@ def main(): output_file_path = os.path.join(history_subfolder, f"{result}-output_path.txt") elif ("openai-agent" in block_id): output_file_path = os.path.join(history_subfolder, f"{result}-response.txt") + elif ("images-to-video" in block_id): + output_file_path = os.path.join(history_subfolder, f"{result}-video_path.txt") else: output_file_path = os.path.join(history_subfolder, f"{result}-image_paths.txt") - # print("output file path:" , output_file_path) + print("output file path:" , output_file_path) with open(output_file_path, "w") as file: file.write(json.dumps(value)) diff --git a/Katana/main.go b/Katana/main.go index 9227f865..97005f63 100644 --- a/Katana/main.go +++ b/Katana/main.go @@ -24,6 +24,8 @@ var TMPPATH = "/tmp/katana" var historySubfolder = "" +var docker_image_name = "" + func execCommand(cmd string, dir string, id string, args Dict, historySubfolder string) error { var command *exec.Cmd var commandArgs []string @@ -52,7 +54,7 @@ func execCommand(cmd string, dir string, id string, args Dict, historySubfolder command = exec.Command("docker", "run", "--rm", "-v", absDir+":/app", // Mount the block folder where computations.py is located "-v", absHistorySubfolder+":/app/history/", // Mount the history folder - "pipeline_image_2", // Docker image -- hardcoded + docker_image_name, // Docker image -- hardcoded "python", "/app/entrypoint.py", // Execute entrypoint.py inside Docker id, "/app/history", mode) // Pass arguments to entrypoint.py (inside Docker paths) } else { @@ -299,6 +301,24 @@ func runTask(task *Task) { func main() { + pipelineName := os.Args[2] + pipelinePath := filepath.Join(".", pipelineName) + + data, err := os.ReadFile(filepath.Join(pipelinePath, "pipeline.json")) + if err != nil { + log.Fatal(err) + } + + var pipeline Pipeline + err = json.Unmarshal(data, &pipeline) + if err != nil { + log.Fatal(err) + } + + if pipeline.Sink != "" { + TMPPATH = pipeline.Sink + } + flag.StringVar(&mode, "mode", "uv", "Execution mode: uv, no-uv, docker") flag.Parse() @@ -306,25 +326,34 @@ func main() { log.Fatal("Pipeline name or path must be provided as an argument.") } - pipelineName := os.Args[2] - pipelinePath := filepath.Join(".", pipelineName) + var filePaths []string // extracting files for the pipelines. + for _, block := range pipeline.Pipeline { + for paramName, param := range block.Action.Parameters { + if paramName == "path" { + filePaths = append(filePaths, param.Value) // Assuming 'Value' contains the file path + } + } + } + + // fmt.Println("File Paths:", filePaths) if mode == "docker" { fmt.Println("Docker mode running") - buildAndRunDockerImage(pipelinePath) + docker_image_name = "katana-" + pipeline.Id + // fmt.Print("docker_image_name : ", docker_image_name) + buildAndRunDockerImage(pipelinePath, docker_image_name, filePaths) return } else { fmt.Println("Running in non-docker mode") - runLocalPipeline(pipelinePath) + runLocalPipeline(pipeline, pipelinePath) } } // TRYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY -func buildAndRunDockerImage(pipelinePath string) { +func buildAndRunDockerImage(pipelinePath string, imageName string, filepaths []string) { // Step 1: Check if Docker image already exists - imageName := "pipeline_image_2" checkCmd := exec.Command("docker", "images", "-q", imageName) checkOutput, err := checkCmd.Output() if err != nil { @@ -366,38 +395,37 @@ func buildAndRunDockerImage(pipelinePath string) { historySubfolder = prepareHistoryFolder(absPipelinePath) // Step 3: Copy files to history folder - copyToHistoryFolder(historySubfolder) + copyToHistoryFolder(historySubfolder, filepaths) // Step 4: Run the local pipeline using the Docker environment runLocalPipelineInDocker(absPipelinePath, historySubfolder) } -func copyToHistoryFolder(historySubfolder string) { - // Hardcoded source file path - srcFile := "C:\\Users\\Teertha\\Pictures\\img.jpg" // HARDCODE +func copyToHistoryFolder(historySubfolder string, filePaths []string) { + for _, srcFile := range filePaths { + // Destination path in the history folder + destFile := filepath.Join(historySubfolder, filepath.Base(srcFile)) - // Destination path in the history folder - destFile := filepath.Join(historySubfolder, filepath.Base(srcFile)) + // Copy the file + source, err := os.Open(srcFile) + if err != nil { + log.Fatalf("Failed to open source file: %v", err) + } + defer source.Close() - // Copy the file - source, err := os.Open(srcFile) - if err != nil { - log.Fatalf("Failed to open source file: %v", err) - } - defer source.Close() + destination, err := os.Create(destFile) + if err != nil { + log.Fatalf("Failed to create destination file: %v", err) + } + defer destination.Close() - destination, err := os.Create(destFile) - if err != nil { - log.Fatalf("Failed to create destination file: %v", err) - } - defer destination.Close() + _, err = io.Copy(destination, source) + if err != nil { + log.Fatalf("Failed to copy file: %v", err) + } - _, err = io.Copy(destination, source) - if err != nil { - log.Fatalf("Failed to copy file: %v", err) + log.Println("Copied file to", destFile) } - - log.Println("Copied file to", destFile) } func prepareHistoryFolder(pipelinePath string) string { @@ -451,21 +479,7 @@ func runLocalPipelineInDocker(pipelinePath string, historySubfolder string) { release() } -func runLocalPipeline(pipelinePath string) { - data, err := os.ReadFile(filepath.Join(pipelinePath, "pipeline.json")) - if err != nil { - log.Fatal(err) - } - - var pipeline Pipeline - err = json.Unmarshal(data, &pipeline) - if err != nil { - log.Fatal(err) - } - - if pipeline.Sink != "" { - TMPPATH = pipeline.Sink - } +func runLocalPipeline(pipeline Pipeline, pipelinePath string) { // Create the history subfolder using the current timestamp historyDir := filepath.Join(pipelinePath, "history") diff --git a/Katana/schema.go b/Katana/schema.go index d0778128..3fc75d53 100644 --- a/Katana/schema.go +++ b/Katana/schema.go @@ -36,13 +36,13 @@ type Container struct { } type Command struct { - Exec string `json:"exec"` - Dir string `json:"dir,omitempty"` + Exec string `json:"exec"` + Dir string `json:"dir,omitempty"` } type Action struct { Container Container `json:"container,omitempty"` - Command Command `json:"command,omitempty"` // commmand replaces docker, actual command,deer paremeter: if the command is empty, i + Command Command `json:"command,omitempty"` // commmand replaces docker, actual command,deer paremeter: if the command is empty, i Pipeline map[string]Block `json:"pipeline,omitempty"` Parameters map[string]Parameter `json:"parameters,omitempty"` } From 2d3c6d767bd6a89b37186522c68e90507a4c8fc9 Mon Sep 17 00:00:00 2001 From: Maharaj Teertha Deb <86840917+TeerthaDeb@users.noreply.github.com> Date: Fri, 4 Oct 2024 12:38:40 -0400 Subject: [PATCH 5/8] Update README.MD --- Katana/README.MD | 1 + 1 file changed, 1 insertion(+) diff --git a/Katana/README.MD b/Katana/README.MD index 39c9bae9..4b5ac0cd 100644 --- a/Katana/README.MD +++ b/Katana/README.MD @@ -57,6 +57,7 @@ go run . --mode=no-uv ``` - This will execute the pipeline directly on your local machine without Docker. +- UV mode will create a virtual environment inside block folders, install requirements and then run each block. All the subsequent runs will use the existing environment to run. no installataion will be there after the 1st run. ## Pipeline Structure From f386498df7409c1e9164b73d7f0c3b94749b50be Mon Sep 17 00:00:00 2001 From: Maharaj Teertha Deb <86840917+TeerthaDeb@users.noreply.github.com> Date: Fri, 4 Oct 2024 12:39:40 -0400 Subject: [PATCH 6/8] Update README.MD --- Katana/README.MD | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Katana/README.MD b/Katana/README.MD index 4b5ac0cd..762ff400 100644 --- a/Katana/README.MD +++ b/Katana/README.MD @@ -56,8 +56,8 @@ or go run . --mode=no-uv ``` -- This will execute the pipeline directly on your local machine without Docker. -- UV mode will create a virtual environment inside block folders, install requirements and then run each block. All the subsequent runs will use the existing environment to run. no installataion will be there after the 1st run. +- No-UV : This will execute the pipeline directly on your local machine without Docker. +- UV mode : This will create a virtual environment inside block folders, install requirements and then run each block. All the subsequent runs will use the existing environment to run. no installataion will be there after the 1st run. ## Pipeline Structure From b494bfdfe56effe6c08f9c4237543ad96d89df55 Mon Sep 17 00:00:00 2001 From: Maharaj Teertha Deb <86840917+TeerthaDeb@users.noreply.github.com> Date: Wed, 23 Oct 2024 14:15:34 -0400 Subject: [PATCH 7/8] Katana with multiple parameters, less prints Katana with multiple parameters, less prints and dynamic outputs. --- Katana/README.MD | 19 +++-- Katana/entrypoint.py | 79 +++++++++++-------- Katana/main.go | 182 +++++++++++++++++++++++++++---------------- 3 files changed, 173 insertions(+), 107 deletions(-) diff --git a/Katana/README.MD b/Katana/README.MD index 762ff400..886775db 100644 --- a/Katana/README.MD +++ b/Katana/README.MD @@ -16,11 +16,12 @@ Katana can be run in two modes: `docker` and `local` (non-docker mode). It expec ### Running in Docker Mode ``` -go run . --mode=docker +go run . --mode=docker ``` - PRE REQ: you must have a dockerfile , and requirements.txt(could be empty) files inside the block folder - This will build the Docker image if it's not found and execute the pipeline using Docker. - All necessary files from the pipeline folder will be mounted in Docker containers. +- are optional if you don't pass it, katana will pick from pipeline.json file. #### Dockerfile Example @@ -49,15 +50,14 @@ RUN pip install -r requirements.txt ### Running in Non-Docker Mode ``` -go run . --mode=uv +go run . --mode=uv ``` or ``` -go run . --mode=no-uv +go run . --mode=no-uv ``` -- No-UV : This will execute the pipeline directly on your local machine without Docker. -- UV mode : This will create a virtual environment inside block folders, install requirements and then run each block. All the subsequent runs will use the existing environment to run. no installataion will be there after the 1st run. +- This will execute the pipeline directly on your local machine without Docker. ## Pipeline Structure @@ -86,10 +86,15 @@ For each pipeline execution, a timestamped folder will be created inside the `hi ## Example -To run the pipeline named `abcd` in Docker mode: +To run the pipeline named `abcd` in Docker mode: (let's say abcd is a canny-edge folder) ``` -go run . --mode=docker abcd +go run . --mode=docker abcd path:"C:\Users\Teertha\Pictures\eeb3cafe-eb49-4162-9fcb-96323d07fd1c.jpg" integer:4 integer:500 integer:20 +``` + +To run the pipeline named 'efgh'(open ai agent) in uv mode: +``` +go run . --mode=docker 'efgh' password:"OPEN-AI_API_KEY" text:"Write a very short article about pandas" text:"You write articles on educational topics." ``` ## Configuration diff --git a/Katana/entrypoint.py b/Katana/entrypoint.py index de6a6b83..17e22b55 100644 --- a/Katana/entrypoint.py +++ b/Katana/entrypoint.py @@ -4,6 +4,7 @@ import sys import shutil import subprocess +import platform def setup_uv_environment(script_dir): # Path to the UV environment @@ -11,20 +12,15 @@ def setup_uv_environment(script_dir): # Create UV environment if it does not exist if not os.path.exists(uv_env_path): - print("Setting up UV environment...") - try: subprocess.run(["uv", "venv", uv_env_path], check=True) except subprocess.CalledProcessError as e: print(f"Failed to create UV environment: {e}") sys.exit(1) - # Install dependencies from requirements.txt inside the UV environment requirements_file = os.path.join(script_dir, "requirements.txt") if os.path.exists(requirements_file): - print("Installing dependencies inside UV environment...") try: - print("HERE>>>") subprocess.run(["uv", "pip", "install", "-r", requirements_file], check=True, cwd=script_dir) except subprocess.CalledProcessError as e: print(f"Failed to install dependencies: {e}") @@ -34,7 +30,10 @@ def setup_uv_environment(script_dir): def rerun_with_uv_environment(script_dir): # Path to the Python interpreter in the UV environment - uv_python = os.path.join(script_dir, ".venv", "Scripts", "python.exe") + if platform.system() == "Windows": + uv_python = os.path.join(script_dir, ".venv", "Scripts", "python.exe") + else: + uv_python = os.path.join(script_dir, ".venv", "bin", "python3.9") # -- maybe I need to change the python version accordingly. # Check if the current Python interpreter is already the one in the UV environment if sys.executable == uv_python: @@ -50,7 +49,7 @@ def rerun_with_uv_environment(script_dir): sys.exit(1) def main(): - print("ARGUMENTS: " , sys.argv) + # print("ARGUMENTS: " , sys.argv) # Get the directory of the script (block folder) script_dir = os.path.dirname(os.path.abspath(__file__)) @@ -68,7 +67,7 @@ def main(): os.chdir(script_dir) # Ensure the 'history' subfolder exists - os.makedirs(history_subfolder, exist_ok=True) + os.makedirs(history_subfolder, exist_ok=True) # This should not need to run otherwise file is missing in history subfolder. # Add the block folder (compute_dir) to the Python path to import 'computations' sys.path.insert(0, compute_dir) @@ -96,15 +95,29 @@ def main(): # Collect arguments passed to the script args_dict = {} - for arg in sys.argv[4:]: - key, value = arg.split('=') - if value.isdigit(): - args_dict[key] = int(value) - else: - try: - args_dict[key] = float(value) - except ValueError: - args_dict[key] = value # Leave as string if conversion fails + if sys.argv[3] == "docker": + for arg in sys.argv[5:]: + key, value = arg.split('=') + if value.isdigit(): + args_dict[key] = int(value) + else: + try: + args_dict[key] = float(value) + except ValueError: + args_dict[key] = value # Leave as string if conversion fails + + else: + for arg in sys.argv[4:]: + key, value = arg.split('=') + if value.isdigit(): + args_dict[key] = int(value) + else: + try: + args_dict[key] = float(value) + except ValueError: + args_dict[key] = value # Leave as string if conversion fails + + args_dict_copy = args_dict.copy() @@ -113,7 +126,6 @@ def main(): if "\\" in str(value): # Check if value is an absolute path print("CHECKED") args_dict_copy[key] = value.split("\\")[-1] # Extract the last part of the path (the file name) - print("DICTS: " , args_dict_copy) # Ensure images parameter is a Python list (if it's passed as a string) if 'images' in args_dict_copy and isinstance(args_dict_copy['images'], str): @@ -127,7 +139,7 @@ def main(): if value is not None: params.append(value) else: - print(f"Warning: No value found for {key} in pipeline.json") + print(f"Warning: No value found for {key} in pipeline.json or in argument") # Call the compute function # print(">>>>>>>>>PAssing parameters: " , params) @@ -156,21 +168,22 @@ def main(): for key, value in outputs.items(): if sys.argv[3] == "docker": - result = sys.argv[1] + result = sys.argv[4].split('\\')[-1] else: - result = block_id.rsplit('-', 1)[0].split('\\')[-1] - - if ("background-removal" in block_id): - output_file_path = os.path.join(history_subfolder, f"{result}-output_path.txt") - elif ("openai-agent" in block_id): - output_file_path = os.path.join(history_subfolder, f"{result}-response.txt") - elif ("images-to-video" in block_id): - output_file_path = os.path.join(history_subfolder, f"{result}-video_path.txt") - else: - output_file_path = os.path.join(history_subfolder, f"{result}-image_paths.txt") - - print("output file path:" , output_file_path) - + result = block_id.split('\\')[-1] + + # if ("background-removal" in block_id): + # output_file_path = os.path.join(history_subfolder, f"{result}-output_path.txt") + # elif ("openai-agent" in block_id): + # output_file_path = os.path.join(history_subfolder, f"{result}-response.txt") + # elif ("images-to-video" in block_id): + # output_file_path = os.path.join(history_subfolder, f"{result}-video_path.txt") + # else: + # output_file_path = os.path.join(history_subfolder, f"{result}-image_paths.txt") + output_file_path = os.path.join(history_subfolder, f"{result}.txt") + + print("OUTPUT FILE:" ,output_file_path) + with open(output_file_path, "w") as file: file.write(json.dumps(value)) diff --git a/Katana/main.go b/Katana/main.go index 97005f63..a4b4eed1 100644 --- a/Katana/main.go +++ b/Katana/main.go @@ -12,11 +12,14 @@ import ( "os/exec" "path/filepath" "runtime" + "strings" "time" ) var mode string +var argsKeyVal []ArgsKeyVal // store the parameters passed by arguments. + //go:embed entrypoint.py var entrypoint []byte @@ -34,7 +37,6 @@ func execCommand(cmd string, dir string, id string, args Dict, historySubfolder for key, value := range args { commandArgs = append(commandArgs, key+"="+value) } - // Convert the paths to absolute paths absDir, err := filepath.Abs(dir) // this should point to the block folder where computations.py is located if err != nil { @@ -54,9 +56,9 @@ func execCommand(cmd string, dir string, id string, args Dict, historySubfolder command = exec.Command("docker", "run", "--rm", "-v", absDir+":/app", // Mount the block folder where computations.py is located "-v", absHistorySubfolder+":/app/history/", // Mount the history folder - docker_image_name, // Docker image -- hardcoded + docker_image_name, "python", "/app/entrypoint.py", // Execute entrypoint.py inside Docker - id, "/app/history", mode) // Pass arguments to entrypoint.py (inside Docker paths) + id, "/app/history", mode, dir) // Pass arguments to entrypoint.py (inside Docker paths) } else { // Local execution if runtime.GOOS == "windows" { @@ -117,7 +119,6 @@ func (t *Task) Execute(args Dict) (Dict, error) { var new_histrory_subfolder = historySubfolder[len(filepath.Base(TMPPATH)):] data, err = os.ReadFile(filepath.Join(TMPPATH, new_histrory_subfolder, value) + ".txt") // wants to read from the 'history' floder } else { - println("From line:199. value: ", value, ", TMPPATH:", TMPPATH) data, err = os.ReadFile(filepath.Join(historySubfolder, value) + ".txt") } } @@ -164,12 +165,27 @@ func deployTask(pipeline *Pipeline, historySubfolder string) (Execution, func()) tasks[name] = &Task{Name: name, Exec: func(args Dict) error { // without that entrypoint won't run return execCommand(block.Action.Command.Exec, filepath.Join(TMPPATH, name), block.Information.Id, args, historySubfolder) }} - } else if len(block.Action.Parameters) > 0 { + } else if len(block.Action.Parameters) > 0 { // importatnt for passing parameters + // Iterate over block parameters for key, value := range block.Action.Parameters { - inputs[name] = &Param{Name: key, Value: value.Value} - // fmt.Println("Key", name, ", Value: ", value) + finalValue := value.Value // Default to the original value + + // Search for a matching key in argsKeyVal + for i := range argsKeyVal { // I implemented a bubble-sort like search algo to match key val arguments. + if argsKeyVal[i].Key == key { + // If a match is found, replace the final value with the argument value + finalValue = argsKeyVal[i].Value + + // Mark this key-value pair as used + argsKeyVal[i].Key = "-1" + argsKeyVal[i].Value = "-1" + break // Stop searching after the first match + } + } + + inputs[name] = &Param{Name: key, Value: finalValue} + // fmt.Println("Key:", key, ", Final Value:", finalValue) } - // fmt.Println("inputs:", inputs) } else if block.Action.Command.Exec != "" { if block.Action.Command.Dir == "" { os.WriteFile("entrypoint.py", entrypoint, 0644) @@ -211,7 +227,8 @@ func deployTask(pipeline *Pipeline, historySubfolder string) (Execution, func()) outputs[name] = pipe task.Out = append(task.Out, pipe) } else { - task.MapOut[name+label] = block.Information.Id + "-" + label + // task.MapOut[name+label] = block.Information.Id + "-" + label + task.MapOut[name+label] = name } } } @@ -271,7 +288,6 @@ func runTask(task *Task) { maps.Copy(args, arg.Dict) if arg.Err != nil { - // fmt.Println(">>>>>>>>>ERROR found in : ", task.Name, " , and the error is: ", arg.Err) executionError = arg.Err } } @@ -299,61 +315,7 @@ func runTask(task *Task) { } } -func main() { - - pipelineName := os.Args[2] - pipelinePath := filepath.Join(".", pipelineName) - - data, err := os.ReadFile(filepath.Join(pipelinePath, "pipeline.json")) - if err != nil { - log.Fatal(err) - } - - var pipeline Pipeline - err = json.Unmarshal(data, &pipeline) - if err != nil { - log.Fatal(err) - } - - if pipeline.Sink != "" { - TMPPATH = pipeline.Sink - } - - flag.StringVar(&mode, "mode", "uv", "Execution mode: uv, no-uv, docker") - flag.Parse() - - if len(os.Args) < 3 { - log.Fatal("Pipeline name or path must be provided as an argument.") - } - - var filePaths []string // extracting files for the pipelines. - for _, block := range pipeline.Pipeline { - for paramName, param := range block.Action.Parameters { - if paramName == "path" { - filePaths = append(filePaths, param.Value) // Assuming 'Value' contains the file path - } - } - } - - // fmt.Println("File Paths:", filePaths) - - if mode == "docker" { - fmt.Println("Docker mode running") - docker_image_name = "katana-" + pipeline.Id - // fmt.Print("docker_image_name : ", docker_image_name) - buildAndRunDockerImage(pipelinePath, docker_image_name, filePaths) - return - } else { - fmt.Println("Running in non-docker mode") - runLocalPipeline(pipeline, pipelinePath) - } - -} - -// TRYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY - func buildAndRunDockerImage(pipelinePath string, imageName string, filepaths []string) { - // Step 1: Check if Docker image already exists checkCmd := exec.Command("docker", "images", "-q", imageName) checkOutput, err := checkCmd.Output() if err != nil { @@ -391,13 +353,10 @@ func buildAndRunDockerImage(pipelinePath string, imageName string, filepaths []s } absPipelinePath = filepath.ToSlash(absPipelinePath) - // Step 2: Prepare history folder historySubfolder = prepareHistoryFolder(absPipelinePath) - // Step 3: Copy files to history folder copyToHistoryFolder(historySubfolder, filepaths) - // Step 4: Run the local pipeline using the Docker environment runLocalPipelineInDocker(absPipelinePath, historySubfolder) } @@ -487,6 +446,23 @@ func runLocalPipeline(pipeline Pipeline, pipelinePath string) { historySubfolder = filepath.Join(historyDir, timestamp) os.MkdirAll(historySubfolder, os.ModePerm) + // if there is any file in the values, copy it in history subfolder. + for _, arg := range argsKeyVal { + if arg.Key == "path" { // could be other KEY--------- CHECK LATER. + src := filepath.Clean(arg.Value) + dest := filepath.Join(historySubfolder, filepath.Base(arg.Value)) + // fmt.Println("HISTORY SUBFOLDER:", historySubfolder, " , FILE PATH: ", dest) + input, err := os.ReadFile(src) + if err != nil { + log.Fatal(err) + } + if err := os.WriteFile(dest, input, 0644); err != nil { + log.Fatal(err) + } + fmt.Println("Moved file to", dest) + } + } + execution, release := deployTask(&pipeline, historySubfolder) result, err := execution(make(Dict)) @@ -498,3 +474,75 @@ func runLocalPipeline(pipeline Pipeline, pipelinePath string) { release() } + +type ArgsKeyVal struct { + Key string + Value string +} + +func main() { + + pipelineName := os.Args[2] + pipelinePath := filepath.Join(".", pipelineName) + + data, err := os.ReadFile(filepath.Join(pipelinePath, "pipeline.json")) + if err != nil { + log.Fatal(err) + } + + var pipeline Pipeline + err = json.Unmarshal(data, &pipeline) + if err != nil { + log.Fatal(err) + } + + if pipeline.Sink != "" { + TMPPATH = pipeline.Sink + } + + flag.StringVar(&mode, "mode", "uv", "Execution mode: uv, no-uv, docker") + flag.Parse() + + if len(os.Args) < 3 { + log.Fatal("Pipeline name or path must be provided as an argument.") + } + + for _, arg := range os.Args[3:] { + parts := strings.SplitN(arg, ":", 2) + if len(parts) == 2 { + argsKeyVal = append(argsKeyVal, ArgsKeyVal{ + Key: parts[0], + Value: parts[1], + }) + } + } + + var filePaths []string // extracting files for the pipelines. + for _, block := range pipeline.Pipeline { + for paramName, param := range block.Action.Parameters { + if paramName == "path" { + filePaths = append(filePaths, param.Value) + } + } + } + + for _, arg := range argsKeyVal { + if arg.Key == "path" { + filePaths = append(filePaths, arg.Value) + } + } + + fmt.Println("PARSED ARGUMENT: ", argsKeyVal, " AND GOT ARGUMENT: ", os.Args[3:]) // print the key value arguements. + + if mode == "docker" { + fmt.Println("Docker mode running") + fmt.Println("filePaths: ", filePaths) + docker_image_name = "katana-" + pipeline.Id + // fmt.Print("docker_image_name : ", docker_image_name) + buildAndRunDockerImage(pipelinePath, docker_image_name, filePaths) + return + } else { + fmt.Println("Running in non-docker mode") + runLocalPipeline(pipeline, pipelinePath) + } +} From 62647ffbdbc2471c7114d3fdb7350fe5f7bd1f1a Mon Sep 17 00:00:00 2001 From: Maharaj Teertha Deb <86840917+TeerthaDeb@users.noreply.github.com> Date: Wed, 23 Oct 2024 14:16:10 -0400 Subject: [PATCH 8/8] Update README.MD --- Katana/README.MD | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Katana/README.MD b/Katana/README.MD index 886775db..190a8801 100644 --- a/Katana/README.MD +++ b/Katana/README.MD @@ -94,7 +94,7 @@ go run . --mode=docker abcd path:"C:\Users\Teertha\Pictures\eeb3cafe-eb49-4162-9 To run the pipeline named 'efgh'(open ai agent) in uv mode: ``` -go run . --mode=docker 'efgh' password:"OPEN-AI_API_KEY" text:"Write a very short article about pandas" text:"You write articles on educational topics." +go run . --mode=uv 'efgh' password:"OPEN-AI_API_KEY" text:"Write a very short article about pandas" text:"You write articles on educational topics." ``` ## Configuration