Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run command in pyflyte-fast-execute in the same process #3029

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from flytekit.core import utils
from flytekit.core.base_task import IgnoreOutputs, PythonTask
from flytekit.core.checkpointer import SyncCheckpoint
from flytekit.core.constants import FLYTE_FAIL_ON_ERROR
from flytekit.core.constants import FLYTE_FAIL_ON_ERROR, FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS
from flytekit.core.context_manager import (
ExecutionParameters,
ExecutionState,
Expand Down Expand Up @@ -721,6 +721,37 @@ def execute_task_cmd(
)


def _run_cmd_in_new_process(cmd, dest_dir):
"""Run cmd in a new process."""
env = os.environ.copy()
if dest_dir is not None:
dest_dir_resolved = os.path.realpath(os.path.expanduser(dest_dir))
if "PYTHONPATH" in env:
env["PYTHONPATH"] += os.pathsep + dest_dir_resolved
else:
env["PYTHONPATH"] = dest_dir_resolved
p = subprocess.Popen(cmd, env=env)

def handle_sigterm(signum, frame):
logger.info(f"passing signum {signum} [frame={frame}] to subprocess")
p.send_signal(signum)

signal.signal(signal.SIGTERM, handle_sigterm)
returncode = p.wait()
exit(returncode)


def _run_cmd_in_current_process(command: click.Command, args: List[str], dest_dir: Optional[str]):
"""Run command with args in the same process."""

if dest_dir is not None:
dest_dir_resolved = os.path.realpath(os.path.expanduser(dest_dir))
if all(os.path.realpath(path) != dest_dir_resolved for path in sys.path):
sys.path.append(dest_dir_resolved)

command(args)


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for Popen

Consider adding error handling around subprocess.Popen() call to catch potential OSError or ValueError exceptions that could occur during process creation.

Code suggestion
Check the AI-generated fix before applying
Suggested change
try:
p = subprocess.Popen(cmd, env=env)
except (OSError, ValueError) as e:
logger.error(f"Failed to start subprocess: {e}")
exit(1)

Code Review Run #ff52b8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

thomasjpfan marked this conversation as resolved.
Show resolved Hide resolved
@_pass_through.command("pyflyte-fast-execute")
@click.option("--additional-distribution", required=False)
@click.option("--dest-dir", required=False)
Expand All @@ -742,24 +773,16 @@ def fast_execute_task_cmd(additional_distribution: str, dest_dir: str, task_exec
cmd.extend(["--dynamic-addl-distro", additional_distribution, "--dynamic-dest-dir", dest_dir])
cmd.append(arg)

commands_to_run_in_process = {cmd.name: cmd for cmd in [map_execute_task_cmd, execute_task_cmd]}
Copy link
Contributor

@flyte-bot flyte-bot Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving static dict to module level

Consider moving the commands_to_run_in_process dictionary definition outside the function to avoid recreating it on every call. This dictionary is static and could be defined at module level.

Code suggestion
Check the AI-generated fix before applying
 @@ -1,1 +1,3 @@
 +_commands_to_run_in_process = {cmd.name: cmd for cmd in [map_execute_task_cmd, execute_task_cmd]}
 +
  def fast_execute_task_cmd(additional_distribution: str, dest_dir: str, task_execute_cmd: List[str]):
 @@ -776,1 +778,0 @@
 -    commands_to_run_in_process = {cmd.name: cmd for cmd in [map_execute_task_cmd, execute_task_cmd]}

Code Review Run #ff52b8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


# Use the commandline to run the task execute command rather than calling it directly in python code
# since the current runtime bytecode references the older user code, rather than the downloaded distribution.
env = os.environ.copy()
if dest_dir is not None:
dest_dir_resolved = os.path.realpath(os.path.expanduser(dest_dir))
if "PYTHONPATH" in env:
env["PYTHONPATH"] += os.pathsep + dest_dir_resolved
else:
env["PYTHONPATH"] = dest_dir_resolved
p = subprocess.Popen(cmd, env=env)

def handle_sigterm(signum, frame):
logger.info(f"passing signum {signum} [frame={frame}] to subprocess")
p.send_signal(signum)

signal.signal(signal.SIGTERM, handle_sigterm)
returncode = p.wait()
exit(returncode)
if str2bool(os.getenv(FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS)) or cmd[0] not in commands_to_run_in_process:
thomasjpfan marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(f"Running {cmd[0]} in a new process")
_run_cmd_in_new_process(cmd, dest_dir)
else:
logger.debug(f"Running {cmd[0]} in the same process")
_run_cmd_in_current_process(commands_to_run_in_process[cmd[0]], cmd[1:], dest_dir)


@_pass_through.command("pyflyte-map-execute")
Expand Down
3 changes: 3 additions & 0 deletions flytekit/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
# Set this environment variable to true to force the task to return non-zero exit code on failure.
FLYTE_FAIL_ON_ERROR = "FLYTE_FAIL_ON_ERROR"

# Set this environment variable to true to force pyflyte-fast-execute to run task-execute-cmd in a separate process
FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS = "FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we run a workflow in the integration tests with FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS enabled? Just want to make sure the old behavior is still working.

# Executions launched by the current eager task will be tagged with this key:current_eager_exec_name
EAGER_TAG_KEY = "eager-exec"

Expand Down
15 changes: 13 additions & 2 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,18 @@ def register():
assert out.returncode == 0


def run(file_name, wf_name, *args) -> str:
def run(file_name, wf_name, *wf_args, run_args=None) -> str:
thomasjpfan marked this conversation as resolved.
Show resolved Hide resolved
# Copy the environment and set the environment variable
if run_args is None:
run_args = []
out = subprocess.run(
[
"pyflyte",
"--verbose",
"-c",
CONFIG,
"run",
*run_args,
"--remote",
"--destination-dir",
DEST_DIR,
Expand All @@ -85,7 +88,7 @@ def run(file_name, wf_name, *args) -> str:
DOMAIN,
MODULE_PATH / file_name,
wf_name,
*args,
*wf_args,
],
capture_output=True, # Capture the output streams
text=True, # Return outputs as strings (not bytes)
Expand All @@ -110,6 +113,14 @@ def test_remote_run():
run("default_lp.py", "my_wf")


def test_remote_run_in_new_process():
# child_workflow.parent_wf asynchronously register a parent wf1 with child lp from another wf2.
run("child_workflow.py", "parent_wf", "--a", "3", run_args=["--env", "FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS=1"])

# run twice to make sure it will register a new version of the workflow.
run("default_lp.py", "my_wf", run_args=["--env", "FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS=1"])


def test_remote_eager_run():
# child_workflow.parent_wf asynchronously register a parent wf1 with child lp from another wf2.
run("eager_example.py", "simple_eager_workflow", "--x", "3")
Expand Down
Loading