-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdelete.go
69 lines (59 loc) · 1.69 KB
/
delete.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package klevdb
import (
"context"
"time"
"golang.org/x/exp/maps"
)
// DeleteMultiBackoff is call on each iteration of
// DeleteMulti to give applications opportunity to not overload
// the target log with deletes
type DeleteMultiBackoff func(context.Context) error
// DeleteMultiWithWait returns a backoff func that sleeps/waits
// for a certain duration. If context is canceled while executing
// it returns the associated error
func DeleteMultiWithWait(d time.Duration) DeleteMultiBackoff {
return func(ctx context.Context) error {
t := time.NewTimer(d)
defer t.Stop()
select {
case <-t.C:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
}
// DeleteMulti tries to delete all messages with offsets
//
// from the log and returns the amount of storage deleted
//
// If error is encountered, it will return the deleted offsets
//
// and size, together with the error
//
// DeleteMultiBackoff is called on each iteration to give
//
// others a chanse to work with the log, while being deleted
func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) {
var deletedOffsets = map[int64]struct{}{}
var deletedSize int64
for len(offsets) > 0 {
deleted, size, err := l.Delete(offsets)
switch {
case err != nil:
return deletedOffsets, deletedSize, err
case len(deleted) == 0:
return deletedOffsets, deletedSize, nil
}
maps.Copy(deletedOffsets, deleted)
deletedSize += size
maps.DeleteFunc(offsets, func(k int64, v struct{}) bool {
_, ok := deleted[k]
return ok
})
if err := backoff(ctx); err != nil {
return deletedOffsets, deletedSize, err
}
}
return deletedOffsets, deletedSize, nil
}