From 37a9b70386ff6e6361d6d198e0b66e6cac025f1d Mon Sep 17 00:00:00 2001 From: kean Date: Sun, 5 May 2024 15:09:53 -0400 Subject: [PATCH] Simplify ImageTask syncronization --- Sources/Nuke/ImageTask.swift | 250 +++++++++++----------- Sources/Nuke/Pipeline/ImagePipeline.swift | 55 +++-- 2 files changed, 161 insertions(+), 144 deletions(-) diff --git a/Sources/Nuke/ImageTask.swift b/Sources/Nuke/ImageTask.swift index dd4f39b4e..abf55a498 100644 --- a/Sources/Nuke/ImageTask.swift +++ b/Sources/Nuke/ImageTask.swift @@ -29,14 +29,14 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send /// The priority of the task. The priority can be updated dynamically even /// for a task that is already running. public var priority: ImageRequest.Priority { - get { withState { $0.priority } } + get { withLock { $0.priority } } set { setPriority(newValue) } } /// Returns the current download progress. Returns zeros before the download /// is started and the expected size of the resource is known. public var currentProgress: Progress { - withState { $0.progress } + withLock { $0.progress } } /// The download progress. @@ -60,7 +60,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send /// The current state of the task. public var state: State { - withState { $0.taskState } + withLock { $0.state } } /// The state of the image task. @@ -86,7 +86,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send public var response: ImageResponse { get async throws { try await withTaskCancellationHandler { - try await task.value + try await _task.value } onCancel: { cancel() } @@ -132,13 +132,18 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send case finished(Result) } - let isDataTask: Bool - - private weak var pipeline: ImagePipeline? - private var task: Task! - private var mutableState: MutableState + private var publicState: PublicState + private let isDataTask: Bool private let onEvent: ((Event, ImageTask) -> Void)? private let lock: os_unfair_lock_t + private let queue: DispatchQueue + private weak var pipeline: ImagePipeline? + + // State synchronized on `pipeline.queue`. + var _task: Task! + var _continuation: UnsafeContinuation? + var _state: State = .running + private var _events: PassthroughSubject? deinit { lock.deinitialize(count: 1) @@ -148,25 +153,14 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send init(taskId: Int64, request: ImageRequest, isDataTask: Bool, pipeline: ImagePipeline, onEvent: ((Event, ImageTask) -> Void)?) { self.taskId = taskId self.request = request - self.mutableState = MutableState(priority: request.priority) + self.publicState = PublicState(priority: request.priority) self.isDataTask = isDataTask self.pipeline = pipeline + self.queue = pipeline.queue self.onEvent = onEvent lock = .allocate(capacity: 1) lock.initialize(to: os_unfair_lock()) - - // Important to call it before `imageTaskStartCalled` - if !isDataTask { - pipeline.delegate.imageTaskCreated(self, pipeline: pipeline) - } - - task = Task { - try await withUnsafeThrowingContinuation { continuation in - self.withState { $0.continuation = continuation } - pipeline.imageTaskStartCalled(self) - } - } } /// Marks task as being cancelled. @@ -174,154 +168,168 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send /// The pipeline will immediately cancel any work associated with a task /// unless there is an equivalent outstanding task running. public func cancel() { - guard setState(.cancelled) else { return } + let didChange: Bool = withLock { + guard $0.state == .running else { return false } + $0.state = .cancelled + return true + } + guard didChange else { return } // Make sure it gets called once (expensive) pipeline?.imageTaskCancelCalled(self) } - // MARK: Hashable - - public func hash(into hasher: inout Hasher) { - hasher.combine(ObjectIdentifier(self).hashValue) - } - - public static func == (lhs: ImageTask, rhs: ImageTask) -> Bool { - ObjectIdentifier(lhs) == ObjectIdentifier(rhs) - } - - // MARK: CustomStringConvertible - - public var description: String { - "ImageTask(id: \(taskId), priority: \(priority), progress: \(currentProgress.completed) / \(currentProgress.total), state: \(state))" - } - - // MARK: Internals - private func setPriority(_ newValue: ImageRequest.Priority) { - let didChange: Bool = withState { + let didChange: Bool = withLock { guard $0.priority != newValue else { return false } $0.priority = newValue - return $0.taskState == .running + return $0.state == .running } guard didChange else { return } pipeline?.imageTaskUpdatePriorityCalled(self, priority: newValue) } - private func setState(_ state: ImageTask.State) -> Bool { - withState { - guard $0.taskState == .running else { return false } - $0.taskState = state - return true - } + // MARK: Internals + + /// Gets called when the task is cancelled either by the user or by an + /// external event such as session invalidation. + /// + /// synchronized on `pipeline.queue`. + func _cancel() { + guard _setState(.cancelled) else { return } + _dispatch(.cancelled) } - func process(_ event: Event) { - let state: MutableState? = withState { - switch event { - case .progress(let progress): - $0.progress = progress - case .finished: - guard $0.taskState == .running else { return nil } - $0.taskState = .completed - default: - break + /// Gets called when the associated task sends a new event. + /// + /// synchronized on `pipeline.queue`. + func _process(_ event: AsyncTask.Event) { + switch event { + case let .value(response, isCompleted): + if isCompleted { + _finish(.success(response)) + } else { + _dispatch(.preview(response)) } - return $0 + case let .progress(value): + withLock { $0.progress = value } + _dispatch(.progress(value)) + case let .error(error): + _finish(.failure(error)) } - guard let state else { return } + } - process(event, in: state) - onEvent?(event, self) - pipeline?.imageTask(self, didProcessEvent: event) + /// Synchronized on `pipeline.queue`. + private func _finish(_ result: Result) { + guard _setState(.completed) else { return } + _dispatch(.finished(result)) + } + + /// Synchronized on `pipeline.queue`. + private func _setState(_ state: State) -> Bool { + guard _state == .running else { return false } + _state = state + withLock { $0.state = state } + return true } - private func process(_ event: Event, in state: MutableState) { - state.events?.send(event) + /// Dispatches the given event to the observers. + /// + /// - warning: The task needs to be fully wired (`_continuation` present) + /// before it can start sending the events. + /// + /// synchronized on `pipeline.queue`. + func _dispatch(_ event: Event) { + guard _continuation != nil else { + return // Task isn't fully wired yet + } + _events?.send(event) switch event { case .cancelled: - state.events?.send(completion: .finished) - state.continuation?.resume(throwing: CancellationError()) + _events?.send(completion: .finished) + _continuation?.resume(throwing: CancellationError()) case .finished(let result): let result = result.mapError { $0 as Error } - state.events?.send(completion: .finished) - state.continuation?.resume(with: result) + _events?.send(completion: .finished) + _continuation?.resume(with: result) default: break } + + onEvent?(event, self) + pipeline?.imageTask(self, didProcessEvent: event, isDataTask: isDataTask) } -} -@available(*, deprecated, renamed: "ImageTask", message: "Async/Await support was added directly to the existing `ImageTask` type") -public typealias AsyncImageTask = ImageTask + // MARK: Hashable -extension ImageTask.Event { - init(_ event: AsyncTask.Event) { - switch event { - case let .value(response, isCompleted): - if isCompleted { - self = .finished(.success(response)) - } else { - self = .preview(response) - } - case let .progress(value): - self = .progress(value) - case let .error(error): - self = .finished(.failure(error)) - } + public func hash(into hasher: inout Hasher) { + hasher.combine(ObjectIdentifier(self).hashValue) + } + + public static func == (lhs: ImageTask, rhs: ImageTask) -> Bool { + ObjectIdentifier(lhs) == ObjectIdentifier(rhs) + } + + // MARK: CustomStringConvertible + + public var description: String { + "ImageTask(id: \(taskId), priority: \(priority), progress: \(currentProgress.completed) / \(currentProgress.total), state: \(state))" } } +@available(*, deprecated, renamed: "ImageTask", message: "Async/Await support was added directly to the existing `ImageTask` type") +public typealias AsyncImageTask = ImageTask + // MARK: - ImageTask (Private) extension ImageTask { private func makeStream(of closure: @escaping (Event) -> T?) -> AsyncStream { AsyncStream { continuation in - let events: PassthroughSubject? = withState { - guard $0.taskState == .running else { return nil } - return $0.makeEvents() - } - guard let events else { - return continuation.finish() - } - let cancellable = events.sink { _ in - continuation.finish() - } receiveValue: { event in - if let value = closure(event) { - continuation.yield(value) + self.queue.async { + guard let events = self._makeEventsSubject() else { + return continuation.finish() } - switch event { - case .cancelled, .finished: + let cancellable = events.sink { _ in continuation.finish() - default: - break + } receiveValue: { event in + if let value = closure(event) { + continuation.yield(value) + } + switch event { + case .cancelled, .finished: + continuation.finish() + default: + break + } + } + continuation.onTermination = { _ in + cancellable.cancel() } - } - continuation.onTermination = { _ in - cancellable.cancel() } } } - private func withState(_ closure: (inout MutableState) -> T) -> T { + // Synchronized on `pipeline.queue` + private func _makeEventsSubject() -> PassthroughSubject? { + guard _state == .running else { + return nil + } + if _events == nil { + _events = PassthroughSubject() + } + return _events! + } + + private func withLock(_ closure: (inout PublicState) -> T) -> T { os_unfair_lock_lock(lock) defer { os_unfair_lock_unlock(lock) } - return closure(&mutableState) + return closure(&publicState) } - /// Contains all the mutable task state. + /// Contains the state synchronized using the internal lock. /// - /// - warning: Must be accessed using `withState`. - private struct MutableState { - var taskState: ImageTask.State = .running + /// - warning: Must be accessed using `withLock`. + private struct PublicState { + var state: ImageTask.State = .running var priority: ImageRequest.Priority var progress = Progress(completed: 0, total: 0) - var continuation: UnsafeContinuation? - var events: PassthroughSubject? - - mutating func makeEvents() -> PassthroughSubject { - if events == nil { - events = PassthroughSubject() - } - return events! - } } } diff --git a/Sources/Nuke/Pipeline/ImagePipeline.swift b/Sources/Nuke/Pipeline/ImagePipeline.swift index 37c1392d5..32b152502 100644 --- a/Sources/Nuke/Pipeline/ImagePipeline.swift +++ b/Sources/Nuke/Pipeline/ImagePipeline.swift @@ -111,7 +111,7 @@ public final class ImagePipeline: @unchecked Sendable { queue.async { guard !self.isInvalidated else { return } self.isInvalidated = true - self.tasks.keys.forEach(self.cancel) + self.tasks.keys.forEach(self.cancelImageTask) } } @@ -284,41 +284,50 @@ public final class ImagePipeline: @unchecked Sendable { // MARK: - ImageTask (Internal) private func makeStartedImageTask(with request: ImageRequest, isDataTask: Bool = false, onEvent: ((ImageTask.Event, ImageTask) -> Void)? = nil) -> ImageTask { - ImageTask(taskId: nextTaskId, request: request, isDataTask: isDataTask, pipeline: self, onEvent: onEvent) - } - - private func cancel(_ task: ImageTask) { - guard let subscription = tasks.removeValue(forKey: task) else { return } - task.process(.cancelled) - subscription.unsubscribe() + let task = ImageTask(taskId: nextTaskId, request: request, isDataTask: isDataTask, pipeline: self, onEvent: onEvent) + // Important to call it before `imageTaskStartCalled` + if !isDataTask { + delegate.imageTaskCreated(task, pipeline: self) + } + task._task = Task { + try await withUnsafeThrowingContinuation { continuation in + self.queue.async { + task._continuation = continuation + self.startImageTask(task, isDataTask: isDataTask) + } + } + } + return task } - private func startImageTask(_ task: ImageTask) { - guard !isInvalidated else { - return task.process(.finished(.failure(.pipelineInvalidated))) - } - guard task.state != .cancelled else { + // By this time, the task has `continuation` set and is fully wired. + private func startImageTask(_ task: ImageTask, isDataTask: Bool) { + guard task._state != .cancelled else { // The task gets started asynchronously in a `Task` and cancellation // can happen before the pipeline reached `startImageTask`. In that // case, the `cancel` method do no send the task event. - return task.process(.cancelled) + return task._dispatch(.cancelled) + } + guard !isInvalidated else { + return task._process(.error(.pipelineInvalidated)) } - let worker = task.isDataTask ? makeTaskLoadData(for: task.request) : makeTaskLoadImage(for: task.request) + let worker = isDataTask ? makeTaskLoadData(for: task.request) : makeTaskLoadImage(for: task.request) tasks[task] = worker.subscribe(priority: task.priority.taskPriority, subscriber: task) { [weak task] in - task?.process(.init($0)) + task?._process($0) } delegate.imageTaskDidStart(task, pipeline: self) onTaskStarted?(task) } + private func cancelImageTask(_ task: ImageTask) { + tasks.removeValue(forKey: task)?.unsubscribe() + task._cancel() + } + // MARK: - Image Task Events func imageTaskCancelCalled(_ task: ImageTask) { - queue.async { self.cancel(task) } - } - - func imageTaskStartCalled(_ task: ImageTask) { - queue.async { self.startImageTask(task) } + queue.async { self.cancelImageTask(task) } } func imageTaskUpdatePriorityCalled(_ task: ImageTask, priority: ImageRequest.Priority) { @@ -327,14 +336,14 @@ public final class ImagePipeline: @unchecked Sendable { } } - func imageTask(_ task: ImageTask, didProcessEvent event: ImageTask.Event) { + func imageTask(_ task: ImageTask, didProcessEvent event: ImageTask.Event, isDataTask: Bool) { switch event { case .cancelled, .finished: tasks[task] = nil default: break } - if !task.isDataTask { + if !isDataTask { delegate.imageTask(task, didReceiveEvent: event, pipeline: self) switch event { case .progress(let progress):