From f9b05b50290c24125a4ddf1ce32ec952eee0120c Mon Sep 17 00:00:00 2001 From: Joe Heck Date: Wed, 10 Apr 2024 16:26:13 -0700 Subject: [PATCH] peerToPeer shoud be complete and usable, now to test and debug... --- Package.swift | 4 +- .../Providers/PeerToPeerProvider.swift | 151 ++++++++++++++++-- 2 files changed, 143 insertions(+), 12 deletions(-) diff --git a/Package.swift b/Package.swift index f1193d3..6bb3c9d 100644 --- a/Package.swift +++ b/Package.swift @@ -27,7 +27,7 @@ let package = Package( .package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"), // Combine replacement for OSS - // .package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"), // Distributed Tracing .package(url: "https://github.com/apple/swift-distributed-tracing", from: "1.0.0"), // Testing Tracing @@ -45,7 +45,7 @@ let package = Package( .product(name: "Base58Swift", package: "Base58Swift"), // Combine replacement for OSS - // .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), + .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), // Distributed Tracing .product(name: "Tracing", package: "swift-distributed-tracing"), diff --git a/Sources/AutomergeRepo/Networking/Providers/PeerToPeerProvider.swift b/Sources/AutomergeRepo/Networking/Providers/PeerToPeerProvider.swift index 6aa2c84..fb261d4 100644 --- a/Sources/AutomergeRepo/Networking/Providers/PeerToPeerProvider.swift +++ b/Sources/AutomergeRepo/Networking/Providers/PeerToPeerProvider.swift @@ -1,3 +1,4 @@ +import AsyncAlgorithms import Foundation import Network import OSLog @@ -37,6 +38,12 @@ public actor PeerToPeerProvider: NetworkProvider { } } + public struct AvailablePeer: Sendable { + let peerId: PEER_ID + let endpoint: NWEndpoint + let name: String + } + public var peeredConnections: [PeerConnection] { connections.values.compactMap { holder in if let peerId = holder.peerId { @@ -74,6 +81,22 @@ public actor PeerToPeerProvider: NetworkProvider { let newConnectionContinuation: AsyncStream.Continuation var newConnectionTaskHandle: Task? + // browser tasks to process/react to callbacks + let browserStateStream: AsyncStream + let browserStateContinuation: AsyncStream.Continuation + var browserStateUpdateTaskHandle: Task? + + struct BrowserResultUpdate: Sendable { + let newResults: Set + let changes: Set + } + + let browserResultUpdateStream: AsyncStream + let browserResultUpdateContinuation: AsyncStream.Continuation + var browserResultUpdateTaskHandle: Task? + + public let availablePeerChannel: AsyncChannel<[AvailablePeer]> + // this allows us to create a provider, but it's not ready to go until // its fully configured by setting a delegate on it, which initializes // not only delegate, but also peerId and the optional peerMetadata @@ -101,6 +124,14 @@ public actor PeerToPeerProvider: NetworkProvider { // Start the connection to accept it, or cancel to reject it. (newConnectionQueue, newConnectionContinuation) = AsyncStream.makeStream() self.newConnectionTaskHandle = nil + + (browserStateStream, browserStateContinuation) = AsyncStream.makeStream() + self.browserStateUpdateTaskHandle = nil + + (browserResultUpdateStream, browserResultUpdateContinuation) = AsyncStream.makeStream() + self.browserStateUpdateTaskHandle = nil + + self.availablePeerChannel = AsyncChannel() } deinit { @@ -185,9 +216,16 @@ public actor PeerToPeerProvider: NetworkProvider { self.delegate = delegate self.peerId = peerId self.peerMetadata = metadata + + // if listener = true, set up a listener... + if config.listening { + if listener == nil { + self.setupBonjourListener() + } + } } - // extra + // MARK: Extra provider methods for listener & multi-connection /// Cancels and removes the connection for a given peerId /// - Parameter peerId: the peer Id to disconnect from either receiving or initiated connection @@ -207,15 +245,6 @@ public actor PeerToPeerProvider: NetworkProvider { } } - public func activateListener() { - // if listener = true, set up a listener... - if config.listening { - if listener == nil { - self.setupBonjourListener() - } - } - } - // MARK: Outgoing connection functions // Returns a new websocketTask to track (at which point, save the url as the endpoint) @@ -363,6 +392,108 @@ public actor PeerToPeerProvider: NetworkProvider { // MARK: NWBrowser + // Start browsing for services. + fileprivate func startBrowsing() { + // Create parameters, and allow browsing over a peer-to-peer link. + let browserNetworkParameters = NWParameters() + browserNetworkParameters.includePeerToPeer = true + + // Browse for the Automerge sync bonjour service type. + let newNetworkBrowser = NWBrowser( + for: .bonjourWithTXTRecord(type: P2PAutomergeSyncProtocol.bonjourType, domain: nil), + using: browserNetworkParameters + ) + + browserStateUpdateTaskHandle = Task { + for await newState in browserStateStream { + await reactToNWBrowserStateUpdate(newState) + } + } + + // connect into the existing system by yielding the value + // into the continuation that the stream provided on creation. + newNetworkBrowser.stateUpdateHandler = { newState in + self.browserStateContinuation.yield(newState) + } + + browserResultUpdateTaskHandle = Task { + for await update in browserResultUpdateStream { + await handleNWBrowserUpdates(update) + } + } + + newNetworkBrowser.browseResultsChangedHandler = { results, changes in + self.browserResultUpdateContinuation.yield(BrowserResultUpdate(newResults: results, changes: changes)) + } + + Logger.peerProtocol.info("Activating NWBrowser \(newNetworkBrowser.debugDescription, privacy: .public)") + browser = newNetworkBrowser + // Start browsing and ask for updates on the main queue. + newNetworkBrowser.start(queue: .main) + } + + private func reactToNWBrowserStateUpdate(_ newState: NWBrowser.State) async { + switch newState { + case let .failed(error): + // Restart the browser if it loses its connection. + if error == NWError.dns(DNSServiceErrorType(kDNSServiceErr_DefunctConnection)) { + Logger.peerProtocol.info("Browser failed with \(error, privacy: .public), restarting") + self.browser?.cancel() + self.startBrowsing() + } else { + Logger.peerProtocol.warning("Browser failed with \(error, privacy: .public), stopping") + self.browser?.cancel() + } + case .ready: + break + case .cancelled: + break + default: + break + } + } + + private func handleNWBrowserUpdates(_ update: BrowserResultUpdate) async { + Logger.peerProtocol.debug("browser update shows \(update.newResults.count, privacy: .public) result(s):") + + let availablePeers = update.newResults.compactMap { browserResult in + Logger.peerProtocol + .debug( + " \(browserResult.endpoint.debugDescription, privacy: .public) \(browserResult.metadata.debugDescription, privacy: .public)" + ) + if case let .bonjour(txtRecord) = browserResult.metadata, + let name = txtRecord[TXTRecordKeys.name], + let peerId = txtRecord[TXTRecordKeys.name] + { + return AvailablePeer(peerId: peerId, endpoint: browserResult.endpoint, name: name) + } + return nil + } + await availablePeerChannel.send(availablePeers) + + if config.autoconnect { + for change in update.changes { + if case let .added(result) = change { + do { + try await connect(to: result.endpoint) + } catch { + Logger.peerProtocol + .warning( + "Attempted to connect to \(result.endpoint.debugDescription), but failed: \(error.localizedDescription)" + ) + } + } + } + } + } + + fileprivate func stopBrowsing() { + guard let browser else { return } + Logger.peerProtocol.info("Terminating NWBrowser") + browser.cancel() + self.browser = nil + } + // MARK: NWListener handlers private func reactToNWListenerStateUpdate(_ newState: NWListener.State) async {