Skip to content

Commit

Permalink
dynamic logging with admin commands
Browse files Browse the repository at this point in the history
  • Loading branch information
SYangster committed Jan 3, 2025
1 parent 62e2441 commit 20d3e6a
Show file tree
Hide file tree
Showing 20 changed files with 321 additions and 13 deletions.
3 changes: 3 additions & 0 deletions nvflare/apis/fl_constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ class AdminCommandNames(object):
SHELL_TAIL = "tail"
SHELL_GREP = "grep"
APP_COMMAND = "app_command"
CONFIGURE_JOB_LOG = "configure_job_log"
CONFIGURE_SITE_LOG = "configure_site_log"


class ServerCommandNames(object):
Expand All @@ -263,6 +265,7 @@ class ServerCommandNames(object):
HANDLE_DEAD_JOB = "handle_dead_job"
SERVER_STATE = "server_state"
APP_COMMAND = "app_command"
CONFIGURE_JOB_LOG = "configure_job_log"


class ServerCommandKey(object):
Expand Down
41 changes: 41 additions & 0 deletions nvflare/fuel/utils/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from logging import Logger
from logging.handlers import RotatingFileHandler

from nvflare.apis.fl_constant import WorkspaceConstants
from nvflare.apis.workspace import Workspace


Expand Down Expand Up @@ -262,6 +263,46 @@ def apply_log_config(dict_config, dir_path: str = "", file_prefix: str = ""):
logging.config.dictConfig(dict_config)


def dynamic_log_config(config: str, workspace: Workspace, job_id: str = None):
# Dynamically configure log given a config (filepath, levelname, levelnumber, 'reload'), apply the config to the proper locations.
if not isinstance(config, str):
raise ValueError(
f"Unsupported config type. Expect config to be string filepath, levelname, levelnumber, or 'reload' but got {type(config)}"
)

if config == "reload":
config = workspace.get_log_config_file_path()

if os.path.isfile(config):
# Read confg file
with open(config, "r") as f:
dict_config = json.load(f)

if job_id:
dir_path = workspace.get_run_dir(job_id)
else:
dir_path = workspace.get_root_dir()

# overwrite log_config.json of site
with open(os.path.join(workspace.get_site_config_dir(), WorkspaceConstants.LOGGING_CONFIG), "w") as f:
f.write(json.dumps(dict_config))

apply_log_config(dict_config, dir_path)

else:
# Set level of root logger based on levelname or levelnumber
if config.isdigit():
level = int(config)
if not (0 <= level <= 50):
raise ValueError(f"Invalid logging level: {level}")
else:
level = getattr(logging, config.upper(), None)
if level is None:
raise ValueError(f"Invalid logging level: {config}")

logging.getLogger().setLevel(level)


def add_log_file_handler(log_file_name):
root_logger = logging.getLogger()
main_handler = root_logger.handlers[0]
Expand Down
6 changes: 5 additions & 1 deletion nvflare/job_config/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,9 @@ def export_job(self, job_root: str):
self._set_all_apps()
self.job.generate_job_config(job_root)

def simulator_run(self, workspace: str, n_clients: int = None, threads: int = None, gpu: str = None):
def simulator_run(
self, workspace: str, n_clients: int = None, threads: int = None, gpu: str = None, log_config: str = None
):
"""Run the job with the simulator with the `workspace` using `clients` and `threads`.
For end users.
Expand All @@ -531,6 +533,7 @@ def simulator_run(self, workspace: str, n_clients: int = None, threads: int = No
n_clients: number of clients.
threads: number of threads.
gpu: gpu assignments for simulating clients, comma separated
log_config: log config json file path
Returns:
Expand All @@ -556,6 +559,7 @@ def simulator_run(self, workspace: str, n_clients: int = None, threads: int = No
n_clients=n_clients,
threads=threads,
gpu=gpu,
log_config=log_config,
)

def as_id(self, obj: Any) -> str:
Expand Down
4 changes: 3 additions & 1 deletion nvflare/job_config/fed_job_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def generate_job_config(self, job_root):

self._generate_meta(job_dir)

def simulator_run(self, workspace, clients=None, n_clients=None, threads=None, gpu=None):
def simulator_run(self, workspace, clients=None, n_clients=None, threads=None, gpu=None, log_config=None):
with TemporaryDirectory() as job_root:
self.generate_job_config(job_root)

Expand All @@ -157,6 +157,8 @@ def simulator_run(self, workspace, clients=None, n_clients=None, threads=None, g
if gpu:
gpu = self._trim_whitespace(gpu)
command += " -gpu " + str(gpu)
if log_config:
command += " -l" + str(log_config)

new_env = os.environ.copy()
process = subprocess.Popen(shlex.split(command, True), preexec_fn=os.setsid, env=new_env)
Expand Down
2 changes: 2 additions & 0 deletions nvflare/private/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class TrainingTopic(object):
START_JOB = "train.start_job"
GET_SCOPES = "train.get_scopes"
NOTIFY_JOB_STATUS = "train.notify_job_status"
CONFIGURE_JOB_LOG = "train.configure_job_log"


class RequestHeader(object):
Expand Down Expand Up @@ -96,6 +97,7 @@ class SysCommandTopic(object):
SHELL = "sys.shell"
REPORT_RESOURCES = "resource_manager.report_resources"
REPORT_ENV = "sys.report_env"
CONFIGURE_SITE_LOG = "sys.configure_site_log"


class ControlCommandTopic(object):
Expand Down
2 changes: 2 additions & 0 deletions nvflare/private/fed/app/simulator/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def define_simulator_parser(simulator_parser):
simulator_parser.add_argument("-c", "--clients", type=str, help="client names list")
simulator_parser.add_argument("-t", "--threads", type=int, help="number of parallel running clients")
simulator_parser.add_argument("-gpu", "--gpu", type=str, help="list of GPU Device Ids, comma separated")
simulator_parser.add_argument("-l", "--log_config", type=str, help="log config file path")
simulator_parser.add_argument("-m", "--max_clients", type=int, default=100, help="max number of clients")
simulator_parser.add_argument(
"--end_run_for_all",
Expand All @@ -46,6 +47,7 @@ def run_simulator(simulator_args):
n_clients=simulator_args.n_clients,
threads=simulator_args.threads,
gpu=simulator_args.gpu,
log_config=simulator_args.log_config,
max_clients=simulator_args.max_clients,
end_run_for_all=simulator_args.end_run_for_all,
)
Expand Down
41 changes: 32 additions & 9 deletions nvflare/private/fed/app/simulator/simulator_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(
n_clients=None,
threads=None,
gpu=None,
log_config=None,
max_clients=100,
end_run_for_all=False,
):
Expand All @@ -99,6 +100,7 @@ def __init__(
self.n_clients = n_clients
self.threads = threads
self.gpu = gpu
self.log_config = log_config
self.max_clients = max_clients
self.end_run_for_all = end_run_for_all

Expand Down Expand Up @@ -126,7 +128,15 @@ def __init__(
self.workspace = os.path.join(running_dir, self.workspace)

def _generate_args(
self, job_folder: str, workspace: str, clients=None, n_clients=None, threads=None, gpu=None, max_clients=100
self,
job_folder: str,
workspace: str,
clients=None,
n_clients=None,
threads=None,
gpu=None,
log_config=None,
max_clients=100,
):
args = Namespace(
job_folder=job_folder,
Expand All @@ -135,14 +145,22 @@ def _generate_args(
n_clients=n_clients,
threads=threads,
gpu=gpu,
log_config=log_config,
max_clients=max_clients,
)
args.set = []
return args

def setup(self):
self.args = self._generate_args(
self.job_folder, self.workspace, self.clients, self.n_clients, self.threads, self.gpu, self.max_clients
self.job_folder,
self.workspace,
self.clients,
self.n_clients,
self.threads,
self.gpu,
self.log_config,
self.max_clients,
)

if self.args.clients:
Expand All @@ -152,14 +170,16 @@ def setup(self):
for i in range(self.args.n_clients):
self.client_names.append("site-" + str(i + 1))

log_config_file_path = os.path.join(self.args.workspace, "local", WorkspaceConstants.LOGGING_CONFIG)
if not os.path.isfile(log_config_file_path):
log_config_file_path = os.path.join(os.path.dirname(__file__), WorkspaceConstants.LOGGING_CONFIG)
if self.args.log_config:
log_config_file_path = self.args.log_config
else:
log_config_file_path = os.path.join(self.args.workspace, "local", WorkspaceConstants.LOGGING_CONFIG)
if not os.path.isfile(log_config_file_path):
log_config_file_path = os.path.join(os.path.dirname(__file__), WorkspaceConstants.LOGGING_CONFIG)

with open(log_config_file_path, "r") as f:
dict_config = json.load(f)

self.args.log_config = None
self.args.config_folder = "config"
self.args.job_id = SimulatorConstants.JOB_NAME
self.args.client_config = os.path.join(self.args.config_folder, JobConstants.CLIENT_JOB_CONFIG)
Expand Down Expand Up @@ -669,9 +689,12 @@ def _pick_next_client(self):
def do_one_task(self, client, num_of_threads, gpu, lock, timeout=60.0, task_name=RunnerTask.TASK_EXEC):
open_port = get_open_ports(1)[0]
client_workspace = os.path.join(self.args.workspace, client.client_name)
logging_config = os.path.join(
self.args.workspace, client.client_name, "local", WorkspaceConstants.LOGGING_CONFIG
)
if self.args.log_config:
logging_config = self.args.log_config
else:
logging_config = os.path.join(
self.args.workspace, client.client_name, "local", WorkspaceConstants.LOGGING_CONFIG
)
decomposer_module = ConfigService.get_str_var(
name=ConfigVarName.DECOMPOSER_MODULE, conf=SystemConfigs.RESOURCES_CONF
)
Expand Down
27 changes: 27 additions & 0 deletions nvflare/private/fed/client/admin_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,32 @@ def process(self, data: Shareable, fl_ctx: FLContext):
return None


class ConfigureJobLogCommand(CommandProcessor):
"""To implement the configure_job_log command."""

def get_command_name(self) -> str:
"""To get the command name.
Returns: AdminCommandNames.CONFIGURE_JOB_LOG
"""
return AdminCommandNames.CONFIGURE_JOB_LOG

def process(self, data: Shareable, fl_ctx: FLContext):
"""Called to process the configure_job_log command.
Args:
data: process data
fl_ctx: FLContext
Returns: configure_job_log command message
"""
client_runner = fl_ctx.get_prop(FLContextKey.RUNNER)
if client_runner:
client_runner.configure_job_log(data, fl_ctx)


class AdminCommands(object):
"""AdminCommands contains all the commands for processing the commands from the parent process."""

Expand All @@ -257,6 +283,7 @@ class AdminCommands(object):
ShowStatsCommand(),
ShowErrorsCommand(),
ResetErrorsCommand(),
ConfigureJobLogCommand(),
]

@staticmethod
Expand Down
3 changes: 3 additions & 0 deletions nvflare/private/fed/client/client_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,9 @@ def get_current_run_info(self, job_id) -> ClientRunInfo:
def get_errors(self, job_id):
return self.client_executor.get_errors(job_id)

def configure_job_log(self, job_id, config):
self.client_executor.configure_job_log(job_id, config)

def reset_errors(self, job_id):
self.client_executor.reset_errors(job_id)

Expand Down
21 changes: 21 additions & 0 deletions nvflare/private/fed/client/client_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,27 @@ def get_errors(self, job_id):
secure_log_traceback()
return None

def configure_job_log(self, job_id, config):
"""Configure the job log.
Args:
job_id: the job_id
config: log config
"""
try:
request = new_cell_message({}, config)
self.client.cell.fire_and_forget(
targets=self._job_fqcn(job_id),
channel=CellChannel.CLIENT_COMMAND,
topic=AdminCommandNames.CONFIGURE_JOB_LOG,
message=request,
optional=True,
)
except Exception as e:
self.logger.error(f"configure_job_log execution exception: {secure_format_exception(e)}.")
secure_log_traceback()

def reset_errors(self, job_id):
"""Resets the error information.
Expand Down
5 changes: 4 additions & 1 deletion nvflare/private/fed/client/client_req_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
from .info_coll_cmd import ClientInfoProcessor
from .scheduler_cmds import CancelResourceProcessor, CheckResourceProcessor, ReportResourcesProcessor, StartJobProcessor
from .shell_cmd import ShellCommandProcessor
from .sys_cmd import ReportEnvProcessor, SysInfoProcessor
from .sys_cmd import ConfigureSiteLogProcessor, ReportEnvProcessor, SysInfoProcessor
from .training_cmds import ( # StartClientMGpuProcessor,; SetRunNumberProcessor,
AbortAppProcessor,
AbortTaskProcessor,
ClientStatusProcessor,
ConfigureJobLogProcessor,
DeleteRunNumberProcessor,
DeployProcessor,
NotifyJobStatusProcessor,
Expand Down Expand Up @@ -52,6 +53,8 @@ class ClientRequestProcessors:
ReportResourcesProcessor(),
ReportEnvProcessor(),
NotifyJobStatusProcessor(),
ConfigureJobLogProcessor(),
ConfigureSiteLogProcessor(),
]

@staticmethod
Expand Down
7 changes: 7 additions & 0 deletions nvflare/private/fed/client/client_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from nvflare.apis.utils.reliable_message import ReliableMessage
from nvflare.apis.utils.task_utils import apply_filters
from nvflare.fuel.f3.cellnet.fqcn import FQCN
from nvflare.fuel.utils.log_utils import dynamic_log_config
from nvflare.private.defs import SpecialTaskName, TaskConstant
from nvflare.private.fed.client.client_engine_executor_spec import ClientEngineExecutorSpec, TaskAssignment
from nvflare.private.fed.tbi import TBI
Expand Down Expand Up @@ -719,3 +720,9 @@ def _handle_do_task(self, topic: str, request: Shareable, fl_ctx: FLContext) ->
task = TaskAssignment(name=task_name, task_id=task_id, data=request)
reply = self._process_task(task, fl_ctx)
return reply

def configure_job_log(self, request: Shareable, fl_ctx: FLContext) -> Shareable:
dynamic_log_config(request, self.engine.get_workspace(), self.job_id)
self.log_info(fl_ctx, f"configured job {self.job_id} server log")

return make_reply(ReturnCode.OK)
17 changes: 16 additions & 1 deletion nvflare/private/fed/client/sys_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

from nvflare.apis.fl_constant import FLContextKey, SystemComponents
from nvflare.apis.fl_context import FLContext
from nvflare.private.admin_defs import Message
from nvflare.fuel.utils.log_utils import dynamic_log_config
from nvflare.private.admin_defs import Message, ok_reply
from nvflare.private.defs import SysCommandTopic
from nvflare.private.fed.client.admin import RequestProcessor

Expand Down Expand Up @@ -80,3 +81,17 @@ def process(self, req: Message, app_ctx) -> Message:
}
message = Message(topic="reply_" + req.topic, body=json.dumps(env))
return message


class ConfigureSiteLogProcessor(RequestProcessor):
def get_topics(self) -> List[str]:
return [SysCommandTopic.CONFIGURE_SITE_LOG]

def process(self, req: Message, app_ctx) -> Message:
engine = app_ctx
fl_ctx = engine.new_context()
workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)

dynamic_log_config(req.body, workspace)

return ok_reply(topic=f"reply_{req.topic}", body="OK")
Loading

0 comments on commit 20d3e6a

Please sign in to comment.