Skip to content

Commit

Permalink
private/multipart: method to begin upload with metadata
Browse files Browse the repository at this point in the history
Custom method to begin upload and set metadata at the same time.
It won't be part of public API for now and main consumer will be
gateway.

Fixes storj/team-metainfo#106

Change-Id: Iff88a08c6a880e84c35cd9a502f3e49b01cd6dfc
  • Loading branch information
mniewrzal committed Apr 26, 2022
1 parent 55c79e4 commit 5580401
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 7 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/vivint/infectious v0.0.0-20200605153912-25a574ae18a3
github.com/zeebo/errs v1.3.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
storj.io/common v0.0.0-20220405183405-ffdc3ab808c6
storj.io/common v0.0.0-20220414110316-a5cb7172d6bf
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
storj.io/common v0.0.0-20220405183405-ffdc3ab808c6 h1:vbGPpXHQsc3doPyTwhWWvo/2MWrmspvF9O6wX8Jw0ME=
storj.io/common v0.0.0-20220405183405-ffdc3ab808c6/go.mod h1:LBJrpAqL4MNSrhGEwc8SJ+tIVtgfCtFEZqDy6/0j67A=
storj.io/common v0.0.0-20220414110316-a5cb7172d6bf h1:D5xZTDOlTTQWdAWeKKm2pFLcz1sceH+f/pVAcYB9jL8=
storj.io/common v0.0.0-20220414110316-a5cb7172d6bf/go.mod h1:LBJrpAqL4MNSrhGEwc8SJ+tIVtgfCtFEZqDy6/0j67A=
storj.io/drpc v0.0.30 h1:jqPe4T9KEu3CDBI05A2hCMgMSHLtd/E0N0yTF9QreIE=
storj.io/drpc v0.0.30/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
8 changes: 8 additions & 0 deletions private/metaclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ type BeginObjectParams struct {
Redundancy storj.RedundancyScheme
EncryptionParameters storj.EncryptionParameters
ExpiresAt time.Time

EncryptedMetadata []byte
EncryptedMetadataEncryptedKey []byte
EncryptedMetadataNonce storj.Nonce
}

func (params *BeginObjectParams) toRequest(header *pb.RequestHeader) *pb.ObjectBeginRequest {
Expand All @@ -379,6 +383,10 @@ func (params *BeginObjectParams) toRequest(header *pb.RequestHeader) *pb.ObjectB
CipherSuite: pb.CipherSuite(params.EncryptionParameters.CipherSuite),
BlockSize: int64(params.EncryptionParameters.BlockSize),
},

EncryptedMetadata: params.EncryptedMetadata,
EncryptedMetadataEncryptedKey: params.EncryptedMetadataEncryptedKey,
EncryptedMetadataNonce: params.EncryptedMetadataNonce,
}
}

Expand Down
182 changes: 182 additions & 0 deletions private/multipart/multipart.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.

package multipart

import (
"context"
"crypto/rand"
"time"
_ "unsafe" // for go:linkname

"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"

"storj.io/common/base58"
"storj.io/common/encryption"
"storj.io/common/paths"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/uplink"
"storj.io/uplink/private/metaclient"
)

var mon = monkit.Package()

// UploadOptions contains additional options for uploading.
type UploadOptions struct {
// When Expires is zero, there is no expiration.
Expires time.Time

CustomMetadata uplink.CustomMetadata
}

// BeginUpload begins a new multipart upload to bucket and key.
//
// Use project.UploadPart to upload individual parts.
//
// Use project.CommitUpload to finish the upload.
//
// Use project.AbortUpload to cancel the upload at any time.
//
// UploadObject is a convenient way to upload single part objects.
func BeginUpload(ctx context.Context, project *uplink.Project, bucket, key string, options *UploadOptions) (info uplink.UploadInfo, err error) {
defer mon.Task()(&ctx)(&err)

switch {
case bucket == "":
return uplink.UploadInfo{}, convertKnownErrors(metaclient.ErrNoBucket.New(""), bucket, key)
case key == "":
return uplink.UploadInfo{}, convertKnownErrors(metaclient.ErrNoPath.New(""), bucket, key)
}

if options == nil {
options = &UploadOptions{}
}

encPath, err := encryptPath(project, bucket, key)
if err != nil {
return uplink.UploadInfo{}, convertKnownErrors(err, bucket, key)
}

metainfoClient, err := dialMetainfoClient(ctx, project)
if err != nil {
return uplink.UploadInfo{}, convertKnownErrors(err, bucket, key)
}
defer func() { err = errs.Combine(err, metainfoClient.Close()) }()

metadata, err := encryptMetadata(project, bucket, key, options.CustomMetadata)
if err != nil {
return uplink.UploadInfo{}, convertKnownErrors(err, bucket, key)
}

response, err := metainfoClient.BeginObject(ctx, metaclient.BeginObjectParams{
Bucket: []byte(bucket),
EncryptedObjectKey: []byte(encPath.Raw()),
ExpiresAt: options.Expires,
EncryptionParameters: encryptionParameters(project),

EncryptedMetadata: metadata.EncryptedContent,
EncryptedMetadataEncryptedKey: metadata.EncryptedKey,
EncryptedMetadataNonce: metadata.EncryptedKeyNonce,
})
if err != nil {
return uplink.UploadInfo{}, convertKnownErrors(err, bucket, key)
}

encodedStreamID := base58.CheckEncode(response.StreamID[:], 1)
return uplink.UploadInfo{
Key: key,
UploadID: encodedStreamID,
System: uplink.SystemMetadata{
Expires: options.Expires,
},
Custom: options.CustomMetadata,
}, nil
}

type encryptedMetadata struct {
EncryptedContent []byte
EncryptedKey []byte
EncryptedKeyNonce storj.Nonce
}

func encryptMetadata(project *uplink.Project, bucket, key string, metadata uplink.CustomMetadata) (encryptedMetadata, error) {
if len(metadata) == 0 {
return encryptedMetadata{}, nil
}

metadataBytes, err := pb.Marshal(&pb.SerializableMeta{
UserDefined: metadata.Clone(),
})
if err != nil {
return encryptedMetadata{}, errs.Wrap(err)
}

streamInfo, err := pb.Marshal(&pb.StreamInfo{
Metadata: metadataBytes,
})
if err != nil {
return encryptedMetadata{}, errs.Wrap(err)
}

derivedKey, err := deriveContentKey(project, bucket, key)
if err != nil {
return encryptedMetadata{}, errs.Wrap(err)
}

var metadataKey storj.Key
// generate random key for encrypting the segment's content
_, err = rand.Read(metadataKey[:])
if err != nil {
return encryptedMetadata{}, errs.Wrap(err)
}

var encryptedKeyNonce storj.Nonce
// generate random nonce for encrypting the metadata key
_, err = rand.Read(encryptedKeyNonce[:])
if err != nil {
return encryptedMetadata{}, errs.Wrap(err)
}

encryptionParameters := encryptionParameters(project)
encryptedKey, err := encryption.EncryptKey(&metadataKey, encryptionParameters.CipherSuite, derivedKey, &encryptedKeyNonce)
if err != nil {
return encryptedMetadata{}, errs.Wrap(err)
}

// encrypt metadata with the content encryption key and zero nonce.
encryptedStreamInfo, err := encryption.Encrypt(streamInfo, encryptionParameters.CipherSuite, &metadataKey, &storj.Nonce{})
if err != nil {
return encryptedMetadata{}, errs.Wrap(err)
}

// TODO should we commit StreamMeta or commit only encrypted StreamInfo
streamMetaBytes, err := pb.Marshal(&pb.StreamMeta{
EncryptedStreamInfo: encryptedStreamInfo,
})
if err != nil {
return encryptedMetadata{}, errs.Wrap(err)
}

return encryptedMetadata{
EncryptedContent: streamMetaBytes,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
}, nil
}

//go:linkname convertKnownErrors storj.io/uplink.convertKnownErrors
func convertKnownErrors(err error, bucket, key string) error

//go:linkname dialMetainfoClient storj.io/uplink.dialMetainfoClient
func dialMetainfoClient(ctx context.Context, project *uplink.Project) (_ *metaclient.Client, err error)

//go:linkname encryptionParameters storj.io/uplink.encryptionParameters
func encryptionParameters(project *uplink.Project) storj.EncryptionParameters

//go:linkname encryptPath storj.io/uplink.encryptPath
func encryptPath(project *uplink.Project, bucket, key string) (paths.Encrypted, error)

//go:linkname deriveContentKey storj.io/uplink.deriveContentKey
func deriveContentKey(project *uplink.Project, bucket, key string) (*storj.Key, error)
2 changes: 1 addition & 1 deletion testsuite/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
go.uber.org/zap v1.16.0
storj.io/common v0.0.0-20220414110316-a5cb7172d6bf
storj.io/drpc v0.0.30
storj.io/storj v0.12.1-0.20220423063417-0513d4cf3a8b
storj.io/storj v0.12.1-0.20220426080427-e66beb242945
storj.io/uplink v1.8.2-0.20220406151905-7305e5b6da85
)

Expand Down
5 changes: 2 additions & 3 deletions testsuite/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,6 @@ sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
storj.io/common v0.0.0-20220131120956-e74f624a3d55/go.mod h1:m0489td5+rKDdoiYOzCkh3CfGW/cLyntZiYfso+QfMs=
storj.io/common v0.0.0-20220216094301-b27f3c9d69e1/go.mod h1:xW3PPPGBo4bdMtEP9GREnmxQptmJNuDg1tEHcA4zqog=
storj.io/common v0.0.0-20220405183405-ffdc3ab808c6/go.mod h1:LBJrpAqL4MNSrhGEwc8SJ+tIVtgfCtFEZqDy6/0j67A=
storj.io/common v0.0.0-20220414110316-a5cb7172d6bf h1:D5xZTDOlTTQWdAWeKKm2pFLcz1sceH+f/pVAcYB9jL8=
storj.io/common v0.0.0-20220414110316-a5cb7172d6bf/go.mod h1:LBJrpAqL4MNSrhGEwc8SJ+tIVtgfCtFEZqDy6/0j67A=
storj.io/drpc v0.0.29/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
Expand All @@ -894,5 +893,5 @@ storj.io/monkit-jaeger v0.0.0-20220131130547-dc4cb5a0d97a h1:qads+aZlFKm5gUxobfF
storj.io/monkit-jaeger v0.0.0-20220131130547-dc4cb5a0d97a/go.mod h1:DGEycSjvzE0JqcD3+6IjwPEK6x30oOus6AApXzl7t0s=
storj.io/private v0.0.0-20220411161553-f73cd1a48d5f h1:wv4NsI9jAw6ZmM+x/saFEOL9PLVwpY/9as+sIoamZL4=
storj.io/private v0.0.0-20220411161553-f73cd1a48d5f/go.mod h1:fZ7FSXv/adIc79sF/5qm7zn0PI5+PWa5p+dbqrZQARM=
storj.io/storj v0.12.1-0.20220423063417-0513d4cf3a8b h1:7yLb7Z2mwGyyFk1gyzULnhg+kYjaz456HYMP2HO7ES4=
storj.io/storj v0.12.1-0.20220423063417-0513d4cf3a8b/go.mod h1:PcQmysolktWSXZUqUx/kcZ4n9fdCRd652UdRip/s6gk=
storj.io/storj v0.12.1-0.20220426080427-e66beb242945 h1:keaGX1eoF9GSjR6JAL0DucwqFdw91uUVyWSsiy5MzBY=
storj.io/storj v0.12.1-0.20220426080427-e66beb242945/go.mod h1:5sPcmIfAQt/nvOjpQtbI4QaAlrcdD3ia5eH5eeeSztc=
132 changes: 132 additions & 0 deletions testsuite/private/multipart/multipart_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.

package multipart_test

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/uplink"
"storj.io/uplink/private/multipart"
)

func TestBeginUpload(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 0,
UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer ctx.Check(project.Close)

_, err = multipart.BeginUpload(ctx, project, "not-existing-testbucket", "multipart-object", nil)
require.Error(t, err)
require.True(t, errors.Is(err, uplink.ErrBucketNotFound))

err = planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")
require.NoError(t, err)

// assert there is no pending multipart upload
assertUploadList(ctx, t, project, "testbucket", nil)

info, err := multipart.BeginUpload(ctx, project, "testbucket", "multipart-object", nil)
require.NoError(t, err)
require.NotNil(t, info.UploadID)

// assert there is only one pending multipart upload
assertUploadList(ctx, t, project, "testbucket", nil, "multipart-object")

// we allow to start several multipart uploads for the same key
_, err = multipart.BeginUpload(ctx, project, "testbucket", "multipart-object", nil)
require.NoError(t, err)
require.NotNil(t, info.UploadID)

info, err = multipart.BeginUpload(ctx, project, "testbucket", "multipart-object-1", nil)
require.NoError(t, err)
require.NotNil(t, info.UploadID)

// assert there are two pending multipart uploads
assertUploadList(ctx, t, project, "testbucket", nil, "multipart-object", "multipart-object-1")
})
}

func TestBeginUploadWithMetadata(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 0,
UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer ctx.Check(project.Close)

err = planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")
require.NoError(t, err)

expectedMetadata := map[string]uplink.CustomMetadata{
"nil": nil,
"empty": {},
"not-empty": {
"key": "value",
},
}

for name, metadata := range expectedMetadata {
t.Run(name, func(t *testing.T) {
info, err := multipart.BeginUpload(ctx, project, "testbucket", name, &multipart.UploadOptions{
CustomMetadata: metadata,
})
require.NoError(t, err)
require.NotNil(t, info.UploadID)

list := project.ListUploads(ctx, "testbucket", &uplink.ListUploadsOptions{
Prefix: name,
Custom: true,
})
require.True(t, list.Next())

if metadata == nil {
require.Empty(t, list.Item().Custom)
} else {
require.Equal(t, metadata, list.Item().Custom)
}
require.False(t, list.Next())
require.NoError(t, list.Err())
})
}
})
}

func assertUploadList(ctx context.Context, t *testing.T, project *uplink.Project, bucket string, options *uplink.ListUploadsOptions, objectKeys ...string) {
list := project.ListUploads(ctx, bucket, options)
require.NoError(t, list.Err())
require.Nil(t, list.Item())

itemKeys := make(map[string]struct{})
for list.Next() {
require.NoError(t, list.Err())
require.NotNil(t, list.Item())
require.False(t, list.Item().IsPrefix)
itemKeys[list.Item().Key] = struct{}{}
}

for _, objectKey := range objectKeys {
if assert.Contains(t, itemKeys, objectKey) {
delete(itemKeys, objectKey)
}
}

require.Empty(t, itemKeys)

require.False(t, list.Next())
require.NoError(t, list.Err())
require.Nil(t, list.Item())
}

0 comments on commit 5580401

Please sign in to comment.