Skip to content

Commit

Permalink
Add ImageTask/MutableState
Browse files Browse the repository at this point in the history
  • Loading branch information
kean committed May 1, 2024
1 parent e140a26 commit 5ba6166
Showing 1 changed file with 56 additions and 54 deletions.
110 changes: 56 additions & 54 deletions Sources/Nuke/ImageTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,21 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
/// Unique only within that pipeline.
public let taskId: Int64

/// The original request.
/// The original request that the task was created with.
public let request: ImageRequest

/// Updates the priority of the task. The priority can be updated dynamically
/// even if that task is already running.
/// The priority of the task. The priority can be updated dynamically even
/// for a task that is already running.
public var priority: ImageRequest.Priority {
get { sync { _priority } }
get { withState { $0.priority } }
set { setPriority(newValue) }
}
private var _priority: ImageRequest.Priority

/// Returns the current download progress. Returns zeros before the download
/// is started and the expected size of the resource is known.
public internal(set) var currentProgress: Progress {
get { sync { _currentProgress } }
set { sync { _currentProgress = newValue } }
public var currentProgress: Progress {
withState { $0.progress }
}
private var _currentProgress = Progress(completed: 0, total: 0)

/// The download progress.
public struct Progress: Hashable, Sendable {
Expand All @@ -57,14 +54,14 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send

/// Initializes progress with the given status.
public init(completed: Int64, total: Int64) {
self.completed = completed
self.total = total
(self.completed, self.total) = (completed, total)
}
}

/// The current state of the task.
public var state: State { sync { _state } }
private var _state: State = .running
public var state: State {
withState { $0.taskState }
}

/// The state of the image task.
public enum State {
Expand All @@ -85,7 +82,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
}
}

/// The image response.
/// Returns the image response.
public var response: ImageResponse {
get async throws {
try await withTaskCancellationHandler {
Expand Down Expand Up @@ -119,7 +116,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send

/// The events sent by the pipeline during the task execution.
public var events: AnyPublisher<Event, Never> {
sync { _events.eraseToAnyPublisher() }
withState { $0.makeEvents().eraseToAnyPublisher() }
}

/// An event produced during the runetime of the task.
Expand All @@ -141,7 +138,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
weak var pipeline: ImagePipeline?

private var task: Task<ImageResponse, Error>!
private var context = AsyncExecutionContext()
private var mutableState: MutableState
private let onEvent: ((Event, ImageTask) -> Void)?
private let lock: os_unfair_lock_t

Expand All @@ -153,7 +150,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
init(taskId: Int64, request: ImageRequest, isDataTask: Bool, pipeline: ImagePipeline, onEvent: ((Event, ImageTask) -> Void)?) {
self.taskId = taskId
self.request = request
self._priority = request.priority
self.mutableState = MutableState(priority: request.priority)
self.isDataTask = isDataTask
self.pipeline = pipeline
self.onEvent = onEvent
Expand All @@ -163,7 +160,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send

task = Task {
try await withUnsafeThrowingContinuation { continuation in
self.sync { self.context.continuation = continuation }
self.withState { $0.continuation = continuation }
pipeline.imageTaskStartCalled(self)
}
}
Expand Down Expand Up @@ -191,64 +188,57 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
// MARK: CustomStringConvertible

public var description: String {
"ImageTask(id: \(taskId), priority: \(_priority), progress: \(currentProgress.completed) / \(currentProgress.total), state: \(state))"
"ImageTask(id: \(taskId), priority: \(priority), progress: \(currentProgress.completed) / \(currentProgress.total), state: \(state))"
}

// MARK: Internals

private func setPriority(_ newValue: ImageRequest.Priority) {
let didChange: Bool = sync {
guard _priority != newValue else { return false }
_priority = newValue
return _state == .running
let didChange: Bool = withState {
guard $0.priority != newValue else { return false }
$0.priority = newValue
return $0.taskState == .running
}
guard didChange else { return }
pipeline?.imageTaskUpdatePriorityCalled(self, priority: newValue)
}

private func setState(_ state: ImageTask.State) -> Bool {
os_unfair_lock_lock(lock)
defer { os_unfair_lock_unlock(lock) }

guard _state == .running else { return false }
_state = state
return true
withState {
guard $0.taskState == .running else { return false }
$0.taskState = state
return true
}
}

func process(_ event: Event) {
switch event {
case .progress(let progress):
currentProgress = progress
withState { $0.progress = progress }
case .finished:
guard setState(.completed) else { return }
default:
break
}
process(event, in: sync { context })
process(event, in: withState { $0 })
onEvent?(event, self)
pipeline?.imageTask(self, didProcessEvent: event)
}

private func process(_ event: Event, in context: AsyncExecutionContext) {
context.events?.send(event)
private func process(_ event: Event, in state: MutableState) {
state.events?.send(event)
switch event {
case .cancelled:
context.events?.send(completion: .finished)
context.continuation?.resume(throwing: CancellationError())
state.events?.send(completion: .finished)
state.continuation?.resume(throwing: CancellationError())
case .finished(let result):
let result = result.mapError { $0 as Error }
context.events?.send(completion: .finished)
context.continuation?.resume(with: result)
state.events?.send(completion: .finished)
state.continuation?.resume(with: result)
default:
break
}
}

private func sync<T>(_ closure: () -> T) -> T {
os_unfair_lock_lock(lock)
defer { os_unfair_lock_unlock(lock) }
return closure()
}
}

@available(*, deprecated, renamed: "ImageTask", message: "Async/Await support was added directly to the existing `ImageTask` type")
Expand All @@ -271,7 +261,7 @@ extension ImageTask.Event {
}
}

// MARK: - ImageTask (Async)
// MARK: - ImageTask (Private)

extension ImageTask {
private func makeStream<T>(of closure: @escaping (Event) -> T?) -> AsyncStream<T> {
Expand Down Expand Up @@ -299,21 +289,33 @@ extension ImageTask {
}

private var _eventIfNotCompleted: PassthroughSubject<Event, Never>? {
os_unfair_lock_lock(lock)
defer { os_unfair_lock_unlock(lock) }
guard _state == .running else { return nil }
return _events
withState {
guard $0.taskState == .running else { return nil }
return $0.makeEvents()
}
}

private var _events: PassthroughSubject<Event, Never> {
if context.events == nil {
context.events = PassthroughSubject()
}
return context.events!
private func withState<T>(_ closure: (inout MutableState) -> T) -> T {
os_unfair_lock_lock(lock)
defer { os_unfair_lock_unlock(lock) }
return closure(&mutableState)
}

private struct AsyncExecutionContext {
/// Contains all the mutable task state.
///
/// - warning: Must be accessed using `withState`.
private struct MutableState {
var taskState: ImageTask.State = .running
var priority: ImageRequest.Priority
var progress = Progress(completed: 0, total: 0)
var continuation: UnsafeContinuation<ImageResponse, Error>?
var events: PassthroughSubject<Event, Never>?

mutating func makeEvents() -> PassthroughSubject<Event, Never> {
if events == nil {
events = PassthroughSubject()
}
return events!
}
}
}

0 comments on commit 5ba6166

Please sign in to comment.