Skip to content

Commit

Permalink
Merge pull request #4424 from kobergj/AvoidConcurrentMapwrites
Browse files Browse the repository at this point in the history
Avoid concurrent mapwrites in receivedsharecache
  • Loading branch information
kobergj authored Dec 20, 2023
2 parents 6e8505c + e49b11c commit 073f9b9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 20 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/avoid-panic-in-cache.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: fixed panic in receivedsharecache pkg

The receivedsharecache pkg would sometime run into concurrent map writes. This is fixed by using maptimesyncedcache pkg instead of a plain map.

https://github.com/cs3org/reva/pull/4424
43 changes: 23 additions & 20 deletions pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/mtimesyncedcache"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
Expand All @@ -46,7 +47,7 @@ const tracerName = "receivedsharecache"
type Cache struct {
lockMap sync.Map

ReceivedSpaces map[string]*Spaces
ReceivedSpaces mtimesyncedcache.Map[string, *Spaces]

storage metadata.Storage
ttl time.Duration
Expand Down Expand Up @@ -74,7 +75,7 @@ type State struct {
// New returns a new Cache instance
func New(s metadata.Storage, ttl time.Duration) Cache {
return Cache{
ReceivedSpaces: map[string]*Spaces{},
ReceivedSpaces: mtimesyncedcache.Map[string, *Spaces]{},
storage: s,
ttl: ttl,
lockMap: sync.Map{},
Expand All @@ -97,7 +98,7 @@ func (c *Cache) Add(ctx context.Context, userID, spaceID string, rs *collaborati
span.SetAttributes(attribute.String("cs3.userid", userID))
defer unlock()

if c.ReceivedSpaces[userID] == nil {
if _, ok := c.ReceivedSpaces.Load(userID); !ok {
err := c.syncWithLock(ctx, userID)
if err != nil {
return err
Expand All @@ -111,7 +112,8 @@ func (c *Cache) Add(ctx context.Context, userID, spaceID string, rs *collaborati
persistFunc := func() error {
c.initializeIfNeeded(userID, spaceID)

receivedSpace := c.ReceivedSpaces[userID].Spaces[spaceID]
rss, _ := c.ReceivedSpaces.Load(userID)
receivedSpace := rss.Spaces[spaceID]
if receivedSpace.States == nil {
receivedSpace.States = map[string]*State{}
}
Expand Down Expand Up @@ -171,10 +173,11 @@ func (c *Cache) Get(ctx context.Context, userID, spaceID, shareID string) (*Stat
if err != nil {
return nil, err
}
if c.ReceivedSpaces[userID] == nil || c.ReceivedSpaces[userID].Spaces[spaceID] == nil {
rss, ok := c.ReceivedSpaces.Load(userID)
if !ok || rss.Spaces[spaceID] == nil {
return nil, nil
}
return c.ReceivedSpaces[userID].Spaces[spaceID].States[shareID], nil
return rss.Spaces[spaceID].States[shareID], nil
}

// List returns a list of received shares for a given user
Expand All @@ -192,7 +195,8 @@ func (c *Cache) List(ctx context.Context, userID string) (map[string]*Space, err
}

spaces := map[string]*Space{}
for spaceID, space := range c.ReceivedSpaces[userID].Spaces {
rss, _ := c.ReceivedSpaces.Load(userID)
for spaceID, space := range rss.Spaces {
spaceCopy := &Space{
States: map[string]*State{},
}
Expand Down Expand Up @@ -220,9 +224,10 @@ func (c *Cache) syncWithLock(ctx context.Context, userID string) error {
jsonPath := userJSONPath(userID)
span.AddEvent("updating cache")
// - update cached list of created shares for the user in memory if changed
rss, _ := c.ReceivedSpaces.Load(userID)
dlres, err := c.storage.Download(ctx, metadata.DownloadRequest{
Path: jsonPath,
IfNoneMatch: []string{c.ReceivedSpaces[userID].etag},
IfNoneMatch: []string{rss.etag},
})
switch err.(type) {
case nil:
Expand All @@ -248,7 +253,7 @@ func (c *Cache) syncWithLock(ctx context.Context, userID string) error {
}
newSpaces.etag = dlres.Etag

c.ReceivedSpaces[userID] = newSpaces
c.ReceivedSpaces.Store(userID, newSpaces)
span.SetStatus(codes.Ok, "")
return nil
}
Expand All @@ -259,12 +264,13 @@ func (c *Cache) persist(ctx context.Context, userID string) error {
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))

if c.ReceivedSpaces[userID] == nil {
rss, ok := c.ReceivedSpaces.Load(userID)
if !ok {
span.SetStatus(codes.Ok, "no received shares")
return nil
}

createdBytes, err := json.Marshal(c.ReceivedSpaces[userID])
createdBytes, err := json.Marshal(rss)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
Expand All @@ -280,11 +286,11 @@ func (c *Cache) persist(ctx context.Context, userID string) error {
ur := metadata.UploadRequest{
Path: jsonPath,
Content: createdBytes,
IfMatchEtag: c.ReceivedSpaces[userID].etag,
IfMatchEtag: rss.etag,
}
// when there is no etag in memory make sure the file has not been created on the server, see https://www.rfc-editor.org/rfc/rfc9110#field.if-match
// > If the field value is "*", the condition is false if the origin server has a current representation for the target resource.
if c.ReceivedSpaces[userID].etag == "" {
if rss.etag == "" {
ur.IfNoneMatch = []string{"*"}
}

Expand All @@ -303,12 +309,9 @@ func userJSONPath(userID string) string {
}

func (c *Cache) initializeIfNeeded(userID, spaceID string) {
if c.ReceivedSpaces[userID] == nil {
c.ReceivedSpaces[userID] = &Spaces{
Spaces: map[string]*Space{},
}
}
if spaceID != "" && c.ReceivedSpaces[userID].Spaces[spaceID] == nil {
c.ReceivedSpaces[userID].Spaces[spaceID] = &Space{}
rss, _ := c.ReceivedSpaces.LoadOrStore(userID, &Spaces{Spaces: map[string]*Space{}})
if spaceID != "" && rss.Spaces[spaceID] == nil {
rss.Spaces[spaceID] = &Space{}
c.ReceivedSpaces.Store(userID, rss)
}
}

0 comments on commit 073f9b9

Please sign in to comment.