diff --git a/add.py b/add.py new file mode 100644 index 0000000..7a7ac9f --- /dev/null +++ b/add.py @@ -0,0 +1,12 @@ +from pathlib import Path + +# Read inputs +a = Path("/workflow/inputs/first_value").read_text() +b = Path("/workflow/inputs/second_value").read_text() + +# Calculate sum +sum = int(a) + int(b) +print(f"The sum of {a} + {b} is {sum}") + +# Write output +Path("/workflow/outputs/sum").write_text(str(sum)) \ No newline at end of file diff --git a/aggregate.py b/aggregate.py new file mode 100644 index 0000000..b97405b --- /dev/null +++ b/aggregate.py @@ -0,0 +1,13 @@ +from pathlib import Path + +# Read inputs +sum_value = Path("/workflow/inputs/sum").read_text() +difference_value = Path("/workflow/inputs/difference").read_text() +product_value = Path("/workflow/inputs/product").read_text() + +# Aggregate results (this is a simple example, more complex logic can be applied) +aggregate_result = int(sum_value) + int(difference_value) + int(product_value) +print(f"The aggregated result is {aggregate_result}") + +# Write output +Path("/workflow/outputs/aggregate").write_text(str(aggregate_result)) diff --git a/demo.png b/demo.png new file mode 100644 index 0000000..b600e78 Binary files /dev/null and b/demo.png differ diff --git a/divide.py b/divide.py new file mode 100644 index 0000000..859d1a4 --- /dev/null +++ b/divide.py @@ -0,0 +1,15 @@ +from pathlib import Path + +# Read inputs +a = Path("/workflow/inputs/first_value").read_text() +b = Path("/workflow/inputs/second_value").read_text() + +# Calculate division (ensure no division by zero) +if int(b) == 0: + division = "undefined" +else: + division = int(a) / int(b) +print(f"The division of {a} / {b} is {division}") + +# Write output +Path("/workflow/outputs/division").write_text(str(division)) diff --git a/modulus.py b/modulus.py new file mode 100644 index 0000000..74d904e --- /dev/null +++ b/modulus.py @@ -0,0 +1,15 @@ +from pathlib import Path + +# Read inputs +a = Path("/workflow/inputs/first_value").read_text() +b = Path("/workflow/inputs/second_value").read_text() + +# Calculate modulus (ensure no division by zero) +if int(b) == 0: + modulus = "undefined" +else: + modulus = int(a) % int(b) +print(f"The modulus of {a} % {b} is {modulus}") + +# Write output +Path("/workflow/outputs/modulus").write_text(str(modulus)) diff --git a/multiply.py b/multiply.py new file mode 100644 index 0000000..9887440 --- /dev/null +++ b/multiply.py @@ -0,0 +1,12 @@ +from pathlib import Path + +# Read inputs +a = Path("/workflow/inputs/first_value").read_text() +b = Path("/workflow/inputs/second_value").read_text() + +# Calculate product +product = int(a) * int(b) +print(f"The product of {a} * {b} is {product}") + +# Write output +Path("/workflow/outputs/product").write_text(str(product)) \ No newline at end of file diff --git a/run_workflow.py b/run_workflow.py new file mode 100644 index 0000000..41b72dc --- /dev/null +++ b/run_workflow.py @@ -0,0 +1,19 @@ +import random +import subprocess + +# Generate random values for a and b +a = random.randint(1, 100) +b = random.randint(1, 100) + +# Print the randomized inputs +print(f"Running workflow with a={a} and b={b}") + +# Command to run the Flyte workflow +command = f"pyflyte run --remote workflow.py etl_workflow --a {a} --b {b}" + +# Execute the command +result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True) + +# Print the output of the command +print("Workflow output:") +print(result.stdout) diff --git a/sqrt.py b/sqrt.py new file mode 100644 index 0000000..0f859f0 --- /dev/null +++ b/sqrt.py @@ -0,0 +1,11 @@ +from pathlib import Path + +# Read input +value = Path("/workflow/inputs/value").read_text() + +# Calculate square root +sqrt = int(value) ** 0.5 +print(f"The square root of {value} is {sqrt}") + +# Write output +Path("/workflow/outputs/sqrt").write_text(str(sqrt)) \ No newline at end of file diff --git a/subtract.py b/subtract.py new file mode 100644 index 0000000..d4ce881 --- /dev/null +++ b/subtract.py @@ -0,0 +1,12 @@ +from pathlib import Path + +# Read inputs +a = Path("/workflow/inputs/first_value").read_text() +b = Path("/workflow/inputs/second_value").read_text() + +# Calculate difference +difference = int(a) - int(b) +print(f"The difference between {a} - {b} is {difference}") + +# Write output +Path("/workflow/outputs/difference").write_text(str(difference)) diff --git a/workflow.py b/workflow.py new file mode 100644 index 0000000..b8d9308 --- /dev/null +++ b/workflow.py @@ -0,0 +1,67 @@ +from flytekit import workflow +from flytekitplugins.domino.task import DominoJobConfig, DominoJobTask + +@workflow +def etl_workflow(a: int, b: int) -> (int, int, int, str, str, int): + # Create addition task + add_task = DominoJobTask( + name='Add Data', + domino_job_config=DominoJobConfig(Command="python add.py"), + inputs={'first_value': int, 'second_value': int}, + outputs={'sum': int}, + use_latest=True + ) + sum_result = add_task(first_value=a, second_value=b) + + # Create multiplication task + multiply_task = DominoJobTask( + name='Check numbers', + domino_job_config=DominoJobConfig(Command="python multiply.py"), + inputs={'first_value': int, 'second_value': int}, + outputs={'product': int}, + use_latest=True + ) + product_result = multiply_task(first_value=a, second_value=b) + + # Create subtraction task that depends on the addition task + subtract_task = DominoJobTask( + name='Remove data', + domino_job_config=DominoJobConfig(Command="python subtract.py"), + inputs={'first_value': int, 'second_value': int}, + outputs={'difference': int}, + use_latest=True + ) + difference_result = subtract_task(first_value=sum_result, second_value=b) + + # Create division task that runs in parallel with the subtraction task + divide_task = DominoJobTask( + name='Sort the data', + domino_job_config=DominoJobConfig(Command="python divide.py"), + inputs={'first_value': int, 'second_value': int}, + outputs={'division': str}, + use_latest=True + ) + division_result = divide_task(first_value=a, second_value=b) + + # Create modulus task that runs in parallel with the subtraction and division tasks + modulus_task = DominoJobTask( + name='Line it all up', + domino_job_config=DominoJobConfig(Command="python modulus.py"), + inputs={'first_value': int, 'second_value': int}, + outputs={'modulus': str}, + use_latest=True + ) + modulus_result = modulus_task(first_value=a, second_value=b) + + # Create an aggregation task that depends on the sum, difference, and product tasks + aggregate_task = DominoJobTask( + name='Aggregate results', + domino_job_config=DominoJobConfig(Command="python aggregate.py"), + inputs={'sum': int, 'difference': int, 'product': int}, + outputs={'aggregate': int}, + use_latest=True + ) + aggregate_result = aggregate_task(sum=sum_result, difference=difference_result, product=product_result) + + # Return the results as separate outputs + return sum_result, product_result, difference_result, division_result, modulus_result, aggregate_result \ No newline at end of file