Skip to content

Commit

Permalink
Merge pull request #6 from dominopetter/main
Browse files Browse the repository at this point in the history
flows
  • Loading branch information
dominopetter authored Nov 13, 2024
2 parents 3cc86f2 + 0b2ceeb commit 9ae0f28
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 0 deletions.
12 changes: 12 additions & 0 deletions add.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from pathlib import Path

# Read inputs
a = Path("/workflow/inputs/first_value").read_text()
b = Path("/workflow/inputs/second_value").read_text()

# Calculate sum
sum = int(a) + int(b)
print(f"The sum of {a} + {b} is {sum}")

# Write output
Path("/workflow/outputs/sum").write_text(str(sum))
13 changes: 13 additions & 0 deletions aggregate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from pathlib import Path

# Read inputs
sum_value = Path("/workflow/inputs/sum").read_text()
difference_value = Path("/workflow/inputs/difference").read_text()
product_value = Path("/workflow/inputs/product").read_text()

# Aggregate results (this is a simple example, more complex logic can be applied)
aggregate_result = int(sum_value) + int(difference_value) + int(product_value)
print(f"The aggregated result is {aggregate_result}")

# Write output
Path("/workflow/outputs/aggregate").write_text(str(aggregate_result))
Binary file added demo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 15 additions & 0 deletions divide.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pathlib import Path

# Read inputs
a = Path("/workflow/inputs/first_value").read_text()
b = Path("/workflow/inputs/second_value").read_text()

# Calculate division (ensure no division by zero)
if int(b) == 0:
division = "undefined"
else:
division = int(a) / int(b)
print(f"The division of {a} / {b} is {division}")

# Write output
Path("/workflow/outputs/division").write_text(str(division))
15 changes: 15 additions & 0 deletions modulus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pathlib import Path

# Read inputs
a = Path("/workflow/inputs/first_value").read_text()
b = Path("/workflow/inputs/second_value").read_text()

# Calculate modulus (ensure no division by zero)
if int(b) == 0:
modulus = "undefined"
else:
modulus = int(a) % int(b)
print(f"The modulus of {a} % {b} is {modulus}")

# Write output
Path("/workflow/outputs/modulus").write_text(str(modulus))
12 changes: 12 additions & 0 deletions multiply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from pathlib import Path

# Read inputs
a = Path("/workflow/inputs/first_value").read_text()
b = Path("/workflow/inputs/second_value").read_text()

# Calculate product
product = int(a) * int(b)
print(f"The product of {a} * {b} is {product}")

# Write output
Path("/workflow/outputs/product").write_text(str(product))
19 changes: 19 additions & 0 deletions run_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import random
import subprocess

# Generate random values for a and b
a = random.randint(1, 100)
b = random.randint(1, 100)

# Print the randomized inputs
print(f"Running workflow with a={a} and b={b}")

# Command to run the Flyte workflow
command = f"pyflyte run --remote workflow.py etl_workflow --a {a} --b {b}"

# Execute the command
result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)

# Print the output of the command
print("Workflow output:")
print(result.stdout)
11 changes: 11 additions & 0 deletions sqrt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from pathlib import Path

# Read input
value = Path("/workflow/inputs/value").read_text()

# Calculate square root
sqrt = int(value) ** 0.5
print(f"The square root of {value} is {sqrt}")

# Write output
Path("/workflow/outputs/sqrt").write_text(str(sqrt))
12 changes: 12 additions & 0 deletions subtract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from pathlib import Path

# Read inputs
a = Path("/workflow/inputs/first_value").read_text()
b = Path("/workflow/inputs/second_value").read_text()

# Calculate difference
difference = int(a) - int(b)
print(f"The difference between {a} - {b} is {difference}")

# Write output
Path("/workflow/outputs/difference").write_text(str(difference))
67 changes: 67 additions & 0 deletions workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from flytekit import workflow
from flytekitplugins.domino.task import DominoJobConfig, DominoJobTask

@workflow
def etl_workflow(a: int, b: int) -> (int, int, int, str, str, int):
# Create addition task
add_task = DominoJobTask(
name='Add Data',
domino_job_config=DominoJobConfig(Command="python add.py"),
inputs={'first_value': int, 'second_value': int},
outputs={'sum': int},
use_latest=True
)
sum_result = add_task(first_value=a, second_value=b)

# Create multiplication task
multiply_task = DominoJobTask(
name='Check numbers',
domino_job_config=DominoJobConfig(Command="python multiply.py"),
inputs={'first_value': int, 'second_value': int},
outputs={'product': int},
use_latest=True
)
product_result = multiply_task(first_value=a, second_value=b)

# Create subtraction task that depends on the addition task
subtract_task = DominoJobTask(
name='Remove data',
domino_job_config=DominoJobConfig(Command="python subtract.py"),
inputs={'first_value': int, 'second_value': int},
outputs={'difference': int},
use_latest=True
)
difference_result = subtract_task(first_value=sum_result, second_value=b)

# Create division task that runs in parallel with the subtraction task
divide_task = DominoJobTask(
name='Sort the data',
domino_job_config=DominoJobConfig(Command="python divide.py"),
inputs={'first_value': int, 'second_value': int},
outputs={'division': str},
use_latest=True
)
division_result = divide_task(first_value=a, second_value=b)

# Create modulus task that runs in parallel with the subtraction and division tasks
modulus_task = DominoJobTask(
name='Line it all up',
domino_job_config=DominoJobConfig(Command="python modulus.py"),
inputs={'first_value': int, 'second_value': int},
outputs={'modulus': str},
use_latest=True
)
modulus_result = modulus_task(first_value=a, second_value=b)

# Create an aggregation task that depends on the sum, difference, and product tasks
aggregate_task = DominoJobTask(
name='Aggregate results',
domino_job_config=DominoJobConfig(Command="python aggregate.py"),
inputs={'sum': int, 'difference': int, 'product': int},
outputs={'aggregate': int},
use_latest=True
)
aggregate_result = aggregate_task(sum=sum_result, difference=difference_result, product=product_result)

# Return the results as separate outputs
return sum_result, product_result, difference_result, division_result, modulus_result, aggregate_result

0 comments on commit 9ae0f28

Please sign in to comment.