Skip to content

Commit

Permalink
Simplify ImageTask syncronization
Browse files Browse the repository at this point in the history
  • Loading branch information
kean committed May 6, 2024
1 parent 5bfe99c commit 37a9b70
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 144 deletions.
250 changes: 129 additions & 121 deletions Sources/Nuke/ImageTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
/// The priority of the task. The priority can be updated dynamically even
/// for a task that is already running.
public var priority: ImageRequest.Priority {
get { withState { $0.priority } }
get { withLock { $0.priority } }
set { setPriority(newValue) }
}

/// Returns the current download progress. Returns zeros before the download
/// is started and the expected size of the resource is known.
public var currentProgress: Progress {
withState { $0.progress }
withLock { $0.progress }
}

/// The download progress.
Expand All @@ -60,7 +60,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send

/// The current state of the task.
public var state: State {
withState { $0.taskState }
withLock { $0.state }
}

/// The state of the image task.
Expand All @@ -86,7 +86,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
public var response: ImageResponse {
get async throws {
try await withTaskCancellationHandler {
try await task.value
try await _task.value
} onCancel: {
cancel()
}
Expand Down Expand Up @@ -132,13 +132,18 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
case finished(Result<ImageResponse, ImagePipeline.Error>)
}

let isDataTask: Bool

private weak var pipeline: ImagePipeline?
private var task: Task<ImageResponse, Error>!
private var mutableState: MutableState
private var publicState: PublicState
private let isDataTask: Bool
private let onEvent: ((Event, ImageTask) -> Void)?
private let lock: os_unfair_lock_t
private let queue: DispatchQueue
private weak var pipeline: ImagePipeline?

// State synchronized on `pipeline.queue`.
var _task: Task<ImageResponse, Error>!
var _continuation: UnsafeContinuation<ImageResponse, Error>?
var _state: State = .running
private var _events: PassthroughSubject<Event, Never>?

deinit {
lock.deinitialize(count: 1)
Expand All @@ -148,180 +153,183 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
init(taskId: Int64, request: ImageRequest, isDataTask: Bool, pipeline: ImagePipeline, onEvent: ((Event, ImageTask) -> Void)?) {
self.taskId = taskId
self.request = request
self.mutableState = MutableState(priority: request.priority)
self.publicState = PublicState(priority: request.priority)
self.isDataTask = isDataTask
self.pipeline = pipeline
self.queue = pipeline.queue
self.onEvent = onEvent

lock = .allocate(capacity: 1)
lock.initialize(to: os_unfair_lock())

// Important to call it before `imageTaskStartCalled`
if !isDataTask {
pipeline.delegate.imageTaskCreated(self, pipeline: pipeline)
}

task = Task {
try await withUnsafeThrowingContinuation { continuation in
self.withState { $0.continuation = continuation }
pipeline.imageTaskStartCalled(self)
}
}
}

/// Marks task as being cancelled.
///
/// The pipeline will immediately cancel any work associated with a task
/// unless there is an equivalent outstanding task running.
public func cancel() {
guard setState(.cancelled) else { return }
let didChange: Bool = withLock {
guard $0.state == .running else { return false }
$0.state = .cancelled
return true
}
guard didChange else { return } // Make sure it gets called once (expensive)
pipeline?.imageTaskCancelCalled(self)
}

// MARK: Hashable

public func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self).hashValue)
}

public static func == (lhs: ImageTask, rhs: ImageTask) -> Bool {
ObjectIdentifier(lhs) == ObjectIdentifier(rhs)
}

// MARK: CustomStringConvertible

public var description: String {
"ImageTask(id: \(taskId), priority: \(priority), progress: \(currentProgress.completed) / \(currentProgress.total), state: \(state))"
}

// MARK: Internals

private func setPriority(_ newValue: ImageRequest.Priority) {
let didChange: Bool = withState {
let didChange: Bool = withLock {
guard $0.priority != newValue else { return false }
$0.priority = newValue
return $0.taskState == .running
return $0.state == .running
}
guard didChange else { return }
pipeline?.imageTaskUpdatePriorityCalled(self, priority: newValue)
}

private func setState(_ state: ImageTask.State) -> Bool {
withState {
guard $0.taskState == .running else { return false }
$0.taskState = state
return true
}
// MARK: Internals

/// Gets called when the task is cancelled either by the user or by an
/// external event such as session invalidation.
///
/// synchronized on `pipeline.queue`.
func _cancel() {
guard _setState(.cancelled) else { return }
_dispatch(.cancelled)
}

func process(_ event: Event) {
let state: MutableState? = withState {
switch event {
case .progress(let progress):
$0.progress = progress
case .finished:
guard $0.taskState == .running else { return nil }
$0.taskState = .completed
default:
break
/// Gets called when the associated task sends a new event.
///
/// synchronized on `pipeline.queue`.
func _process(_ event: AsyncTask<ImageResponse, ImagePipeline.Error>.Event) {
switch event {
case let .value(response, isCompleted):
if isCompleted {
_finish(.success(response))
} else {
_dispatch(.preview(response))
}
return $0
case let .progress(value):
withLock { $0.progress = value }
_dispatch(.progress(value))
case let .error(error):
_finish(.failure(error))
}
guard let state else { return }
}

process(event, in: state)
onEvent?(event, self)
pipeline?.imageTask(self, didProcessEvent: event)
/// Synchronized on `pipeline.queue`.
private func _finish(_ result: Result<ImageResponse, ImagePipeline.Error>) {
guard _setState(.completed) else { return }
_dispatch(.finished(result))
}

/// Synchronized on `pipeline.queue`.
private func _setState(_ state: State) -> Bool {
guard _state == .running else { return false }
_state = state
withLock { $0.state = state }
return true
}

private func process(_ event: Event, in state: MutableState) {
state.events?.send(event)
/// Dispatches the given event to the observers.
///
/// - warning: The task needs to be fully wired (`_continuation` present)
/// before it can start sending the events.
///
/// synchronized on `pipeline.queue`.
func _dispatch(_ event: Event) {
guard _continuation != nil else {
return // Task isn't fully wired yet
}
_events?.send(event)
switch event {
case .cancelled:
state.events?.send(completion: .finished)
state.continuation?.resume(throwing: CancellationError())
_events?.send(completion: .finished)
_continuation?.resume(throwing: CancellationError())
case .finished(let result):
let result = result.mapError { $0 as Error }
state.events?.send(completion: .finished)
state.continuation?.resume(with: result)
_events?.send(completion: .finished)
_continuation?.resume(with: result)
default:
break
}

onEvent?(event, self)
pipeline?.imageTask(self, didProcessEvent: event, isDataTask: isDataTask)
}
}

@available(*, deprecated, renamed: "ImageTask", message: "Async/Await support was added directly to the existing `ImageTask` type")
public typealias AsyncImageTask = ImageTask
// MARK: Hashable

extension ImageTask.Event {
init(_ event: AsyncTask<ImageResponse, ImagePipeline.Error>.Event) {
switch event {
case let .value(response, isCompleted):
if isCompleted {
self = .finished(.success(response))
} else {
self = .preview(response)
}
case let .progress(value):
self = .progress(value)
case let .error(error):
self = .finished(.failure(error))
}
public func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self).hashValue)
}

public static func == (lhs: ImageTask, rhs: ImageTask) -> Bool {
ObjectIdentifier(lhs) == ObjectIdentifier(rhs)
}

// MARK: CustomStringConvertible

public var description: String {
"ImageTask(id: \(taskId), priority: \(priority), progress: \(currentProgress.completed) / \(currentProgress.total), state: \(state))"
}
}

@available(*, deprecated, renamed: "ImageTask", message: "Async/Await support was added directly to the existing `ImageTask` type")
public typealias AsyncImageTask = ImageTask

// MARK: - ImageTask (Private)

extension ImageTask {
private func makeStream<T>(of closure: @escaping (Event) -> T?) -> AsyncStream<T> {
AsyncStream { continuation in
let events: PassthroughSubject<Event, Never>? = withState {
guard $0.taskState == .running else { return nil }
return $0.makeEvents()
}
guard let events else {
return continuation.finish()
}
let cancellable = events.sink { _ in
continuation.finish()
} receiveValue: { event in
if let value = closure(event) {
continuation.yield(value)
self.queue.async {
guard let events = self._makeEventsSubject() else {
return continuation.finish()
}
switch event {
case .cancelled, .finished:
let cancellable = events.sink { _ in
continuation.finish()
default:
break
} receiveValue: { event in
if let value = closure(event) {
continuation.yield(value)
}
switch event {
case .cancelled, .finished:
continuation.finish()
default:
break
}
}
continuation.onTermination = { _ in
cancellable.cancel()
}
}
continuation.onTermination = { _ in
cancellable.cancel()
}
}
}

private func withState<T>(_ closure: (inout MutableState) -> T) -> T {
// Synchronized on `pipeline.queue`
private func _makeEventsSubject() -> PassthroughSubject<Event, Never>? {
guard _state == .running else {
return nil
}
if _events == nil {
_events = PassthroughSubject()
}
return _events!
}

private func withLock<T>(_ closure: (inout PublicState) -> T) -> T {
os_unfair_lock_lock(lock)
defer { os_unfair_lock_unlock(lock) }
return closure(&mutableState)
return closure(&publicState)
}

/// Contains all the mutable task state.
/// Contains the state synchronized using the internal lock.
///
/// - warning: Must be accessed using `withState`.
private struct MutableState {
var taskState: ImageTask.State = .running
/// - warning: Must be accessed using `withLock`.
private struct PublicState {
var state: ImageTask.State = .running
var priority: ImageRequest.Priority
var progress = Progress(completed: 0, total: 0)
var continuation: UnsafeContinuation<ImageResponse, Error>?
var events: PassthroughSubject<Event, Never>?

mutating func makeEvents() -> PassthroughSubject<Event, Never> {
if events == nil {
events = PassthroughSubject()
}
return events!
}
}
}
Loading

0 comments on commit 37a9b70

Please sign in to comment.