Skip to content

Commit

Permalink
Added lock in ReliableMessage (#2811)
Browse files Browse the repository at this point in the history
  • Loading branch information
nvidianz authored Aug 20, 2024
1 parent 0a1b036 commit 1ddff91
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 49 deletions.
102 changes: 53 additions & 49 deletions nvflare/apis/utils/reliable_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,63 +96,67 @@ def __init__(self, topic, request_handler_f, executor, per_msg_timeout, tx_timeo
self.tx_id = None
self.reply_time = None
self.replying = False
self.lock = threading.Lock()

def process(self, request: Shareable, fl_ctx: FLContext) -> Shareable:
if not ReliableMessage.is_available():
return make_reply(ReturnCode.SERVICE_UNAVAILABLE)

self.tx_id = request.get_header(HEADER_TX_ID)
op = request.get_header(HEADER_OP)
peer_ctx = fl_ctx.get_peer_context()
assert isinstance(peer_ctx, FLContext)
self.source = peer_ctx.get_identity_name()
if op == OP_REQUEST:
# it is possible that a new request for the same tx is received while we are processing the previous one
if not self.rcv_time:
self.rcv_time = time.time()
self.per_msg_timeout = request.get_header(HEADER_PER_MSG_TIMEOUT)
self.tx_timeout = request.get_header(HEADER_TX_TIMEOUT)

# start processing
ReliableMessage.info(fl_ctx, f"started processing request of topic {self.topic}")
try:
self.executor.submit(self._do_request, request, fl_ctx)
return _status_reply(STATUS_IN_PROCESS) # ack
except Exception as ex:
# it is possible that the RM is already closed (self.executor is shut down)
ReliableMessage.error(fl_ctx, f"failed to submit request: {secure_format_exception(ex)}")
return make_reply(ReturnCode.SERVICE_UNAVAILABLE)
elif self.result:
# we already finished processing - send the result back
ReliableMessage.info(fl_ctx, "resend result back to requester")
return self.result
else:
# we are still processing
ReliableMessage.info(fl_ctx, "got request - the request is being processed")
return _status_reply(STATUS_IN_PROCESS)
elif op == OP_QUERY:
if self.result:
if self.reply_time:
# result already sent back successfully
ReliableMessage.info(fl_ctx, "got query: we already replied successfully")
return _status_reply(STATUS_REPLIED)
elif self.replying:
# result is being sent
ReliableMessage.info(fl_ctx, "got query: reply is being sent")
return _status_reply(STATUS_IN_REPLY)
else:
# try to send the result again
ReliableMessage.info(fl_ctx, "got query: sending reply again")
with self.lock:
self.tx_id = request.get_header(HEADER_TX_ID)
op = request.get_header(HEADER_OP)
peer_ctx = fl_ctx.get_peer_context()
assert isinstance(peer_ctx, FLContext)
self.source = peer_ctx.get_identity_name()
if op == OP_REQUEST:
# it is possible that a new request for the same tx is received while we are processing the previous one
if not self.rcv_time:
self.rcv_time = time.time()
self.per_msg_timeout = request.get_header(HEADER_PER_MSG_TIMEOUT)
self.tx_timeout = request.get_header(HEADER_TX_TIMEOUT)

# start processing
ReliableMessage.info(fl_ctx, f"started processing request of topic {self.topic}")
try:
self.executor.submit(self._do_request, request, fl_ctx)
return _status_reply(STATUS_IN_PROCESS) # ack
except Exception as ex:
# it is possible that the RM is already closed (self.executor is shut down)
ReliableMessage.error(fl_ctx, f"failed to submit request: {secure_format_exception(ex)}")
return make_reply(ReturnCode.SERVICE_UNAVAILABLE)
elif self.result:
# we already finished processing - send the result back
ReliableMessage.info(fl_ctx, "resend result back to requester")
return self.result
else:
# still in process
if time.time() - self.rcv_time > self.tx_timeout:
# the process is taking too much time
ReliableMessage.error(fl_ctx, f"aborting processing since exceeded max tx time {self.tx_timeout}")
return _status_reply(STATUS_ABORTED)
else:
ReliableMessage.debug(fl_ctx, "got query: request is in-process")
# we are still processing
ReliableMessage.info(fl_ctx, "got request - the request is being processed")
return _status_reply(STATUS_IN_PROCESS)
elif op == OP_QUERY:
if self.result:
if self.reply_time:
# result already sent back successfully
ReliableMessage.info(fl_ctx, "got query: we already replied successfully")
return _status_reply(STATUS_REPLIED)
elif self.replying:
# result is being sent
ReliableMessage.info(fl_ctx, "got query: reply is being sent")
return _status_reply(STATUS_IN_REPLY)
else:
# try to send the result again
ReliableMessage.info(fl_ctx, "got query: sending reply again")
return self.result
else:
# still in process
if time.time() - self.rcv_time > self.tx_timeout:
# the process is taking too much time
ReliableMessage.error(
fl_ctx, f"aborting processing since exceeded max tx time {self.tx_timeout}"
)
return _status_reply(STATUS_ABORTED)
else:
ReliableMessage.debug(fl_ctx, "got query: request is in-process")
return _status_reply(STATUS_IN_PROCESS)

def _try_reply(self, fl_ctx: FLContext):
engine = fl_ctx.get_engine()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def all_reduce(
raise RuntimeError(f"bad result from XGB server: expect AllreduceReply but got {type(result)}")

def broadcast(self, rank: int, seq: int, root: int, send_buf: bytes, fl_ctx: FLContext) -> bytes:
self.logger.debug(f"Sending broadcast: {rank=} {seq=} {root=} {len(send_buf)=}")
result = self.internal_xgb_client.send_broadcast(seq_num=seq, rank=rank, data=send_buf, root=root)
if isinstance(result, pb2.BroadcastReply):
return result.receive_buffer
Expand Down

0 comments on commit 1ddff91

Please sign in to comment.