From 8586041ad089d87e21519d6762bb5f44705b079e Mon Sep 17 00:00:00 2001 From: coyotte508 <coyotte508@gmail.com> Date: Tue, 7 Nov 2023 12:04:30 +0100 Subject: [PATCH 1/2] =?UTF-8?q?=E2=9C=A8=20Add=20abortSignal=20to=20commit?= =?UTF-8?q?=20&=20uploadFiles?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/hub/src/lib/commit.ts | 23 ++++++++++++++++++- packages/hub/src/lib/upload-file.ts | 2 ++ .../hub/src/lib/upload-files-with-progress.ts | 15 ++++++++++++ packages/hub/src/lib/upload-files.ts | 2 ++ packages/hub/src/utils/sha256-node.ts | 9 +++++++- packages/hub/src/utils/sha256.ts | 13 +++++++++-- 6 files changed, 60 insertions(+), 4 deletions(-) diff --git a/packages/hub/src/lib/commit.ts b/packages/hub/src/lib/commit.ts index b9a13e8d7..0123d88cd 100644 --- a/packages/hub/src/lib/commit.ts +++ b/packages/hub/src/lib/commit.ts @@ -80,6 +80,7 @@ export interface CommitParams { * Custom fetch function to use instead of the default one, for example to use a proxy or edit headers. */ fetch?: typeof fetch; + abortSignal?: AbortSignal; } export interface CommitOutput { @@ -142,6 +143,8 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr const lazyBlob = await createBlob(operation.content, { fetch: params.fetch }); + params.abortSignal?.throwIfAborted(); + return { ...operation, content: lazyBlob, @@ -163,6 +166,8 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr ), }; + params.abortSignal?.throwIfAborted(); + const res = await (params.fetch ?? fetch)( `${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/preupload/${encodeURIComponent( params.branch ?? "main" @@ -174,6 +179,7 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr "Content-Type": "application/json", }, body: JSON.stringify(payload), + signal: params.abortSignal, } ); @@ -202,7 +208,7 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr >((yieldCallback, returnCallback, rejectCallack) => { return promisesQueue( operations.map((op) => async () => { - const iterator = sha256(op.content, { useWebWorker: params.useWebWorkers }); + const iterator = sha256(op.content, { useWebWorker: params.useWebWorkers, abortSignal: params.abortSignal }); let res: IteratorResult<number, string>; do { res = await iterator.next(); @@ -218,6 +224,8 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr ).then(returnCallback, rejectCallack); }); + params.abortSignal?.throwIfAborted(); + const payload: ApiLfsBatchRequest = { operation: "upload", // multipart is a custom protocol for HF @@ -244,6 +252,7 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr "Content-Type": "application/vnd.git-lfs+json", }, body: JSON.stringify(payload), + signal: params.abortSignal, } ); @@ -265,6 +274,8 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr throw new InvalidApiResponseFormatError("Unrequested object ID in response"); } + params.abortSignal?.throwIfAborted(); + if (obj.error) { const errorMessage = `Error while doing LFS batch call for ${operations[shas.indexOf(obj.oid)].path}: ${ obj.error.message @@ -316,6 +327,8 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr await promisesQueueStreaming( parts.map((part) => async () => { + params.abortSignal?.throwIfAborted(); + const index = parseInt(part) - 1; const slice = content.slice(index * chunkSize, (index + 1) * chunkSize); @@ -323,6 +336,7 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr method: "PUT", /** Unfortunately, browsers don't support our inherited version of Blob in fetch calls */ body: slice instanceof WebBlob && isFrontend ? await slice.arrayBuffer() : slice, + signal: params.abortSignal, ...({ progressHint: { path: op.path, @@ -354,6 +368,8 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr MULTIPART_PARALLEL_UPLOAD ); + params.abortSignal?.throwIfAborted(); + const res = await (params.fetch ?? fetch)(completionUrl, { method: "POST", body: JSON.stringify(completeReq), @@ -361,6 +377,7 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr Accept: "application/vnd.git-lfs+json", "Content-Type": "application/vnd.git-lfs+json", }, + signal: params.abortSignal, }); if (!res.ok) { @@ -386,6 +403,7 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr }, /** Unfortunately, browsers don't support our inherited version of Blob in fetch calls */ body: content instanceof WebBlob && isFrontend ? await content.arrayBuffer() : content, + signal: params.abortSignal, ...({ progressHint: { path: op.path, @@ -421,6 +439,8 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr }); } + params.abortSignal?.throwIfAborted(); + yield { event: "phase", phase: "committing" }; return yield* eventToGenerator<CommitProgressEvent, CommitOutput>( @@ -467,6 +487,7 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr ] .map((x) => JSON.stringify(x)) .join("\n"), + signal: params.abortSignal, ...({ progressHint: { progressCallback: (progress: number) => { diff --git a/packages/hub/src/lib/upload-file.ts b/packages/hub/src/lib/upload-file.ts index 6133e2139..35755168d 100644 --- a/packages/hub/src/lib/upload-file.ts +++ b/packages/hub/src/lib/upload-file.ts @@ -14,6 +14,7 @@ export function uploadFile(params: { parentCommit?: CommitParams["parentCommit"]; fetch?: CommitParams["fetch"]; useWebWorkers?: CommitParams["useWebWorkers"]; + abortSignal?: CommitParams["abortSignal"]; }): Promise<CommitOutput> { const path = params.file instanceof URL @@ -40,5 +41,6 @@ export function uploadFile(params: { parentCommit: params.parentCommit, fetch: params.fetch, useWebWorkers: params.useWebWorkers, + abortSignal: params.abortSignal, }); } diff --git a/packages/hub/src/lib/upload-files-with-progress.ts b/packages/hub/src/lib/upload-files-with-progress.ts index 83519df9e..ac719ff71 100644 --- a/packages/hub/src/lib/upload-files-with-progress.ts +++ b/packages/hub/src/lib/upload-files-with-progress.ts @@ -25,6 +25,7 @@ export async function* uploadFilesWithProgress(params: { branch?: CommitParams["branch"]; isPullRequest?: CommitParams["isPullRequest"]; parentCommit?: CommitParams["parentCommit"]; + abortSignal?: CommitParams["abortSignal"]; /** * Set this to true in order to have progress events for hashing */ @@ -45,6 +46,7 @@ export async function* uploadFilesWithProgress(params: { isPullRequest: params.isPullRequest, parentCommit: params.parentCommit, useWebWorkers: params.useWebWorkers, + abortSignal: params.abortSignal, fetch: async (input, init) => { if (!init) { return fetch(input); @@ -105,6 +107,7 @@ export async function* uploadFilesWithProgress(params: { }); } + init.signal?.throwIfAborted(); xhr.send(init.body); return new Promise((resolve, reject) => { @@ -126,6 +129,18 @@ export async function* uploadFilesWithProgress(params: { xhr.addEventListener("error", () => { reject(new Error(xhr.statusText)); }); + + if (init.signal) { + init.signal.addEventListener("abort", () => { + xhr.abort(); + + try { + init.signal?.throwIfAborted(); + } catch (err) { + reject(err); + } + }); + } }); }, }); diff --git a/packages/hub/src/lib/upload-files.ts b/packages/hub/src/lib/upload-files.ts index 381cce521..22b93508f 100644 --- a/packages/hub/src/lib/upload-files.ts +++ b/packages/hub/src/lib/upload-files.ts @@ -13,6 +13,7 @@ export function uploadFiles(params: { parentCommit?: CommitParams["parentCommit"]; fetch?: CommitParams["fetch"]; useWebWorkers?: CommitParams["useWebWorkers"]; + abortSignal?: CommitParams["abortSignal"]; }): Promise<CommitOutput> { return commit({ credentials: params.credentials, @@ -30,5 +31,6 @@ export function uploadFiles(params: { parentCommit: params.parentCommit, fetch: params.fetch, useWebWorkers: params.useWebWorkers, + abortSignal: params.abortSignal, }); } diff --git a/packages/hub/src/utils/sha256-node.ts b/packages/hub/src/utils/sha256-node.ts index 5255136d5..b068d1a21 100644 --- a/packages/hub/src/utils/sha256-node.ts +++ b/packages/hub/src/utils/sha256-node.ts @@ -2,7 +2,12 @@ import { Readable } from "node:stream"; import type { ReadableStream } from "node:stream/web"; import { createHash } from "node:crypto"; -export async function* sha256Node(buffer: ArrayBuffer | Blob): AsyncGenerator<number, string> { +export async function* sha256Node( + buffer: ArrayBuffer | Blob, + opts?: { + abortSignal?: AbortSignal; + } +): AsyncGenerator<number, string> { const sha256Stream = createHash("sha256"); const size = buffer instanceof Blob ? buffer.size : buffer.byteLength; let done = 0; @@ -13,6 +18,8 @@ export async function* sha256Node(buffer: ArrayBuffer | Blob): AsyncGenerator<nu sha256Stream.update(buffer); done += buffer.length; yield done / size; + + opts?.abortSignal?.throwIfAborted(); } return sha256Stream.digest("hex"); diff --git a/packages/hub/src/utils/sha256.ts b/packages/hub/src/utils/sha256.ts index 800de3bd6..afd57e1cb 100644 --- a/packages/hub/src/utils/sha256.ts +++ b/packages/hub/src/utils/sha256.ts @@ -92,7 +92,7 @@ function destroyWorker(worker: Worker): void { */ export async function* sha256( buffer: Blob, - opts?: { useWebWorker?: boolean | { minSize: number; poolSize?: number } } + opts?: { useWebWorker?: boolean | { minSize: number; poolSize?: number }; abortSignal?: AbortSignal } ): AsyncGenerator<number, string> { yield 0; @@ -124,6 +124,13 @@ export async function* sha256( returnCallback(event.data.sha256); } else if (event.data.progress) { yieldCallback(event.data.progress); + + try { + opts.abortSignal?.throwIfAborted(); + } catch (err) { + destroyWorker(worker); + rejectCallack(err); + } } else { destroyWorker(worker); rejectCallack(event); @@ -160,6 +167,8 @@ export async function* sha256( sha256.update(value); bytesDone += value.length; yield bytesDone / total; + + opts?.abortSignal?.throwIfAborted(); } return sha256.digest("hex"); @@ -169,7 +178,7 @@ export async function* sha256( cryptoModule = await import("./sha256-node"); } - return yield* cryptoModule.sha256Node(buffer); + return yield* cryptoModule.sha256Node(buffer, { abortSignal: opts?.abortSignal }); } // eslint-disable-next-line @typescript-eslint/consistent-type-imports From ac6712b57550e79e25c99cebb18059f260ef8e05 Mon Sep 17 00:00:00 2001 From: coyotte508 <coyotte508@gmail.com> Date: Tue, 7 Nov 2023 12:17:13 +0100 Subject: [PATCH 2/2] =?UTF-8?q?=E2=9C=A8=20Cancel=20all=20requests=20if=20?= =?UTF-8?q?one=20fail?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/hub/src/lib/commit.ts | 713 +++++++++++++++++---------------- 1 file changed, 363 insertions(+), 350 deletions(-) diff --git a/packages/hub/src/lib/commit.ts b/packages/hub/src/lib/commit.ts index 0123d88cd..ad2365a11 100644 --- a/packages/hub/src/lib/commit.ts +++ b/packages/hub/src/lib/commit.ts @@ -130,403 +130,416 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr const lfsShas = new Map<string, string | null>(); - const allOperations = await Promise.all( - params.operations.map(async (operation) => { - if (operation.operation !== "addOrUpdate") { - return operation; - } + const abortController = new AbortController(); + const abortSignal = abortController.signal; - if (!(operation.content instanceof URL)) { - /** TS trick to enforce `content` to be a `Blob` */ - return { ...operation, content: operation.content }; - } + if (params.abortSignal) { + params.abortSignal.addEventListener("abort", () => abortController.abort()); + } - const lazyBlob = await createBlob(operation.content, { fetch: params.fetch }); + try { + const allOperations = await Promise.all( + params.operations.map(async (operation) => { + if (operation.operation !== "addOrUpdate") { + return operation; + } - params.abortSignal?.throwIfAborted(); + if (!(operation.content instanceof URL)) { + /** TS trick to enforce `content` to be a `Blob` */ + return { ...operation, content: operation.content }; + } - return { - ...operation, - content: lazyBlob, - }; - }) - ); + const lazyBlob = await createBlob(operation.content, { fetch: params.fetch }); - const gitAttributes = allOperations.filter(isFileOperation).find((op) => op.path === ".gitattributes")?.content; + abortSignal?.throwIfAborted(); - for (const operations of chunk(allOperations.filter(isFileOperation), 100)) { - const payload: ApiPreuploadRequest = { - gitAttributes: gitAttributes && (await gitAttributes.text()), - files: await Promise.all( - operations.map(async (operation) => ({ - path: operation.path, - size: operation.content.size, - sample: base64FromBytes(new Uint8Array(await operation.content.slice(0, 512).arrayBuffer())), - })) - ), - }; - - params.abortSignal?.throwIfAborted(); - - const res = await (params.fetch ?? fetch)( - `${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/preupload/${encodeURIComponent( - params.branch ?? "main" - )}` + (params.isPullRequest ? "?create_pr=1" : ""), - { - method: "POST", - headers: { - ...(params.credentials && { Authorization: `Bearer ${params.credentials.accessToken}` }), - "Content-Type": "application/json", - }, - body: JSON.stringify(payload), - signal: params.abortSignal, - } + return { + ...operation, + content: lazyBlob, + }; + }) ); - if (!res.ok) { - throw await createApiError(res); - } + const gitAttributes = allOperations.filter(isFileOperation).find((op) => op.path === ".gitattributes")?.content; + + for (const operations of chunk(allOperations.filter(isFileOperation), 100)) { + const payload: ApiPreuploadRequest = { + gitAttributes: gitAttributes && (await gitAttributes.text()), + files: await Promise.all( + operations.map(async (operation) => ({ + path: operation.path, + size: operation.content.size, + sample: base64FromBytes(new Uint8Array(await operation.content.slice(0, 512).arrayBuffer())), + })) + ), + }; + + abortSignal?.throwIfAborted(); - const json: ApiPreuploadResponse = await res.json(); + const res = await (params.fetch ?? fetch)( + `${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/preupload/${encodeURIComponent( + params.branch ?? "main" + )}` + (params.isPullRequest ? "?create_pr=1" : ""), + { + method: "POST", + headers: { + ...(params.credentials && { Authorization: `Bearer ${params.credentials.accessToken}` }), + "Content-Type": "application/json", + }, + body: JSON.stringify(payload), + signal: abortSignal, + } + ); + + if (!res.ok) { + throw await createApiError(res); + } - for (const file of json.files) { - if (file.uploadMode === "lfs") { - lfsShas.set(file.path, null); + const json: ApiPreuploadResponse = await res.json(); + + for (const file of json.files) { + if (file.uploadMode === "lfs") { + lfsShas.set(file.path, null); + } } } - } - yield { event: "phase", phase: "uploadingLargeFiles" }; - - for (const operations of chunk( - allOperations.filter(isFileOperation).filter((op) => lfsShas.has(op.path)), - 100 - )) { - const shas = yield* eventToGenerator< - { event: "fileProgress"; type: "hashing"; path: string; progress: number }, - string[] - >((yieldCallback, returnCallback, rejectCallack) => { - return promisesQueue( - operations.map((op) => async () => { - const iterator = sha256(op.content, { useWebWorker: params.useWebWorkers, abortSignal: params.abortSignal }); - let res: IteratorResult<number, string>; - do { - res = await iterator.next(); - if (!res.done) { - yieldCallback({ event: "fileProgress", path: op.path, progress: res.value, type: "hashing" }); - } - } while (!res.done); - const sha = res.value; - lfsShas.set(op.path, res.value); - return sha; - }), - CONCURRENT_SHAS - ).then(returnCallback, rejectCallack); - }); - - params.abortSignal?.throwIfAborted(); - - const payload: ApiLfsBatchRequest = { - operation: "upload", - // multipart is a custom protocol for HF - transfers: ["basic", "multipart"], - hash_algo: "sha_256", - ref: { - name: params.branch ?? "main", - }, - objects: operations.map((op, i) => ({ - oid: shas[i], - size: op.content.size, - })), - }; - - const res = await (params.fetch ?? fetch)( - `${params.hubUrl ?? HUB_URL}/${repoId.type === "model" ? "" : repoId.type + "s/"}${ - repoId.name - }.git/info/lfs/objects/batch`, - { - method: "POST", - headers: { - ...(params.credentials && { Authorization: `Bearer ${params.credentials.accessToken}` }), - Accept: "application/vnd.git-lfs+json", - "Content-Type": "application/vnd.git-lfs+json", + yield { event: "phase", phase: "uploadingLargeFiles" }; + + for (const operations of chunk( + allOperations.filter(isFileOperation).filter((op) => lfsShas.has(op.path)), + 100 + )) { + const shas = yield* eventToGenerator< + { event: "fileProgress"; type: "hashing"; path: string; progress: number }, + string[] + >((yieldCallback, returnCallback, rejectCallack) => { + return promisesQueue( + operations.map((op) => async () => { + const iterator = sha256(op.content, { useWebWorker: params.useWebWorkers, abortSignal: abortSignal }); + let res: IteratorResult<number, string>; + do { + res = await iterator.next(); + if (!res.done) { + yieldCallback({ event: "fileProgress", path: op.path, progress: res.value, type: "hashing" }); + } + } while (!res.done); + const sha = res.value; + lfsShas.set(op.path, res.value); + return sha; + }), + CONCURRENT_SHAS + ).then(returnCallback, rejectCallack); + }); + + abortSignal?.throwIfAborted(); + + const payload: ApiLfsBatchRequest = { + operation: "upload", + // multipart is a custom protocol for HF + transfers: ["basic", "multipart"], + hash_algo: "sha_256", + ref: { + name: params.branch ?? "main", }, - body: JSON.stringify(payload), - signal: params.abortSignal, - } - ); + objects: operations.map((op, i) => ({ + oid: shas[i], + size: op.content.size, + })), + }; - if (!res.ok) { - throw await createApiError(res); - } + const res = await (params.fetch ?? fetch)( + `${params.hubUrl ?? HUB_URL}/${repoId.type === "model" ? "" : repoId.type + "s/"}${ + repoId.name + }.git/info/lfs/objects/batch`, + { + method: "POST", + headers: { + ...(params.credentials && { Authorization: `Bearer ${params.credentials.accessToken}` }), + Accept: "application/vnd.git-lfs+json", + "Content-Type": "application/vnd.git-lfs+json", + }, + body: JSON.stringify(payload), + signal: abortSignal, + } + ); - const json: ApiLfsBatchResponse = await res.json(); - const batchRequestId = res.headers.get("X-Request-Id") || undefined; + if (!res.ok) { + throw await createApiError(res); + } - const shaToOperation = new Map(operations.map((op, i) => [shas[i], op])); + const json: ApiLfsBatchResponse = await res.json(); + const batchRequestId = res.headers.get("X-Request-Id") || undefined; - yield* eventToGenerator<CommitProgressEvent, void>((yieldCallback, returnCallback, rejectCallback) => { - return promisesQueueStreaming( - json.objects.map((obj) => async () => { - const op = shaToOperation.get(obj.oid); + const shaToOperation = new Map(operations.map((op, i) => [shas[i], op])); - if (!op) { - throw new InvalidApiResponseFormatError("Unrequested object ID in response"); - } + yield* eventToGenerator<CommitProgressEvent, void>((yieldCallback, returnCallback, rejectCallback) => { + return promisesQueueStreaming( + json.objects.map((obj) => async () => { + const op = shaToOperation.get(obj.oid); - params.abortSignal?.throwIfAborted(); + if (!op) { + throw new InvalidApiResponseFormatError("Unrequested object ID in response"); + } - if (obj.error) { - const errorMessage = `Error while doing LFS batch call for ${operations[shas.indexOf(obj.oid)].path}: ${ - obj.error.message - }${batchRequestId ? ` - Request ID: ${batchRequestId}` : ""}`; - throw new HubApiError(res.url, obj.error.code, batchRequestId, errorMessage); - } - if (!obj.actions?.upload) { - // Already uploaded + abortSignal?.throwIfAborted(); + + if (obj.error) { + const errorMessage = `Error while doing LFS batch call for ${operations[shas.indexOf(obj.oid)].path}: ${ + obj.error.message + }${batchRequestId ? ` - Request ID: ${batchRequestId}` : ""}`; + throw new HubApiError(res.url, obj.error.code, batchRequestId, errorMessage); + } + if (!obj.actions?.upload) { + // Already uploaded + yieldCallback({ + event: "fileProgress", + path: op.path, + progress: 1, + type: "uploading", + }); + return; + } yieldCallback({ event: "fileProgress", path: op.path, - progress: 1, + progress: 0, type: "uploading", }); - return; - } - yieldCallback({ - event: "fileProgress", - path: op.path, - progress: 0, - type: "uploading", - }); - const content = op.content; - const header = obj.actions.upload.header; - if (header?.chunk_size) { - const chunkSize = parseInt(header.chunk_size); - - // multipart upload - // parts are in upload.header['00001'] to upload.header['99999'] - - const completionUrl = obj.actions.upload.href; - const parts = Object.keys(header).filter((key) => /^[0-9]+$/.test(key)); - - if (parts.length !== Math.ceil(content.size / chunkSize)) { - throw new Error("Invalid server response to upload large LFS file, wrong number of parts"); - } - - const completeReq: ApiLfsCompleteMultipartRequest = { - oid: obj.oid, - parts: parts.map((part) => ({ - partNumber: +part, - etag: "", - })), - }; - - // Defined here so that it's not redefined at each iteration (and the caller can tell it's for the same file) - const progressCallback = (progress: number) => - yieldCallback({ event: "fileProgress", path: op.path, progress, type: "uploading" }); - - await promisesQueueStreaming( - parts.map((part) => async () => { - params.abortSignal?.throwIfAborted(); - - const index = parseInt(part) - 1; - const slice = content.slice(index * chunkSize, (index + 1) * chunkSize); - - const res = await (params.fetch ?? fetch)(header[part], { - method: "PUT", - /** Unfortunately, browsers don't support our inherited version of Blob in fetch calls */ - body: slice instanceof WebBlob && isFrontend ? await slice.arrayBuffer() : slice, - signal: params.abortSignal, - ...({ - progressHint: { - path: op.path, - part: index, - numParts: parts.length, - progressCallback, - }, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } as any), - }); - - if (!res.ok) { - throw await createApiError(res, { - requestId: batchRequestId, - message: `Error while uploading part ${part} of ${ - operations[shas.indexOf(obj.oid)].path - } to LFS storage`, + const content = op.content; + const header = obj.actions.upload.header; + if (header?.chunk_size) { + const chunkSize = parseInt(header.chunk_size); + + // multipart upload + // parts are in upload.header['00001'] to upload.header['99999'] + + const completionUrl = obj.actions.upload.href; + const parts = Object.keys(header).filter((key) => /^[0-9]+$/.test(key)); + + if (parts.length !== Math.ceil(content.size / chunkSize)) { + throw new Error("Invalid server response to upload large LFS file, wrong number of parts"); + } + + const completeReq: ApiLfsCompleteMultipartRequest = { + oid: obj.oid, + parts: parts.map((part) => ({ + partNumber: +part, + etag: "", + })), + }; + + // Defined here so that it's not redefined at each iteration (and the caller can tell it's for the same file) + const progressCallback = (progress: number) => + yieldCallback({ event: "fileProgress", path: op.path, progress, type: "uploading" }); + + await promisesQueueStreaming( + parts.map((part) => async () => { + abortSignal?.throwIfAborted(); + + const index = parseInt(part) - 1; + const slice = content.slice(index * chunkSize, (index + 1) * chunkSize); + + const res = await (params.fetch ?? fetch)(header[part], { + method: "PUT", + /** Unfortunately, browsers don't support our inherited version of Blob in fetch calls */ + body: slice instanceof WebBlob && isFrontend ? await slice.arrayBuffer() : slice, + signal: abortSignal, + ...({ + progressHint: { + path: op.path, + part: index, + numParts: parts.length, + progressCallback, + }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } as any), }); - } - const eTag = res.headers.get("ETag"); + if (!res.ok) { + throw await createApiError(res, { + requestId: batchRequestId, + message: `Error while uploading part ${part} of ${ + operations[shas.indexOf(obj.oid)].path + } to LFS storage`, + }); + } - if (!eTag) { - throw new Error("Cannot get ETag of part during multipart upload"); - } + const eTag = res.headers.get("ETag"); - completeReq.parts[Number(part) - 1].etag = eTag; - }), - MULTIPART_PARALLEL_UPLOAD - ); + if (!eTag) { + throw new Error("Cannot get ETag of part during multipart upload"); + } - params.abortSignal?.throwIfAborted(); + completeReq.parts[Number(part) - 1].etag = eTag; + }), + MULTIPART_PARALLEL_UPLOAD + ); - const res = await (params.fetch ?? fetch)(completionUrl, { - method: "POST", - body: JSON.stringify(completeReq), - headers: { - Accept: "application/vnd.git-lfs+json", - "Content-Type": "application/vnd.git-lfs+json", - }, - signal: params.abortSignal, - }); + abortSignal?.throwIfAborted(); - if (!res.ok) { - throw await createApiError(res, { - requestId: batchRequestId, - message: `Error completing multipart upload of ${ - operations[shas.indexOf(obj.oid)].path - } to LFS storage`, + const res = await (params.fetch ?? fetch)(completionUrl, { + method: "POST", + body: JSON.stringify(completeReq), + headers: { + Accept: "application/vnd.git-lfs+json", + "Content-Type": "application/vnd.git-lfs+json", + }, + signal: abortSignal, }); - } - yieldCallback({ - event: "fileProgress", - path: op.path, - progress: 1, - type: "uploading", - }); - } else { - const res = await (params.fetch ?? fetch)(obj.actions.upload.href, { - method: "PUT", - headers: { - ...(batchRequestId ? { "X-Request-Id": batchRequestId } : undefined), - }, - /** Unfortunately, browsers don't support our inherited version of Blob in fetch calls */ - body: content instanceof WebBlob && isFrontend ? await content.arrayBuffer() : content, - signal: params.abortSignal, - ...({ - progressHint: { - path: op.path, - progressCallback: (progress: number) => - yieldCallback({ - event: "fileProgress", - path: op.path, - progress, - type: "uploading", - }), - }, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } as any), - }); + if (!res.ok) { + throw await createApiError(res, { + requestId: batchRequestId, + message: `Error completing multipart upload of ${ + operations[shas.indexOf(obj.oid)].path + } to LFS storage`, + }); + } - if (!res.ok) { - throw await createApiError(res, { - requestId: batchRequestId, - message: `Error while uploading ${operations[shas.indexOf(obj.oid)].path} to LFS storage`, + yieldCallback({ + event: "fileProgress", + path: op.path, + progress: 1, + type: "uploading", + }); + } else { + const res = await (params.fetch ?? fetch)(obj.actions.upload.href, { + method: "PUT", + headers: { + ...(batchRequestId ? { "X-Request-Id": batchRequestId } : undefined), + }, + /** Unfortunately, browsers don't support our inherited version of Blob in fetch calls */ + body: content instanceof WebBlob && isFrontend ? await content.arrayBuffer() : content, + signal: abortSignal, + ...({ + progressHint: { + path: op.path, + progressCallback: (progress: number) => + yieldCallback({ + event: "fileProgress", + path: op.path, + progress, + type: "uploading", + }), + }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } as any), }); - } - - yieldCallback({ - event: "fileProgress", - path: op.path, - progress: 1, - type: "uploading", - }); - } - }), - CONCURRENT_LFS_UPLOADS - ).then(returnCallback, rejectCallback); - }); - } - params.abortSignal?.throwIfAborted(); + if (!res.ok) { + throw await createApiError(res, { + requestId: batchRequestId, + message: `Error while uploading ${operations[shas.indexOf(obj.oid)].path} to LFS storage`, + }); + } - yield { event: "phase", phase: "committing" }; + yieldCallback({ + event: "fileProgress", + path: op.path, + progress: 1, + type: "uploading", + }); + } + }), + CONCURRENT_LFS_UPLOADS + ).then(returnCallback, rejectCallback); + }); + } - return yield* eventToGenerator<CommitProgressEvent, CommitOutput>( - async (yieldCallback, returnCallback, rejectCallback) => - (params.fetch ?? fetch)( - `${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/commit/${encodeURIComponent( - params.branch ?? "main" - )}` + (params.isPullRequest ? "?create_pr=1" : ""), - { - method: "POST", - headers: { - ...(params.credentials && { Authorization: `Bearer ${params.credentials.accessToken}` }), - "Content-Type": "application/x-ndjson", - }, - body: [ - { - key: "header", - value: { - summary: params.title, - description: params.description, - parentCommit: params.parentCommit, - } satisfies ApiCommitHeader, + abortSignal?.throwIfAborted(); + + yield { event: "phase", phase: "committing" }; + + return yield* eventToGenerator<CommitProgressEvent, CommitOutput>( + async (yieldCallback, returnCallback, rejectCallback) => + (params.fetch ?? fetch)( + `${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/commit/${encodeURIComponent( + params.branch ?? "main" + )}` + (params.isPullRequest ? "?create_pr=1" : ""), + { + method: "POST", + headers: { + ...(params.credentials && { Authorization: `Bearer ${params.credentials.accessToken}` }), + "Content-Type": "application/x-ndjson", }, - ...((await Promise.all( - allOperations.map((operation) => { - if (isFileOperation(operation)) { - const sha = lfsShas.get(operation.path); - if (sha) { - return { - key: "lfsFile", - value: { - path: operation.path, - algo: "sha256", - size: operation.content.size, - oid: sha, - } satisfies ApiCommitLfsFile, - }; + body: [ + { + key: "header", + value: { + summary: params.title, + description: params.description, + parentCommit: params.parentCommit, + } satisfies ApiCommitHeader, + }, + ...((await Promise.all( + allOperations.map((operation) => { + if (isFileOperation(operation)) { + const sha = lfsShas.get(operation.path); + if (sha) { + return { + key: "lfsFile", + value: { + path: operation.path, + algo: "sha256", + size: operation.content.size, + oid: sha, + } satisfies ApiCommitLfsFile, + }; + } } - } - - return convertOperationToNdJson(operation); - }) - )) satisfies ApiCommitOperation[]), - ] - .map((x) => JSON.stringify(x)) - .join("\n"), - signal: params.abortSignal, - ...({ - progressHint: { - progressCallback: (progress: number) => { - // For now, we display equal progress for all files - // We could compute the progress based on the size of `convertOperationToNdJson` for each of the files instead - for (const op of allOperations) { - if (isFileOperation(op) && !lfsShas.has(op.path)) { - yieldCallback({ - event: "fileProgress", - path: op.path, - progress, - type: "uploading", - }); + + return convertOperationToNdJson(operation); + }) + )) satisfies ApiCommitOperation[]), + ] + .map((x) => JSON.stringify(x)) + .join("\n"), + signal: abortSignal, + ...({ + progressHint: { + progressCallback: (progress: number) => { + // For now, we display equal progress for all files + // We could compute the progress based on the size of `convertOperationToNdJson` for each of the files instead + for (const op of allOperations) { + if (isFileOperation(op) && !lfsShas.has(op.path)) { + yieldCallback({ + event: "fileProgress", + path: op.path, + progress, + type: "uploading", + }); + } } - } + }, }, - }, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } as any), - } - ) - .then(async (res) => { - if (!res.ok) { - throw await createApiError(res); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } as any), } + ) + .then(async (res) => { + if (!res.ok) { + throw await createApiError(res); + } - const json = await res.json(); + const json = await res.json(); - returnCallback({ - pullRequestUrl: json.pullRequestUrl, - commit: { - oid: json.commitOid, - url: json.commitUrl, - }, - hookOutput: json.hookOutput, - }); - }) - .catch(rejectCallback) - ); + returnCallback({ + pullRequestUrl: json.pullRequestUrl, + commit: { + oid: json.commitOid, + url: json.commitUrl, + }, + hookOutput: json.hookOutput, + }); + }) + .catch(rejectCallback) + ); + } catch (err) { + // For parallel requests, cancel them all if one fails + abortController.abort(); + throw err; + } } export async function commit(params: CommitParams): Promise<CommitOutput> {