Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: cardyok <[email protected]>
  • Loading branch information
cardyok committed Jan 24, 2025
1 parent 84ad45c commit 684d011
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 554 deletions.
86 changes: 60 additions & 26 deletions components/accelerator/nvidia/error/xid/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,34 @@ import (
"database/sql"
"sort"
"strconv"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/leptonai/gpud/components"
nvidia_common "github.com/leptonai/gpud/components/accelerator/nvidia/common"
nvidia_component_error_xid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error/xid/id"
"github.com/leptonai/gpud/components/accelerator/nvidia/error/xid/store"
nvidia_query_xid "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid"
"github.com/leptonai/gpud/components/db"
os_id "github.com/leptonai/gpud/components/os/id"
"github.com/leptonai/gpud/components/query"
"github.com/leptonai/gpud/log"
pkg_dmesg "github.com/leptonai/gpud/pkg/dmesg"
)

func New(ctx context.Context, cfg nvidia_common.Config, db *sql.DB) components.Component {
const DefaultRetentionPeriod = 3 * 24 * time.Hour
const DefaultStateUpdatePeriod = 30 * time.Second

func New(ctx context.Context, cfg nvidia_common.Config, dbRW *sql.DB, dbRO *sql.DB) *XIDComponent {

Check warning on line 30 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L30

Added line #L30 was not covered by tests
cfg.Query.SetDefaultsIfNotSet()
setDefaultPoller(cfg)

cctx, ccancel := context.WithCancel(ctx)
getDefaultPoller().Start(cctx, cfg.Query, nvidia_component_error_xid_id.Name)

setHealthyCh := make(chan struct{})
localStore, err := store.New(ctx, db, "components_accelerator_nvidia_error_xid_events")
localStore, err := db.NewStore(dbRW, dbRO, "components_accelerator_nvidia_error_xid_events", DefaultRetentionPeriod)
if err != nil {
log.Logger.Errorw("failed to create store", "error", err)
ccancel()
Expand All @@ -47,6 +51,7 @@ func New(ctx context.Context, cfg nvidia_common.Config, db *sql.DB) components.C
}

func (c *XIDComponent) SetHealthy() error {
log.Logger.Debugw("set healthy event received")
c.setHealthyCh <- struct{}{}
return nil

Check warning on line 56 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L53-L56

Added lines #L53 - L56 were not covered by tests
}
Expand All @@ -59,7 +64,8 @@ type XIDComponent struct {
poller query.Poller
currState components.State
setHealthyCh chan struct{}
store *store.Store
store db.Store
sync.RWMutex
}

func (c *XIDComponent) Name() string { return nvidia_component_error_xid_id.Name }

Check warning on line 71 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L71

Added line #L71 was not covered by tests
Expand All @@ -71,58 +77,77 @@ func (c *XIDComponent) Start() error {
return nil

Check warning on line 77 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L76-L77

Added lines #L76 - L77 were not covered by tests
}
for {
backoffTime := 1 * time.Second
osComponent, err := components.GetComponent(os_id.Name)
if err != nil {
log.Logger.Errorw("failed to get os component", "error", err)
time.Sleep(1 * time.Second)
time.Sleep(backoffTime)

Check warning on line 84 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L79-L84

Added lines #L79 - L84 were not covered by tests
continue
}
osEvents, err := osComponent.Events(c.rootCtx, time.Now().Add(-3*24*time.Hour))
if err != nil {
log.Logger.Errorw("failed to get os states", "error", err)
time.Sleep(1 * time.Second)
time.Sleep(backoffTime)

Check warning on line 90 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L87-L90

Added lines #L87 - L90 were not covered by tests
continue
}
if len(osEvents) < 1 {
log.Logger.Debugw("no os states found")
time.Sleep(1 * time.Second)
continue
}
if !osEvents[0].Time.After(time.Now().Add(-3 * 24 * time.Hour)) {
log.Logger.Debugw("newest reboot event not caught")
time.Sleep(1 * time.Second)
time.Sleep(backoffTime)

Check warning on line 95 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L93-L95

Added lines #L93 - L95 were not covered by tests
continue
}
localEvents, err := c.store.GetAllEvents(c.rootCtx)
localEvents, err := c.store.Get(c.rootCtx, time.Time{})

Check warning on line 98 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L98

Added line #L98 was not covered by tests
if err != nil {
log.Logger.Errorw("failed to get all events", "error", err)
time.Sleep(1 * time.Second)
time.Sleep(backoffTime)
continue

Check warning on line 102 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L100-L102

Added lines #L100 - L102 were not covered by tests
}
events := mergeEvents(osEvents, localEvents)
c.Lock()
c.currState = EvolveHealthyState(events)
c.Unlock()
break

Check warning on line 108 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L104-L108

Added lines #L104 - L108 were not covered by tests
}
go func() {
ticker := time.NewTicker(DefaultStateUpdatePeriod)
defer ticker.Stop()
for {
select {
case <-c.rootCtx.Done():
return
case <-c.setHealthyCh:
count, err := c.store.CreateEvent(c.rootCtx, components.Event{Time: metav1.Time{Time: time.Now().UTC()}, Name: "SetHealthy"})
case <-ticker.C:
osComponent, err := components.GetComponent(os_id.Name)
if err != nil {
log.Logger.Errorw("failed to create event", "error", err)
log.Logger.Errorw("failed to get os component", "error", err)
}
osEvents, err := osComponent.Events(c.rootCtx, time.Now().Add(-3*24*time.Hour))
if err != nil {
log.Logger.Errorw("failed to get os states", "error", err)
}
if len(osEvents) < 1 {
log.Logger.Debugw("no os states found")
}
localEvents, err := c.store.Get(c.rootCtx, time.Time{})
if err != nil {
log.Logger.Errorw("failed to get all events", "error", err)
continue

Check warning on line 132 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L110-L132

Added lines #L110 - L132 were not covered by tests
} else if count == 0 {
log.Logger.Debugw("no new events created")
}
events := mergeEvents(osEvents, localEvents)
c.Lock()
c.currState = EvolveHealthyState(events)
c.Unlock()
case <-c.setHealthyCh:
if err = c.store.Insert(c.rootCtx, components.Event{Time: metav1.Time{Time: time.Now().UTC()}, Name: "SetHealthy"}); err != nil {
log.Logger.Errorw("failed to create event", "error", err)
continue

Check warning on line 141 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L134-L141

Added lines #L134 - L141 were not covered by tests
}
events, err := c.store.GetAllEvents(c.rootCtx)
events, err := c.store.Get(c.rootCtx, time.Time{})
if err != nil {
log.Logger.Errorw("failed to get all events", "error", err)
continue

Check warning on line 146 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L143-L146

Added lines #L143 - L146 were not covered by tests
}
c.Lock()
c.currState = EvolveHealthyState(events)
c.Unlock()
case dmesgLine := <-watcher.Watch():
log.Logger.Debugw("dmesg line", "line", dmesgLine)
ev, err := nvidia_query_xid.ParseDmesgLogLine(metav1.Time{Time: dmesgLine.Timestamp}, dmesgLine.Content)
Expand All @@ -142,33 +167,42 @@ func (c *XIDComponent) Start() error {
EventKeyDeviceUUID: ev.DeviceUUID,
},
}
count, err := c.store.CreateEvent(c.rootCtx, event)
currEvent, err := c.store.Find(c.rootCtx, event)
if err != nil {
log.Logger.Errorw("failed to create event", "error", err)
continue

Check warning on line 173 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L162-L173

Added lines #L162 - L173 were not covered by tests
} else if count == 0 {
}
if currEvent != nil {
log.Logger.Debugw("no new events created")
continue

Check warning on line 177 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L175-L177

Added lines #L175 - L177 were not covered by tests
}
events, err := c.store.GetAllEvents(c.rootCtx)
if err = c.store.Insert(c.rootCtx, event); err != nil {
log.Logger.Errorw("failed to create event", "error", err)
continue

Check warning on line 181 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L179-L181

Added lines #L179 - L181 were not covered by tests
}
events, err := c.store.Get(c.rootCtx, time.Time{})
if err != nil {
log.Logger.Errorw("failed to get all events", "error", err)
continue

Check warning on line 186 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L183-L186

Added lines #L183 - L186 were not covered by tests
}
c.Lock()
c.currState = EvolveHealthyState(events)
c.Unlock()

Check warning on line 190 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L188-L190

Added lines #L188 - L190 were not covered by tests
}
}
}()
return nil

Check warning on line 194 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L194

Added line #L194 was not covered by tests
}

func (c *XIDComponent) States(_ context.Context) ([]components.State, error) {
c.RLock()
defer c.RUnlock()
return []components.State{c.currState}, nil

Check warning on line 200 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L197-L200

Added lines #L197 - L200 were not covered by tests
}

func (c *XIDComponent) Events(ctx context.Context, since time.Time) ([]components.Event, error) {
var ret []components.Event
events, err := c.store.GetEvents(ctx, since)
events, err := c.store.Get(ctx, since)

Check warning on line 205 in components/accelerator/nvidia/error/xid/component.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/error/xid/component.go#L203-L205

Added lines #L203 - L205 were not covered by tests
if err != nil {
return nil, err
}
Expand All @@ -192,7 +226,7 @@ func (c *XIDComponent) Close() error {
return nil
}

// mergeEvents merges two event slices and returns a time ascending sorted new slice
// mergeEvents merges two event slices and returns a time descending sorted new slice
func mergeEvents(a, b []components.Event) []components.Event {
totalLen := len(a) + len(b)
if totalLen == 0 {
Expand All @@ -203,7 +237,7 @@ func mergeEvents(a, b []components.Event) []components.Event {
result = append(result, b...)

sort.Slice(result, func(i, j int) bool {
return result[i].Time.Time.Before(result[j].Time.Time)
return result[i].Time.Time.After(result[j].Time.Time)
})

return result
Expand Down
8 changes: 4 additions & 4 deletions components/accelerator/nvidia/error/xid/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestMergeEvents(t *testing.T) {
assert.Equal(t, tt.expected, len(result))
if len(result) > 1 {
for i := 1; i < len(result); i++ {
assert.True(t, result[i-1].Time.Time.Before(result[i].Time.Time) ||
assert.True(t, result[i-1].Time.Time.After(result[i].Time.Time) ||
result[i-1].Time.Time.Equal(result[i].Time.Time),
"events should be sorted by timestamp")
}
Expand All @@ -95,10 +95,10 @@ func TestMergeEvents(t *testing.T) {
result := mergeEvents(a, b)
assert.Len(t, result, 4)
expectedTimes := []time.Time{
now.Add(-2 * time.Hour),
now.Add(-1 * time.Hour),
now,
now.Add(2 * time.Hour),
now,
now.Add(-1 * time.Hour),
now.Add(-2 * time.Hour),
}
for i, expectedTime := range expectedTimes {
assert.Equal(t, expectedTime.Unix(), result[i].Time.Unix(),
Expand Down
21 changes: 13 additions & 8 deletions components/accelerator/nvidia/error/xid/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,27 @@ const rebootThreshold = 2

// EvolveHealthyState resolves the state of the XID error component.
// note: assume events are sorted by time in ascending order
func EvolveHealthyState(events []components.Event) components.State {
func EvolveHealthyState(events []components.Event) (ret components.State) {
defer func() {
log.Logger.Debugf("EvolveHealthyState: %v", ret)
}()
var lastSuggestedAction *common.SuggestedActions
var lastXidErr *XidError
lastHealth := StateHealthy
xidRebootMap := make(map[uint64]int)
for _, event := range events {
for i := len(events) - 1; i >= 0; i-- {
event := events[i]
log.Logger.Debugf("EvolveHealthyState: event: %v %v", event.Time, event.Name)
if event.Name == EventNameErroXid {
event = resolveXIDEvent(event)
resolvedEvent := resolveXIDEvent(event)
var currXidErr XidError
if err := json.Unmarshal([]byte(event.ExtraInfo[EventKeyErroXidData]), &currXidErr); err != nil {
log.Logger.Errorf("failed to unmarshal event %s %s extra info: %s", event.Name, event.Message, err)
if err := json.Unmarshal([]byte(resolvedEvent.ExtraInfo[EventKeyErroXidData]), &currXidErr); err != nil {
log.Logger.Errorf("failed to unmarshal event %s %s extra info: %s", resolvedEvent.Name, resolvedEvent.Message, err)
continue
}

currEvent := StateHealthy
switch event.Type {
switch resolvedEvent.Type {
case common.EventTypeCritical:
currEvent = StateDegraded
case common.EventTypeFatal:
Expand All @@ -58,7 +63,7 @@ func EvolveHealthyState(events []components.Event) components.State {
lastSuggestedAction = currXidErr.SuggestedActionsByGPUd
}
} else if event.Name == "reboot" {
if lastSuggestedAction != nil && len(lastSuggestedAction.RepairActions) > 0 && lastSuggestedAction.RepairActions[0] == common.RepairActionTypeRebootSystem {
if lastSuggestedAction != nil && len(lastSuggestedAction.RepairActions) > 0 && (lastSuggestedAction.RepairActions[0] == common.RepairActionTypeRebootSystem || lastSuggestedAction.RepairActions[0] == common.RepairActionTypeCheckUserAppAndGPU) {
lastHealth = StateHealthy
lastSuggestedAction = nil
lastXidErr = nil
Expand Down Expand Up @@ -114,7 +119,7 @@ func resolveXIDEvent(event components.Event) components.Event {
return ret
}
ret.Type = detail.EventType
ret.Message = detail.Description
ret.Message = fmt.Sprintf("XID %d detected on %s", currXid, event.ExtraInfo[EventKeyDeviceUUID])
ret.SuggestedActions = detail.SuggestedActionsByGPUd
raw, _ := json.Marshal(&XidError{
Time: event.Time,
Expand Down
4 changes: 2 additions & 2 deletions components/accelerator/nvidia/error/xid/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func TestStateUpdateBasedOnEvents(t *testing.T) {

t.Run("reboot recover", func(t *testing.T) {
events := []components.Event{
createXidEvent(789, common.EventTypeCritical, common.RepairActionTypeRebootSystem),
{Name: "reboot"},
createXidEvent(789, common.EventTypeCritical, common.RepairActionTypeRebootSystem),
}
state := EvolveHealthyState(events)
assert.True(t, state.Healthy)
Expand All @@ -79,8 +79,8 @@ func TestStateUpdateBasedOnEvents(t *testing.T) {

t.Run("SetHealthy", func(t *testing.T) {
events := []components.Event{
createXidEvent(789, common.EventTypeFatal, common.RepairActionTypeRebootSystem),
{Name: "SetHealthy"},
createXidEvent(789, common.EventTypeFatal, common.RepairActionTypeRebootSystem),
}
state := EvolveHealthyState(events)
assert.True(t, state.Healthy)
Expand Down
Loading

0 comments on commit 684d011

Please sign in to comment.