Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Katana Added #216

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions Katana/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Katana

Katana is a Go-based tool designed to manage and execute pipelines for various computational tasks, including Docker and non-Docker modes. It handles the execution of blocks defined in pipelines and efficiently manages file movement and history logging.

## Features

- Execute pipelines in Docker or locally.
- Supports mounting files and managing environments.
- Tracks execution history in timestamped folders.
- Handles the movement of computation outputs to history folders.

## Usage

Katana can be run in two modes: `docker` and `local` (non-docker mode). It expects a pipeline directory with blocks and their corresponding computational logic.

### Running in Docker Mode

```
go run . --mode=docker <pipeline-name> <your-parameters>
```
- PRE REQ: you must have a dockerfile , and requirements.txt(could be empty) files inside the block folder
- This will build the Docker image if it's not found and execute the pipeline using Docker.
- All necessary files from the pipeline folder will be mounted in Docker containers.
- <your-parameters> are optional if you don't pass it, katana will pick from pipeline.json file.

#### Dockerfile Example

Here’s an example of a `Dockerfile` that can be used to run the pipeline:

```Dockerfile
FROM python:3.10-slim

# Set working directory
WORKDIR /app

# Copy necessary files
COPY . .

# Install FFmpeg
RUN apt-get update && \
apt-get install -y ffmpeg && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Install Python dependencies
RUN pip install -r requirements.txt
```


### Running in Non-Docker Mode

```
go run . --mode=uv <pipeline-name> <your inputs>
```
or
```
go run . --mode=no-uv <pipeline-name> <your inputs>
```

- This will execute the pipeline directly on your local machine without Docker.

## Pipeline Structure

A pipeline directory contains multiple blocks, each of which has its own computational logic (such as `computations.py`). Here’s an example of the directory structure:

```
main_dir/
|--> pipeline1/
| |--> block1/
| | |--> computations.py
| | |--> requirements.txt
| |--> block2/
| |--> pipeline.json
|--> entrypoint.py
|--> katana (Go)
```

Each block has:

- `computations.py`: The script that executes the block's computation.
- `requirements.txt`: The Python dependencies required to run `computations.py`.

### History Management

For each pipeline execution, a timestamped folder will be created inside the `history` folder. All output files will be moved to this folder.

## Example

To run the pipeline named `abcd` in Docker mode: (let's say abcd is a canny-edge folder)

```
go run . --mode=docker abcd path:"C:\Users\Teertha\Pictures\eeb3cafe-eb49-4162-9fcb-96323d07fd1c.jpg" integer:4 integer:500 integer:20
```

To run the pipeline named 'efgh'(open ai agent) in uv mode:
```
go run . --mode=uv 'efgh' password:"OPEN-AI_API_KEY" text:"Write a very short article about pandas" text:"You write articles on educational topics."
```

## Configuration

You can define blocks and their parameters in `pipeline.json` within the pipeline directory. This file defines the computational blocks and how they connect with each other.


## Important Notice

If you encounter an error such as:
`path not found: 'files/fiz/xxxxx...'`

You may need to update the `computations.py` file. Modify the line that generates the HTML path to ensure it uses a unique identifier. For example:

```python
html_path = f"viz_{unique_id}.html"
```
191 changes: 191 additions & 0 deletions Katana/entrypoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import os
import inspect
import json
import sys
import shutil
import subprocess
import platform

def setup_uv_environment(script_dir):
# Path to the UV environment
uv_env_path = os.path.join(script_dir, ".venv")

# Create UV environment if it does not exist
if not os.path.exists(uv_env_path):
try:
subprocess.run(["uv", "venv", uv_env_path], check=True)
except subprocess.CalledProcessError as e:
print(f"Failed to create UV environment: {e}")
sys.exit(1)

requirements_file = os.path.join(script_dir, "requirements.txt")
if os.path.exists(requirements_file):
try:
subprocess.run(["uv", "pip", "install", "-r", requirements_file], check=True, cwd=script_dir)
except subprocess.CalledProcessError as e:
print(f"Failed to install dependencies: {e}")
sys.exit(1)
else:
print("No requirements.txt found. Skipping dependency installation.")

def rerun_with_uv_environment(script_dir):
# Path to the Python interpreter in the UV environment
if platform.system() == "Windows":
uv_python = os.path.join(script_dir, ".venv", "Scripts", "python.exe")
else:
uv_python = os.path.join(script_dir, ".venv", "bin", "python3.9") # -- maybe I need to change the python version accordingly.

# Check if the current Python interpreter is already the one in the UV environment
if sys.executable == uv_python:
return # Continue with the script

if os.path.exists(uv_python):
print(f"Re-running the script with the UV environment's Python interpreter: {uv_python}")
# Re-run the current script with the UV environment's Python interpreter
subprocess.run([uv_python, __file__] + sys.argv[1:], check=True)
sys.exit(0) # Exit the original process
else:
print("Could not find the Python interpreter in the UV environment.")
sys.exit(1)

def main():
# print("ARGUMENTS: " , sys.argv)
# Get the directory of the script (block folder)
script_dir = os.path.dirname(os.path.abspath(__file__))

# Set up the UV environment if uv in argument:
if(sys.argv[3] == "uv"):
setup_uv_environment(script_dir)
#Activate the environment:
rerun_with_uv_environment(script_dir)


# Get the history subfolder from the arguments
compute_dir = sys.argv[1] # Directory with compute logic (block folder)
history_subfolder = os.path.abspath(sys.argv[2])

os.chdir(script_dir)

# Ensure the 'history' subfolder exists
os.makedirs(history_subfolder, exist_ok=True) # This should not need to run otherwise file is missing in history subfolder.

# Add the block folder (compute_dir) to the Python path to import 'computations'
sys.path.insert(0, compute_dir)

# Store the initial list of existing files in the current block folder
initial_files = set(os.listdir(script_dir))

# Import 'compute' function after adding the path
from computations import compute

# Move all files from the 'history' folder to the current block folder
for item in os.listdir(history_subfolder):
src_path = os.path.join(history_subfolder, item)
dst_path = os.path.join(script_dir, item)
if os.path.isdir(src_path):
shutil.copytree(src_path, dst_path, dirs_exist_ok=True)
else:
shutil.copy2(src_path, dst_path)

# Get block ID from the compute directory
block_id = compute_dir

params = []
debug_inputs = {}

# Collect arguments passed to the script
args_dict = {}
if sys.argv[3] == "docker":
for arg in sys.argv[5:]:
key, value = arg.split('=')
if value.isdigit():
args_dict[key] = int(value)
else:
try:
args_dict[key] = float(value)
except ValueError:
args_dict[key] = value # Leave as string if conversion fails

else:
for arg in sys.argv[4:]:
key, value = arg.split('=')
if value.isdigit():
args_dict[key] = int(value)
else:
try:
args_dict[key] = float(value)
except ValueError:
args_dict[key] = value # Leave as string if conversion fails



args_dict_copy = args_dict.copy()

if(sys.argv[3] == "docker"):
for key, value in args_dict_copy.items():
if "\\" in str(value): # Check if value is an absolute path
print("CHECKED")
args_dict_copy[key] = value.split("\\")[-1] # Extract the last part of the path (the file name)

# Ensure images parameter is a Python list (if it's passed as a string)
if 'images' in args_dict_copy and isinstance(args_dict_copy['images'], str):
args_dict_copy['images'] = json.loads(args_dict_copy['images'])

# Fetch outputs from pipeline.json and get the corresponding parameters
for key in inspect.signature(compute).parameters.keys():
value = args_dict_copy.get(key)
debug_inputs[key] = value

if value is not None:
params.append(value)
else:
print(f"Warning: No value found for {key} in pipeline.json or in argument")

# Call the compute function
# print(">>>>>>>>>PAssing parameters: " , params)
outputs = compute(*params)

# Store the final state of files in the block folder (after compute is executed)

final_files = set(os.listdir(script_dir))

# Identify new files created during computation (those not in the initial list)
new_files = final_files - initial_files

# Move the newly created files back to the 'history' folder
for new_file in new_files:
src_path = os.path.join(script_dir, new_file)
dst_path = os.path.join(history_subfolder, new_file)
if os.path.exists(dst_path):
if os.path.isfile(dst_path):
os.remove(dst_path)
elif os.path.isdir(dst_path):
shutil.rmtree(dst_path)

shutil.move(src_path, dst_path)

# Output results to files in the 'history' folder

for key, value in outputs.items():
if sys.argv[3] == "docker":
result = sys.argv[4].split('\\')[-1]
else:
result = block_id.split('\\')[-1]

# if ("background-removal" in block_id):
# output_file_path = os.path.join(history_subfolder, f"{result}-output_path.txt")
# elif ("openai-agent" in block_id):
# output_file_path = os.path.join(history_subfolder, f"{result}-response.txt")
# elif ("images-to-video" in block_id):
# output_file_path = os.path.join(history_subfolder, f"{result}-video_path.txt")
# else:
# output_file_path = os.path.join(history_subfolder, f"{result}-image_paths.txt")
output_file_path = os.path.join(history_subfolder, f"{result}.txt")

print("OUTPUT FILE:" ,output_file_path)

with open(output_file_path, "w") as file:
file.write(json.dumps(value))

if __name__ == "__main__":
main()
3 changes: 3 additions & 0 deletions Katana/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module katana

go 1.21.1
Loading
Loading