Skip to content

Commit

Permalink
chore: add helpers to log blocked sends and receives
Browse files Browse the repository at this point in the history
These are occasionally useful, and I currently need them to debug the
loopback network failures.
  • Loading branch information
mark-rushakoff committed Apr 23, 2024
1 parent 72b617c commit 6316c0b
Showing 1 changed file with 107 additions and 0 deletions.
107 changes: 107 additions & 0 deletions internal/gchan/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package gchan
import (
"context"
"log/slog"
"time"
)

// SendC selects between ctx.Done and sending val to out.
Expand All @@ -22,6 +23,59 @@ func SendC[T any](ctx context.Context, log *slog.Logger, out chan<- T, val T, ca
}
}

// SendCLogBlocked behaves similar to [SendC] but logs if the send is blocked longer than tolerableBlockDuration.
// In that case, it logs the total blocked duration when the send eventually completes
// or when ctx is canceled.
//
// If the send completes successfully within tolerableBlockDuration,
// nothing is logged, matching SendC's behavior.
//
// This is useful for test helpers but generally should be avoided in production code.
func SendCLogBlocked[T any](
ctx context.Context, log *slog.Logger,
out chan<- T, val T,
during string,
tolerableBlockDuration time.Duration,
) (sent bool) {
start := time.Now()

if tolerableBlockDuration <= 0 {
// Don't set up a timer if the caller wants to log immediately on blocked send.
select {
case <-ctx.Done():
log.Info("Context canceled while "+during, "cause", context.Cause(ctx))
return false
case out <- val:
return true
default:
log.Info("Blocked on initial send attempt while " + during)
}
} else {
timer := time.NewTimer(tolerableBlockDuration)
defer timer.Stop()

select {
case <-ctx.Done():
log.Info("Context canceled while "+during, "cause", context.Cause(ctx))
return false
case out <- val:
return true
case <-timer.C:
log.Info("Blocked on send attempt while "+during, "dur", tolerableBlockDuration)
}
}

// Now we block for the rest of the send.
select {
case <-ctx.Done():
log.Info("Context canceled while "+during, "cause", context.Cause(ctx), "blocked_duration", time.Since(start))
return false
case out <- val:
log.Info("Succesfully sent while "+during, "blocked_duration", time.Since(start))
return true
}
}

// RecvC selects between ctx.Done and receiving from in.
// If ctx is canceled before the receive from in completes,
// RecvC logs the message "Context canceled while " + canceledDuring,
Expand All @@ -37,6 +91,59 @@ func RecvC[T any](ctx context.Context, log *slog.Logger, in <-chan T, canceledDu
}
}

// RecvCLogBlocked behaves similar to [RecvC] but logs if the receive is blocked longer than tolerableBlockDuration.
// In that case, it logs the total blocked duration when the receive eventually completes
// or when ctx is canceled.
//
// If the receive completes successfully within tolerableBlockDuration,
// nothing is logged, matching RecvC's behavior.
//
// This is useful for test helpers but generally should be avoided in production code.
func RecvCLogBlocked[T any](
ctx context.Context, log *slog.Logger,
in <-chan T,
during string,
tolerableBlockDuration time.Duration,
) (val T, received bool) {
start := time.Now()

if tolerableBlockDuration <= 0 {
// Don't set up a timer if the caller wants to log immediately on blocked receive.
select {
case <-ctx.Done():
log.Info("Context canceled while "+during, "cause", context.Cause(ctx))
return val, false
case val := <-in:
return val, true
default:
log.Info("Blocked on initial receive attempt while " + during)
}
} else {
timer := time.NewTimer(tolerableBlockDuration)
defer timer.Stop()

select {
case <-ctx.Done():
log.Info("Context canceled while "+during, "cause", context.Cause(ctx))
return val, false
case val := <-in:
return val, true
case <-timer.C:
log.Info("Blocked on initial receive attempt while "+during, "dur", tolerableBlockDuration)
}
}

// Now we block for the rest of the receive.
select {
case <-ctx.Done():
log.Info("Context canceled while "+during, "cause", context.Cause(ctx), "blocked_duration", time.Since(start))
return val, false
case val := <-in:
log.Info("Successfully received while "+during, "blocked_duration", time.Since(start))
return val, true
}
}

// ReqResp performs a blocking send of reqValue to reqChan,
// then waits to receive a value from respChan.
// If ctx is canceled during either operation,
Expand Down

0 comments on commit 6316c0b

Please sign in to comment.