-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathcopy.go
314 lines (246 loc) · 7.3 KB
/
copy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
package sparsecat
import (
"errors"
"fmt"
"github.com/svenwiltink/sparsecat/format"
"io"
"os"
)
type onlyReader struct {
io.Reader
}
type zeroReader struct{}
func (zeroReader) Read(p []byte) (int, error) {
for index := range p {
p[index] = 0
}
return len(p), nil
}
func NewDecoder(reader io.Reader) *Decoder {
return &Decoder{reader: reader, Format: format.RbdDiffv1}
}
// Decoder decodes an incoming sparsecat stream. It is able to convert it to a 'normal'
// stream of data using the WriteTo method. An optimized path is used when the target of an io.Copy
// is an *os.File (not a pipe or socket)
type Decoder struct {
Format format.Format
DisableSparseWriting bool
DisableFileTruncate bool
reader io.Reader
fileSize int64
currentOffset int64
currentSection io.Reader
currentSectionLength int64
currentSectionRead int
done bool
}
// Read is the slow path of the decoder. It output the entire sparse file.
func (d *Decoder) Read(p []byte) (int, error) {
var err error
if d.currentSection == nil {
d.fileSize, err = d.Format.ReadFileSize(d.reader)
if err != nil {
return 0, fmt.Errorf("error determining target file size: %w", err)
}
err = d.parseSection()
if err != nil {
return 0, fmt.Errorf("error reading first section: %w", err)
}
}
read, err := d.currentSection.Read(p)
d.currentSectionRead += read
d.currentOffset += int64(read)
if err == nil {
return read, nil
}
if !errors.Is(err, io.EOF) {
return read, err
}
// current section has ended. Was it expected?
if d.currentSectionLength != int64(d.currentSectionRead) {
return read, fmt.Errorf("read size doesn't equal section size. %d vs %d. %w", d.currentSectionRead, d.currentSectionLength, io.ErrUnexpectedEOF)
}
// EOF was expected. Are there more sections?
if d.done {
return read, err
}
// there are more sections to read. Reset counter and get next section
d.currentSectionRead = 0
// get next section
err = d.parseSection()
return read, err
}
func (d *Decoder) parseSection() error {
section, err := d.Format.ReadSectionHeader(d.reader)
if errors.Is(err, io.EOF) {
d.currentSectionLength = d.fileSize - d.currentOffset
d.currentSection = io.LimitReader(zeroReader{}, d.currentSectionLength)
d.done = true
return nil
}
if err != nil {
return err
}
padding := section.Offset - d.currentOffset
d.currentSectionLength = padding + section.Length
paddingReader := io.LimitReader(zeroReader{}, padding)
dataReader := io.LimitReader(d.reader, section.Length)
d.currentSection = io.MultiReader(paddingReader, dataReader)
return nil
}
// WriteTo is the fast path optimisation of Decoder.Read. If the target of io.Copy is an *os.File that is
// capable of seeking WriteTo will be used. It preserves the sparseness of the target file and does not need
// to write the entire file. Only section of the file containing data will be written. When s.DisableSparseWriting
// has been set this falls back to io.Copy with only the s.Read function exposed. When s.DisableFileTruncate has
// been set the output file will not be truncated prior to writing to it
func (d *Decoder) WriteTo(writer io.Writer) (int64, error) {
if d.DisableSparseWriting {
return io.Copy(writer, onlyReader{d})
}
file, isFile := d.isSeekableFile(writer)
if !isFile {
return io.Copy(writer, onlyReader{d})
}
size, err := d.Format.ReadFileSize(d.reader)
if err != nil {
return 0, fmt.Errorf("error determining target file size: %w", err)
}
if !d.DisableFileTruncate {
err = SparseTruncate(file, size)
if err != nil {
return 0, fmt.Errorf("error truncating target file: %w", err)
}
}
var written int64 = 0
for {
section, err := d.Format.ReadSectionHeader(d.reader)
if errors.Is(err, io.EOF) {
return written, nil
}
if err != nil {
return written, err
}
_, err = file.Seek(section.Offset, io.SeekStart)
if err != nil {
return written, fmt.Errorf("error seeking to start of data section: %w", err)
}
copied, err := io.Copy(writer, io.LimitReader(d.reader, section.Length))
written += copied
if err != nil {
return written, fmt.Errorf("error copying data: %w", err)
}
}
}
func (d *Decoder) isSeekableFile(writer io.Writer) (*os.File, bool) {
file, isFile := writer.(*os.File)
if isFile {
// not all files are actually seekable. pipes aren't for example
_, err := file.Seek(0, io.SeekCurrent)
return file, err == nil
}
return nil, false
}
func NewEncoder(file *os.File) *Encoder {
return &Encoder{file: file, Format: format.RbdDiffv1, MaxSectionSize: 1 << 32}
}
// Encoder encodes a file to a stream of sparsecat data.
type Encoder struct {
file *os.File
Format format.Format
MaxSectionSize int64
fileSize int64
currentOffset int64
currentSection io.Reader
currentSectionLength int64
currentSectionEnd int64
currentSectionRead int
supportsHoleDetection bool
done bool
}
func (e *Encoder) Read(p []byte) (int, error) {
if e.currentSection == nil {
info, err := e.file.Stat()
if err != nil {
return 0, fmt.Errorf("error running stat: %w", err)
}
size := uint64(info.Size())
if isBlockDevice(info) {
e.supportsHoleDetection = false
bsize, err := getBlockDeviceSize(e.file)
if err != nil {
return 0, fmt.Errorf("error determining size of block device: %w", err)
}
size = uint64(bsize)
} else {
e.supportsHoleDetection = supportsSeekHole(e.file)
}
e.currentSection, e.currentSectionLength = e.Format.GetFileSizeReader(size)
}
read, err := e.currentSection.Read(p)
e.currentSectionRead += read
if err == nil {
return read, err
}
if !errors.Is(err, io.EOF) {
return read, err
}
// current section has ended. Was it expected?
if e.currentSectionLength != int64(e.currentSectionRead) {
return read, fmt.Errorf("read size doesn't equal section size. %d vs %d. %w", e.currentSectionRead, e.currentSectionLength, io.ErrUnexpectedEOF)
}
// are there more sections to come?
if e.done {
return read, io.EOF
}
e.currentOffset = e.currentSectionEnd
e.currentSectionRead = 0
err = e.parseSection()
return read, err
}
func (e *Encoder) parseSection() error {
if !e.supportsHoleDetection {
return e.slowDetectSection()
}
start, end, err := detectDataSection(e.file, e.currentOffset)
if errors.Is(err, io.EOF) {
e.currentSection, e.currentSectionLength = e.Format.GetEndTagReader()
e.done = true
return nil
}
if err != nil {
return fmt.Errorf("error detecting data section: %w", err)
}
length := end - start
if length > e.MaxSectionSize {
end = start + e.MaxSectionSize
length = e.MaxSectionSize
}
e.currentSectionEnd = end
_, err = e.file.Seek(start, io.SeekStart)
if err != nil {
return err
}
e.currentSection, e.currentSectionLength = e.Format.GetSectionReader(e.file, format.Section{
Offset: start,
Length: length,
})
return nil
}
func (e *Encoder) slowDetectSection() error {
start, end, reader, err := slowDetectDataSection(e.file, e.currentOffset)
if errors.Is(err, io.EOF) {
e.currentSection, e.currentSectionLength = e.Format.GetEndTagReader()
e.done = true
return nil
}
if err != nil {
return fmt.Errorf("error detecting data section for block device: %w", err)
}
length := end - start
e.currentSectionEnd = end
e.currentSection, e.currentSectionLength = e.Format.GetSectionReader(reader, format.Section{
Offset: start,
Length: length,
})
return nil
}