forked from bradenmacdonald/s3-lite-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobject-uploader.ts
197 lines (190 loc) · 7 KB
/
object-uploader.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import type { Client, ObjectMetadata, UploadedObjectInfo } from "./client.ts";
import { getVersionId, sanitizeETag } from "./helpers.ts";
import { parse as parseXML } from "./xml-parser.ts";
/**
* Stream a file to S3
*
* We assume that TransformChunkSizes has been used first, so that this stream
* will always receive chunks of exactly size "partSize", except for the final
* chunk.
*
* Note that the total size of the upload doesn't have to be known in advance,
* as long as TransformChunkSizes was used first. Then this ObjectUploader
* will decide based on the size of the first chunk whether it is doing a
* single-request upload or a multi-part upload.
*/
export class ObjectUploader extends WritableStream<Uint8Array> {
public readonly getResult: () => UploadedObjectInfo;
constructor({ client, bucketName, objectName, partSize, metadata }: {
client: Client;
bucketName: string;
objectName: string;
partSize: number;
metadata: Record<string, string>;
}) {
let result: UploadedObjectInfo;
let nextPartNumber = 1;
let uploadId: string;
const etags: { part: number; etag: string }[] = [];
const partsPromises: Promise<Response>[] = []; // If doing multi-part upload, this holds a promise for each part so we can upload them in parallel
super({
start() {}, // required
async write(chunk, _controller) {
const method = "PUT";
const partNumber = nextPartNumber++;
try {
// We are going to upload this file in a single part, because it's small enough
if (partNumber == 1 && chunk.length < partSize) {
// PUT the chunk in a single request — use an empty query.
const response = await client.makeRequest({
method,
headers: new Headers({
// Set user metadata as this is not a multipart upload
...metadata,
"Content-Length": String(chunk.length),
}),
bucketName,
objectName,
payload: chunk,
});
result = {
etag: sanitizeETag(response.headers.get("etag") ?? undefined),
versionId: getVersionId(response.headers),
};
return;
}
/// If we get here, this is a streaming upload in multiple parts.
if (partNumber === 1) {
uploadId = (await initiateNewMultipartUpload({
client,
bucketName,
objectName,
metadata,
})).uploadId;
}
// Upload the next part
const partPromise = client.makeRequest({
method,
query: { partNumber: partNumber.toString(), uploadId },
headers: new Headers({ "Content-Length": String(chunk.length) }),
bucketName: bucketName,
objectName: objectName,
payload: chunk,
});
partPromise.then((response) => {
// In order to aggregate the parts together, we need to collect the etags.
let etag = response.headers.get("etag") ?? "";
if (etag) {
etag = etag.replace(/^"/, "").replace(/"$/, "");
}
etags.push({ part: partNumber, etag });
});
partsPromises.push(partPromise);
} catch (err) {
// Throwing an error will make future writes to this sink fail.
throw err;
}
},
async close() {
if (result) {
// This was already completed, in a single upload. Nothing more to do.
} else if (uploadId) {
// Wait for all parts to finish uploading
await Promise.all(partsPromises);
// Sort the etags (required)
etags.sort((a, b) => a.part > b.part ? 1 : -1);
// Complete the multi-part upload
result = await completeMultipartUpload({ client, bucketName, objectName, uploadId, etags });
} else {
throw new Error("Stream was closed without uploading any data.");
}
},
});
this.getResult = () => {
if (result === undefined) {
throw new Error("Result is not ready. await the stream first.");
}
return result;
};
}
}
/** Initiate a new multipart upload request. */
async function initiateNewMultipartUpload(
options: {
client: Client;
bucketName: string;
objectName: string;
metadata?: ObjectMetadata;
},
): Promise<{ uploadId: string }> {
const method = "POST";
const headers = new Headers(options.metadata);
const query = "uploads";
const response = await options.client.makeRequest({
method,
bucketName: options.bucketName,
objectName: options.objectName,
query,
headers,
returnBody: true,
});
// Response is like:
// <InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
// <Bucket>dev-bucket</Bucket>
// <Key>test-32m.dat</Key>
// <UploadId>422f976b-35e0-4a55-aca7-bf2d46277f93</UploadId>
// </InitiateMultipartUploadResult>
const responseText = await response.text();
const root = parseXML(responseText).root;
if (!root || root.name !== "InitiateMultipartUploadResult") {
throw new Error(`Unexpected response: ${responseText}`);
}
const uploadId = root.children.find((c) => c.name === "UploadId")?.content;
if (!uploadId) {
throw new Error(`Unable to get UploadId from response: ${responseText}`);
}
return { uploadId };
}
async function completeMultipartUpload(
{ client, bucketName, objectName, uploadId, etags }: {
client: Client;
bucketName: string;
objectName: string;
uploadId: string;
etags: { part: number; etag: string }[];
},
): Promise<UploadedObjectInfo> {
const payload = `
<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
${etags.map((et) => ` <Part><PartNumber>${et.part}</PartNumber><ETag>${et.etag}</ETag></Part>`).join("\n")}
</CompleteMultipartUpload>
`;
const response = await client.makeRequest({
method: "POST",
bucketName,
objectName,
query: `uploadId=${encodeURIComponent(uploadId)}`,
payload: new TextEncoder().encode(payload),
returnBody: true,
});
const responseText = await response.text();
// Example response:
// <?xml version="1.0" encoding="UTF-8"?>
// <CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
// <Location>http://localhost:9000/dev-bucket/test-32m.dat</Location>
// <Bucket>dev-bucket</Bucket>
// <Key>test-32m.dat</Key>
// <ETag>"4581589392ae60eafdb031f441858c7a-7"</ETag>
// </CompleteMultipartUploadResult>
const root = parseXML(responseText).root;
if (!root || root.name !== "CompleteMultipartUploadResult") {
throw new Error(`Unexpected response: ${responseText}`);
}
const etagRaw = root.children.find((c) => c.name === "ETag")?.content;
if (!etagRaw) throw new Error(`Unable to get ETag from response: ${responseText}`);
const versionId = getVersionId(response.headers);
return {
etag: sanitizeETag(etagRaw),
versionId,
};
}