From a92daaa4bad7b0437897e4ecc79fb3149cf50cca Mon Sep 17 00:00:00 2001 From: Jack Stone Date: Mon, 1 Mar 2021 10:49:02 +0000 Subject: [PATCH] AsPublisher Failure Type Alignment with RxSwift (#2) * Add additional asObservable tests * Fix spelling * Fix alignment * Move and rename assertBridgeBufferDoesNotOverflowIfPossible * Align asPublisher Error type with RxSwift `Error` * Make assertBridgeBufferDoesNotOverflowIfPossible testable and add missing tests --- README.md | 4 +- .../RxSwift to Combine/BridgePublisher.swift | 2 +- .../ObservableType+AsPublisher.swift | 27 +-- ...ridgeBufferDoesNotOverflowIfPossible.swift | 28 ++++ .../ObservableType+AsPublisherTests.swift | 157 +++++++++++++++++- .../Publisher+AsObservableTests.swift | 62 ++++++- Tests/CombineRxTests/TestError.swift | 1 + 7 files changed, 255 insertions(+), 26 deletions(-) create mode 100644 Sources/CombineRx/Interoperability/RxSwift to Combine/Publisher+AssertBridgeBufferDoesNotOverflowIfPossible.swift diff --git a/README.md b/README.md index 6573410..3dae0c2 100644 --- a/README.md +++ b/README.md @@ -30,13 +30,13 @@ let myBridgedPublisher1 = Observable.just(0).asPublisher(withBufferSize: 1, andB let myBridgedPublisher2 = Observable.from([0, 1, 2, 3]).asPublisher(withBufferSize: 4, andBridgeBufferingStrategy: .error) ``` -One difference between RxSwift and Combine is that Combine adheres to the mechanism of back pressure in order to ensure that `Publisher`s only produce as many elements that `Subscriber`s have requested. This prevents the case where elements might build up in a `Publisher`s buffer faster than they can be processed downstream by a subscriber as this could lead to out-of-memory errors and degradation in performance due to high system resource consumption. Combine applies this back pressure upstream through a contractual obligation by `Publisher`s to only emit an element when it is requested by `Subscriber`s through `Subscribers.Demand` requests. +One difference between RxSwift and Combine is that Combine adheres to the mechanism of backpressure in order to ensure that `Publisher`s only produce as many elements that `Subscriber`s have requested. This prevents the case where elements might build up in a `Publisher`s buffer faster than they can be processed downstream by a subscriber as this could lead to out-of-memory errors and degradation in performance due to high system resource consumption. Combine applies this backpressure upstream through a contractual obligation by `Publisher`s to only emit an element when it is requested by `Subscriber`s through `Subscribers.Demand` requests. RxSwift `Observable`s differ in this regard as they rely on a source with an unbounded rate of production and therefore when bridging to a Combine `Publisher`, we must maintain a buffer or drop elements accordingly in order to satisfy the requirements of downstream subscribers. This is the reason for the required `withBufferSize` and `andBridgeBufferingStrategy` parameters for `asPublisher(withBufferSize:andBridgeBufferingStrategy:)`. `withBufferSize` is where the buffer size should manually be set (ideally based directly on the number of expected elements in the sequence). `andBridgeBufferingStrategy` is the strategy to employ when the maximum buffer capacity is reached. Keeping in line with native Combine strategies, this can either be `error`, where any buffer overflow is treated as an error, `dropNewest` where the elements already present in the buffer are maintained and any new elements are ignored, or finally `dropOldest` where new elements are added to the buffer and replace older elements that were already present. -Additional information on Combine's use of back pressure can be found [here](https://developer.apple.com/documentation/combine/processing-published-elements-with-subscribers). +Additional information on Combine's use of backpressure can be found [here](https://developer.apple.com/documentation/combine/processing-published-elements-with-subscribers). ## Installation diff --git a/Sources/CombineRx/Interoperability/RxSwift to Combine/BridgePublisher.swift b/Sources/CombineRx/Interoperability/RxSwift to Combine/BridgePublisher.swift index 3a75e19..a2772f7 100644 --- a/Sources/CombineRx/Interoperability/RxSwift to Combine/BridgePublisher.swift +++ b/Sources/CombineRx/Interoperability/RxSwift to Combine/BridgePublisher.swift @@ -17,7 +17,7 @@ public struct BridgePublisher: Publisher { self.upstream = upstream } - public func receive(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input { + public func receive(subscriber: S) where S: Subscriber, Self.Failure == S.Failure, Self.Output == S.Input { subscriber.receive(subscription: BridgeSubscription(upstream: upstream, downstream: subscriber)) } } diff --git a/Sources/CombineRx/Interoperability/RxSwift to Combine/ObservableType+AsPublisher.swift b/Sources/CombineRx/Interoperability/RxSwift to Combine/ObservableType+AsPublisher.swift index cd6e485..6c579e2 100644 --- a/Sources/CombineRx/Interoperability/RxSwift to Combine/ObservableType+AsPublisher.swift +++ b/Sources/CombineRx/Interoperability/RxSwift to Combine/ObservableType+AsPublisher.swift @@ -17,31 +17,22 @@ extension ObservableType { /// In order to mitigate out-of-memory errors, it is recommended to provide a conservative value for `size` that matches /// the expected output of the upstream `Observable` and consumption of the downstream `Subscriber`. /// + /// In order to align with RxSwift error types, the `Failure` type of the returned publisher is `Error` whereby the concrete upstream error type is preserved. + /// If it is desired to transform the publisher error type further downstream, it will be required to do this manually using the `mapError` accordingly. + /// /// - Parameter size: The size of the buffer. /// - Parameter whenFull: The buffering strategy to use. This determines how to handle the case when maximum buffer capacity is reached. /// /// - Returns: A Combine `Publisher` that is the bridged transformation of the given `ObservableType`. public func asPublisher(withBufferSize size: Int, - andBridgeBufferingStrategy whenFull: BridgeBufferingStrategy) -> Publishers.Buffer> { + andBridgeBufferingStrategy whenFull: BridgeBufferingStrategy) -> Publishers.MapError>, Error> { return BridgePublisher(upstream: self) .buffer(size: size, prefetch: .byRequest, whenFull: whenFull.strategy) - } -} - -extension Publisher where Failure == BridgeFailure { - - /// Raises a fatal error when an upstream `BridgePublisher`'s buffer overflows. - /// In the event this does not occur, the failure type is mapped to the `Error` type of the upstream `Observable`. - /// - /// This function can be used at any point following the `asCombineBridge` function if you want to ensure a buffer overflow never occurs. - /// - /// - Returns: A publisher that maps any upstream error or fatal errors in the event of a `bufferOverflow`. - public func assertBridgeBufferDoesNotOverflow() -> Publishers.MapError { - return mapError { error -> Error in - switch error { - case .bufferOverflow: preconditionFailure("Bridge buffer overflowed.") - case .upstreamError(let upstreamError): return upstreamError + .mapError { error -> Error in + switch error { + case BridgeFailure.upstreamError(let upstreamError): return upstreamError + default: return error + } } - } } } diff --git a/Sources/CombineRx/Interoperability/RxSwift to Combine/Publisher+AssertBridgeBufferDoesNotOverflowIfPossible.swift b/Sources/CombineRx/Interoperability/RxSwift to Combine/Publisher+AssertBridgeBufferDoesNotOverflowIfPossible.swift new file mode 100644 index 0000000..fec83ff --- /dev/null +++ b/Sources/CombineRx/Interoperability/RxSwift to Combine/Publisher+AssertBridgeBufferDoesNotOverflowIfPossible.swift @@ -0,0 +1,28 @@ +// +// Publisher+AssertBridgeBufferDoesNotOverflowIfPossible.swift +// Copyright © 2021 Notonthehighstreet Enterprises Limited. All rights reserved. +// + +import Foundation +import Combine + +extension Publisher { + + /// Convenience method whose default implementation causes a `preconditionFailure` when an upstream `BridgePublisher`'s buffer overflows. + /// In the event this does not occur, the failure type is mapped to the `Error` type of the upstream `Observable`. + /// + /// This function can be used at any point following the `asPublisher` function if you want to ensure a buffer overflow never occurs. + /// + /// - Returns: A publisher that maps any upstream error or fatal errors in the event of a `bufferOverflow`. + /// + public func assertBridgeBufferDoesNotOverflowIfPossible(onBufferOverflow: @escaping () -> Error = { preconditionFailure("Bridge buffer overflowed.") }) -> Publishers.MapError { + return mapError { error -> Error in + guard let bridgeFailure = error as? BridgeFailure else { return error } + + switch bridgeFailure { + case .bufferOverflow: return onBufferOverflow() + case .upstreamError(let upstreamError): return upstreamError + } + } + } +} diff --git a/Tests/CombineRxTests/Interoperability/ObservableType+AsPublisherTests.swift b/Tests/CombineRxTests/Interoperability/ObservableType+AsPublisherTests.swift index 3f53f74..245aab4 100644 --- a/Tests/CombineRxTests/Interoperability/ObservableType+AsPublisherTests.swift +++ b/Tests/CombineRxTests/Interoperability/ObservableType+AsPublisherTests.swift @@ -96,6 +96,93 @@ final class ObservableType_AsPublisherTests: XCTestCase { wait(for: [expectation], timeout: 0.1) } + func testErrorTypeCanBeChainedDownstreamIfAlsoErrorType() { + + let expectation = XCTestExpectation(description: "Should complete with `.failure`") + let subject = PublishSubject() + + subject + .asPublisher(withBufferSize: 1, andBridgeBufferingStrategy: .error) + .receive(on: scheduler) + .flatMap { value -> AnyPublisher in + Just(value * 2) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() + } + .sink( + receiveCompletion: { completion in + guard case .failure(let error) = completion else { + XCTFail("Did not complete with `.failure`") + return + } + + guard case TestError.generic = error else { + XCTFail() + return + } + + expectation.fulfill() + }, + receiveValue: { _ in XCTFail("Should not receive any values") } + ) + .store(in: &cancellables) + + subject.onError(TestError.generic) + + scheduler.advance() + + wait(for: [expectation], timeout: 0.1) + } + + func testErrorTypeCanBeMappedFurtherDownstream() { + + let expectation = XCTestExpectation(description: "Should complete with `.failure`") + let subject = PublishSubject() + + struct DatedError: Error { + let error: Error + let date: Date + + init(_ error: Error) { + self.error = error + date = Date() + } + } + + subject + .asPublisher(withBufferSize: 1, andBridgeBufferingStrategy: .error) + .receive(on: scheduler) + .mapError { DatedError($0) } // Map error required to map from generic `Error` type to desired `DatedError` type + .flatMap { value -> AnyPublisher in // Dummy flatMap to indicate a possible transform that might be required + Just(value * 2) + .setFailureType(to: DatedError.self) + .eraseToAnyPublisher() + } + .sink( + receiveCompletion: { completion in + guard case .failure(let datedError) = completion else { + XCTFail("Did not complete with `.failure`") + return + } + + guard case TestError.generic = datedError.error else { + XCTFail() + return + } + + expectation.fulfill() + }, + receiveValue: { _ in XCTFail("Should not receive any values") } + ) + .store(in: &cancellables) + + subject.onError(TestError.generic) + + scheduler.advance() + + wait(for: [expectation], timeout: 0.1) + } + func testCanFillBufferWithEvents() { let bufferSize = 100 @@ -182,14 +269,14 @@ final class ObservableType_AsPublisherTests: XCTestCase { XCTAssertEqual(output, expected) } - func testAssertBridgeBufferDoesNotOverflowPropogatesErrors() { + func testAssertBridgeBufferDoesNotOverflowIfPossiblePropogatesErrors() { let expectation = XCTestExpectation(description: "Should complete with `.failure`") let subject = PublishSubject() subject .asPublisher(withBufferSize: 1, andBridgeBufferingStrategy: .error) - .assertBridgeBufferDoesNotOverflow() + .assertBridgeBufferDoesNotOverflowIfPossible { XCTFail(); return TestError.other } .receive(on: scheduler) .sink( receiveCompletion: { completion in @@ -200,7 +287,7 @@ final class ObservableType_AsPublisherTests: XCTestCase { expectation.fulfill() }, - receiveValue: { _ in } + receiveValue: { _ in XCTFail() } ) .store(in: &cancellables) @@ -210,4 +297,68 @@ final class ObservableType_AsPublisherTests: XCTestCase { wait(for: [expectation], timeout: 0.1) } + + func testAssertBridgeBufferDoesNotOverflowIfPossiblePropogatesUpstreamBridgeFailureErrors() { + + let expectation = XCTestExpectation(description: "Should complete with `.failure`") + let subject = PublishSubject() + + subject + .asPublisher(withBufferSize: 1, andBridgeBufferingStrategy: .error) + .assertBridgeBufferDoesNotOverflowIfPossible { XCTFail(); return TestError.other } + .receive(on: scheduler) + .sink( + receiveCompletion: { completion in + guard case .failure = completion else { + XCTFail("Did not complete with `.failure`") + return + } + + expectation.fulfill() + }, + receiveValue: { _ in XCTFail() } + ) + .store(in: &cancellables) + + subject.onError(BridgeFailure.upstreamError(TestError.generic)) + + scheduler.advance() + + wait(for: [expectation], timeout: 0.1) + } + + func testAssertBridgeBufferDoesNotOverflowIfPossibleIsTriggeredOnBridgeBufferOverflow() { + + let expectation = XCTestExpectation(description: "Should complete with `.failure`") + let bridgeBufferOverflowExpectation = XCTestExpectation(description: "Should trigger bridge buffer overflow") + + let subject = PublishSubject() + + subject + .asPublisher(withBufferSize: 1, andBridgeBufferingStrategy: .error) + .assertBridgeBufferDoesNotOverflowIfPossible { bridgeBufferOverflowExpectation.fulfill(); return BridgeFailure.bufferOverflow } + .receive(on: scheduler) + .sink( + receiveCompletion: { completion in + guard case .failure(let error) = completion else { + XCTFail("Did not complete with `.failure`") + return + } + + guard case BridgeFailure.bufferOverflow = error else { + XCTFail() + return + } + + expectation.fulfill() + }, + receiveValue: { _ in XCTFail() }) + .store(in: &cancellables) + + subject.onError(BridgeFailure.bufferOverflow) + + scheduler.advance() + + wait(for: [expectation, bridgeBufferOverflowExpectation], timeout: 0.1) + } } diff --git a/Tests/CombineRxTests/Interoperability/Publisher+AsObservableTests.swift b/Tests/CombineRxTests/Interoperability/Publisher+AsObservableTests.swift index 06ebebc..c027802 100644 --- a/Tests/CombineRxTests/Interoperability/Publisher+AsObservableTests.swift +++ b/Tests/CombineRxTests/Interoperability/Publisher+AsObservableTests.swift @@ -11,6 +11,9 @@ import RxSwift final class Publisher_AsObservableTests: XCTestCase { + /// A test scheduler that uses virtual time. This means that we don't need to waste + /// test time waiting for any `XTCExpectation`s. For example, using this test scheduler, it is + /// easy to test operators like delay/debounce etc without having to actually wait `n` seconds or milliseconds. private var scheduler: TestScheduler! override func setUp() { @@ -18,12 +21,17 @@ final class Publisher_AsObservableTests: XCTestCase { scheduler = TestScheduler(initialClock: 0) } + /// Tests that we can convert observables being emitted from a Combine `PassthroughSubject` + /// into an RxSwift `Observable` and that emitted elements are propogated correctly. func testPassthroughSubjectSendEventsAreBridged() { let disposeBag = DisposeBag() + + // Register an observer with the test scheduler let output = scheduler.createObserver(Int.self) let subject = PassthroughSubject() + // Set up a subscription to the Combine subject and use bridging method to convert to an RxSwift `Observable` scheduler.scheduleAt(100) { subject .asObservable() @@ -31,21 +39,70 @@ final class Publisher_AsObservableTests: XCTestCase { .disposed(by: disposeBag) } + // Specify some test events that will be emitted from our Combine subject at various test time intervals scheduler.scheduleAt(200) { subject.send(0) } scheduler.scheduleAt(300) { subject.send(1) } scheduler.scheduleAt(400) { subject.send(2) } scheduler.scheduleAt(500) { subject.send(3) } + // Specify our expected events that will have been recorded by our test scheduler at a given test time and + // with the expected received element let expectedEvents = [Recorded.next(200, 0), Recorded.next(300, 1), Recorded.next(400, 2), Recorded.next(500, 3)] + // Run the test scheduler and assert that the recorded events are the same as what we were expecting + scheduler.start() + + XCTAssertEqual(output.events, expectedEvents) + } + + /// Tests that publishers like `Just` with single elements are propogated into the RxSwift world. + func testPublisherSendEventsAreBridged() { + + let disposeBag = DisposeBag() + + let publisher = Just(1) + let output = scheduler.createObserver(Int.self) + + scheduler.scheduleAt(100) { + publisher + .asObservable() + .subscribe(output) + .disposed(by: disposeBag) + } + + let expectedEvents = [Recorded.next(100, 1), Recorded.completed(100)] + + scheduler.start() + + XCTAssertEqual(output.events, expectedEvents) + } + + /// Tests that publishers like `Just<[Int]>` with a sequence of elements are propogated into the RxSwift world. + func testSequencePublisherEventsAreBridged() { + + let disposeBag = DisposeBag() + + let publisher = Just<[Int]>([1, 2, 3, 4, 5]) + let output = scheduler.createObserver([Int].self) + + scheduler.scheduleAt(100) { + publisher + .asObservable() + .subscribe(output) + .disposed(by: disposeBag) + } + + let expectedEvents = [Recorded.next(100, [1, 2, 3, 4, 5]), Recorded.completed(100)] + scheduler.start() XCTAssertEqual(output.events, expectedEvents) } + /// Tests that when a Combine publisher emits a completion, this is propogated into the RxSwift world. func testCompleteEventsArePropogatedDownstream() { let disposeBag = DisposeBag() @@ -70,11 +127,12 @@ final class Publisher_AsObservableTests: XCTestCase { XCTAssertEqual(output.events, expectedEvents) } + /// Tests that when a Combine publisher emits an error, this is propogated into the RxSwift world. func testErrorsArePropogatedDownstream() { let disposeBag = DisposeBag() let output = scheduler.createObserver(Int.self) - let subject = PassthroughSubject() + let subject = PassthroughSubject() scheduler.scheduleAt(100) { subject @@ -83,7 +141,7 @@ final class Publisher_AsObservableTests: XCTestCase { .disposed(by: disposeBag) } - let error = BridgeFailure.upstreamError(TestError.generic) + let error = TestError.generic scheduler.scheduleAt(200) { subject.send(0) } scheduler.scheduleAt(300) { subject.send(completion: .failure(error)) } diff --git a/Tests/CombineRxTests/TestError.swift b/Tests/CombineRxTests/TestError.swift index aff8ae3..a7e5ce7 100644 --- a/Tests/CombineRxTests/TestError.swift +++ b/Tests/CombineRxTests/TestError.swift @@ -7,4 +7,5 @@ import Foundation enum TestError: Error { case generic + case other }