Skip to content

Commit

Permalink
Merge pull request #4402 from butonic/dedicated-upload-session-struct
Browse files Browse the repository at this point in the history
use dedicated upload session struct
  • Loading branch information
butonic authored Dec 21, 2023
2 parents 44ba5ee + 165e4d0 commit 6fe91cb
Show file tree
Hide file tree
Showing 12 changed files with 1,023 additions and 919 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/refactor-upload-session.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: refactor upload session

We refactored the upload session code to make it reusable, kill a lot of code and save some stat requests

https://github.com/cs3org/reva/pull/4402
24 changes: 13 additions & 11 deletions pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/cache"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/mitchellh/mapstructure"
)
Expand Down Expand Up @@ -100,22 +99,25 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
return nil, err
}

if _, ok := fs.(storage.UploadSessionLister); ok {
if usl, ok := fs.(storage.UploadSessionLister); ok {
// We can currently only send updates if the fs is decomposedfs as we read very specific keys from the storage map of the tus info
go func() {
for {
ev := <-handler.CompleteUploads
// We should be able to get the upload progress with fs.GetUploadProgress, but currently tus will erase the info files
// so we create a Progress instance here that is used to read the correct properties
up := upload.Progress{
Info: ev.Upload,
}
executant := up.Executant()
ref := up.Reference()
datatx.InvalidateCache(&executant, &ref, m.statCache)
if m.publisher != nil {
if err := datatx.EmitFileUploadedEvent(up.SpaceOwner(), &executant, &ref, m.publisher); err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event")
ups, err := usl.ListUploadSessions(context.Background(), storage.UploadSessionFilter{ID: &ev.Upload.ID})
if err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Str("session", ev.Upload.ID).Msg("failed to list upload session")
} else {
up := ups[0]
executant := up.Executant()
ref := up.Reference()
datatx.InvalidateCache(&executant, &ref, m.statCache)
if m.publisher != nil {
if err := datatx.EmitFileUploadedEvent(up.SpaceOwner(), &executant, &ref, m.publisher); err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event")
}
}
}
}
Expand Down
66 changes: 43 additions & 23 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/jellydator/ttlcache/v2"
"github.com/pkg/errors"
tusd "github.com/tus/tusd/pkg/handler"
microstore "go-micro.dev/v4/store"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -104,6 +105,26 @@ type Tree interface {
Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error)
}

// Session is the interface that OcisSession implements. By combining tus.Upload,
// storage.UploadSession and custom functions we can reuse the same struct throughout
// the whole upload lifecycle.
//
// Some functions that are only used by decomposedfs are not yet part of this interface.
// They might be added after more refactoring.
type Session interface {
tusd.Upload
storage.UploadSession
upload.Session
LockID() string
}

type SessionStore interface {
New(ctx context.Context) *upload.OcisSession
List(ctx context.Context) ([]*upload.OcisSession, error)
Get(ctx context.Context, id string) (*upload.OcisSession, error)
Cleanup(ctx context.Context, session upload.Session, failure bool, keepUpload bool)
}

// Decomposedfs provides the base for decomposed filesystem implementations
type Decomposedfs struct {
lu *lookup.Lookup
Expand All @@ -113,6 +134,7 @@ type Decomposedfs struct {
chunkHandler *chunking.ChunkHandler
stream events.Stream
cache cache.StatCache
sessionStore SessionStore

UserCache *ttlcache.Cache
userSpaceIndex *spaceidindex.Index
Expand Down Expand Up @@ -211,6 +233,7 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event
userSpaceIndex: userSpaceIndex,
groupSpaceIndex: groupSpaceIndex,
spaceTypeIndex: spaceTypeIndex,
sessionStore: upload.NewSessionStore(lu, tp, o.Root, es, o.AsyncFileUploads, o.Tokens),
}

if o.AsyncFileUploads {
Expand Down Expand Up @@ -245,7 +268,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
for event := range ch {
switch ev := event.Event.(type) {
case events.PostprocessingFinished:
up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
session, err := fs.sessionStore.Get(ctx, ev.UploadID)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload")
continue // NOTE: since we can't get the upload, we can't delete the blob
Expand All @@ -256,12 +279,11 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
keepUpload bool
)

n, err := node.ReadNode(ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false, nil, true)
n, err := session.Node(ctx)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node")
continue
}
up.Node = n

switch ev.Outcome {
default:
Expand All @@ -272,7 +294,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
keepUpload = true
metrics.UploadSessionsAborted.Inc()
case events.PPOutcomeContinue:
if err := up.Finalize(); err != nil {
if err := session.Finalize(); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload")
keepUpload = true // should we keep the upload when assembling failed?
failed = true
Expand All @@ -285,7 +307,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
}

getParent := func() *node.Node {
p, err := up.Node.Parent(ctx)
p, err := n.Parent(ctx)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read parent")
return nil
Expand All @@ -296,7 +318,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
now := time.Now()
if failed {
// propagate sizeDiff after failed postprocessing
if err := fs.tp.Propagate(ctx, up.Node, -up.SizeDiff); err != nil {
if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change")
}
} else if p := getParent(); p != nil {
Expand All @@ -307,7 +329,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
}
}

upload.Cleanup(up, failed, keepUpload)
fs.sessionStore.Cleanup(ctx, session, failed, keepUpload)

// remove cache entry in gateway
fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
Expand All @@ -322,11 +344,11 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
Filename: ev.Filename,
FileRef: &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: up.Info.MetaData["providerID"],
SpaceId: up.Info.Storage["SpaceRoot"],
OpaqueId: up.Info.Storage["SpaceRoot"],
StorageId: session.ProviderID(),
SpaceId: session.SpaceID(),
OpaqueId: session.SpaceID(),
},
Path: utils.MakeRelativePath(filepath.Join(up.Info.MetaData["dir"], up.Info.MetaData["filename"])),
Path: utils.MakeRelativePath(filepath.Join(session.Dir(), session.Filename())),
},
Timestamp: utils.TimeToTS(now),
SpaceOwner: n.SpaceOwnerOrManager(ctx),
Expand All @@ -335,17 +357,17 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event")
}
case events.RestartPostprocessing:
up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
session, err := fs.sessionStore.Get(ctx, ev.UploadID)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload")
continue
}
n, err := node.ReadNode(ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false, nil, true)
n, err := session.Node(ctx)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node")
continue
}
s, err := up.URL(up.Ctx)
s, err := session.URL(ctx)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url")
continue
Expand All @@ -355,13 +377,13 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {

// restart postprocessing
if err := events.Publish(ctx, fs.stream, events.BytesReceived{
UploadID: up.Info.ID,
UploadID: session.ID(),
URL: s,
SpaceOwner: n.SpaceOwnerOrManager(up.Ctx),
SpaceOwner: n.SpaceOwnerOrManager(ctx),
ExecutingUser: &user.User{Id: &user.UserId{OpaqueId: "postprocessing-restart"}}, // send nil instead?
ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID},
Filename: up.Info.Storage["NodeName"],
Filesize: uint64(up.Info.Size),
Filename: session.Filename(),
Filesize: uint64(session.Size()),
}); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish BytesReceived event")
}
Expand Down Expand Up @@ -460,19 +482,17 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
*/
default:
// uploadid is not empty -> this is an async upload
up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
session, err := fs.sessionStore.Get(ctx, ev.UploadID)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload")
continue
}

no, err := node.ReadNode(up.Ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false, nil, false)
n, err = session.Node(ctx)
if err != nil {
log.Error().Err(err).Interface("uploadID", ev.UploadID).Msg("Failed to get node after scan")
continue
}

n = no
}

if err := n.SetScanData(ctx, res.Description, res.Scandate); err != nil {
Expand Down Expand Up @@ -1031,7 +1051,7 @@ func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error getting mtime for '"+n.ID+"'")
}
currentEtag, err := node.CalculateEtag(n, mtime)
currentEtag, err := node.CalculateEtag(n.ID, mtime)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error calculating etag for '"+n.ID+"'")
}
Expand Down
25 changes: 16 additions & 9 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,14 +512,9 @@ func (n *Node) LockFilePath() string {
}

// CalculateEtag returns a hash of fileid + tmtime (or mtime)
func CalculateEtag(n *Node, tmTime time.Time) (string, error) {
return calculateEtag(n, tmTime)
}

// calculateEtag returns a hash of fileid + tmtime (or mtime)
func calculateEtag(n *Node, tmTime time.Time) (string, error) {
func CalculateEtag(id string, tmTime time.Time) (string, error) {
h := md5.New()
if _, err := io.WriteString(h, n.ID); err != nil {
if _, err := io.WriteString(h, id); err != nil {
return "", err
}
/* TODO we could strengthen the etag by adding the blobid, but then all etags would change. we would need a legacy etag check as well
Expand Down Expand Up @@ -562,7 +557,7 @@ func (n *Node) SetEtag(ctx context.Context, val string) (err error) {
return
}
var etag string
if etag, err = calculateEtag(n, tmTime); err != nil {
if etag, err = CalculateEtag(n.ID, tmTime); err != nil {
return
}

Expand Down Expand Up @@ -673,7 +668,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi
// use temporary etag if it is set
if b, err := n.XattrString(ctx, prefixes.TmpEtagAttr); err == nil && b != "" {
ri.Etag = fmt.Sprintf(`"%x"`, b)
} else if ri.Etag, err = calculateEtag(n, tmTime); err != nil {
} else if ri.Etag, err = CalculateEtag(n.ID, tmTime); err != nil {
sublog.Debug().Err(err).Msg("could not calculate etag")
}

Expand Down Expand Up @@ -1150,6 +1145,18 @@ func (n *Node) DeleteGrant(ctx context.Context, g *provider.Grant, acquireLock b
return nil
}

// Purge removes a node from disk. It does not move it to the trash
func (n *Node) Purge(ctx context.Context) error {
// remove node
if err := utils.RemoveItem(n.InternalPath()); err != nil {
return err
}

// remove child entry in parent
src := filepath.Join(n.ParentPath(), n.Name)
return os.Remove(src)
}

// ListGrants lists all grants of the current node.
func (n *Node) ListGrants(ctx context.Context) ([]*provider.Grant, error) {
grantees, err := n.ListGrantees(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen
appctx.GetLogger(ctx).Error().Err(err).Str("name", fi.Name()).Msg("error reading blobsize xattr, using 0")
}
rev.Size = uint64(blobSize)
etag, err := node.CalculateEtag(n, mtime)
etag, err := node.CalculateEtag(n.ID, mtime)
if err != nil {
return nil, errors.Wrapf(err, "error calculating etag")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node,
}
}

etag, err := node.CalculateEtag(n, tmtime)
etag, err := node.CalculateEtag(n.ID, tmtime)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 6fe91cb

Please sign in to comment.