Skip to content

Commit

Permalink
add more upload session filters
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Dec 11, 2023
1 parent 45774e2 commit 4e264b1
Showing 1 changed file with 143 additions and 2 deletions.
145 changes: 143 additions & 2 deletions services/storage-users/pkg/command/uploads.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package command

import (
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"

"github.com/urfave/cli/v2"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
Expand All @@ -22,6 +26,7 @@ func Uploads(cfg *config.Config) *cli.Command {
Usage: "manage unfinished uploads",
Subcommands: []*cli.Command{
ListUploads(cfg),
ListUploadSessions(cfg),
PurgeExpiredUploads(cfg),
},
}
Expand All @@ -31,7 +36,7 @@ func Uploads(cfg *config.Config) *cli.Command {
func ListUploads(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "list",
Usage: "Print a list of all incomplete uploads",
Usage: "Print a list of all incomplete uploads (deprecated, use sessions)",
Before: func(c *cli.Context) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Expand All @@ -50,7 +55,7 @@ func ListUploads(cfg *config.Config) *cli.Command {

managingFS, ok := fs.(storage.UploadSessionLister)
if !ok {
fmt.Fprintf(os.Stderr, "'%s' storage does not support listing expired uploads\n", cfg.Driver)
fmt.Fprintf(os.Stderr, "'%s' storage does not support listing upload sessions\n", cfg.Driver)
os.Exit(1)
}
falseValue := false
Expand All @@ -69,6 +74,142 @@ func ListUploads(cfg *config.Config) *cli.Command {
}
}

// ListUploadSessions prints a list of upload sessiens
func ListUploadSessions(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "sessions",
Usage: "Print a list of upload sessions",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "id",
Usage: "filter sessions by upload session id",
},
&cli.BoolFlag{
Name: "processing",
DisableDefaultText: true,
Usage: "filter sessions by processing status",
},
&cli.BoolFlag{
Name: "expired",
DisableDefaultText: true,
Usage: "filter sessions by expired status",
},
&cli.StringFlag{
Name: "output",
Usage: "output format to use (can be 'plain' or 'json', experimental)",
Value: "plain",
DefaultText: "plain",
},
},
Before: func(c *cli.Context) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
f, ok := registry.NewFuncs[cfg.Driver]
if !ok {
fmt.Fprintf(os.Stderr, "Unknown filesystem driver '%s'\n", cfg.Driver)
os.Exit(1)
}
drivers := revaconfig.StorageProviderDrivers(cfg)
fs, err := f(drivers[cfg.Driver].(map[string]interface{}), nil)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to initialize filesystem driver '%s'\n", cfg.Driver)
return err
}

managingFS, ok := fs.(storage.UploadSessionLister)
if !ok {
fmt.Fprintf(os.Stderr, "'%s' storage does not support listing upload sessions\n", cfg.Driver)
os.Exit(1)
}
var b strings.Builder
filter := storage.UploadSessionFilter{}
if c.IsSet("processing") {
processingValue := c.Bool("processing")
filter.Processing = &processingValue
if !processingValue {
b.WriteString("Not ")
}
if b.Len() == 0 {
b.WriteString("Processing ")
} else {
b.WriteString("processing ")
}
}
if c.IsSet("expired") {
expiredValue := c.Bool("expired")
filter.Expired = &expiredValue
if !expiredValue {
if b.Len() == 0 {
b.WriteString("Not ")
} else {
b.WriteString(", not ")
}
}
if b.Len() == 0 {
b.WriteString("Expired ")
} else {
b.WriteString("expired ")
}
}
if b.Len() == 0 {
b.WriteString("Sessions")
} else {
b.WriteString("sessions")
}
if c.IsSet("id") {
idValue := c.String("id")
filter.ID = &idValue
b.WriteString(" with id '" + idValue + "'")
}
b.WriteString(":")
uploads, err := managingFS.ListUploadSessions(c.Context, filter)
if err != nil {
return err
}

asJson := c.String("output") == "json"
if !asJson {
fmt.Println(b.String())
}
for _, u := range uploads {
ref := u.Reference()
if asJson {
s := struct {
ID string `json:"id"`
Space string `json:"space"`
Filename string `json:"filename"`
Offset int64 `json:"offset"`
Size int64 `json:"size"`
Executant userpb.UserId `json:"executant"`
SpaceOwner *userpb.UserId `json:"spaceowner,omitempty"`
Expires time.Time `json:"expires"`
Processing bool `json:"processing"`
}{
Space: ref.GetResourceId().GetSpaceId(),
ID: u.ID(),
Filename: u.Filename(),
Offset: u.Offset(),
Size: u.Size(),
Executant: u.Executant(),
SpaceOwner: u.SpaceOwner(),
Expires: u.Expires(),
Processing: u.IsProcessing(),
}
j, err := json.Marshal(s)
if err != nil {
fmt.Println(err)
}
fmt.Println(string(j))
} else {
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
}
}
return nil
},
}
}

// PurgeExpiredUploads is the entry point for the server command.
func PurgeExpiredUploads(cfg *config.Config) *cli.Command {
return &cli.Command{
Expand Down

0 comments on commit 4e264b1

Please sign in to comment.