From 810f4082f95c7f6a799ca622150b2fc868613b67 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Mon, 11 Nov 2024 18:57:03 +0000 Subject: [PATCH] Simplify compact loop error handling Rather than restarting the outer loop on unhandled error, simmply break out of the inner loop on any error. We always want to update the compact and target rev, and do post-compact operations - so the only thing we really need to check the error for is logging and metrics. Signed-off-by: Brad Davidson --- pkg/logstructured/sqllog/sql.go | 80 ++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 26 deletions(-) diff --git a/pkg/logstructured/sqllog/sql.go b/pkg/logstructured/sqllog/sql.go index afd62fbc..48e8bfaf 100644 --- a/pkg/logstructured/sqllog/sql.go +++ b/pkg/logstructured/sqllog/sql.go @@ -113,7 +113,6 @@ func (s *SQLLog) compactor(interval time.Duration) { targetCompactRev, _ := s.CurrentRevision(s.ctx) logrus.Tracef("COMPACT starting compactRev=%d targetCompactRev=%d", compactRev, targetCompactRev) -outer: for { select { case <-s.ctx.Done(): @@ -127,14 +126,20 @@ outer: // (several hundred ms) just for the database to execute the subquery to select the revisions to delete. var ( + resultLabel string iterCompactRev int64 + iterStart time.Time + iterCount int64 compactedRev int64 currentRev int64 err error ) + resultLabel = metrics.ResultSuccess iterCompactRev = compactRev compactedRev = compactRev + iterStart = time.Now() + iterCount = 0 for iterCompactRev < targetCompactRev { // Set move iteration target compactBatchSize revisions forward, or @@ -145,54 +150,74 @@ outer: iterCompactRev = targetCompactRev } - compactedRev, currentRev, err = s.compact(compactedRev, iterCompactRev) - if err != nil { - // ErrCompacted indicates that no further work is necessary - either compactRev changed since the - // last iteration because another client has compacted, or the requested revision has already been compacted. - if err == server.ErrCompacted { - break - } - logrus.Errorf("Compact failed: %v", err) - metrics.CompactTotal.WithLabelValues(metrics.ResultError).Inc() - continue outer + // only update the compacted and current revisions if they are valid, + // but break out of the inner loop on any error. + compacted, current, cerr := s.compact(compactedRev, iterCompactRev) + if compacted != 0 && current != 0 { + compactedRev = compacted + currentRev = current } + if cerr != nil { + err = cerr + break + } + iterCount++ } - if err := s.postCompact(); err != nil { - logrus.Errorf("Post-compact operations failed: %v", err) + if iterCount > 0 { + logrus.Infof("COMPACT compacted from %d to %d in %d transactions over %s", compactRev, compactedRev, iterCount, time.Now().Sub(iterStart).Round(time.Millisecond)) + + // post-compact operation errors are not critical, but should be reported + if perr := s.postCompact(); perr != nil { + logrus.Errorf("Post-compact operations failed: %v", perr) + } } - // Record the final results for the outer loop + // Store the final results for this compact interval. + // Note that one or more of the small-batch compact transactions + // may have succeeded and moved the compact revision forward, even if err is non-nil. compactRev = compactedRev targetCompactRev = currentRev - metrics.CompactTotal.WithLabelValues(metrics.ResultSuccess).Inc() + // ErrCompacted indicates that no further work is necessary - either compactRev changed since the + // last iteration because another client has compacted, or the requested revision has already been compacted. + if err != nil && err != server.ErrCompacted { + logrus.Errorf("Compact failed: %v", err) + resultLabel = metrics.ResultError + } + metrics.CompactTotal.WithLabelValues(resultLabel).Inc() } } -// compact removes deleted or replaced rows from the database. compactRev is the revision that was last compacted to. -// If this changes between compactions, we know that someone else has compacted and we don't need to do it. -// targetCompactRev is the revision that we should try to compact to. Upon success, the function returns the revision -// compacted to, and the revision that we should try to compact to next time (the current revision). -// This logic is directly cribbed from k8s.io/apiserver/pkg/storage/etcd3/compact.go +// compact removes deleted or replaced rows from the database, and updates the compact rev key. +// compactRev is the current compact revision; targetCompactRev is the revision to compact to. +// If compactRev does not match what's in the database, we know that someone else has compacted and we don't need to do it. +// Deletion of rows and update of the compact rev key is done within a single transaction. The transaction is rolled back on any error. +// +// On success, the function returns the revision compacted to, and the revision that we should try to compact to next time (the current revision). +// ErrCompacted is returned if the current revision is stale, or the target revision has already been compacted. +// In this case the compact and current revisions from the database are returned. +// On any other error, the returned compact and current revisions should not be used. +// +// This logic is cribbed from k8s.io/apiserver/pkg/storage/etcd3/compact.go func (s *SQLLog) compact(compactRev int64, targetCompactRev int64) (int64, int64, error) { ctx, cancel := context.WithTimeout(s.ctx, compactTimeout) defer cancel() t, err := s.d.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) if err != nil { - return compactRev, targetCompactRev, errors.Wrap(err, "failed to begin transaction") + return 0, 0, errors.Wrap(err, "failed to begin transaction") } defer t.MustRollback() currentRev, err := t.CurrentRevision(s.ctx) if err != nil { - return compactRev, targetCompactRev, errors.Wrap(err, "failed to get current revision") + return 0, 0, errors.Wrap(err, "failed to get current revision") } dbCompactRev, err := t.GetCompactRevision(s.ctx) if err != nil { - return compactRev, targetCompactRev, errors.Wrap(err, "failed to get compact revision") + return 0, 0, errors.Wrap(err, "failed to get compact revision") } // Check to see if another node already compacted. This is normal on a multi-server cluster. @@ -206,7 +231,7 @@ func (s *SQLLog) compact(compactRev int64, targetCompactRev int64) (int64, int64 // Don't bother compacting to a revision that has already been compacted if targetCompactRev <= compactRev { - logrus.Infof("COMPACT revision %d has already been compacted", targetCompactRev) + logrus.Tracef("COMPACT revision %d has already been compacted", targetCompactRev) return dbCompactRev, currentRev, server.ErrCompacted } @@ -215,13 +240,16 @@ func (s *SQLLog) compact(compactRev int64, targetCompactRev int64) (int64, int64 start := time.Now() deletedRows, err := t.Compact(s.ctx, targetCompactRev) if err != nil { - return compactRev, targetCompactRev, errors.Wrapf(err, "failed to compact to revision %d", targetCompactRev) + return 0, 0, errors.Wrapf(err, "failed to compact to revision %d", targetCompactRev) } if err := t.SetCompactRevision(s.ctx, targetCompactRev); err != nil { - return compactRev, targetCompactRev, errors.Wrap(err, "failed to record compact revision") + return 0, 0, errors.Wrap(err, "failed to record compact revision") } + // only commit the transaction if we make it all the way through deleting and + // updating the compact revision without any errors. The deferred rollback + // becomes a no-op if the transaction is committed. t.MustCommit() logrus.Infof("COMPACT deleted %d rows from %d revisions in %s - compacted to %d/%d", deletedRows, (targetCompactRev - compactRev), time.Since(start), targetCompactRev, currentRev)