Skip to content

Commit

Permalink
improve error handling, edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
SYangster committed Jan 8, 2025
1 parent 21586dd commit 5a5bfb7
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 35 deletions.
21 changes: 18 additions & 3 deletions nvflare/private/fed/client/admin_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from nvflare.apis.fl_constant import AdminCommandNames, FLContextKey
from nvflare.apis.fl_context import FLContext
from nvflare.apis.shareable import Shareable
from nvflare.fuel.utils.log_utils import dynamic_log_config
from nvflare.private.fed.client.client_status import get_status_message
from nvflare.security.logging import secure_format_exception
from nvflare.widgets.info_collector import InfoCollector
from nvflare.widgets.widget import WidgetID

Expand Down Expand Up @@ -267,9 +269,22 @@ def process(self, data: Shareable, fl_ctx: FLContext):
Returns: configure_job_log command message
"""
client_runner = fl_ctx.get_prop(FLContextKey.RUNNER)
if client_runner:
client_runner.configure_job_log(data, fl_ctx)
engine = fl_ctx.get_engine()
#dynamic_log_config(request, self.engine.get_workspace(), self.job_id)
try:
dynamic_log_config(data, engine.get_workspace(), fl_ctx.get_job_id())
except Exception as e:
return secure_format_exception(e)
# try:
# dynamic_log_config()

# client_runner = fl_ctx.get_prop(FLContextKey.RUNNER)
# # if client_runner:
# # return client_runner.configure_job_log(data, fl_ctx)
# try:
# client_runner.configure_job_log(data, fl_ctx)
# except Exception as e:
# return secure_format_exception(e)


class AdminCommands(object):
Expand Down
2 changes: 1 addition & 1 deletion nvflare/private/fed/client/client_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ def get_errors(self, job_id):
return self.client_executor.get_errors(job_id)

def configure_job_log(self, job_id, config):
self.client_executor.configure_job_log(job_id, config)
return self.client_executor.configure_job_log(job_id, config)

def reset_errors(self, job_id):
self.client_executor.reset_errors(job_id)
Expand Down
43 changes: 39 additions & 4 deletions nvflare/private/fed/client/client_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,26 +277,61 @@ def get_errors(self, job_id):
secure_log_traceback()
return None

# def configure_job_log(self, job_id, config):
# """Configure the job log.

# Args:
# job_id: the job_id
# config: log config

# """
# try:
# request = new_cell_message({}, config)
# self.client.cell.fire_and_forget(
# targets=self._job_fqcn(job_id),
# channel=CellChannel.CLIENT_COMMAND,
# topic=AdminCommandNames.CONFIGURE_JOB_LOG,
# message=request,
# optional=True,
# )
# except Exception as e:
# self.logger.error(f"configure_job_log execution exception: {secure_format_exception(e)}.")
# secure_log_traceback()

def configure_job_log(self, job_id, config):
"""Configure the job log.
Args:
job_id: the job_id
config: log config
Returns:
configure_job_log command message
"""
try:
request = new_cell_message({}, config)
self.client.cell.fire_and_forget(
targets=self._job_fqcn(job_id),
return_data = self.client.cell.send_request(
target=self._job_fqcn(job_id),
channel=CellChannel.CLIENT_COMMAND,
topic=AdminCommandNames.CONFIGURE_JOB_LOG,
message=request,
request=request,
optional=True,
timeout=self.job_query_timeout,
)
print(f"\n\n\n\t\t {return_data=} {return_data.get_header(MessageHeaderKey.RETURN_CODE)} {return_data.payload}=\n\n\n")
if return_data.get_header(MessageHeaderKey.RETURN_CODE) != ReturnCode.OK:
return f"configure_job_log execution exception: {return_data.payload}."
# return_code = return_data.get_header(MessageHeaderKey.RETURN_CODE)
# if return_code == ReturnCode.OK:
# errors_info = return_data.payload
# return errors_info
# else:
# return None
except Exception as e:
self.logger.error(f"configure_job_log execution exception: {secure_format_exception(e)}.")
err = f"configure_job_log execution exception: {secure_format_exception(e)}."
self.logger.error(err)
secure_log_traceback()
return err

def reset_errors(self, job_id):
"""Resets the error information.
Expand Down
5 changes: 4 additions & 1 deletion nvflare/private/fed/client/client_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,10 @@ def _handle_do_task(self, topic: str, request: Shareable, fl_ctx: FLContext) ->
return reply

def configure_job_log(self, request: Shareable, fl_ctx: FLContext) -> Shareable:
#try:
dynamic_log_config(request, self.engine.get_workspace(), self.job_id)
# except Exception as e:
# return error_reply(secure_format_exception(e))
self.log_info(fl_ctx, f"configured job {self.job_id} server log")

return make_reply(ReturnCode.OK)
#return make_reply(ReturnCode.OK)
11 changes: 8 additions & 3 deletions nvflare/private/fed/client/sys_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
from nvflare.apis.fl_constant import FLContextKey, SystemComponents
from nvflare.apis.fl_context import FLContext
from nvflare.fuel.utils.log_utils import dynamic_log_config
from nvflare.private.admin_defs import Message, ok_reply
from nvflare.private.admin_defs import error_reply, Message, ok_reply
from nvflare.private.defs import SysCommandTopic
from nvflare.private.fed.client.admin import RequestProcessor
from nvflare.security.logging import secure_format_exception


class SysInfoProcessor(RequestProcessor):
Expand Down Expand Up @@ -90,8 +91,12 @@ def get_topics(self) -> List[str]:
def process(self, req: Message, app_ctx) -> Message:
engine = app_ctx
fl_ctx = engine.new_context()
site_name = fl_ctx.get_identity_name()
workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)

dynamic_log_config(req.body, workspace)
try:
dynamic_log_config(req.body, workspace)
except Exception as e:
return error_reply(secure_format_exception(e))

return ok_reply(topic=f"reply_{req.topic}", body="OK")
return ok_reply(topic=f"reply_{req.topic}", body=f"successfully configured {site_name} log")
15 changes: 12 additions & 3 deletions nvflare/private/fed/client/training_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from nvflare.private.fed.client.admin import RequestProcessor
from nvflare.private.fed.client.client_engine_internal_spec import ClientEngineInternalSpec
from nvflare.private.fed.utils.fed_utils import get_scope_info
from nvflare.security.logging import secure_format_exception


class StartAppProcessor(RequestProcessor):
Expand Down Expand Up @@ -161,11 +162,19 @@ def process(self, req: Message, app_ctx) -> Message:
engine = app_ctx
if not isinstance(engine, ClientEngineInternalSpec):
raise TypeError("engine must be ClientEngineInternalSpec, but got {}".format(type(engine)))

job_id = req.get_header(RequestHeader.JOB_ID)
engine.configure_job_log(job_id, req.body)

return ok_reply(topic=f"reply_{req.topic}", body="")
return engine.configure_job_log(job_id, req.body)

# client_name = engine.get_client_name()
# job_id = req.get_header(RequestHeader.JOB_ID)
# try:
# engine.configure_job_log(job_id, req.body)
# except Exception as e:
# return error_reply(secure_format_exception(e))

# return ok_reply(topic=f"reply_{req.topic}", body=f"successfully configured {client_name} job {job_id} log")


class ClientStatusProcessor(RequestProcessor):
Expand Down
32 changes: 25 additions & 7 deletions nvflare/private/fed/server/job_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def authorize_configure_job_log(self, conn: Connection, args: List[str]):
if len(args) < 4:
conn.append_error("syntax error: please provide job_id, target_type, and config")
return PreAuthzReturnCode.ERROR
self.authorize_job(conn, args[:-1])
return self.authorize_job(conn, args[:-1])

def _start_app_on_clients(self, conn: Connection, job_id: str) -> bool:
engine = conn.app_ctx
Expand Down Expand Up @@ -346,23 +346,41 @@ def configure_job_log(self, conn: Connection, args: List[str]):
engine = conn.app_ctx
if not isinstance(engine, ServerEngine):
raise TypeError("engine must be ServerEngine but got {}".format(type(engine)))

job_id = conn.get_prop(self.JOB_ID)
with engine.new_context() as fl_ctx:
job_manager = engine.job_def_manager
job = job_manager.get_job(job_id, fl_ctx)
job_status = job.meta.get(JobMetaKey.STATUS)
print(f"\n\n\n\t\t {job_status=}\n\n\n")
if not job_status is RunStatus.RUNNING:
conn.append_error(f"Job {job_id} is not running TEST{job_status}")
return

if target_type in [self.TARGET_TYPE_SERVER, self.TARGET_TYPE_ALL]:
engine.configure_job_log(str(job_id), config)
err = engine.configure_job_log(str(job_id), config)
if err:
conn.append_error(err)
return
# try:
# engine.configure_job_log(str(job_id), config)
# except Exception as e:
# conn.append_error(
# secure_format_exception(e),
# meta=make_meta(MetaStatusValue.INTERNAL_ERROR, info=secure_format_exception(e)),
# )
# return
conn.append_string(f"successfully configured server job {job_id} log")

if target_type in [self.TARGET_TYPE_CLIENT, self.TARGET_TYPE_ALL]:
message = new_message(conn, topic=TrainingTopic.CONFIGURE_JOB_LOG, body=config, require_authz=False)
message.set_header(RequestHeader.JOB_ID, str(job_id))
replies = self.send_request_to_clients(conn, message)
self.process_replies_to_table(conn, replies)
conn.append_string(
f"successfully configured client job {job_id} logs of {conn.get_prop(self.TARGET_CLIENT_NAMES)}"
)

if target_type not in [self.TARGET_TYPE_ALL, self.TARGET_TYPE_CLIENT, self.TARGET_TYPE_SERVER]:
conn.append_string(
"invalid target type {}. Usage: configure_job_log job_id server|client <client-name>... config".format(
conn.append_error(
"invalid target type {}. Usage: configure_job_log job_id server|client <client-name>...|all config".format(
target_type
)
)
Expand Down
16 changes: 12 additions & 4 deletions nvflare/private/fed/server/server_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from nvflare.apis.fl_context import FLContext
from nvflare.apis.shareable import Shareable, make_reply
from nvflare.apis.utils.fl_context_utils import gen_new_peer_ctx
from nvflare.fuel.utils.log_utils import get_obj_logger
from nvflare.fuel.utils.log_utils import dynamic_log_config, get_obj_logger
from nvflare.private.defs import SpecialTaskName, TaskConstant
from nvflare.security.logging import secure_format_exception, secure_format_traceback
from nvflare.widgets.widget import WidgetID
Expand Down Expand Up @@ -442,10 +442,18 @@ def process(self, data: Shareable, fl_ctx: FLContext):
fl_ctx: FLContext
"""
server_runner = fl_ctx.get_prop(FLContextKey.RUNNER)
engine = fl_ctx.get_engine()
#dynamic_log_config(request, self.engine.get_workspace(), self.job_id)
try:
dynamic_log_config(data, engine.get_workspace(), fl_ctx.get_job_id())
except Exception as e:
return secure_format_exception(e)

if server_runner:
server_runner.configure_job_log(data, fl_ctx)

# server_runner = fl_ctx.get_prop(FLContextKey.RUNNER)

# if server_runner:
# server_runner.configure_job_log(data, fl_ctx)


class AppCommandProcessor(CommandProcessor):
Expand Down
8 changes: 5 additions & 3 deletions nvflare/private/fed/server/server_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,17 +861,19 @@ def reset_errors(self, job_id) -> str:

return f"reset the server error stats for job: {job_id}"

def configure_job_log(self, job_id, data) -> dict:
def configure_job_log(self, job_id, data) -> str:
error = None
try:
self.send_command_to_child_runner_process(
error = self.send_command_to_child_runner_process(
job_id=job_id,
command_name=AdminCommandNames.CONFIGURE_JOB_LOG,
command_data=data,
)
except Exception as ex:
self.logger.error(f"Failed to configure_job_log for JOB: {job_id}: {secure_format_exception(ex)}")
return make_reply(ReturnCode.EXECUTION_EXCEPTION)

return f"configured log for job: {job_id}"
return error

def _send_admin_requests(self, requests, fl_ctx: FLContext, timeout_secs=10) -> List[ClientReply]:
return self.server.admin_server.send_requests(requests, fl_ctx, timeout_secs=timeout_secs)
Expand Down
18 changes: 12 additions & 6 deletions nvflare/private/fed/server/sys_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import psutil

from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.proto import MetaKey
from nvflare.fuel.hci.proto import MetaKey, MetaStatusValue, make_meta
from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec
from nvflare.fuel.hci.server.authz import PreAuthzReturnCode
from nvflare.fuel.utils.log_utils import dynamic_log_config
Expand Down Expand Up @@ -102,7 +102,7 @@ def authorize_configure_site_log(self, conn: Connection, args: List[str]):
if len(args) < 3:
conn.append_error("syntax error: please provide target_type and config")
return PreAuthzReturnCode.ERROR
self.authorize_server_operation(conn, args[:-1])
return self.authorize_server_operation(conn, args[:-1])

def sys_info(self, conn: Connection, args: [str]):
if len(args) < 2:
Expand Down Expand Up @@ -147,18 +147,24 @@ def configure_site_log(self, conn: Connection, args: [str]):
raise TypeError("engine must be ServerEngine but got {}".format(type(engine)))

workspace = engine.get_workspace()
dynamic_log_config(config, workspace)
try:
dynamic_log_config(config, workspace)
except Exception as e:
conn.append_error(
secure_format_exception(e),
meta=make_meta(MetaStatusValue.INTERNAL_ERROR, info=secure_format_exception(e)),
)
return
conn.append_string("successfully configured server site log")

if target_type in [self.TARGET_TYPE_CLIENT, self.TARGET_TYPE_ALL]:
message = new_message(conn, topic=SysCommandTopic.CONFIGURE_SITE_LOG, body=config, require_authz=True)
replies = self.send_request_to_clients(conn, message)
self.process_replies_to_table(conn, replies)
conn.append_string(f"successfully configured client site logs of {conn.get_prop(self.TARGET_CLIENT_NAMES)}")

if target_type not in [self.TARGET_TYPE_ALL, self.TARGET_TYPE_CLIENT, self.TARGET_TYPE_SERVER]:
conn.append_string(
"invalid target type {}. Usage: configure_site_log server|client <client-name>... config".format(
conn.append_error(
"invalid target type {}. Usage: configure_site_log server|client <client-name>...|all config".format(
target_type
)
)
Expand Down

0 comments on commit 5a5bfb7

Please sign in to comment.