diff --git a/Sources/Operators/ConcatMap.swift b/Sources/Operators/ConcatMap.swift index c2f31ca..196036e 100644 --- a/Sources/Operators/ConcatMap.swift +++ b/Sources/Operators/ConcatMap.swift @@ -112,13 +112,12 @@ private extension Publishers.ConcatMap { override func receive(_ input: Upstream.Output) -> Subscribers.Demand { let mapped = transform(input) - lock.lock() + defer { lock.unlock() } + if activePublisher == nil { - lock.unlock() setActivePublisher(mapped) } else { - lock.unlock() bufferedPublishers.append(mapped) } @@ -132,15 +131,15 @@ private extension Publishers.ConcatMap { publisher.sink( receiveCompletion: { completion in + self.lock.lock() + defer { self.lock.unlock() } switch completion { case .finished: - self.lock.lock() guard let next = self.bufferedPublishers.first else { - self.lock.unlock() + self.activePublisher = nil return } self.bufferedPublishers.removeFirst() - self.lock.unlock() self.setActivePublisher(next) case .failure(let error): self.receive(completion: .failure(error)) diff --git a/Tests/ConcatMapTests.swift b/Tests/ConcatMapTests.swift index d7c3259..6c7bef3 100644 --- a/Tests/ConcatMapTests.swift +++ b/Tests/ConcatMapTests.swift @@ -24,9 +24,9 @@ final class ConcatMapTests: XCTestCase { cancellables = [] } - func test_publishes_values_inOrder() { + func test_publishes_values_in_order() { var receivedValues = [Int]() - let expectedValues = [1, 2, 4, 5, 6] + let expectedValues = [1, 2, 3] let firstPublisher = P() let secondPublisher = P() @@ -43,19 +43,89 @@ final class ConcatMapTests: XCTestCase { sut.send(firstPublisher) sut.send(secondPublisher) + + firstPublisher.send(1) + firstPublisher.send(completion: .finished) + + secondPublisher.send(2) sut.send(thirdPublisher) + secondPublisher.send(completion: .finished) + + thirdPublisher.send(3) + + XCTAssertEqual(expectedValues, receivedValues) + } + + func test_ignores_values_of_subsequent_while_previous_hasNot_completed() { + var receivedValues = [Int]() + let expectedValues = [1, 3] + + let firstPublisher = P() + let secondPublisher = P() + + let sut = PassthroughSubject
() + + sut.concatMap { $0 } + .sink( + receiveCompletion: { _ in }, + receiveValue: { value in receivedValues.append(value) } + ) + .store(in: &cancellables) + + sut.send(firstPublisher) + sut.send(secondPublisher) firstPublisher.send(1) - firstPublisher.send(2) - // values sent onto the second publisher will be ignored as long as the first publisher hasn't completed + secondPublisher.send(2) + firstPublisher.send(completion: .finished) + secondPublisher.send(3) + secondPublisher.send(completion: .finished) + + XCTAssertEqual(expectedValues, receivedValues) + } + + func test_publishes_values_of_subsequent_publisher_after_emptying_publisher_queue() { + var receivedValues = [Int]() + let expectedValues = [1, 2] + + let firstPublisher = P() + let secondPublisher = P() + + let sut = PassthroughSubject
()
+
+ sut.concatMap { $0 }
+ .sink(
+ receiveCompletion: { _ in },
+ receiveValue: { value in receivedValues.append(value) }
+ )
+ .store(in: &cancellables)
+
+ sut.send(firstPublisher)
+ firstPublisher.send(1)
firstPublisher.send(completion: .finished)
- secondPublisher.send(4)
- secondPublisher.send(5)
+ sut.send(secondPublisher)
+ secondPublisher.send(2)
secondPublisher.send(completion: .finished)
- thirdPublisher.send(6)
+ XCTAssertEqual(expectedValues, receivedValues)
+ }
+
+ func test_synchronous_completion() {
+ var receivedValues = [Int]()
+ let expectedValues = [1, 2]
+ let firstPublisher = Just