Skip to content

Commit

Permalink
fix(cache): sink cache hang problem (lf-edge#2452)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying authored Dec 6, 2023
1 parent 8dde92a commit cecc341
Showing 1 changed file with 27 additions and 37 deletions.
64 changes: 27 additions & 37 deletions internal/topo/node/cache/sync_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"github.com/lf-edge/ekuiper/pkg/kv"
)

type AckResult bool

// page Rotates storage for in memory cache
// Not thread safe!
type page struct {
Expand Down Expand Up @@ -95,11 +93,10 @@ func (p *page) reset() {

type SyncCache struct {
// The input data to the cache
in <-chan []map[string]interface{}
Out chan []map[string]interface{}
Ack chan bool
cacheCtrl chan interface{} // CacheCtrl is the only place to control the cache; sync in and ack result
errorCh chan<- error
in <-chan []map[string]interface{}
Out chan []map[string]interface{}
Ack chan bool
errorCh chan<- error
// cache config
cacheConf *conf.SinkConf
maxDiskPage int
Expand Down Expand Up @@ -131,7 +128,6 @@ func NewSyncCache(ctx api.StreamContext, in <-chan []map[string]interface{}, err
in: in,
Out: make(chan []map[string]interface{}, bufferLength),
Ack: make(chan bool, 10),
cacheCtrl: make(chan interface{}, 10),
errorCh: errCh,
maxMemPage: cacheConf.MemoryCacheThreshold / cacheConf.BufferPageSize,
memCache: make([]*page, 0),
Expand Down Expand Up @@ -159,38 +155,32 @@ func (c *SyncCache) run(ctx api.StreamContext) {
for {
select {
case item := <-c.in:
ctx.GetLogger().Debugf("send to cache")
c.cacheCtrl <- item
ctx.GetLogger().Debugf("adding cache %v", item)
// hack here: nil is a signal to continue sending, so not adding nil to cache
if item != nil {
c.addCache(ctx, item)
} else {
ctx.GetLogger().Debug("nil cache, continue sending")
}
if c.sendStatus == 2 {
c.sendStatus = 0
ctx.GetLogger().Debug("send status to 0 after adding cache in error state")
}
ctx.GetLogger().Debugf("cache status %d", c.sendStatus)
if c.sendStatus == 0 {
c.send(ctx)
}
case isSuccess := <-c.Ack:
// only send the next sink after receiving an ack
ctx.GetLogger().Debugf("cache ack")
c.cacheCtrl <- AckResult(isSuccess)
case data := <-c.cacheCtrl: // The only place to manipulate cache
switch r := data.(type) {
case AckResult:
if r {
ctx.GetLogger().Debugf("deleting cache")
c.deleteCache(ctx)
c.sendStatus = 0
ctx.GetLogger().Debug("send status to 0 after true ack")
} else {
c.sendStatus = 2
ctx.GetLogger().Debug("send status to 2 after false ack")
}
case []map[string]interface{}:
ctx.GetLogger().Debugf("adding cache %v", data)
// hack here: nil is a signal to continue sending, so not adding nil to cache
if r != nil {
c.addCache(ctx, r)
} else {
ctx.GetLogger().Debug("nil cache, continue sending")
}
if c.sendStatus == 2 {
c.sendStatus = 0
ctx.GetLogger().Debug("send status to 0 after adding cache in error state")
}
default:
ctx.GetLogger().Errorf("unknown cache control command %v", data)
if isSuccess {
ctx.GetLogger().Debugf("deleting cache")
c.deleteCache(ctx)
c.sendStatus = 0
ctx.GetLogger().Debug("send status to 0 after true ack")
} else {
c.sendStatus = 2
ctx.GetLogger().Debug("send status to 2 after false ack")
}
ctx.GetLogger().Debugf("cache status %d", c.sendStatus)
if c.sendStatus == 0 {
Expand Down

0 comments on commit cecc341

Please sign in to comment.