From d250748b15005d0ff995797f00419a2e74e1a025 Mon Sep 17 00:00:00 2001 From: Sean Yang Date: Tue, 7 Jan 2025 17:18:45 -0800 Subject: [PATCH] improve error handling, edge cases --- nvflare/private/fed/client/admin_commands.py | 10 +++++-- nvflare/private/fed/client/client_engine.py | 2 +- nvflare/private/fed/client/client_executor.py | 18 ++++++++--- nvflare/private/fed/client/client_runner.py | 7 ----- nvflare/private/fed/client/sys_cmd.py | 11 +++++-- nvflare/private/fed/client/training_cmds.py | 9 ++++-- nvflare/private/fed/server/job_cmds.py | 30 ++++++++++++++----- nvflare/private/fed/server/server_commands.py | 11 +++---- nvflare/private/fed/server/server_engine.py | 11 ++++--- nvflare/private/fed/server/server_runner.py | 7 ----- nvflare/private/fed/server/sys_cmd.py | 18 +++++++---- 11 files changed, 85 insertions(+), 49 deletions(-) diff --git a/nvflare/private/fed/client/admin_commands.py b/nvflare/private/fed/client/admin_commands.py index 67797ac0c5..5b164d70d5 100644 --- a/nvflare/private/fed/client/admin_commands.py +++ b/nvflare/private/fed/client/admin_commands.py @@ -17,7 +17,9 @@ from nvflare.apis.fl_constant import AdminCommandNames, FLContextKey from nvflare.apis.fl_context import FLContext from nvflare.apis.shareable import Shareable +from nvflare.fuel.utils.log_utils import dynamic_log_config from nvflare.private.fed.client.client_status import get_status_message +from nvflare.security.logging import secure_format_exception from nvflare.widgets.info_collector import InfoCollector from nvflare.widgets.widget import WidgetID @@ -267,9 +269,11 @@ def process(self, data: Shareable, fl_ctx: FLContext): Returns: configure_job_log command message """ - client_runner = fl_ctx.get_prop(FLContextKey.RUNNER) - if client_runner: - client_runner.configure_job_log(data, fl_ctx) + engine = fl_ctx.get_engine() + try: + dynamic_log_config(data, engine.get_workspace(), fl_ctx.get_job_id()) + except Exception as e: + return secure_format_exception(e) class AdminCommands(object): diff --git a/nvflare/private/fed/client/client_engine.py b/nvflare/private/fed/client/client_engine.py index 0acc8cfd59..9db0994727 100644 --- a/nvflare/private/fed/client/client_engine.py +++ b/nvflare/private/fed/client/client_engine.py @@ -468,7 +468,7 @@ def get_errors(self, job_id): return self.client_executor.get_errors(job_id) def configure_job_log(self, job_id, config): - self.client_executor.configure_job_log(job_id, config) + return self.client_executor.configure_job_log(job_id, config) def reset_errors(self, job_id): self.client_executor.reset_errors(job_id) diff --git a/nvflare/private/fed/client/client_executor.py b/nvflare/private/fed/client/client_executor.py index 2057f7f601..5b502d173a 100644 --- a/nvflare/private/fed/client/client_executor.py +++ b/nvflare/private/fed/client/client_executor.py @@ -284,19 +284,29 @@ def configure_job_log(self, job_id, config): job_id: the job_id config: log config + Returns: + configure_job_log command message """ try: request = new_cell_message({}, config) - self.client.cell.fire_and_forget( - targets=self._job_fqcn(job_id), + return_data = self.client.cell.send_request( + target=self._job_fqcn(job_id), channel=CellChannel.CLIENT_COMMAND, topic=AdminCommandNames.CONFIGURE_JOB_LOG, - message=request, + request=request, optional=True, + timeout=self.job_query_timeout, ) + return_code = return_data.get_header(MessageHeaderKey.RETURN_CODE) + if return_code == ReturnCode.OK: + return return_data.payload + else: + return f"failed to configure_job_log with return code: {return_code}" except Exception as e: - self.logger.error(f"configure_job_log execution exception: {secure_format_exception(e)}.") + err = f"configure_job_log execution exception: {secure_format_exception(e)}." + self.logger.error(err) secure_log_traceback() + return err def reset_errors(self, job_id): """Resets the error information. diff --git a/nvflare/private/fed/client/client_runner.py b/nvflare/private/fed/client/client_runner.py index 33422e188a..e258b4caf8 100644 --- a/nvflare/private/fed/client/client_runner.py +++ b/nvflare/private/fed/client/client_runner.py @@ -35,7 +35,6 @@ from nvflare.apis.utils.reliable_message import ReliableMessage from nvflare.apis.utils.task_utils import apply_filters from nvflare.fuel.f3.cellnet.fqcn import FQCN -from nvflare.fuel.utils.log_utils import dynamic_log_config from nvflare.private.defs import SpecialTaskName, TaskConstant from nvflare.private.fed.client.client_engine_executor_spec import ClientEngineExecutorSpec, TaskAssignment from nvflare.private.fed.tbi import TBI @@ -720,9 +719,3 @@ def _handle_do_task(self, topic: str, request: Shareable, fl_ctx: FLContext) -> task = TaskAssignment(name=task_name, task_id=task_id, data=request) reply = self._process_task(task, fl_ctx) return reply - - def configure_job_log(self, request: Shareable, fl_ctx: FLContext) -> Shareable: - dynamic_log_config(request, self.engine.get_workspace(), self.job_id) - self.log_info(fl_ctx, f"configured job {self.job_id} server log") - - return make_reply(ReturnCode.OK) diff --git a/nvflare/private/fed/client/sys_cmd.py b/nvflare/private/fed/client/sys_cmd.py index 43a5645bc7..ed79573487 100644 --- a/nvflare/private/fed/client/sys_cmd.py +++ b/nvflare/private/fed/client/sys_cmd.py @@ -25,9 +25,10 @@ from nvflare.apis.fl_constant import FLContextKey, SystemComponents from nvflare.apis.fl_context import FLContext from nvflare.fuel.utils.log_utils import dynamic_log_config -from nvflare.private.admin_defs import Message, ok_reply +from nvflare.private.admin_defs import Message, error_reply, ok_reply from nvflare.private.defs import SysCommandTopic from nvflare.private.fed.client.admin import RequestProcessor +from nvflare.security.logging import secure_format_exception class SysInfoProcessor(RequestProcessor): @@ -90,8 +91,12 @@ def get_topics(self) -> List[str]: def process(self, req: Message, app_ctx) -> Message: engine = app_ctx fl_ctx = engine.new_context() + site_name = fl_ctx.get_identity_name() workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT) - dynamic_log_config(req.body, workspace) + try: + dynamic_log_config(req.body, workspace) + except Exception as e: + return error_reply(secure_format_exception(e)) - return ok_reply(topic=f"reply_{req.topic}", body="OK") + return ok_reply(topic=f"reply_{req.topic}", body=f"successfully configured {site_name} log") diff --git a/nvflare/private/fed/client/training_cmds.py b/nvflare/private/fed/client/training_cmds.py index 759750112c..de74527f90 100644 --- a/nvflare/private/fed/client/training_cmds.py +++ b/nvflare/private/fed/client/training_cmds.py @@ -162,10 +162,15 @@ def process(self, req: Message, app_ctx) -> Message: if not isinstance(engine, ClientEngineInternalSpec): raise TypeError("engine must be ClientEngineInternalSpec, but got {}".format(type(engine))) + fl_ctx = engine.new_context() + site_name = fl_ctx.get_identity_name() job_id = req.get_header(RequestHeader.JOB_ID) - engine.configure_job_log(job_id, req.body) - return ok_reply(topic=f"reply_{req.topic}", body="") + err = engine.configure_job_log(job_id, req.body) + if err: + return error_reply(err) + + return ok_reply(topic=f"reply_{req.topic}", body=f"successfully configured {site_name} job {job_id} log") class ClientStatusProcessor(RequestProcessor): diff --git a/nvflare/private/fed/server/job_cmds.py b/nvflare/private/fed/server/job_cmds.py index 8cb5eba012..147210fcec 100644 --- a/nvflare/private/fed/server/job_cmds.py +++ b/nvflare/private/fed/server/job_cmds.py @@ -241,7 +241,7 @@ def authorize_configure_job_log(self, conn: Connection, args: List[str]): if len(args) < 4: conn.append_error("syntax error: please provide job_id, target_type, and config") return PreAuthzReturnCode.ERROR - self.authorize_job(conn, args[:-1]) + return self.authorize_job(conn, args[:-1]) def _start_app_on_clients(self, conn: Connection, job_id: str) -> bool: engine = conn.app_ctx @@ -347,8 +347,27 @@ def configure_job_log(self, conn: Connection, args: List[str]): if not isinstance(engine, ServerEngine): raise TypeError("engine must be ServerEngine but got {}".format(type(engine))) + try: + with engine.new_context() as fl_ctx: + job_manager = engine.job_def_manager + job = job_manager.get_job(job_id, fl_ctx) + job_status = job.meta.get(JobMetaKey.STATUS) + if not job_status == RunStatus.RUNNING: + conn.append_error(f"Job {job_id} must be running but is {job_status}") + return + except Exception as e: + conn.append_error( + f"Exception occurred trying to check job status {job_id} for configure_job_log: {secure_format_exception(e)}", + meta=make_meta(MetaStatusValue.INTERNAL_ERROR, f"exception {type(e)}"), + ) + return + if target_type in [self.TARGET_TYPE_SERVER, self.TARGET_TYPE_ALL]: - engine.configure_job_log(str(job_id), config) + err = engine.configure_job_log(str(job_id), config) + if err: + conn.append_error(err) + return + conn.append_string(f"successfully configured server job {job_id} log") if target_type in [self.TARGET_TYPE_CLIENT, self.TARGET_TYPE_ALL]: @@ -356,13 +375,10 @@ def configure_job_log(self, conn: Connection, args: List[str]): message.set_header(RequestHeader.JOB_ID, str(job_id)) replies = self.send_request_to_clients(conn, message) self.process_replies_to_table(conn, replies) - conn.append_string( - f"successfully configured client job {job_id} logs of {conn.get_prop(self.TARGET_CLIENT_NAMES)}" - ) if target_type not in [self.TARGET_TYPE_ALL, self.TARGET_TYPE_CLIENT, self.TARGET_TYPE_SERVER]: - conn.append_string( - "invalid target type {}. Usage: configure_job_log job_id server|client ... config".format( + conn.append_error( + "invalid target type {}. Usage: configure_job_log job_id server|client ...|all config".format( target_type ) ) diff --git a/nvflare/private/fed/server/server_commands.py b/nvflare/private/fed/server/server_commands.py index b61cc953ae..6b45d0710e 100644 --- a/nvflare/private/fed/server/server_commands.py +++ b/nvflare/private/fed/server/server_commands.py @@ -29,7 +29,7 @@ from nvflare.apis.fl_context import FLContext from nvflare.apis.shareable import Shareable, make_reply from nvflare.apis.utils.fl_context_utils import gen_new_peer_ctx -from nvflare.fuel.utils.log_utils import get_obj_logger +from nvflare.fuel.utils.log_utils import dynamic_log_config, get_obj_logger from nvflare.private.defs import SpecialTaskName, TaskConstant from nvflare.security.logging import secure_format_exception, secure_format_traceback from nvflare.widgets.widget import WidgetID @@ -442,10 +442,11 @@ def process(self, data: Shareable, fl_ctx: FLContext): fl_ctx: FLContext """ - server_runner = fl_ctx.get_prop(FLContextKey.RUNNER) - - if server_runner: - server_runner.configure_job_log(data, fl_ctx) + engine = fl_ctx.get_engine() + try: + dynamic_log_config(data, engine.get_workspace(), fl_ctx.get_job_id()) + except Exception as e: + return secure_format_exception(e) class AppCommandProcessor(CommandProcessor): diff --git a/nvflare/private/fed/server/server_engine.py b/nvflare/private/fed/server/server_engine.py index 3118ac9bb1..e824ef8557 100644 --- a/nvflare/private/fed/server/server_engine.py +++ b/nvflare/private/fed/server/server_engine.py @@ -861,17 +861,20 @@ def reset_errors(self, job_id) -> str: return f"reset the server error stats for job: {job_id}" - def configure_job_log(self, job_id, data) -> dict: + def configure_job_log(self, job_id, data) -> str: + error = None try: - self.send_command_to_child_runner_process( + error = self.send_command_to_child_runner_process( job_id=job_id, command_name=AdminCommandNames.CONFIGURE_JOB_LOG, command_data=data, ) except Exception as ex: - self.logger.error(f"Failed to configure_job_log for JOB: {job_id}: {secure_format_exception(ex)}") + err = f"Failed to configure_job_log for JOB: {job_id}: {secure_format_exception(ex)}" + self.logger.error(err) + return err - return f"configured log for job: {job_id}" + return error def _send_admin_requests(self, requests, fl_ctx: FLContext, timeout_secs=10) -> List[ClientReply]: return self.server.admin_server.send_requests(requests, fl_ctx, timeout_secs=timeout_secs) diff --git a/nvflare/private/fed/server/server_runner.py b/nvflare/private/fed/server/server_runner.py index 97a73dfa81..81c47f4848 100644 --- a/nvflare/private/fed/server/server_runner.py +++ b/nvflare/private/fed/server/server_runner.py @@ -26,7 +26,6 @@ from nvflare.apis.utils.fl_context_utils import add_job_audit_event from nvflare.apis.utils.reliable_message import ReliableMessage from nvflare.apis.utils.task_utils import apply_filters -from nvflare.fuel.utils.log_utils import dynamic_log_config from nvflare.private.defs import SpecialTaskName, TaskConstant from nvflare.private.fed.tbi import TBI from nvflare.private.privacy_manager import Scope @@ -555,9 +554,3 @@ def get_persist_state(self, fl_ctx: FLContext) -> dict: def restore(self, state_data: dict, fl_ctx: FLContext): self.job_id = state_data.get("job_id") self.current_wf_index = int(state_data.get("current_wf_index", 0)) - - def configure_job_log(self, data, fl_ctx: FLContext) -> Shareable: - dynamic_log_config(data, self.engine.get_workspace(), self.job_id) - self.log_info(fl_ctx, f"configured job {self.job_id} server log") - - return make_reply(ReturnCode.OK) diff --git a/nvflare/private/fed/server/sys_cmd.py b/nvflare/private/fed/server/sys_cmd.py index 3313411e12..a908cc27b6 100644 --- a/nvflare/private/fed/server/sys_cmd.py +++ b/nvflare/private/fed/server/sys_cmd.py @@ -18,7 +18,7 @@ import psutil from nvflare.fuel.hci.conn import Connection -from nvflare.fuel.hci.proto import MetaKey +from nvflare.fuel.hci.proto import MetaKey, MetaStatusValue, make_meta from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec from nvflare.fuel.hci.server.authz import PreAuthzReturnCode from nvflare.fuel.utils.log_utils import dynamic_log_config @@ -102,7 +102,7 @@ def authorize_configure_site_log(self, conn: Connection, args: List[str]): if len(args) < 3: conn.append_error("syntax error: please provide target_type and config") return PreAuthzReturnCode.ERROR - self.authorize_server_operation(conn, args[:-1]) + return self.authorize_server_operation(conn, args[:-1]) def sys_info(self, conn: Connection, args: [str]): if len(args) < 2: @@ -147,18 +147,24 @@ def configure_site_log(self, conn: Connection, args: [str]): raise TypeError("engine must be ServerEngine but got {}".format(type(engine))) workspace = engine.get_workspace() - dynamic_log_config(config, workspace) + try: + dynamic_log_config(config, workspace) + except Exception as e: + conn.append_error( + secure_format_exception(e), + meta=make_meta(MetaStatusValue.INTERNAL_ERROR, info=secure_format_exception(e)), + ) + return conn.append_string("successfully configured server site log") if target_type in [self.TARGET_TYPE_CLIENT, self.TARGET_TYPE_ALL]: message = new_message(conn, topic=SysCommandTopic.CONFIGURE_SITE_LOG, body=config, require_authz=True) replies = self.send_request_to_clients(conn, message) self.process_replies_to_table(conn, replies) - conn.append_string(f"successfully configured client site logs of {conn.get_prop(self.TARGET_CLIENT_NAMES)}") if target_type not in [self.TARGET_TYPE_ALL, self.TARGET_TYPE_CLIENT, self.TARGET_TYPE_SERVER]: - conn.append_string( - "invalid target type {}. Usage: configure_site_log server|client ... config".format( + conn.append_error( + "invalid target type {}. Usage: configure_site_log server|client ...|all config".format( target_type ) )