Skip to content

Commit

Permalink
Use TaskGroup to wait on network activating or retry timeout, whichev…
Browse files Browse the repository at this point in the history
…er is first (#102)

* Add NWPathMonitor logic to WebSocketProvider retries
* Add NWPathMonitor logic to WebSocketProvider retries (using group)
* Remove inadvertent comment changes
* Return proper success
* Cleanup comments
  • Loading branch information
jessegrosjean authored May 20, 2024
1 parent 39f75a8 commit c24502c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 7 deletions.
49 changes: 42 additions & 7 deletions Sources/AutomergeRepo/Networking/Providers/WebSocketProvider.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Automerge
import Network
@preconcurrency import Combine
import OSLog

Expand Down Expand Up @@ -45,9 +46,7 @@ public final class WebSocketProvider: NetworkProvider {
ongoingReceiveMessageTask = nil
peered = false
}

// MARK: NetworkProvider Methods


/// Initiate an outgoing connection.
public func connect(to url: URL) async throws {
if peered {
Expand Down Expand Up @@ -362,7 +361,7 @@ public final class WebSocketProvider: NetworkProvider {
// reconnect attempts)
var reconnectAttempts: UInt = 0
var tryToReconnect = config.reconnectOnError

repeat {
msgFromWebSocket = nil

Expand All @@ -374,6 +373,7 @@ public final class WebSocketProvider: NetworkProvider {
tryToReconnect = false
break
}


// if we're not currently peered, attempt to reconnect
// (if we're configured to do so)
Expand All @@ -394,10 +394,44 @@ public final class WebSocketProvider: NetworkProvider {
)
}
reconnectAttempts += 1

do {
try await Task.sleep(for: .seconds(waitBeforeReconnect))
// if endpoint is nil, this returns nil
_ = try await attemptConnect(to: endpoint)
// Wait to reconnect. Wait for waitBeforeReconnect and networth path
// transitioning from not satisfied to satisfied. Whichever comes first.
let success = try await withThrowingTaskGroup(of: Void.self) { group in

// Wait for timeout
group.addTask {
try await Task.sleep(for: .seconds(waitBeforeReconnect))
}

// Wait for network becomming availible
group.addTask {
let monitor = NWPathMonitor()
var last = monitor.currentPath
for await each in monitor.paths() {
if last.status != .satisfied && each.status == .satisfied {
Logger.websocket.info("WEBSOCKET: Network path satisfied while waiting to reconnect")
return
} else {
last = each
}
}
}

// After either task succeeds then cancel group and attempt connection
for try await _ in group {
group.cancelAll()
return try await attemptConnect(to: endpoint)
}

return false
}

if success {
// On successful connection reset connection attemtps
reconnectAttempts = 0
}
} catch {
webSocketTask = nil
peered = false
Expand Down Expand Up @@ -437,6 +471,7 @@ public final class WebSocketProvider: NetworkProvider {
}
}
} while tryToReconnect

self.peered = false
webSocketTask?.cancel()
webSocketTask = nil
Expand Down
15 changes: 15 additions & 0 deletions Sources/AutomergeRepo/extensions/NWPathMonitor+paths.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import Network

extension NWPathMonitor {
func paths() -> AsyncStream<NWPath> {
AsyncStream { continuation in
pathUpdateHandler = { path in
continuation.yield(path)
}
continuation.onTermination = { [weak self] _ in
self?.cancel()
}
start(queue: DispatchQueue(label: "NSPathMonitor.paths"))
}
}
}

0 comments on commit c24502c

Please sign in to comment.