Skip to content

Commit

Permalink
Merge pull request #25 from Mohanson/resume
Browse files Browse the repository at this point in the history
Resume
  • Loading branch information
polym authored Oct 12, 2016
2 parents e48df17 + 169c1ea commit 7a89324
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 5 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Table of Contents
* [获取空间存储使用量](#获取空间存储使用量)
* [创建目录](#创建目录)
* [上传](#上传)
* [断点续传](#断点续传)
* [下载](#下载)
* [删除](#删除)
* [获取文件信息](#获取文件信息)
Expand Down Expand Up @@ -116,6 +117,16 @@ func (u *UpYun) Put(key string, value io.Reader, useMD5 bool,

`key` 为 UPYUN 上的存储路径,`value` 既可以是文件,也可以是 `buffer`,`useMD5` 是否 MD5 校验,`headers` 自定义上传参数,除 [上传参数](https://docs.upyun.com/api/rest_api/#_4),还可以设置 `Content-Length`,支持流式上传。流式上传需要指定 `Contnet-Length`,如需 MD5 校验,需要设置 `Content-MD5`。


#### 断点续传
```go
func (u *UpYun) ResumePut(key string, value *os.File, useMD5 bool,
headers map[string]string, reporter ResumeReporter) (http.Header, error)
```

以断点续传方式上传文件,当文件在上传过程中遭遇网络故障时,将等待 5 秒后,在失败断点处自动重试 3 次。参数 `reporter` 用于报告上传进度。可通过修改全局变量 `ResumeWaitTime` 与 `ResumeRetryCount` 自定义重试等待时间与重试次数。


#### 下载

```go
Expand Down
4 changes: 2 additions & 2 deletions upyun/upyun-http-core.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ type upYunHTTPCore struct {
httpClient *http.Client
}

func (core *upYunHTTPCore) SetTimeout(timeout int) {
func (core *upYunHTTPCore) SetTimeout(timeout time.Duration) {
core.httpClient = &http.Client{
Transport: &http.Transport{
Dial: func(network, addr string) (c net.Conn, err error) {
c, err = net.DialTimeout(network, addr, time.Duration(timeout)*time.Second)
c, err = net.DialTimeout(network, addr, timeout)
if err != nil {
return nil, err
}
Expand Down
91 changes: 91 additions & 0 deletions upyun/upyun-rest-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
URL "net/url"
"os"
"path"
"strconv"
"strings"
"time"
)

// UPYUN REST API Client
Expand Down Expand Up @@ -145,6 +147,95 @@ func (u *UpYun) Put(key string, value io.Reader, useMD5 bool,
return rtHeaders, err
}

// Put uploads file object to UPYUN File System part by part,
// and automatically retries when a network problem occurs
func (u *UpYun) ResumePut(key string, value *os.File, useMD5 bool,
headers map[string]string, reporter ResumeReporter) (http.Header, error) {
if headers == nil {
headers = make(map[string]string)
}

fileinfo, err := value.Stat()
if err != nil {
return nil, err
}

// If filesize < resumePartSizeLowerLimit, use UpYun.Put() instead
if fileinfo.Size() < resumeFileSizeLowerLimit {
return u.Put(key, value, useMD5, headers)
}

maxPartID := int(fileinfo.Size() / resumePartSize)
if fileinfo.Size()%resumePartSize == 0 {
maxPartID--
}

var resp http.Header

for part := 0; part <= maxPartID; part++ {

innerHeaders := make(map[string]string)
for k, v := range headers {
innerHeaders[k] = v
}

innerHeaders["X-Upyun-Part-Id"] = strconv.Itoa(part)
switch part {
case 0:
innerHeaders["X-Upyun-Multi-Type"] = headers["Content-Type"]
innerHeaders["X-Upyun-Multi-Length"] = strconv.FormatInt(fileinfo.Size(), 10)
innerHeaders["X-Upyun-Multi-Stage"] = "initiate,upload"
innerHeaders["Content-Length"] = strconv.Itoa(resumePartSize)
case maxPartID:
innerHeaders["X-Upyun-Multi-Stage"] = "upload,complete"
innerHeaders["Content-Length"] = fmt.Sprint(fileinfo.Size() - int64(resumePartSize)*int64(part))
if useMD5 {
value.Seek(0, 0)
hex, _, _ := md5sum(value)
innerHeaders["X-Upyun-Multi-MD5"] = hex
}
default:
innerHeaders["X-Upyun-Multi-Stage"] = "upload"
innerHeaders["Content-Length"] = strconv.Itoa(resumePartSize)
}

file, err := NewFragmentFile(value, int64(part)*int64(resumePartSize), resumePartSize)
if err != nil {
return resp, err
}
if useMD5 {
innerHeaders["Content-MD5"], _ = file.MD5()
}

// Retry when get net error from UpYun.Put(), return error in other cases
for i := 0; i < ResumeRetryCount+1; i++ {
resp, err = u.Put(key, file, useMD5, innerHeaders)
if err == nil {
break
}
// Retry only get net error
_, ok := err.(net.Error)
if !ok {
return resp, err
}
if i == ResumeRetryCount {
return resp, err
}
time.Sleep(ResumeWaitTime)
file.Seek(0, 0)
}
if reporter != nil {
reporter(part, maxPartID)
}

if part == 0 {
headers["X-Upyun-Multi-UUID"] = resp.Get("X-Upyun-Multi-Uuid")
}
}

return resp, nil
}

// Get gets the specified file in UPYUN File System
func (u *UpYun) Get(key string, value io.Writer) (int, error) {
length, _, err := u.doRESTRequest("GET", key, "", nil, value)
Expand Down
93 changes: 93 additions & 0 deletions upyun/upyun-resume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package upyun

import (
"errors"
"fmt"
"io"
"os"
"time"
)

const (
// resumePartSize is the size of each part for resume upload
resumePartSize = 1024 * 1024
// resumeFileSizeLowerLimit is the lowest file size limit for resume upload
resumeFileSizeLowerLimit int64 = resumePartSize * 10
)

var (
// ResumeRetry is the number of retries for resume upload
ResumeRetryCount = 3
// ResumeWaitSeconds is the number of time to wait when net error occurs
ResumeWaitTime = time.Second * 5
)

// ResumeReporter
type ResumeReporter func(int, int)

// ResumeReporterPrintln is a simple ResumeReporter for test
func ResumeReporterPrintln(partID int, maxPartID int) {
fmt.Printf("resume test reporter: %v / %v\n", partID, maxPartID)
}

// FragmentFile is like os.File, but only a part of file can be Read().
// return io.EOF when cursor fetch the limit.
type FragmentFile struct {
offset int64
limit int
cursor int
*os.File
}

// NewFragmentFile returns a new FragmentFile.
func NewFragmentFile(file *os.File, offset int64, limit int) (*FragmentFile, error) {
sizedfile := &FragmentFile{
offset: offset,
limit: limit,
File: file,
}
_, err := sizedfile.Seek(0, 0)
if err != nil {
return nil, err
}
return sizedfile, nil
}

// Seek likes os.File.Seek()
func (f *FragmentFile) Seek(offset int64, whence int) (ret int64, err error) {
switch whence {
case 0:
f.cursor = int(offset)
return f.File.Seek(f.offset+offset, 0)
default:
return 0, errors.New("whence must be 0")
}
}

// Read is just like os.File.Read but return io.EOF when catch sizedfile's limit
// or the end of file
func (f *FragmentFile) Read(b []byte) (n int, err error) {
if f.cursor >= f.limit {
return 0, io.EOF
}
n, err = f.File.Read(b)
if int(f.cursor)+n > f.limit {
n = f.limit - f.cursor
}
f.cursor += n
return n, err
}

// Close will not actually close FragmentFile
func (f *FragmentFile) Close() error {
return nil
}

// MD5 returns md5 of the FragmentFile.
func (f *FragmentFile) MD5() (string, error) {
cursor := f.cursor
f.Seek(0, 0)
md5, _, err := md5sum(f)
f.Seek(int64(cursor), 0)
return md5, err
}
6 changes: 3 additions & 3 deletions upyun/upyun.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
// Default(Min/Max)ChunkSize: set the buffer size when doing copy operation
defaultChunkSize = 32 * 1024
// defaultConnectTimeout: connection timeout when connect to upyun endpoint
defaultConnectTimeout = 60
defaultConnectTimeout = time.Second * 60
)

// chunkSize: chunk size when copy
Expand Down Expand Up @@ -110,9 +110,9 @@ func chunkedCopy(dst io.Writer, src io.Reader) (written int64, err error) {
}

// Use for http connection timeout
func timeoutDialer(timeout int) func(string, string) (net.Conn, error) {
func timeoutDialer(timeout time.Duration) func(string, string) (net.Conn, error) {
return func(network, addr string) (c net.Conn, err error) {
c, err = net.DialTimeout(network, addr, time.Duration(timeout)*time.Second)
c, err = net.DialTimeout(network, addr, timeout)
if err != nil {
return nil, err
}
Expand Down
75 changes: 75 additions & 0 deletions upyun/upyun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package upyun

import (
"bytes"
"crypto/md5"
"fmt"
"io"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -177,6 +179,79 @@ func TestGetLargeList(t *testing.T) {
}
}

func TestResumeSmallFile(t *testing.T) {
file, err := os.Open(upload)
if err != nil {
t.Error(err)
}
defer file.Close()

md5Hash := md5.New()
io.Copy(md5Hash, file)
md5 := fmt.Sprintf("%x", md5Hash.Sum(nil))
file.Seek(0, 0)

_, err = up.ResumePut(testPath+"/"+upload, file, true, map[string]string{"Content-Type": "text/plain"}, nil)
if err != nil {
t.Error(err)
}

_, err = up.GetInfo(testPath + "/" + upload)
if err != nil {
t.Error(err)
}

buf := bytes.NewBuffer(make([]byte, 0, 1024))
up.Get(testPath+"/"+upload, buf)

md5Hash.Reset()
io.Copy(md5Hash, buf)

if fmt.Sprintf("%x", md5Hash.Sum(nil)) != md5 {
t.Error("MD5 is inconsistent")
}
}

func TestResumeBigFile(t *testing.T) {
file, err := os.OpenFile("/tmp/bigfile", os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0666)
if err != nil {
t.Error(err)
}
defer os.Remove(file.Name())
defer file.Close()

for i := 0; i < 15*1024; i++ {
file.Write(bytes.Repeat([]byte("1"), 1024))
}
file.Seek(0, 0)

md5Hash := md5.New()
io.Copy(md5Hash, file)
md5 := fmt.Sprintf("%x", md5Hash.Sum(nil))
file.Seek(0, 0)

_, err = up.ResumePut(testPath+"/"+"bigfile", file, true, nil, ResumeReporterPrintln)
if err != nil {
t.Error(err)
}

defer up.Delete(testPath + "/" + "bigfile")

_, err = up.GetInfo(testPath + "/" + "bigfile")
if err != nil {
t.Error(err)
}

buf := bytes.NewBuffer(make([]byte, 0, 1024))
up.Get(testPath+"/"+"bigfile", buf)

md5Hash.Reset()
io.Copy(md5Hash, buf)
if fmt.Sprintf("%x", md5Hash.Sum(nil)) != md5 {
t.Error("MD5 is inconsistent")
}
}

func TestDelete(t *testing.T) {
// delete file
path := testPath + "/" + upload
Expand Down

0 comments on commit 7a89324

Please sign in to comment.