Skip to content

Commit

Permalink
feat: allows files bigger than 4GiB by sharding
Browse files Browse the repository at this point in the history
Signed-off-by: Bruno Calza <[email protected]>
  • Loading branch information
brunocalza committed Dec 19, 2024
1 parent 2f61fc0 commit cd0c509
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 51 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ run:

# Lint
lint:
go run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.0 run
go run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.62.2 run
.PHONY: lint

# Build
Expand Down
146 changes: 98 additions & 48 deletions uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,25 @@ import (
"github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
ucanto_car "github.com/web3-storage/go-ucanto/core/car"
"github.com/web3-storage/go-ucanto/core/delegation"
"github.com/web3-storage/go-ucanto/did"
"github.com/web3-storage/go-ucanto/principal"
"github.com/web3-storage/go-ucanto/principal/ed25519/signer"
"github.com/web3-storage/go-w3up/capability/storeadd"
"github.com/web3-storage/go-w3up/capability/uploadadd"
"github.com/web3-storage/go-w3up/car/sharding"
"github.com/web3-storage/go-w3up/client"
"github.com/web3-storage/go-w3up/cmd/util"
w3sdelegation "github.com/web3-storage/go-w3up/delegation"
)

// w3s interface to make it easier to mock w3s.
type w3s interface {
upload(cid.Cid, string) (cid.Cid, error)
upload(cid.Cid, string) (cid.Cid, []ipld.Link, error)

Check failure on line 38 in uploader.go

View workflow job for this annotation

GitHub Actions / lint

undefined: cid (typecheck)
}

// Uploader ...
Expand Down Expand Up @@ -78,14 +79,14 @@ func (u *Uploader) Upload(ctx context.Context, r io.Reader) (_ UploadResult, err
return UploadResult{}, fmt.Errorf("failed generating CAR: %s", err)
}

shard, err := u.w3s.upload(root, dest)
root, shards, err := u.w3s.upload(root, dest)
if err != nil {
return UploadResult{}, fmt.Errorf("failed archiving CAR: %s", err)
}

return UploadResult{
Root: root,
Shard: shard,
Shard: cid.MustParse(shards[0].String()),
}, nil
}

Expand Down Expand Up @@ -246,49 +247,115 @@ func newW3sclient(spaceID string, sk string, proofBytes []byte) (*w3sclient, err
}, nil
}

func (c *w3sclient) upload(root cid.Cid, dest string) (_ cid.Cid, err error) {
func (c *w3sclient) upload(root cid.Cid, dest string) (_ cid.Cid, _ []ipld.Link, err error) {
// no need to close the file because the http client is doing that
f, err := os.Open(fmt.Sprintf("%s.car", dest))
if err != nil {
return cid.Undef, err
return cid.Undef, []ipld.Link{}, err
}
defer func() {
// Close file and override return error type if it is nil.
if cerr := f.Close(); err == nil {
err = cerr
}
}()

stat, err := f.Stat()
if err != nil {
return cid.Undef, err
return cid.Undef, []ipld.Link{}, err
}

var shdlnks []ipld.Link

size := uint64(stat.Size())
mh, err := multihash.SumStream(f, multihash.SHA2_256, -1)
if err != nil {
return cid.Undef, err
if size < sharding.ShardSize {
link, err := storeShard(c.issuer, c.space, f, []delegation.Delegation{c.proof})
if err != nil {
return cid.Undef, []ipld.Link{}, err
}
shdlnks = append(shdlnks, link)
} else {
_, blocks, err := ucanto_car.Decode(f)
if err != nil {
return cid.Undef, []ipld.Link{}, fmt.Errorf("decoding CAR: %s", err)
}
shds, err := sharding.NewSharder([]ipld.Link{}, blocks)
if err != nil {
return cid.Undef, []ipld.Link{}, fmt.Errorf("sharding CAR: %s", err)
}

for {
shd, err := shds.Next()
if err != nil {
if err == io.EOF {
break
}
return cid.Undef, []ipld.Link{}, err
}
link, err := storeShard(c.issuer, c.space, shd, []delegation.Delegation{c.proof})
if err != nil {
return cid.Undef, []ipld.Link{}, err
}
shdlnks = append(shdlnks, link)
}
}

shardLink := cidlink.Link{Cid: cid.NewCidV1(uint64(multicodec.Car), mh)}
rcpt, err := client.StoreAdd(
rcpt2, err := client.UploadAdd(
c.issuer,
c.space,
&storeadd.Caveat{Link: shardLink, Size: size},
&uploadadd.Caveat{Root: cidlink.Link{Cid: root}, Shards: shdlnks},
client.WithConnection(util.MustGetConnection()),
client.WithProofs([]delegation.Delegation{c.proof}),
)
if err != nil {
return cid.Undef, err
return cid.Undef, []ipld.Link{}, err
}

if rcpt2.Out().Error() != nil {
return cid.Undef, []ipld.Link{}, fmt.Errorf("%s", rcpt2.Out().Error().Message)
}

return root, shdlnks, nil
}

func storeShard(
issuer principal.Signer, space did.DID, shard io.Reader, proofs []delegation.Delegation,
) (ipld.Link, error) {

Check failure on line 323 in uploader.go

View workflow job for this annotation

GitHub Actions / lint

undefined: ipld (typecheck)
buf := new(bytes.Buffer)
_, err := buf.ReadFrom(shard)
if err != nil {
return nil, fmt.Errorf("reading CAR: %s", err)
}

mh, err := multihash.Sum(buf.Bytes(), multihash.SHA2_256, -1)

Check failure on line 330 in uploader.go

View workflow job for this annotation

GitHub Actions / lint

undefined: multihash (typecheck)
if err != nil {
return nil, fmt.Errorf("hashing CAR: %s", err)
}

link := cidlink.Link{Cid: cid.NewCidV1(0x0202, mh)}

rcpt, err := client.StoreAdd(
issuer,
space,
&storeadd.Caveat{
Link: link,
Size: uint64(buf.Len()),
},
client.WithConnection(util.MustGetConnection()),
client.WithProofs(proofs),
)
if err != nil {
return nil, fmt.Errorf("store/add %s: %s", link, err)
}

if rcpt.Out().Error() != nil {
return cid.Undef, fmt.Errorf(rcpt.Out().Error().Message)
return nil, fmt.Errorf("%+v", rcpt.Out().Error())
}

if rcpt.Out().Ok().Status == "upload" {
_, err := f.Seek(0, io.SeekStart)
hr, err := http.NewRequest("PUT", *rcpt.Out().Ok().Url, bytes.NewReader(buf.Bytes()))
if err != nil {
return cid.Undef, err
}

hr, err := http.NewRequest("PUT", *rcpt.Out().Ok().Url, f)
if err != nil {
return cid.Undef, err
return nil, fmt.Errorf("creating HTTP request: %s", err)
}

hdr := map[string][]string{}
Expand All @@ -298,39 +365,22 @@ func (c *w3sclient) upload(root cid.Cid, dest string) (_ cid.Cid, err error) {
}
hdr[k] = []string{v}
}

hr.Header = hdr
hr.ContentLength = int64(size)
httpClient := http.Client{
Timeout: 0,
}
hr.ContentLength = int64(buf.Len())
httpClient := http.Client{}
res, err := httpClient.Do(hr)
if err != nil {
return cid.Undef, err
return nil, fmt.Errorf("doing HTTP request: %s", err)
}

if res.StatusCode != 200 {
return cid.Undef, fmt.Errorf("status code: %d", res.StatusCode)
return nil, fmt.Errorf("non-200 status code while uploading file: %d", res.StatusCode)
}

if err := res.Body.Close(); err != nil {
return cid.Undef, fmt.Errorf("closing request body: %s", err)
err = res.Body.Close()
if err != nil {
return nil, fmt.Errorf("closing request body: %s", err)
}
}

rcpt2, err := client.UploadAdd(
c.issuer,
c.space,
&uploadadd.Caveat{Root: cidlink.Link{Cid: root}, Shards: []datamodel.Link{shardLink}},
client.WithConnection(util.MustGetConnection()),
client.WithProofs([]delegation.Delegation{c.proof}),
)
if err != nil {
return cid.Undef, err
}

if rcpt2.Out().Error() != nil {
return cid.Undef, fmt.Errorf(rcpt2.Out().Error().Message)
}

return shardLink.Cid, nil
return link, nil
}
12 changes: 10 additions & 2 deletions uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
_ "github.com/ipfs/go-unixfsnode/file"
"github.com/ipld/go-car/v2/blockstore"
"github.com/ipld/go-car/v2/storage"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -38,7 +41,7 @@ type mockClient struct {
dest string
}

func (c *mockClient) upload(_ cid.Cid, dest string) (cid.Cid, error) {
func (c *mockClient) upload(_ cid.Cid, dest string) (cid.Cid, []ipld.Link, error) {
c.dest = dest

// check tmp file exists
Expand All @@ -54,7 +57,12 @@ func (c *mockClient) upload(_ cid.Cid, dest string) (cid.Cid, error) {
require.NoError(c.t, err)
require.Equal(c.t, "Hello", content)

return cid.Cid{}, nil
hash, err := multihash.Sum([]byte{}, multihash.SHA2_256, -1)
require.NoError(c.t, err)

cid := cid.NewCidV1(cid.Raw, hash)

return cid, []ipld.Link{cidlink.Link{Cid: cid}}, nil
}

func extract(filename string) (string, error) {
Expand Down

0 comments on commit cd0c509

Please sign in to comment.