diff --git a/lib/src/socket.dart b/lib/src/socket.dart index b456458..ab794b6 100644 --- a/lib/src/socket.dart +++ b/lib/src/socket.dart @@ -253,9 +253,12 @@ class PhoenixSocket { /// Send a channel on the socket. /// - /// Used internally to send prepared message. If you need to send - /// a message on a channel, you would usually use [PhoenixChannel.push] - /// instead. + /// Used internally to send prepared message. If you need to send a message on + /// a channel, you would usually use [PhoenixChannel.push] instead. + /// + /// Returns a future that completes when the reply for the sent message is + /// received. If your flow awaits for the result of this future, add a timout + /// to it so that you are not stuck in case that the reply is never received. Future sendMessage(Message message) { if (message.ref == null) { throw ArgumentError.value( @@ -339,12 +342,9 @@ class PhoenixSocket { _cancelHeartbeat(); - _connectionManager.start(); - _logger.info('Waiting for initial heartbeat round trip'); - final initialHearbeatSucceeded = await _sendHeartbeat(force: true); - _logger.fine('Initial heartbeat result: $initialHearbeatSucceeded'); - if (initialHearbeatSucceeded) { + final initialHeartbeatSucceeded = await _sendHeartbeat(force: true); + if (initialHeartbeatSucceeded) { _logger.info('Socket open'); _stateEventStreamController.add(PhoenixSocketOpenEvent()); } @@ -373,38 +373,43 @@ class PhoenixSocket { _logger.fine('Heartbeat ${heartbeatMessage.ref} sent'); _latestHeartbeatRef = heartbeatMessage.ref; - _heartbeatTimeout = _scheduleHeartbeat(); + _heartbeatTimeout = _scheduleHeartbeat(_latestHeartbeatRef!); await _pendingMessages[_latestHeartbeatRef]!.future; + _logger.fine('Heartbeat $_latestHeartbeatRef completed'); return true; - } on WebSocketChannelException catch (error, stacktrace) { - _logger.severe( + } on WebSocketChannelException catch (error, stackTrace) { + _logger.warning( 'Heartbeat message failed: WebSocketChannelException', error, - stacktrace, + stackTrace, ); _stateEventStreamController.add( PhoenixSocketErrorEvent( error: error, - stacktrace: stacktrace, + stacktrace: stackTrace, ), ); return false; - } catch (error, stacktrace) { - _logger.warning('Heartbeat message failed', error, stacktrace); + } catch (error, stackTrace) { + _logger.warning('Heartbeat message failed', error, stackTrace); _reconnect(4001, reason: 'Heartbeat timeout'); return false; } } - Timer _scheduleHeartbeat() { + Timer _scheduleHeartbeat([String? previousHeartbeat]) { return Timer(_options.heartbeat, () { - if (_latestHeartbeatRef != null) { - final completer = _pendingMessages.remove(_latestHeartbeatRef); + if (previousHeartbeat != null) { + final completer = _pendingMessages.remove(previousHeartbeat); if (completer != null && !completer.isCompleted) { - completer - .completeError(HeartbeatFailedException(_latestHeartbeatRef!)); + completer.completeError( + TimeoutException( + 'Heartbeat $previousHeartbeat not completed before sending new one', + ), + StackTrace.current, + ); return; } } @@ -461,9 +466,12 @@ class PhoenixSocket { } completer.complete(message); } - // The connection is alive, prevent hearbeat timeout from closing - // connection. - _latestHeartbeatRef = null; + + if (message.ref != _latestHeartbeatRef) { + // The connection is alive, prevent heartbeat timeout from closing + // connection. + _latestHeartbeatRef = null; + } } if (message.topic != null && message.topic!.isNotEmpty) { @@ -527,12 +535,3 @@ class PhoenixSocket { _triggerChannelExceptions(exception); } } - -final class HeartbeatFailedException implements Exception { - HeartbeatFailedException(this.heartbeatRef); - - final String heartbeatRef; - - @override - String toString() => 'HeartbeatFailedException(ref: $heartbeatRef)'; -}