Skip to content

Commit

Permalink
adding synchronous get
Browse files Browse the repository at this point in the history
  • Loading branch information
so-sahu committed Feb 14, 2024
1 parent cf813ce commit 95cf614
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 22 deletions.
7 changes: 3 additions & 4 deletions pkg/host/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ func (s *Store[E]) Watch(ctx context.Context) (store.Watch[E], error) {
}

func (s *Store[E]) get(id string) (E, error) {
s.idMu.RLock(id)
defer s.idMu.RUnlock(id)

file, err := os.ReadFile(filepath.Join(s.dir, id))
if err != nil {
if !os.IsNotExist(err) {
Expand All @@ -221,10 +224,6 @@ func (s *Store[E]) get(id string) (E, error) {
}

obj := s.newFunc()
if len(file) == 0 {
return obj, err
}

if err := json.Unmarshal(file, &obj); err != nil {
return utils.Zero[E](), fmt.Errorf("failed to unmarshal object: %w", err)
}
Expand Down
62 changes: 45 additions & 17 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,41 +25,69 @@ func NewMutexMap[K comparable]() *MutexMap[K] {
}

type mutexMapEntry struct {
mu sync.Mutex
count int
mu sync.Mutex
readerCnt int
writerCnt int
readerDone *sync.Cond
}

// Lock locks the given key.
// Lock locks the given key for writing.
func (m *MutexMap[K]) Lock(key K) {
m.mu.Lock()
entry := m.entries[key]
if entry == nil {
entry = &mutexMapEntry{}
m.entries[key] = entry
}
entry.count++
entry := m.getOrCreateEntry(key)
entry.writerCnt++
m.mu.Unlock()

entry.mu.Lock()
for entry.readerCnt > 0 {
entry.readerDone.Wait()
}
}

// Unlock unlocks the given key.
// Unlock unlocks the given key for writing.
func (m *MutexMap[K]) Unlock(key K) {
m.mu.Lock()
defer m.mu.Unlock()
entry := m.entries[key]
if entry == nil {
m.mu.Unlock()
panic(fmt.Errorf("unlock: key %v not found", key))
}

entry.count--
if entry.count == 0 {
entry.writerCnt--
if entry.writerCnt == 0 {
delete(m.entries, key)
}
m.mu.Unlock()
entry.mu.Unlock()
}

func (m *MutexMap[K]) RLock(key K) {
m.mu.Lock()
entry := m.getOrCreateEntry(key)
entry.readerCnt++
m.mu.Unlock()
}

func (m *MutexMap[K]) RUnlock(key K) {
m.mu.Lock()
defer m.mu.Unlock()
entry := m.entries[key]
if entry == nil {
return
}
entry.readerCnt--
if entry.readerCnt == 0 && entry.readerDone != nil {
entry.readerDone.Signal()
}
}

func (m *MutexMap[K]) getOrCreateEntry(key K) *mutexMapEntry {
entry, ok := m.entries[key]
if !ok {
entry = &mutexMapEntry{}
entry.readerDone = sync.NewCond(&entry.mu)
m.entries[key] = entry
}
return entry
}

type mutexMapLocker[K comparable] struct {
m *MutexMap[K]
key K
Expand Down Expand Up @@ -98,5 +126,5 @@ func (m *MutexMap[K]) Count(key K) int {
if entry == nil {
return 0
}
return entry.count
return entry.readerCnt + entry.writerCnt
}
2 changes: 1 addition & 1 deletion provider/server/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var _ = BeforeSuite(func() {
Qcow2Type: "exec",
},
NicPlugin: pluginOpts,
ResyncIntervalMachineState: 10 * time.Second,
ResyncIntervalMachineState: 20 * time.Second,
GCVMGracefulShutdownTimeout: 10 * time.Second,
ResyncIntervalGarbageCollector: 5 * time.Second,
}
Expand Down

0 comments on commit 95cf614

Please sign in to comment.