diff --git a/Makefile b/Makefile index 6243224..4ee5f6d 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ run: # Lint lint: - go run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.0 run + go run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.62.2 run .PHONY: lint # Build diff --git a/uploader.go b/uploader.go index 837c1ac..e6a0932 100644 --- a/uploader.go +++ b/uploader.go @@ -17,16 +17,17 @@ import ( "github.com/ipld/go-car/v2" "github.com/ipld/go-car/v2/blockstore" "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/multiformats/go-multicodec" "github.com/multiformats/go-multihash" + ucanto_car "github.com/web3-storage/go-ucanto/core/car" "github.com/web3-storage/go-ucanto/core/delegation" "github.com/web3-storage/go-ucanto/did" "github.com/web3-storage/go-ucanto/principal" "github.com/web3-storage/go-ucanto/principal/ed25519/signer" "github.com/web3-storage/go-w3up/capability/storeadd" "github.com/web3-storage/go-w3up/capability/uploadadd" + "github.com/web3-storage/go-w3up/car/sharding" "github.com/web3-storage/go-w3up/client" "github.com/web3-storage/go-w3up/cmd/util" w3sdelegation "github.com/web3-storage/go-w3up/delegation" @@ -34,7 +35,7 @@ import ( // w3s interface to make it easier to mock w3s. type w3s interface { - upload(cid.Cid, string) (cid.Cid, error) + upload(cid.Cid, string) (cid.Cid, []ipld.Link, error) } // Uploader ... @@ -78,14 +79,14 @@ func (u *Uploader) Upload(ctx context.Context, r io.Reader) (_ UploadResult, err return UploadResult{}, fmt.Errorf("failed generating CAR: %s", err) } - shard, err := u.w3s.upload(root, dest) + root, shards, err := u.w3s.upload(root, dest) if err != nil { return UploadResult{}, fmt.Errorf("failed archiving CAR: %s", err) } return UploadResult{ Root: root, - Shard: shard, + Shard: cid.MustParse(shards[0].String()), }, nil } @@ -246,49 +247,115 @@ func newW3sclient(spaceID string, sk string, proofBytes []byte) (*w3sclient, err }, nil } -func (c *w3sclient) upload(root cid.Cid, dest string) (_ cid.Cid, err error) { +func (c *w3sclient) upload(root cid.Cid, dest string) (_ cid.Cid, _ []ipld.Link, err error) { // no need to close the file because the http client is doing that f, err := os.Open(fmt.Sprintf("%s.car", dest)) if err != nil { - return cid.Undef, err + return cid.Undef, []ipld.Link{}, err } + defer func() { + // Close file and override return error type if it is nil. + if cerr := f.Close(); err == nil { + err = cerr + } + }() stat, err := f.Stat() if err != nil { - return cid.Undef, err + return cid.Undef, []ipld.Link{}, err } + var shdlnks []ipld.Link + size := uint64(stat.Size()) - mh, err := multihash.SumStream(f, multihash.SHA2_256, -1) - if err != nil { - return cid.Undef, err + if size < sharding.ShardSize { + link, err := storeShard(c.issuer, c.space, f, []delegation.Delegation{c.proof}) + if err != nil { + return cid.Undef, []ipld.Link{}, err + } + shdlnks = append(shdlnks, link) + } else { + _, blocks, err := ucanto_car.Decode(f) + if err != nil { + return cid.Undef, []ipld.Link{}, fmt.Errorf("decoding CAR: %s", err) + } + shds, err := sharding.NewSharder([]ipld.Link{}, blocks) + if err != nil { + return cid.Undef, []ipld.Link{}, fmt.Errorf("sharding CAR: %s", err) + } + + for { + shd, err := shds.Next() + if err != nil { + if err == io.EOF { + break + } + return cid.Undef, []ipld.Link{}, err + } + link, err := storeShard(c.issuer, c.space, shd, []delegation.Delegation{c.proof}) + if err != nil { + return cid.Undef, []ipld.Link{}, err + } + shdlnks = append(shdlnks, link) + } } - shardLink := cidlink.Link{Cid: cid.NewCidV1(uint64(multicodec.Car), mh)} - rcpt, err := client.StoreAdd( + rcpt2, err := client.UploadAdd( c.issuer, c.space, - &storeadd.Caveat{Link: shardLink, Size: size}, + &uploadadd.Caveat{Root: cidlink.Link{Cid: root}, Shards: shdlnks}, client.WithConnection(util.MustGetConnection()), client.WithProofs([]delegation.Delegation{c.proof}), ) if err != nil { - return cid.Undef, err + return cid.Undef, []ipld.Link{}, err + } + + if rcpt2.Out().Error() != nil { + return cid.Undef, []ipld.Link{}, fmt.Errorf("%s", rcpt2.Out().Error().Message) + } + + return root, shdlnks, nil +} + +func storeShard( + issuer principal.Signer, space did.DID, shard io.Reader, proofs []delegation.Delegation, +) (ipld.Link, error) { + buf := new(bytes.Buffer) + _, err := buf.ReadFrom(shard) + if err != nil { + return nil, fmt.Errorf("reading CAR: %s", err) + } + + mh, err := multihash.Sum(buf.Bytes(), multihash.SHA2_256, -1) + if err != nil { + return nil, fmt.Errorf("hashing CAR: %s", err) + } + + link := cidlink.Link{Cid: cid.NewCidV1(0x0202, mh)} + + rcpt, err := client.StoreAdd( + issuer, + space, + &storeadd.Caveat{ + Link: link, + Size: uint64(buf.Len()), + }, + client.WithConnection(util.MustGetConnection()), + client.WithProofs(proofs), + ) + if err != nil { + return nil, fmt.Errorf("store/add %s: %s", link, err) } if rcpt.Out().Error() != nil { - return cid.Undef, fmt.Errorf(rcpt.Out().Error().Message) + return nil, fmt.Errorf("%+v", rcpt.Out().Error()) } if rcpt.Out().Ok().Status == "upload" { - _, err := f.Seek(0, io.SeekStart) + hr, err := http.NewRequest("PUT", *rcpt.Out().Ok().Url, bytes.NewReader(buf.Bytes())) if err != nil { - return cid.Undef, err - } - - hr, err := http.NewRequest("PUT", *rcpt.Out().Ok().Url, f) - if err != nil { - return cid.Undef, err + return nil, fmt.Errorf("creating HTTP request: %s", err) } hdr := map[string][]string{} @@ -298,39 +365,22 @@ func (c *w3sclient) upload(root cid.Cid, dest string) (_ cid.Cid, err error) { } hdr[k] = []string{v} } + hr.Header = hdr - hr.ContentLength = int64(size) - httpClient := http.Client{ - Timeout: 0, - } + hr.ContentLength = int64(buf.Len()) + httpClient := http.Client{} res, err := httpClient.Do(hr) if err != nil { - return cid.Undef, err + return nil, fmt.Errorf("doing HTTP request: %s", err) } - if res.StatusCode != 200 { - return cid.Undef, fmt.Errorf("status code: %d", res.StatusCode) + return nil, fmt.Errorf("non-200 status code while uploading file: %d", res.StatusCode) } - - if err := res.Body.Close(); err != nil { - return cid.Undef, fmt.Errorf("closing request body: %s", err) + err = res.Body.Close() + if err != nil { + return nil, fmt.Errorf("closing request body: %s", err) } } - rcpt2, err := client.UploadAdd( - c.issuer, - c.space, - &uploadadd.Caveat{Root: cidlink.Link{Cid: root}, Shards: []datamodel.Link{shardLink}}, - client.WithConnection(util.MustGetConnection()), - client.WithProofs([]delegation.Delegation{c.proof}), - ) - if err != nil { - return cid.Undef, err - } - - if rcpt2.Out().Error() != nil { - return cid.Undef, fmt.Errorf(rcpt2.Out().Error().Message) - } - - return shardLink.Cid, nil + return link, nil } diff --git a/uploader_test.go b/uploader_test.go index 904673c..9659f50 100644 --- a/uploader_test.go +++ b/uploader_test.go @@ -11,6 +11,9 @@ import ( _ "github.com/ipfs/go-unixfsnode/file" "github.com/ipld/go-car/v2/blockstore" "github.com/ipld/go-car/v2/storage" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" ) @@ -38,7 +41,7 @@ type mockClient struct { dest string } -func (c *mockClient) upload(_ cid.Cid, dest string) (cid.Cid, error) { +func (c *mockClient) upload(_ cid.Cid, dest string) (cid.Cid, []ipld.Link, error) { c.dest = dest // check tmp file exists @@ -54,7 +57,12 @@ func (c *mockClient) upload(_ cid.Cid, dest string) (cid.Cid, error) { require.NoError(c.t, err) require.Equal(c.t, "Hello", content) - return cid.Cid{}, nil + hash, err := multihash.Sum([]byte{}, multihash.SHA2_256, -1) + require.NoError(c.t, err) + + cid := cid.NewCidV1(cid.Raw, hash) + + return cid, []ipld.Link{cidlink.Link{Cid: cid}}, nil } func extract(filename string) (string, error) {