Skip to content

Commit

Permalink
peerToPeer shoud be complete and usable, now to test and debug...
Browse files Browse the repository at this point in the history
  • Loading branch information
heckj committed Apr 10, 2024
1 parent 70b6d27 commit f9b05b5
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 12 deletions.
4 changes: 2 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ let package = Package(
.package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"),

// Combine replacement for OSS
// .package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
// Distributed Tracing
.package(url: "https://github.com/apple/swift-distributed-tracing", from: "1.0.0"),
// Testing Tracing
Expand All @@ -45,7 +45,7 @@ let package = Package(
.product(name: "Base58Swift", package: "Base58Swift"),

// Combine replacement for OSS
// .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),

// Distributed Tracing
.product(name: "Tracing", package: "swift-distributed-tracing"),
Expand Down
151 changes: 141 additions & 10 deletions Sources/AutomergeRepo/Networking/Providers/PeerToPeerProvider.swift
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import AsyncAlgorithms
import Foundation
import Network
import OSLog
Expand Down Expand Up @@ -37,6 +38,12 @@ public actor PeerToPeerProvider: NetworkProvider {
}
}

public struct AvailablePeer: Sendable {
let peerId: PEER_ID
let endpoint: NWEndpoint
let name: String
}

public var peeredConnections: [PeerConnection] {
connections.values.compactMap { holder in
if let peerId = holder.peerId {
Expand Down Expand Up @@ -74,6 +81,22 @@ public actor PeerToPeerProvider: NetworkProvider {
let newConnectionContinuation: AsyncStream<NWConnection>.Continuation
var newConnectionTaskHandle: Task<Void, Never>?

// browser tasks to process/react to callbacks
let browserStateStream: AsyncStream<NWBrowser.State>
let browserStateContinuation: AsyncStream<NWBrowser.State>.Continuation
var browserStateUpdateTaskHandle: Task<Void, Never>?

struct BrowserResultUpdate: Sendable {
let newResults: Set<NWBrowser.Result>
let changes: Set<NWBrowser.Result.Change>
}

let browserResultUpdateStream: AsyncStream<BrowserResultUpdate>
let browserResultUpdateContinuation: AsyncStream<BrowserResultUpdate>.Continuation
var browserResultUpdateTaskHandle: Task<Void, Never>?

public let availablePeerChannel: AsyncChannel<[AvailablePeer]>

// this allows us to create a provider, but it's not ready to go until
// its fully configured by setting a delegate on it, which initializes
// not only delegate, but also peerId and the optional peerMetadata
Expand Down Expand Up @@ -101,6 +124,14 @@ public actor PeerToPeerProvider: NetworkProvider {
// Start the connection to accept it, or cancel to reject it.
(newConnectionQueue, newConnectionContinuation) = AsyncStream<NWConnection>.makeStream()
self.newConnectionTaskHandle = nil

(browserStateStream, browserStateContinuation) = AsyncStream<NWBrowser.State>.makeStream()
self.browserStateUpdateTaskHandle = nil

(browserResultUpdateStream, browserResultUpdateContinuation) = AsyncStream<BrowserResultUpdate>.makeStream()
self.browserStateUpdateTaskHandle = nil

self.availablePeerChannel = AsyncChannel()
}

deinit {
Expand Down Expand Up @@ -185,9 +216,16 @@ public actor PeerToPeerProvider: NetworkProvider {
self.delegate = delegate
self.peerId = peerId
self.peerMetadata = metadata

// if listener = true, set up a listener...
if config.listening {
if listener == nil {
self.setupBonjourListener()
}
}
}

// extra
// MARK: Extra provider methods for listener & multi-connection

/// Cancels and removes the connection for a given peerId
/// - Parameter peerId: the peer Id to disconnect from either receiving or initiated connection
Expand All @@ -207,15 +245,6 @@ public actor PeerToPeerProvider: NetworkProvider {
}
}

public func activateListener() {
// if listener = true, set up a listener...
if config.listening {
if listener == nil {
self.setupBonjourListener()
}
}
}

// MARK: Outgoing connection functions

// Returns a new websocketTask to track (at which point, save the url as the endpoint)
Expand Down Expand Up @@ -363,6 +392,108 @@ public actor PeerToPeerProvider: NetworkProvider {

// MARK: NWBrowser

// Start browsing for services.
fileprivate func startBrowsing() {
// Create parameters, and allow browsing over a peer-to-peer link.
let browserNetworkParameters = NWParameters()
browserNetworkParameters.includePeerToPeer = true

// Browse for the Automerge sync bonjour service type.
let newNetworkBrowser = NWBrowser(
for: .bonjourWithTXTRecord(type: P2PAutomergeSyncProtocol.bonjourType, domain: nil),
using: browserNetworkParameters
)

browserStateUpdateTaskHandle = Task {
for await newState in browserStateStream {
await reactToNWBrowserStateUpdate(newState)
}
}

// connect into the existing system by yielding the value
// into the continuation that the stream provided on creation.
newNetworkBrowser.stateUpdateHandler = { newState in
self.browserStateContinuation.yield(newState)
}

browserResultUpdateTaskHandle = Task {
for await update in browserResultUpdateStream {
await handleNWBrowserUpdates(update)
}
}

newNetworkBrowser.browseResultsChangedHandler = { results, changes in
self.browserResultUpdateContinuation.yield(BrowserResultUpdate(newResults: results, changes: changes))
}

Logger.peerProtocol.info("Activating NWBrowser \(newNetworkBrowser.debugDescription, privacy: .public)")
browser = newNetworkBrowser
// Start browsing and ask for updates on the main queue.
newNetworkBrowser.start(queue: .main)
}

private func reactToNWBrowserStateUpdate(_ newState: NWBrowser.State) async {
switch newState {
case let .failed(error):
// Restart the browser if it loses its connection.
if error == NWError.dns(DNSServiceErrorType(kDNSServiceErr_DefunctConnection)) {
Logger.peerProtocol.info("Browser failed with \(error, privacy: .public), restarting")
self.browser?.cancel()
self.startBrowsing()
} else {
Logger.peerProtocol.warning("Browser failed with \(error, privacy: .public), stopping")
self.browser?.cancel()
}
case .ready:
break
case .cancelled:
break
default:
break
}
}

private func handleNWBrowserUpdates(_ update: BrowserResultUpdate) async {
Logger.peerProtocol.debug("browser update shows \(update.newResults.count, privacy: .public) result(s):")

let availablePeers = update.newResults.compactMap { browserResult in
Logger.peerProtocol
.debug(
" \(browserResult.endpoint.debugDescription, privacy: .public) \(browserResult.metadata.debugDescription, privacy: .public)"
)
if case let .bonjour(txtRecord) = browserResult.metadata,
let name = txtRecord[TXTRecordKeys.name],
let peerId = txtRecord[TXTRecordKeys.name]
{
return AvailablePeer(peerId: peerId, endpoint: browserResult.endpoint, name: name)
}
return nil
}
await availablePeerChannel.send(availablePeers)

if config.autoconnect {
for change in update.changes {
if case let .added(result) = change {
do {
try await connect(to: result.endpoint)
} catch {
Logger.peerProtocol
.warning(
"Attempted to connect to \(result.endpoint.debugDescription), but failed: \(error.localizedDescription)"
)
}
}
}
}
}

fileprivate func stopBrowsing() {
guard let browser else { return }
Logger.peerProtocol.info("Terminating NWBrowser")
browser.cancel()
self.browser = nil
}

// MARK: NWListener handlers

private func reactToNWListenerStateUpdate(_ newState: NWListener.State) async {
Expand Down

0 comments on commit f9b05b5

Please sign in to comment.