Skip to content

Commit

Permalink
Even more improvements!
Browse files Browse the repository at this point in the history
  • Loading branch information
kpsroka committed Aug 8, 2024
1 parent 26b0ba7 commit dd00743
Showing 1 changed file with 31 additions and 32 deletions.
63 changes: 31 additions & 32 deletions lib/src/socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,12 @@ class PhoenixSocket {
/// Send a channel on the socket.
///
/// Used internally to send prepared message. If you need to send
/// a message on a channel, you would usually use [PhoenixChannel.push]
/// instead.
/// Used internally to send prepared message. If you need to send a message on
/// a channel, you would usually use [PhoenixChannel.push] instead.
///
/// Returns a future that completes when the reply for the sent message is
/// received. If your flow awaits for the result of this future, add a timout
/// to it so that you are not stuck in case that the reply is never received.
Future<Message> sendMessage(Message message) {
if (message.ref == null) {
throw ArgumentError.value(
Expand Down Expand Up @@ -339,12 +342,9 @@ class PhoenixSocket {

_cancelHeartbeat();

_connectionManager.start();

_logger.info('Waiting for initial heartbeat round trip');
final initialHearbeatSucceeded = await _sendHeartbeat(force: true);
_logger.fine('Initial heartbeat result: $initialHearbeatSucceeded');
if (initialHearbeatSucceeded) {
final initialHeartbeatSucceeded = await _sendHeartbeat(force: true);
if (initialHeartbeatSucceeded) {
_logger.info('Socket open');
_stateEventStreamController.add(PhoenixSocketOpenEvent());
}
Expand Down Expand Up @@ -373,38 +373,43 @@ class PhoenixSocket {
_logger.fine('Heartbeat ${heartbeatMessage.ref} sent');
_latestHeartbeatRef = heartbeatMessage.ref;

_heartbeatTimeout = _scheduleHeartbeat();
_heartbeatTimeout = _scheduleHeartbeat(_latestHeartbeatRef!);

await _pendingMessages[_latestHeartbeatRef]!.future;
_logger.fine('Heartbeat $_latestHeartbeatRef completed');
return true;
} on WebSocketChannelException catch (error, stacktrace) {
_logger.severe(
} on WebSocketChannelException catch (error, stackTrace) {
_logger.warning(
'Heartbeat message failed: WebSocketChannelException',
error,
stacktrace,
stackTrace,
);
_stateEventStreamController.add(
PhoenixSocketErrorEvent(
error: error,
stacktrace: stacktrace,
stacktrace: stackTrace,
),
);

return false;
} catch (error, stacktrace) {
_logger.warning('Heartbeat message failed', error, stacktrace);
} catch (error, stackTrace) {
_logger.warning('Heartbeat message failed', error, stackTrace);
_reconnect(4001, reason: 'Heartbeat timeout');
return false;
}
}

Timer _scheduleHeartbeat() {
Timer _scheduleHeartbeat([String? previousHeartbeat]) {
return Timer(_options.heartbeat, () {
if (_latestHeartbeatRef != null) {
final completer = _pendingMessages.remove(_latestHeartbeatRef);
if (previousHeartbeat != null) {
final completer = _pendingMessages.remove(previousHeartbeat);
if (completer != null && !completer.isCompleted) {
completer
.completeError(HeartbeatFailedException(_latestHeartbeatRef!));
completer.completeError(
TimeoutException(
'Heartbeat $previousHeartbeat not completed before sending new one',
),
StackTrace.current,
);
return;
}
}
Expand Down Expand Up @@ -461,9 +466,12 @@ class PhoenixSocket {
}
completer.complete(message);
}
// The connection is alive, prevent hearbeat timeout from closing
// connection.
_latestHeartbeatRef = null;

if (message.ref != _latestHeartbeatRef) {
// The connection is alive, prevent heartbeat timeout from closing
// connection.
_latestHeartbeatRef = null;
}
}

if (message.topic != null && message.topic!.isNotEmpty) {
Expand Down Expand Up @@ -527,12 +535,3 @@ class PhoenixSocket {
_triggerChannelExceptions(exception);
}
}

final class HeartbeatFailedException implements Exception {
HeartbeatFailedException(this.heartbeatRef);

final String heartbeatRef;

@override
String toString() => 'HeartbeatFailedException(ref: $heartbeatRef)';
}

0 comments on commit dd00743

Please sign in to comment.