Skip to content

Commit

Permalink
improve error handling, edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
SYangster committed Jan 13, 2025
1 parent 76b1774 commit d250748
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 49 deletions.
10 changes: 7 additions & 3 deletions nvflare/private/fed/client/admin_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from nvflare.apis.fl_constant import AdminCommandNames, FLContextKey
from nvflare.apis.fl_context import FLContext
from nvflare.apis.shareable import Shareable
from nvflare.fuel.utils.log_utils import dynamic_log_config
from nvflare.private.fed.client.client_status import get_status_message
from nvflare.security.logging import secure_format_exception
from nvflare.widgets.info_collector import InfoCollector
from nvflare.widgets.widget import WidgetID

Expand Down Expand Up @@ -267,9 +269,11 @@ def process(self, data: Shareable, fl_ctx: FLContext):
Returns: configure_job_log command message
"""
client_runner = fl_ctx.get_prop(FLContextKey.RUNNER)
if client_runner:
client_runner.configure_job_log(data, fl_ctx)
engine = fl_ctx.get_engine()
try:
dynamic_log_config(data, engine.get_workspace(), fl_ctx.get_job_id())
except Exception as e:
return secure_format_exception(e)


class AdminCommands(object):
Expand Down
2 changes: 1 addition & 1 deletion nvflare/private/fed/client/client_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ def get_errors(self, job_id):
return self.client_executor.get_errors(job_id)

def configure_job_log(self, job_id, config):
self.client_executor.configure_job_log(job_id, config)
return self.client_executor.configure_job_log(job_id, config)

def reset_errors(self, job_id):
self.client_executor.reset_errors(job_id)
Expand Down
18 changes: 14 additions & 4 deletions nvflare/private/fed/client/client_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,19 +284,29 @@ def configure_job_log(self, job_id, config):
job_id: the job_id
config: log config
Returns:
configure_job_log command message
"""
try:
request = new_cell_message({}, config)
self.client.cell.fire_and_forget(
targets=self._job_fqcn(job_id),
return_data = self.client.cell.send_request(
target=self._job_fqcn(job_id),
channel=CellChannel.CLIENT_COMMAND,
topic=AdminCommandNames.CONFIGURE_JOB_LOG,
message=request,
request=request,
optional=True,
timeout=self.job_query_timeout,
)
return_code = return_data.get_header(MessageHeaderKey.RETURN_CODE)
if return_code == ReturnCode.OK:
return return_data.payload
else:
return f"failed to configure_job_log with return code: {return_code}"
except Exception as e:
self.logger.error(f"configure_job_log execution exception: {secure_format_exception(e)}.")
err = f"configure_job_log execution exception: {secure_format_exception(e)}."
self.logger.error(err)
secure_log_traceback()
return err

def reset_errors(self, job_id):
"""Resets the error information.
Expand Down
7 changes: 0 additions & 7 deletions nvflare/private/fed/client/client_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from nvflare.apis.utils.reliable_message import ReliableMessage
from nvflare.apis.utils.task_utils import apply_filters
from nvflare.fuel.f3.cellnet.fqcn import FQCN
from nvflare.fuel.utils.log_utils import dynamic_log_config
from nvflare.private.defs import SpecialTaskName, TaskConstant
from nvflare.private.fed.client.client_engine_executor_spec import ClientEngineExecutorSpec, TaskAssignment
from nvflare.private.fed.tbi import TBI
Expand Down Expand Up @@ -720,9 +719,3 @@ def _handle_do_task(self, topic: str, request: Shareable, fl_ctx: FLContext) ->
task = TaskAssignment(name=task_name, task_id=task_id, data=request)
reply = self._process_task(task, fl_ctx)
return reply

def configure_job_log(self, request: Shareable, fl_ctx: FLContext) -> Shareable:
dynamic_log_config(request, self.engine.get_workspace(), self.job_id)
self.log_info(fl_ctx, f"configured job {self.job_id} server log")

return make_reply(ReturnCode.OK)
11 changes: 8 additions & 3 deletions nvflare/private/fed/client/sys_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
from nvflare.apis.fl_constant import FLContextKey, SystemComponents
from nvflare.apis.fl_context import FLContext
from nvflare.fuel.utils.log_utils import dynamic_log_config
from nvflare.private.admin_defs import Message, ok_reply
from nvflare.private.admin_defs import Message, error_reply, ok_reply
from nvflare.private.defs import SysCommandTopic
from nvflare.private.fed.client.admin import RequestProcessor
from nvflare.security.logging import secure_format_exception


class SysInfoProcessor(RequestProcessor):
Expand Down Expand Up @@ -90,8 +91,12 @@ def get_topics(self) -> List[str]:
def process(self, req: Message, app_ctx) -> Message:
engine = app_ctx
fl_ctx = engine.new_context()
site_name = fl_ctx.get_identity_name()
workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)

dynamic_log_config(req.body, workspace)
try:
dynamic_log_config(req.body, workspace)
except Exception as e:
return error_reply(secure_format_exception(e))

return ok_reply(topic=f"reply_{req.topic}", body="OK")
return ok_reply(topic=f"reply_{req.topic}", body=f"successfully configured {site_name} log")
9 changes: 7 additions & 2 deletions nvflare/private/fed/client/training_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,15 @@ def process(self, req: Message, app_ctx) -> Message:
if not isinstance(engine, ClientEngineInternalSpec):
raise TypeError("engine must be ClientEngineInternalSpec, but got {}".format(type(engine)))

fl_ctx = engine.new_context()
site_name = fl_ctx.get_identity_name()
job_id = req.get_header(RequestHeader.JOB_ID)
engine.configure_job_log(job_id, req.body)

return ok_reply(topic=f"reply_{req.topic}", body="")
err = engine.configure_job_log(job_id, req.body)
if err:
return error_reply(err)

return ok_reply(topic=f"reply_{req.topic}", body=f"successfully configured {site_name} job {job_id} log")


class ClientStatusProcessor(RequestProcessor):
Expand Down
30 changes: 23 additions & 7 deletions nvflare/private/fed/server/job_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def authorize_configure_job_log(self, conn: Connection, args: List[str]):
if len(args) < 4:
conn.append_error("syntax error: please provide job_id, target_type, and config")
return PreAuthzReturnCode.ERROR
self.authorize_job(conn, args[:-1])
return self.authorize_job(conn, args[:-1])

def _start_app_on_clients(self, conn: Connection, job_id: str) -> bool:
engine = conn.app_ctx
Expand Down Expand Up @@ -347,22 +347,38 @@ def configure_job_log(self, conn: Connection, args: List[str]):
if not isinstance(engine, ServerEngine):
raise TypeError("engine must be ServerEngine but got {}".format(type(engine)))

try:
with engine.new_context() as fl_ctx:
job_manager = engine.job_def_manager
job = job_manager.get_job(job_id, fl_ctx)
job_status = job.meta.get(JobMetaKey.STATUS)
if not job_status == RunStatus.RUNNING:
conn.append_error(f"Job {job_id} must be running but is {job_status}")
return
except Exception as e:
conn.append_error(
f"Exception occurred trying to check job status {job_id} for configure_job_log: {secure_format_exception(e)}",
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, f"exception {type(e)}"),
)
return

if target_type in [self.TARGET_TYPE_SERVER, self.TARGET_TYPE_ALL]:
engine.configure_job_log(str(job_id), config)
err = engine.configure_job_log(str(job_id), config)
if err:
conn.append_error(err)
return

conn.append_string(f"successfully configured server job {job_id} log")

if target_type in [self.TARGET_TYPE_CLIENT, self.TARGET_TYPE_ALL]:
message = new_message(conn, topic=TrainingTopic.CONFIGURE_JOB_LOG, body=config, require_authz=False)
message.set_header(RequestHeader.JOB_ID, str(job_id))
replies = self.send_request_to_clients(conn, message)
self.process_replies_to_table(conn, replies)
conn.append_string(
f"successfully configured client job {job_id} logs of {conn.get_prop(self.TARGET_CLIENT_NAMES)}"
)

if target_type not in [self.TARGET_TYPE_ALL, self.TARGET_TYPE_CLIENT, self.TARGET_TYPE_SERVER]:
conn.append_string(
"invalid target type {}. Usage: configure_job_log job_id server|client <client-name>... config".format(
conn.append_error(
"invalid target type {}. Usage: configure_job_log job_id server|client <client-name>...|all config".format(
target_type
)
)
Expand Down
11 changes: 6 additions & 5 deletions nvflare/private/fed/server/server_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from nvflare.apis.fl_context import FLContext
from nvflare.apis.shareable import Shareable, make_reply
from nvflare.apis.utils.fl_context_utils import gen_new_peer_ctx
from nvflare.fuel.utils.log_utils import get_obj_logger
from nvflare.fuel.utils.log_utils import dynamic_log_config, get_obj_logger
from nvflare.private.defs import SpecialTaskName, TaskConstant
from nvflare.security.logging import secure_format_exception, secure_format_traceback
from nvflare.widgets.widget import WidgetID
Expand Down Expand Up @@ -442,10 +442,11 @@ def process(self, data: Shareable, fl_ctx: FLContext):
fl_ctx: FLContext
"""
server_runner = fl_ctx.get_prop(FLContextKey.RUNNER)

if server_runner:
server_runner.configure_job_log(data, fl_ctx)
engine = fl_ctx.get_engine()
try:
dynamic_log_config(data, engine.get_workspace(), fl_ctx.get_job_id())
except Exception as e:
return secure_format_exception(e)


class AppCommandProcessor(CommandProcessor):
Expand Down
11 changes: 7 additions & 4 deletions nvflare/private/fed/server/server_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,17 +861,20 @@ def reset_errors(self, job_id) -> str:

return f"reset the server error stats for job: {job_id}"

def configure_job_log(self, job_id, data) -> dict:
def configure_job_log(self, job_id, data) -> str:
error = None
try:
self.send_command_to_child_runner_process(
error = self.send_command_to_child_runner_process(
job_id=job_id,
command_name=AdminCommandNames.CONFIGURE_JOB_LOG,
command_data=data,
)
except Exception as ex:
self.logger.error(f"Failed to configure_job_log for JOB: {job_id}: {secure_format_exception(ex)}")
err = f"Failed to configure_job_log for JOB: {job_id}: {secure_format_exception(ex)}"
self.logger.error(err)
return err

return f"configured log for job: {job_id}"
return error

def _send_admin_requests(self, requests, fl_ctx: FLContext, timeout_secs=10) -> List[ClientReply]:
return self.server.admin_server.send_requests(requests, fl_ctx, timeout_secs=timeout_secs)
Expand Down
7 changes: 0 additions & 7 deletions nvflare/private/fed/server/server_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from nvflare.apis.utils.fl_context_utils import add_job_audit_event
from nvflare.apis.utils.reliable_message import ReliableMessage
from nvflare.apis.utils.task_utils import apply_filters
from nvflare.fuel.utils.log_utils import dynamic_log_config
from nvflare.private.defs import SpecialTaskName, TaskConstant
from nvflare.private.fed.tbi import TBI
from nvflare.private.privacy_manager import Scope
Expand Down Expand Up @@ -555,9 +554,3 @@ def get_persist_state(self, fl_ctx: FLContext) -> dict:
def restore(self, state_data: dict, fl_ctx: FLContext):
self.job_id = state_data.get("job_id")
self.current_wf_index = int(state_data.get("current_wf_index", 0))

def configure_job_log(self, data, fl_ctx: FLContext) -> Shareable:
dynamic_log_config(data, self.engine.get_workspace(), self.job_id)
self.log_info(fl_ctx, f"configured job {self.job_id} server log")

return make_reply(ReturnCode.OK)
18 changes: 12 additions & 6 deletions nvflare/private/fed/server/sys_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import psutil

from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.proto import MetaKey
from nvflare.fuel.hci.proto import MetaKey, MetaStatusValue, make_meta
from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec
from nvflare.fuel.hci.server.authz import PreAuthzReturnCode
from nvflare.fuel.utils.log_utils import dynamic_log_config
Expand Down Expand Up @@ -102,7 +102,7 @@ def authorize_configure_site_log(self, conn: Connection, args: List[str]):
if len(args) < 3:
conn.append_error("syntax error: please provide target_type and config")
return PreAuthzReturnCode.ERROR
self.authorize_server_operation(conn, args[:-1])
return self.authorize_server_operation(conn, args[:-1])

def sys_info(self, conn: Connection, args: [str]):
if len(args) < 2:
Expand Down Expand Up @@ -147,18 +147,24 @@ def configure_site_log(self, conn: Connection, args: [str]):
raise TypeError("engine must be ServerEngine but got {}".format(type(engine)))

workspace = engine.get_workspace()
dynamic_log_config(config, workspace)
try:
dynamic_log_config(config, workspace)
except Exception as e:
conn.append_error(
secure_format_exception(e),
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, info=secure_format_exception(e)),
)
return
conn.append_string("successfully configured server site log")

if target_type in [self.TARGET_TYPE_CLIENT, self.TARGET_TYPE_ALL]:
message = new_message(conn, topic=SysCommandTopic.CONFIGURE_SITE_LOG, body=config, require_authz=True)
replies = self.send_request_to_clients(conn, message)
self.process_replies_to_table(conn, replies)
conn.append_string(f"successfully configured client site logs of {conn.get_prop(self.TARGET_CLIENT_NAMES)}")

if target_type not in [self.TARGET_TYPE_ALL, self.TARGET_TYPE_CLIENT, self.TARGET_TYPE_SERVER]:
conn.append_string(
"invalid target type {}. Usage: configure_site_log server|client <client-name>... config".format(
conn.append_error(
"invalid target type {}. Usage: configure_site_log server|client <client-name>...|all config".format(
target_type
)
)
Expand Down

0 comments on commit d250748

Please sign in to comment.