diff --git a/nvflare/apis/fl_constant.py b/nvflare/apis/fl_constant.py index d03e456ad3..51cc894738 100644 --- a/nvflare/apis/fl_constant.py +++ b/nvflare/apis/fl_constant.py @@ -245,6 +245,7 @@ class AdminCommandNames(object): SHELL_TAIL = "tail" SHELL_GREP = "grep" APP_COMMAND = "app_command" + CONFIGURE_JOB_LOG = "configure_job_log" class ServerCommandNames(object): @@ -263,6 +264,7 @@ class ServerCommandNames(object): HANDLE_DEAD_JOB = "handle_dead_job" SERVER_STATE = "server_state" APP_COMMAND = "app_command" + CONFIGURE_JOB_LOG = "configure_job_log" class ServerCommandKey(object): diff --git a/nvflare/fuel/utils/log_utils.py b/nvflare/fuel/utils/log_utils.py index 40f87068a0..3ebd851853 100644 --- a/nvflare/fuel/utils/log_utils.py +++ b/nvflare/fuel/utils/log_utils.py @@ -221,43 +221,73 @@ def get_script_logger(): f"{package + '.' if package else ''}{os.path.splitext(os.path.basename(file))[0] if file else ''}" ) +# def update_filenames(obj, dir_path: str = "", file_prefix: str = ""): +# """Update 'filename' keys in JSON objects with dir_path and file_prefix.""" +# if "filename" in obj and isinstance(obj["filename"], str): +# filename = obj["filename"] +# if file_prefix: +# filename = os.path.join(os.path.dirname(filename), file_prefix + "_" + os.path.basename(filename)) +# obj["filename"] = os.path.join(dir_path, filename) +# return obj -def update_filenames(obj, dir_path: str = "", file_prefix: str = ""): - """Update 'filename' keys in JSON objects with dir_path and file_prefix.""" - if "filename" in obj and isinstance(obj["filename"], str): - filename = obj["filename"] - if file_prefix: - filename = os.path.join(os.path.dirname(filename), file_prefix + "_" + os.path.basename(filename)) - obj["filename"] = os.path.join(dir_path, filename) - return obj +def configure_logging(workspace: Workspace, dir_path: str = "", file_prefix: str = ""): + log_config_file_path = workspace.get_log_config_file_path() + assert os.path.isfile(log_config_file_path), f"missing log config file {log_config_file_path}" + with open(log_config_file_path, "r") as f: + dict_config = json.load(f) -def read_log_config(file, dir_path: str = "", file_prefix: str = "") -> dict: - """ - Reads JSON logging configuration file and returns config dictionary. - Updates 'filename' keys with dir_path for dynamic locations. + apply_log_config(dict_config, dir_path, file_prefix) - Args: - file (str): Path to the configuration file. - dir_path (str): Update filename keys with dir_path. - Returns: - config (dict) - """ - try: - with open(file, "r") as f: - config = json.load(f, object_hook=lambda obj: update_filenames(obj, dir_path, file_prefix)) - return config - except Exception as e: - raise ValueError(f"Unrecognized logging configuration format. Failed to parse JSON: {e}.") +def apply_log_config(dict_config, dir_path: str = "", file_prefix: str = ""): + stack = [dict_config] + while stack: + current_dict = stack.pop() + for key, value in current_dict.items(): + if isinstance(value, dict): + stack.append(value) + elif key == "filename": + if file_prefix: + value = os.path.join(os.path.dirname(value), file_prefix + "_" + os.path.basename(value)) + current_dict[key] = os.path.join(dir_path, value) + logging.config.dictConfig(dict_config) -def configure_logging(workspace: Workspace, dir_path: str = "", file_prefix: str = ""): - log_config_file_path = workspace.get_log_config_file_path() - assert os.path.isfile(log_config_file_path), f"missing log config file {log_config_file_path}" - dict_config = read_log_config(log_config_file_path, dir_path, file_prefix) - logging.config.dictConfig(dict_config) +def handle_log_config_command(config: str, workspace: Workspace, job_id: str = None): + if config is None: + config = workspace.get_log_config_file_path() + + if os.path.isfile(config): + + with open(config, "r") as f: + dict_config = json.load(f) + + if job_id: + dir_path = workspace.get_run_dir(job_id) + else: + dir_path = workspace.get_root_dir() + with open(workspace.get_log_config_file_path(), "w") as f: + f.write(json.dumps(dict_config)) + + apply_log_config(dict_config, dir_path) + + elif isinstance(config, str): + if config.isdigit(): + level = int(config) + if not (0 <= level <= 50): + raise ValueError(f"Invalid logging level: {level}") + else: + level = getattr(logging, config.upper(), None) + if level is None: + raise ValueError(f"Invalid logging level: {config}") + + logging.getLogger().setLevel(level) + else: + raise ValueError( + f"Unsupported config type. Expect config to be filepath or string level but got {type(config)}" + ) def add_log_file_handler(log_file_name): diff --git a/nvflare/job_config/api.py b/nvflare/job_config/api.py index c010a673b6..bc57d1cd47 100644 --- a/nvflare/job_config/api.py +++ b/nvflare/job_config/api.py @@ -522,7 +522,9 @@ def export_job(self, job_root: str): self._set_all_apps() self.job.generate_job_config(job_root) - def simulator_run(self, workspace: str, n_clients: int = None, threads: int = None, gpu: str = None): + def simulator_run( + self, workspace: str, n_clients: int = None, threads: int = None, gpu: str = None, log_config: str = None + ): """Run the job with the simulator with the `workspace` using `clients` and `threads`. For end users. @@ -531,6 +533,7 @@ def simulator_run(self, workspace: str, n_clients: int = None, threads: int = No n_clients: number of clients. threads: number of threads. gpu: gpu assignments for simulating clients, comma separated + log_config: log config json file Returns: @@ -556,6 +559,7 @@ def simulator_run(self, workspace: str, n_clients: int = None, threads: int = No n_clients=n_clients, threads=threads, gpu=gpu, + log_config=log_config, ) def as_id(self, obj: Any) -> str: diff --git a/nvflare/job_config/fed_job_config.py b/nvflare/job_config/fed_job_config.py index 940a773a63..c92fe890e5 100644 --- a/nvflare/job_config/fed_job_config.py +++ b/nvflare/job_config/fed_job_config.py @@ -136,7 +136,7 @@ def generate_job_config(self, job_root): self._generate_meta(job_dir) - def simulator_run(self, workspace, clients=None, n_clients=None, threads=None, gpu=None): + def simulator_run(self, workspace, clients=None, n_clients=None, threads=None, gpu=None, log_config=None): with TemporaryDirectory() as job_root: self.generate_job_config(job_root) @@ -157,6 +157,8 @@ def simulator_run(self, workspace, clients=None, n_clients=None, threads=None, g if gpu: gpu = self._trim_whitespace(gpu) command += " -gpu " + str(gpu) + if log_config: + command += " -l" + str(log_config) new_env = os.environ.copy() process = subprocess.Popen(shlex.split(command, True), preexec_fn=os.setsid, env=new_env) diff --git a/nvflare/lighter/impl/master_template.yml b/nvflare/lighter/impl/master_template.yml index cbda18f74e..a10436cf36 100644 --- a/nvflare/lighter/impl/master_template.yml +++ b/nvflare/lighter/impl/master_template.yml @@ -271,6 +271,8 @@ default_authz: | "permissions": { "project_admin": "any", "org_admin": { + "configure_site_log": "o:site", + "configure_job_log": "n:submitter", "submit_job": "none", "clone_job": "none", "manage_job": "o:submitter", @@ -281,7 +283,8 @@ default_authz: | "byoc": "none" }, "lead": { - "submit_job": "any", + "configure_job_log": "n:submitter", + "submit_job": "o:site", "clone_job": "n:submitter", "manage_job": "n:submitter", "download_job": "n:submitter", diff --git a/nvflare/private/defs.py b/nvflare/private/defs.py index bb0f98683d..d5f3f96744 100644 --- a/nvflare/private/defs.py +++ b/nvflare/private/defs.py @@ -68,6 +68,7 @@ class TrainingTopic(object): START_JOB = "train.start_job" GET_SCOPES = "train.get_scopes" NOTIFY_JOB_STATUS = "train.notify_job_status" + CONFIGURE_JOB_LOG = "train.configure_job_log" class RequestHeader(object): @@ -96,6 +97,7 @@ class SysCommandTopic(object): SHELL = "sys.shell" REPORT_RESOURCES = "resource_manager.report_resources" REPORT_ENV = "sys.report_env" + CONFIGURE_SITE_LOG = "sys.configure_site_log" class ControlCommandTopic(object): diff --git a/nvflare/private/fed/app/simulator/simulator.py b/nvflare/private/fed/app/simulator/simulator.py index 50a5a56319..235e8472af 100644 --- a/nvflare/private/fed/app/simulator/simulator.py +++ b/nvflare/private/fed/app/simulator/simulator.py @@ -29,6 +29,7 @@ def define_simulator_parser(simulator_parser): simulator_parser.add_argument("-c", "--clients", type=str, help="client names list") simulator_parser.add_argument("-t", "--threads", type=int, help="number of parallel running clients") simulator_parser.add_argument("-gpu", "--gpu", type=str, help="list of GPU Device Ids, comma separated") + simulator_parser.add_argument("-l", "--log_config", type=str, help="log config file") simulator_parser.add_argument("-m", "--max_clients", type=int, default=100, help="max number of clients") simulator_parser.add_argument( "--end_run_for_all", @@ -46,6 +47,7 @@ def run_simulator(simulator_args): n_clients=simulator_args.n_clients, threads=simulator_args.threads, gpu=simulator_args.gpu, + log_config=simulator_args.log_config, max_clients=simulator_args.max_clients, end_run_for_all=simulator_args.end_run_for_all, ) diff --git a/nvflare/private/fed/app/simulator/simulator_runner.py b/nvflare/private/fed/app/simulator/simulator_runner.py index 24363be673..f16a9495d4 100644 --- a/nvflare/private/fed/app/simulator/simulator_runner.py +++ b/nvflare/private/fed/app/simulator/simulator_runner.py @@ -13,7 +13,6 @@ # limitations under the License. import copy import json -import logging.config import os import shlex import shutil @@ -53,7 +52,7 @@ from nvflare.fuel.utils.argument_utils import parse_vars from nvflare.fuel.utils.config_service import ConfigService from nvflare.fuel.utils.gpu_utils import get_host_gpu_ids -from nvflare.fuel.utils.log_utils import read_log_config +from nvflare.fuel.utils.log_utils import apply_log_config from nvflare.fuel.utils.network_utils import get_open_ports from nvflare.fuel.utils.zip_utils import split_path, unzip_all_from_bytes, zip_directory_to_bytes from nvflare.private.defs import AppFolderConstants @@ -89,6 +88,7 @@ def __init__( n_clients=None, threads=None, gpu=None, + log_config=None, max_clients=100, end_run_for_all=False, ): @@ -100,6 +100,7 @@ def __init__( self.n_clients = n_clients self.threads = threads self.gpu = gpu + self.log_config = log_config self.max_clients = max_clients self.end_run_for_all = end_run_for_all @@ -127,7 +128,15 @@ def __init__( self.workspace = os.path.join(running_dir, self.workspace) def _generate_args( - self, job_folder: str, workspace: str, clients=None, n_clients=None, threads=None, gpu=None, max_clients=100 + self, + job_folder: str, + workspace: str, + clients=None, + n_clients=None, + threads=None, + gpu=None, + log_config=None, + max_clients=100, ): args = Namespace( job_folder=job_folder, @@ -136,6 +145,7 @@ def _generate_args( n_clients=n_clients, threads=threads, gpu=gpu, + log_config=log_config, max_clients=max_clients, ) args.set = [] @@ -143,7 +153,14 @@ def _generate_args( def setup(self): self.args = self._generate_args( - self.job_folder, self.workspace, self.clients, self.n_clients, self.threads, self.gpu, self.max_clients + self.job_folder, + self.workspace, + self.clients, + self.n_clients, + self.threads, + self.gpu, + self.log_config, + self.max_clients, ) if self.args.clients: @@ -153,12 +170,17 @@ def setup(self): for i in range(self.args.n_clients): self.client_names.append("site-" + str(i + 1)) - log_config_file_path = os.path.join(self.args.workspace, "local", WorkspaceConstants.LOGGING_CONFIG) - if not os.path.isfile(log_config_file_path): - log_config_file_path = os.path.join(os.path.dirname(__file__), WorkspaceConstants.LOGGING_CONFIG) - dict_config = read_log_config(log_config_file_path, os.path.join(self.args.workspace, SiteType.SERVER)) + if self.args.log_config: + log_config_file_path = self.args.log_config + else: + log_config_file_path = os.path.join(self.args.workspace, "local", WorkspaceConstants.LOGGING_CONFIG) + if not os.path.isfile(log_config_file_path): + log_config_file_path = os.path.join(os.path.dirname(__file__), WorkspaceConstants.LOGGING_CONFIG) + + with open(log_config_file_path, "r") as f: + dict_config = json.load(f) - self.args.log_config = None + # self.args.log_config = None self.args.config_folder = "config" self.args.job_id = SimulatorConstants.JOB_NAME self.args.client_config = os.path.join(self.args.config_folder, JobConstants.CLIENT_JOB_CONFIG) @@ -180,7 +202,7 @@ def setup(self): os.makedirs(os.path.join(self.simulator_root, SiteType.SERVER)) - logging.config.dictConfig(dict_config) + apply_log_config(dict_config, os.path.join(self.simulator_root, SiteType.SERVER)) try: data_bytes, job_name, meta = self.validate_job_data() @@ -668,9 +690,12 @@ def _pick_next_client(self): def do_one_task(self, client, num_of_threads, gpu, lock, timeout=60.0, task_name=RunnerTask.TASK_EXEC): open_port = get_open_ports(1)[0] client_workspace = os.path.join(self.args.workspace, client.client_name) - logging_config = os.path.join( - self.args.workspace, client.client_name, "local", WorkspaceConstants.LOGGING_CONFIG - ) + if self.args.log_config: + logging_config = self.args.log_config + else: + logging_config = os.path.join( + self.args.workspace, client.client_name, "local", WorkspaceConstants.LOGGING_CONFIG + ) decomposer_module = ConfigService.get_str_var( name=ConfigVarName.DECOMPOSER_MODULE, conf=SystemConfigs.RESOURCES_CONF ) diff --git a/nvflare/private/fed/app/simulator/simulator_worker.py b/nvflare/private/fed/app/simulator/simulator_worker.py index f71b61c198..f0672107dd 100644 --- a/nvflare/private/fed/app/simulator/simulator_worker.py +++ b/nvflare/private/fed/app/simulator/simulator_worker.py @@ -13,7 +13,7 @@ # limitations under the License. import argparse -import logging.config +import json import os import sys import threading @@ -30,7 +30,6 @@ from nvflare.fuel.f3.mpm import MainProcessMonitor as mpm from nvflare.fuel.hci.server.authz import AuthorizationService from nvflare.fuel.sec.audit import AuditService -from nvflare.fuel.utils.log_utils import read_log_config from nvflare.private.fed.app.deployer.base_client_deployer import BaseClientDeployer from nvflare.private.fed.app.utils import check_parent_alive, init_security_content_service from nvflare.private.fed.client.client_engine import ClientEngine @@ -234,8 +233,11 @@ def main(args): thread = threading.Thread(target=check_parent_alive, args=(parent_pid, stop_event)) thread.start() - dict_config = read_log_config(args.logging_config, args.workspace) - logging.config.dictConfig(dict_config) + with open(args.logging_config, "r") as f: + dict_config = json.load(f) + from nvflare.fuel.utils.log_utils import apply_log_config + + apply_log_config(dict_config, args.workspace) os.chdir(args.workspace) startup = os.path.join(args.workspace, WorkspaceConstants.STARTUP_FOLDER_NAME) diff --git a/nvflare/private/fed/client/admin_commands.py b/nvflare/private/fed/client/admin_commands.py index ed4488c473..8636e30456 100644 --- a/nvflare/private/fed/client/admin_commands.py +++ b/nvflare/private/fed/client/admin_commands.py @@ -160,6 +160,32 @@ def process(self, data: Shareable, fl_ctx: FLContext): return result +class ConfigureJobLogCommand(CommandProcessor): + """To implement the show_stats command.""" + + def get_command_name(self) -> str: + """To get the command name. + + Returns: AdminCommandNames.CONFIGURE_JOB_LOG + + """ + return AdminCommandNames.CONFIGURE_JOB_LOG + + def process(self, data: Shareable, fl_ctx: FLContext): + """Called to process the abort_task command. + + Args: + data: process data + fl_ctx: FLContext + + Returns: show_stats command message + + """ + client_runner = fl_ctx.get_prop(FLContextKey.RUNNER) + if client_runner: + client_runner.configure_job_log(data, fl_ctx) + + class ShowErrorsCommand(CommandProcessor): """To implement the show_errors command.""" @@ -257,6 +283,7 @@ class AdminCommands(object): ShowStatsCommand(), ShowErrorsCommand(), ResetErrorsCommand(), + ConfigureJobLogCommand(), ] @staticmethod diff --git a/nvflare/private/fed/client/client_engine.py b/nvflare/private/fed/client/client_engine.py index 4799ec91a5..8f2ce79d0d 100644 --- a/nvflare/private/fed/client/client_engine.py +++ b/nvflare/private/fed/client/client_engine.py @@ -464,6 +464,9 @@ def get_current_run_info(self, job_id) -> ClientRunInfo: def get_errors(self, job_id): return self.client_executor.get_errors(job_id) + def configure_job_log(self, job_id, config): + return self.client_executor.configure_job_log(job_id, config) + def reset_errors(self, job_id): self.client_executor.reset_errors(job_id) diff --git a/nvflare/private/fed/client/client_executor.py b/nvflare/private/fed/client/client_executor.py index 246e456cb2..9355707623 100644 --- a/nvflare/private/fed/client/client_executor.py +++ b/nvflare/private/fed/client/client_executor.py @@ -277,6 +277,29 @@ def get_errors(self, job_id): secure_log_traceback() return None + def configure_job_log(self, job_id, config): + """Get the error information. + + Args: + job_id: the job_id + + Returns: + A dict of error information. + """ + try: + request = new_cell_message({}, config) + self.client.cell.fire_and_forget( + targets=self._job_fqcn(job_id), + channel=CellChannel.CLIENT_COMMAND, + topic=AdminCommandNames.CONFIGURE_JOB_LOG, + message=request, + optional=True, + ) + except Exception as e: + self.logger.error(f"configure_job_log execution exception: {secure_format_exception(e)}.") + secure_log_traceback() + return None + def reset_errors(self, job_id): """Resets the error information. diff --git a/nvflare/private/fed/client/client_req_processors.py b/nvflare/private/fed/client/client_req_processors.py index a1aa221beb..0d39c5f5dd 100644 --- a/nvflare/private/fed/client/client_req_processors.py +++ b/nvflare/private/fed/client/client_req_processors.py @@ -15,11 +15,12 @@ from .info_coll_cmd import ClientInfoProcessor from .scheduler_cmds import CancelResourceProcessor, CheckResourceProcessor, ReportResourcesProcessor, StartJobProcessor from .shell_cmd import ShellCommandProcessor -from .sys_cmd import ReportEnvProcessor, SysInfoProcessor +from .sys_cmd import ConfigureSiteLogProcessor, ReportEnvProcessor, SysInfoProcessor from .training_cmds import ( # StartClientMGpuProcessor,; SetRunNumberProcessor, AbortAppProcessor, AbortTaskProcessor, ClientStatusProcessor, + ConfigureJobLogProcessor, DeleteRunNumberProcessor, DeployProcessor, NotifyJobStatusProcessor, @@ -40,6 +41,8 @@ class ClientRequestProcessors: DeployProcessor(), ShellCommandProcessor(), DeleteRunNumberProcessor(), + ConfigureJobLogProcessor(), + ConfigureSiteLogProcessor(), SysInfoProcessor(), RestartClientProcessor(), # StartClientMGpuProcessor(), diff --git a/nvflare/private/fed/client/client_runner.py b/nvflare/private/fed/client/client_runner.py index e258b4caf8..43d3e61607 100644 --- a/nvflare/private/fed/client/client_runner.py +++ b/nvflare/private/fed/client/client_runner.py @@ -35,6 +35,7 @@ from nvflare.apis.utils.reliable_message import ReliableMessage from nvflare.apis.utils.task_utils import apply_filters from nvflare.fuel.f3.cellnet.fqcn import FQCN +from nvflare.fuel.utils.log_utils import handle_log_config_command from nvflare.private.defs import SpecialTaskName, TaskConstant from nvflare.private.fed.client.client_engine_executor_spec import ClientEngineExecutorSpec, TaskAssignment from nvflare.private.fed.tbi import TBI @@ -719,3 +720,9 @@ def _handle_do_task(self, topic: str, request: Shareable, fl_ctx: FLContext) -> task = TaskAssignment(name=task_name, task_id=task_id, data=request) reply = self._process_task(task, fl_ctx) return reply + + def configure_job_log(self, request: Shareable, fl_ctx: FLContext) -> Shareable: + handle_log_config_command(request, self.engine.get_workspace(), self.job_id) + self.log_info(fl_ctx, f"configured job {self.job_id} server log") + + return make_reply(ReturnCode.OK) diff --git a/nvflare/private/fed/client/sys_cmd.py b/nvflare/private/fed/client/sys_cmd.py index 1bf20fd0c6..88f0d63321 100644 --- a/nvflare/private/fed/client/sys_cmd.py +++ b/nvflare/private/fed/client/sys_cmd.py @@ -24,7 +24,8 @@ from nvflare.apis.fl_constant import FLContextKey, SystemComponents from nvflare.apis.fl_context import FLContext -from nvflare.private.admin_defs import Message +from nvflare.fuel.utils.log_utils import handle_log_config_command +from nvflare.private.admin_defs import Message, ok_reply from nvflare.private.defs import SysCommandTopic from nvflare.private.fed.client.admin import RequestProcessor @@ -80,3 +81,17 @@ def process(self, req: Message, app_ctx) -> Message: } message = Message(topic="reply_" + req.topic, body=json.dumps(env)) return message + + +class ConfigureSiteLogProcessor(RequestProcessor): + def get_topics(self) -> List[str]: + return [SysCommandTopic.CONFIGURE_SITE_LOG] + + def process(self, req: Message, app_ctx) -> Message: + engine = app_ctx + fl_ctx = engine.new_context() + workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT) + + handle_log_config_command(req.body, workspace) + + return ok_reply(topic=f"reply_{req.topic}", body="OK") diff --git a/nvflare/private/fed/client/training_cmds.py b/nvflare/private/fed/client/training_cmds.py index e554568fa4..759750112c 100644 --- a/nvflare/private/fed/client/training_cmds.py +++ b/nvflare/private/fed/client/training_cmds.py @@ -153,6 +153,21 @@ def process(self, req: Message, app_ctx) -> Message: return ok_reply(topic=f"reply_{req.topic}", body=result) +class ConfigureJobLogProcessor(RequestProcessor): + def get_topics(self) -> List[str]: + return [TrainingTopic.CONFIGURE_JOB_LOG] + + def process(self, req: Message, app_ctx) -> Message: + engine = app_ctx + if not isinstance(engine, ClientEngineInternalSpec): + raise TypeError("engine must be ClientEngineInternalSpec, but got {}".format(type(engine))) + + job_id = req.get_header(RequestHeader.JOB_ID) + engine.configure_job_log(job_id, req.body) + + return ok_reply(topic=f"reply_{req.topic}", body="") + + class ClientStatusProcessor(RequestProcessor): def get_topics(self) -> List[str]: return [TrainingTopic.CHECK_STATUS] diff --git a/nvflare/private/fed/server/cmd_utils.py b/nvflare/private/fed/server/cmd_utils.py index 4cc8cc72ce..06a7eb8048 100644 --- a/nvflare/private/fed/server/cmd_utils.py +++ b/nvflare/private/fed/server/cmd_utils.py @@ -78,7 +78,8 @@ def validate_command_targets(self, conn: Connection, args: List[str]) -> str: return "" if target_type == self.TARGET_TYPE_CLIENT: - client_names = args[1:] + # client_names = args[1:] + client_names = [] # TODO fix elif target_type == self.TARGET_TYPE_ALL: client_names = [] else: diff --git a/nvflare/private/fed/server/job_cmds.py b/nvflare/private/fed/server/job_cmds.py index 74d55ed10a..7dc7a1d3fb 100644 --- a/nvflare/private/fed/server/job_cmds.py +++ b/nvflare/private/fed/server/job_cmds.py @@ -92,6 +92,13 @@ def get_spec(self): enabled=False, confirm=ConfirmMethod.AUTH, ), + CommandSpec( + name=AdminCommandNames.CONFIGURE_JOB_LOG, + description="configure logging of a job", + usage=f"{AdminCommandNames.CONFIGURE_JOB_LOG} job_id config server|client|all", + handler_func=self.configure_job_log, + authz_func=self.authorize_job, + ), CommandSpec( name=AdminCommandNames.START_APP, description="start the FL app", @@ -230,6 +237,19 @@ def authorize_job(self, conn: Connection, args: List[str]): return PreAuthzReturnCode.REQUIRE_AUTHZ + def authorize_configure_job_log(self, conn: Connection, args: List[str]): + rc = self.authorize_job_id(conn, args) + if rc == PreAuthzReturnCode.ERROR: + return rc + + if len(args) > 2: + err = self.validate_command_targets(conn, args[3:]) + if err: + conn.append_error(err, meta=make_meta(MetaStatusValue.INVALID_TARGET, err)) + return PreAuthzReturnCode.ERROR + + return PreAuthzReturnCode.REQUIRE_AUTHZ + def _start_app_on_clients(self, conn: Connection, job_id: str) -> bool: engine = conn.app_ctx client_names = conn.get_prop(self.TARGET_CLIENT_NAMES, None) @@ -321,6 +341,37 @@ def delete_job_id(self, conn: Connection, args: List[str]): conn.append_success("") + def configure_job_log(self, conn: Connection, args: List[str]): + job_id = args[1] + target_type = args[2] + + config = None + if len(args) > 3: + config = args[3] + + engine = conn.app_ctx + if not isinstance(engine, ServerEngine): + raise TypeError("engine must be ServerEngine but got {}".format(type(engine))) + + if target_type == self.TARGET_TYPE_SERVER: + engine.configure_job_log(str(job_id), config) + + elif target_type == self.TARGET_TYPE_CLIENT: + message = new_message( + conn, topic=TrainingTopic.CONFIGURE_JOB_LOG, body=config, require_authz=False + ) # TODO require_authz? + message.set_header(RequestHeader.JOB_ID, str(job_id)) + replies = self.send_request_to_clients(conn, message) + self.process_replies_to_table(conn, replies) + else: + conn.append_string( + "invalid target type {}. Usage: configure_site_log config server|client ".format( + target_type + ) + ) + + conn.append_string(f"successfully configured job log of {target_type} job {job_id}") + def list_jobs(self, conn: Connection, args: List[str]): try: parser = _create_list_job_cmd_parser() diff --git a/nvflare/private/fed/server/server_commands.py b/nvflare/private/fed/server/server_commands.py index cf1361971c..b858a1443c 100644 --- a/nvflare/private/fed/server/server_commands.py +++ b/nvflare/private/fed/server/server_commands.py @@ -299,6 +299,35 @@ def process(self, data: Shareable, fl_ctx: FLContext): return collector.get_run_stats() +class ConfigureJobLogCommand(CommandProcessor): + """To implement the show_stats command.""" + + def get_command_name(self) -> str: + """To get the command name. + + Returns: ServerCommandNames.SHOW_STATS + + """ + return ServerCommandNames.CONFIGURE_JOB_LOG + + def process(self, data: Shareable, fl_ctx: FLContext): + """Called to process the abort command. + + Args: + data: process data + fl_ctx: FLContext + + Returns: Engine run_info + + """ + server_runner = fl_ctx.get_prop(FLContextKey.RUNNER) + + if server_runner: + server_runner.configure_job_log(data, fl_ctx) + + return None + + class GetErrorsCommand(CommandProcessor): """To implement the show_errors command.""" @@ -477,6 +506,7 @@ class ServerCommands(object): HandleDeadJobCommand(), ShowStatsCommand(), GetErrorsCommand(), + ConfigureJobLogCommand(), ResetErrorsCommand(), HeartbeatCommand(), ServerStateCommand(), diff --git a/nvflare/private/fed/server/server_engine.py b/nvflare/private/fed/server/server_engine.py index f21d836322..9e798f59be 100644 --- a/nvflare/private/fed/server/server_engine.py +++ b/nvflare/private/fed/server/server_engine.py @@ -861,6 +861,21 @@ def reset_errors(self, job_id) -> str: return f"reset the server error stats for job: {job_id}" + def configure_job_log(self, job_id, data) -> dict: + result = None + try: + result = self.send_command_to_child_runner_process( + job_id=job_id, + command_name=ServerCommandNames.CONFIGURE_JOB_LOG, + command_data=data, + ) + except Exception as ex: + self.logger.error(f"Failed to configure_log for JOB: {job_id}: {secure_format_exception(ex)}") + + if result is None: + result = {} + return result + def _send_admin_requests(self, requests, fl_ctx: FLContext, timeout_secs=10) -> List[ClientReply]: return self.server.admin_server.send_requests(requests, fl_ctx, timeout_secs=timeout_secs) diff --git a/nvflare/private/fed/server/server_runner.py b/nvflare/private/fed/server/server_runner.py index 81c47f4848..aa9a940c98 100644 --- a/nvflare/private/fed/server/server_runner.py +++ b/nvflare/private/fed/server/server_runner.py @@ -26,6 +26,7 @@ from nvflare.apis.utils.fl_context_utils import add_job_audit_event from nvflare.apis.utils.reliable_message import ReliableMessage from nvflare.apis.utils.task_utils import apply_filters +from nvflare.fuel.utils.log_utils import handle_log_config_command from nvflare.private.defs import SpecialTaskName, TaskConstant from nvflare.private.fed.tbi import TBI from nvflare.private.privacy_manager import Scope @@ -554,3 +555,9 @@ def get_persist_state(self, fl_ctx: FLContext) -> dict: def restore(self, state_data: dict, fl_ctx: FLContext): self.job_id = state_data.get("job_id") self.current_wf_index = int(state_data.get("current_wf_index", 0)) + + def configure_job_log(self, data, fl_ctx: FLContext) -> Shareable: + handle_log_config_command(data, self.engine.get_workspace(), self.job_id) + self.log_info(fl_ctx, f"configured job {self.job_id} server log") + + return make_reply(ReturnCode.OK) diff --git a/nvflare/private/fed/server/sys_cmd.py b/nvflare/private/fed/server/sys_cmd.py index 6fb89266ae..9bf275e5d2 100644 --- a/nvflare/private/fed/server/sys_cmd.py +++ b/nvflare/private/fed/server/sys_cmd.py @@ -20,10 +20,12 @@ from nvflare.fuel.hci.conn import Connection from nvflare.fuel.hci.proto import MetaKey from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec +from nvflare.fuel.utils.log_utils import handle_log_config_command from nvflare.private.admin_defs import MsgHeader, ReturnCode from nvflare.private.defs import SysCommandTopic from nvflare.private.fed.server.admin import new_message from nvflare.private.fed.server.cmd_utils import CommandUtil +from nvflare.private.fed.server.server_engine import ServerEngine from nvflare.security.logging import secure_format_exception @@ -60,6 +62,14 @@ def get_spec(self): authz_func=self.authorize_server_operation, visible=True, ), + CommandSpec( + name="configure_site_log", + description="configure logging of a site", + usage="configure_site_log config server|client ...", + handler_func=self.configure_site_log, + authz_func=self.authorize_server_operation, + visible=True, + ), CommandSpec( name="report_resources", description="get the resources info", @@ -116,6 +126,39 @@ def sys_info(self, conn: Connection, args: [str]): conn.append_string("invalid target type {}. Usage: sys_info server|client ".format(target_type)) + def configure_site_log(self, conn: Connection, args: [str]): + if len(args) < 2: + conn.append_error("syntax error: missing site names") + return + + target_type = args[1] + + config = None + if len(args) > 2: + config = args[2] + + if target_type == self.TARGET_TYPE_SERVER: + engine = conn.app_ctx + if not isinstance(engine, ServerEngine): + raise TypeError("engine must be ServerEngine but got {}".format(type(engine))) + + workspace = engine.get_workspace() + handle_log_config_command(config, workspace) + + elif target_type == self.TARGET_TYPE_CLIENT: + message = new_message(conn, topic=SysCommandTopic.CONFIGURE_SITE_LOG, body=config, require_authz=True) + replies = self.send_request_to_clients(conn, message) + self.process_replies_to_table(conn, replies) + + else: + conn.append_string( + "invalid target type {}. Usage: configure_site_log config server|client ".format( + target_type + ) + ) + + conn.append_string(f"successfully configured site log of {target_type}") + def _process_replies(self, conn, replies): if not replies: conn.append_error("no responses from clients")