Skip to content

Commit

Permalink
AsPublisher Failure Type Alignment with RxSwift (#2)
Browse files Browse the repository at this point in the history
* Add additional asObservable tests

* Fix spelling

* Fix alignment

* Move and rename assertBridgeBufferDoesNotOverflowIfPossible

* Align asPublisher Error type with RxSwift `Error`

* Make assertBridgeBufferDoesNotOverflowIfPossible testable and add missing tests
  • Loading branch information
Jackstone92 authored Mar 1, 2021
1 parent acea143 commit a92daaa
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 26 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ let myBridgedPublisher1 = Observable.just(0).asPublisher(withBufferSize: 1, andB
let myBridgedPublisher2 = Observable.from([0, 1, 2, 3]).asPublisher(withBufferSize: 4, andBridgeBufferingStrategy: .error)
```

One difference between RxSwift and Combine is that Combine adheres to the mechanism of back pressure in order to ensure that `Publisher`s only produce as many elements that `Subscriber`s have requested. This prevents the case where elements might build up in a `Publisher`s buffer faster than they can be processed downstream by a subscriber as this could lead to out-of-memory errors and degradation in performance due to high system resource consumption. Combine applies this back pressure upstream through a contractual obligation by `Publisher`s to only emit an element when it is requested by `Subscriber`s through `Subscribers.Demand` requests.
One difference between RxSwift and Combine is that Combine adheres to the mechanism of backpressure in order to ensure that `Publisher`s only produce as many elements that `Subscriber`s have requested. This prevents the case where elements might build up in a `Publisher`s buffer faster than they can be processed downstream by a subscriber as this could lead to out-of-memory errors and degradation in performance due to high system resource consumption. Combine applies this backpressure upstream through a contractual obligation by `Publisher`s to only emit an element when it is requested by `Subscriber`s through `Subscribers.Demand` requests.

RxSwift `Observable`s differ in this regard as they rely on a source with an unbounded rate of production and therefore when bridging to a Combine `Publisher`, we must maintain a buffer or drop elements accordingly in order to satisfy the requirements of downstream subscribers.

This is the reason for the required `withBufferSize` and `andBridgeBufferingStrategy` parameters for `asPublisher(withBufferSize:andBridgeBufferingStrategy:)`. `withBufferSize` is where the buffer size should manually be set (ideally based directly on the number of expected elements in the sequence). `andBridgeBufferingStrategy` is the strategy to employ when the maximum buffer capacity is reached. Keeping in line with native Combine strategies, this can either be `error`, where any buffer overflow is treated as an error, `dropNewest` where the elements already present in the buffer are maintained and any new elements are ignored, or finally `dropOldest` where new elements are added to the buffer and replace older elements that were already present.

Additional information on Combine's use of back pressure can be found [here](https://developer.apple.com/documentation/combine/processing-published-elements-with-subscribers).
Additional information on Combine's use of backpressure can be found [here](https://developer.apple.com/documentation/combine/processing-published-elements-with-subscribers).

## Installation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public struct BridgePublisher<U: ObservableType>: Publisher {
self.upstream = upstream
}

public func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
public func receive<S>(subscriber: S) where S: Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
subscriber.receive(subscription: BridgeSubscription(upstream: upstream, downstream: subscriber))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,22 @@ extension ObservableType {
/// In order to mitigate out-of-memory errors, it is recommended to provide a conservative value for `size` that matches
/// the expected output of the upstream `Observable` and consumption of the downstream `Subscriber`.
///
/// In order to align with RxSwift error types, the `Failure` type of the returned publisher is `Error` whereby the concrete upstream error type is preserved.
/// If it is desired to transform the publisher error type further downstream, it will be required to do this manually using the `mapError` accordingly.
///
/// - Parameter size: The size of the buffer.
/// - Parameter whenFull: The buffering strategy to use. This determines how to handle the case when maximum buffer capacity is reached.
///
/// - Returns: A Combine `Publisher` that is the bridged transformation of the given `ObservableType`.
public func asPublisher(withBufferSize size: Int,
andBridgeBufferingStrategy whenFull: BridgeBufferingStrategy) -> Publishers.Buffer<BridgePublisher<Self>> {
andBridgeBufferingStrategy whenFull: BridgeBufferingStrategy) -> Publishers.MapError<Publishers.Buffer<BridgePublisher<Self>>, Error> {
return BridgePublisher(upstream: self)
.buffer(size: size, prefetch: .byRequest, whenFull: whenFull.strategy)
}
}

extension Publisher where Failure == BridgeFailure {

/// Raises a fatal error when an upstream `BridgePublisher`'s buffer overflows.
/// In the event this does not occur, the failure type is mapped to the `Error` type of the upstream `Observable`.
///
/// This function can be used at any point following the `asCombineBridge` function if you want to ensure a buffer overflow never occurs.
///
/// - Returns: A publisher that maps any upstream error or fatal errors in the event of a `bufferOverflow`.
public func assertBridgeBufferDoesNotOverflow() -> Publishers.MapError<Self, Error> {
return mapError { error -> Error in
switch error {
case .bufferOverflow: preconditionFailure("Bridge buffer overflowed.")
case .upstreamError(let upstreamError): return upstreamError
.mapError { error -> Error in
switch error {
case BridgeFailure.upstreamError(let upstreamError): return upstreamError
default: return error
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// Publisher+AssertBridgeBufferDoesNotOverflowIfPossible.swift
// Copyright © 2021 Notonthehighstreet Enterprises Limited. All rights reserved.
//

import Foundation
import Combine

extension Publisher {

/// Convenience method whose default implementation causes a `preconditionFailure` when an upstream `BridgePublisher`'s buffer overflows.
/// In the event this does not occur, the failure type is mapped to the `Error` type of the upstream `Observable`.
///
/// This function can be used at any point following the `asPublisher` function if you want to ensure a buffer overflow never occurs.
///
/// - Returns: A publisher that maps any upstream error or fatal errors in the event of a `bufferOverflow`.
///
public func assertBridgeBufferDoesNotOverflowIfPossible(onBufferOverflow: @escaping () -> Error = { preconditionFailure("Bridge buffer overflowed.") }) -> Publishers.MapError<Self, Error> {
return mapError { error -> Error in
guard let bridgeFailure = error as? BridgeFailure else { return error }

switch bridgeFailure {
case .bufferOverflow: return onBufferOverflow()
case .upstreamError(let upstreamError): return upstreamError
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,93 @@ final class ObservableType_AsPublisherTests: XCTestCase {
wait(for: [expectation], timeout: 0.1)
}

func testErrorTypeCanBeChainedDownstreamIfAlsoErrorType() {

let expectation = XCTestExpectation(description: "Should complete with `.failure`")
let subject = PublishSubject<Int>()

subject
.asPublisher(withBufferSize: 1, andBridgeBufferingStrategy: .error)
.receive(on: scheduler)
.flatMap { value -> AnyPublisher<Int, Error> in
Just(value * 2)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
.sink(
receiveCompletion: { completion in
guard case .failure(let error) = completion else {
XCTFail("Did not complete with `.failure`")
return
}

guard case TestError.generic = error else {
XCTFail()
return
}

expectation.fulfill()
},
receiveValue: { _ in XCTFail("Should not receive any values") }
)
.store(in: &cancellables)

subject.onError(TestError.generic)

scheduler.advance()

wait(for: [expectation], timeout: 0.1)
}

func testErrorTypeCanBeMappedFurtherDownstream() {

let expectation = XCTestExpectation(description: "Should complete with `.failure`")
let subject = PublishSubject<Int>()

struct DatedError: Error {
let error: Error
let date: Date

init(_ error: Error) {
self.error = error
date = Date()
}
}

subject
.asPublisher(withBufferSize: 1, andBridgeBufferingStrategy: .error)
.receive(on: scheduler)
.mapError { DatedError($0) } // Map error required to map from generic `Error` type to desired `DatedError` type
.flatMap { value -> AnyPublisher<Int, DatedError> in // Dummy flatMap to indicate a possible transform that might be required
Just(value * 2)
.setFailureType(to: DatedError.self)
.eraseToAnyPublisher()
}
.sink(
receiveCompletion: { completion in
guard case .failure(let datedError) = completion else {
XCTFail("Did not complete with `.failure`")
return
}

guard case TestError.generic = datedError.error else {
XCTFail()
return
}

expectation.fulfill()
},
receiveValue: { _ in XCTFail("Should not receive any values") }
)
.store(in: &cancellables)

subject.onError(TestError.generic)

scheduler.advance()

wait(for: [expectation], timeout: 0.1)
}

func testCanFillBufferWithEvents() {

let bufferSize = 100
Expand Down Expand Up @@ -182,14 +269,14 @@ final class ObservableType_AsPublisherTests: XCTestCase {
XCTAssertEqual(output, expected)
}

func testAssertBridgeBufferDoesNotOverflowPropogatesErrors() {
func testAssertBridgeBufferDoesNotOverflowIfPossiblePropogatesErrors() {

let expectation = XCTestExpectation(description: "Should complete with `.failure`")
let subject = PublishSubject<Int>()

subject
.asPublisher(withBufferSize: 1, andBridgeBufferingStrategy: .error)
.assertBridgeBufferDoesNotOverflow()
.assertBridgeBufferDoesNotOverflowIfPossible { XCTFail(); return TestError.other }
.receive(on: scheduler)
.sink(
receiveCompletion: { completion in
Expand All @@ -200,7 +287,7 @@ final class ObservableType_AsPublisherTests: XCTestCase {

expectation.fulfill()
},
receiveValue: { _ in }
receiveValue: { _ in XCTFail() }
)
.store(in: &cancellables)

Expand All @@ -210,4 +297,68 @@ final class ObservableType_AsPublisherTests: XCTestCase {

wait(for: [expectation], timeout: 0.1)
}

func testAssertBridgeBufferDoesNotOverflowIfPossiblePropogatesUpstreamBridgeFailureErrors() {

let expectation = XCTestExpectation(description: "Should complete with `.failure`")
let subject = PublishSubject<Int>()

subject
.asPublisher(withBufferSize: 1, andBridgeBufferingStrategy: .error)
.assertBridgeBufferDoesNotOverflowIfPossible { XCTFail(); return TestError.other }
.receive(on: scheduler)
.sink(
receiveCompletion: { completion in
guard case .failure = completion else {
XCTFail("Did not complete with `.failure`")
return
}

expectation.fulfill()
},
receiveValue: { _ in XCTFail() }
)
.store(in: &cancellables)

subject.onError(BridgeFailure.upstreamError(TestError.generic))

scheduler.advance()

wait(for: [expectation], timeout: 0.1)
}

func testAssertBridgeBufferDoesNotOverflowIfPossibleIsTriggeredOnBridgeBufferOverflow() {

let expectation = XCTestExpectation(description: "Should complete with `.failure`")
let bridgeBufferOverflowExpectation = XCTestExpectation(description: "Should trigger bridge buffer overflow")

let subject = PublishSubject<Int>()

subject
.asPublisher(withBufferSize: 1, andBridgeBufferingStrategy: .error)
.assertBridgeBufferDoesNotOverflowIfPossible { bridgeBufferOverflowExpectation.fulfill(); return BridgeFailure.bufferOverflow }
.receive(on: scheduler)
.sink(
receiveCompletion: { completion in
guard case .failure(let error) = completion else {
XCTFail("Did not complete with `.failure`")
return
}

guard case BridgeFailure.bufferOverflow = error else {
XCTFail()
return
}

expectation.fulfill()
},
receiveValue: { _ in XCTFail() })
.store(in: &cancellables)

subject.onError(BridgeFailure.bufferOverflow)

scheduler.advance()

wait(for: [expectation, bridgeBufferOverflowExpectation], timeout: 0.1)
}
}
Loading

0 comments on commit a92daaa

Please sign in to comment.