Skip to content

Commit

Permalink
Log stream responses with progress.
Browse files Browse the repository at this point in the history
  • Loading branch information
carlbraganza committed Oct 9, 2024
1 parent ff25e03 commit 92990e8
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 17 deletions.
27 changes: 27 additions & 0 deletions pkg/internal/server/grpc/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package grpc

import (
"context"
"flag"
"net"
"strconv"
"strings"
"testing"

Expand All @@ -37,6 +39,7 @@ import (
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/klog/v2"

smsv1alpha1 "github.com/kubernetes-csi/external-snapshot-metadata/client/apis/snapshotmetadataservice/v1alpha1"
fakecbt "github.com/kubernetes-csi/external-snapshot-metadata/client/clientset/versioned/fake"
Expand Down Expand Up @@ -407,3 +410,27 @@ func convStringByteMapToStringStringMap(inMap map[string][]byte) map[string]stri
}
return ret
}

type KlogRestoreVerbosityFunc func()

// SetKlogVerbosity sets up the default logger with the specified verbosity level.
func (th *testHarness) SetKlogVerbosity(verboseLevel int, uniquePrefix string) KlogRestoreVerbosityFunc {
klog.ClearLogger()
// Set the verbosity level using a new flag set.
// It is not possible to set a verbose klog/v2/testlogger as the background logger
// because the klog.V() performs its own checks.
var level klog.Level
level.Set(strconv.Itoa(verboseLevel))
fs := flag.NewFlagSet(uniquePrefix+"Fs1", flag.ContinueOnError)
fs.Var(&level, uniquePrefix+"V1", "test log verbosity level")
klog.InitFlags(fs)

return func() {
// restore the verbosity level using a new flag set
klog.ClearLogger()
fs := flag.NewFlagSet(uniquePrefix+"Fs2", flag.ExitOnError)
level.Set("1")
fs.Var(&level, uniquePrefix+"V2", "test log verbosity level")
klog.InitFlags(fs)
}
}
64 changes: 60 additions & 4 deletions pkg/internal/server/grpc/get_metadata_allocated.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package grpc

import (
"context"
"fmt"
"io"
"strings"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -114,22 +116,76 @@ func (s *Server) convertToCSIGetMetadataAllocatedRequest(ctx context.Context, re
}

func (s *Server) streamGetMetadataAllocatedResponse(ctx context.Context, clientStream api.SnapshotMetadata_GetMetadataAllocatedServer, csiStream csi.SnapshotMetadata_GetMetadataAllocatedClient) error {
var (
blockMetadataType api.BlockMetadataType
lastByteOffset int64
lastSize int64
logger = klog.FromContext(ctx)
numBlockMetadata int
responseNum int
volumeCapacityBytes int64
)

for {
csiResp, err := csiStream.Recv()
if err == io.EOF {
klog.FromContext(ctx).V(HandlerTraceLogLevel).Info("stream EOF")
logger.V(HandlerTraceLogLevel).WithValues(
"blockMetadataType", blockMetadataType.String(),
"lastByteOffset", lastByteOffset,
"lastSize", lastSize,
"lastResponseNum", responseNum,
"volumeCapacityBytes", volumeCapacityBytes,
).Info("stream EOF")
return nil
}

//TODO: stream logging with progress

if err != nil {
logger.WithValues(
"blockMetadataType", blockMetadataType.String(),
"lastByteOffset", lastByteOffset,
"lastSize", lastSize,
"lastResponseNum", responseNum,
"volumeCapacityBytes", volumeCapacityBytes,
).Error(err, msgInternalFailedCSIDriverResponse)
return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedCSIDriverResponseFmt, err)
}

responseNum++

clientResp := s.convertToGetMetadataAllocatedResponse(csiResp)
blockMetadataType = clientResp.BlockMetadataType
volumeCapacityBytes = clientResp.VolumeCapacityBytes
numBlockMetadata = len(clientResp.BlockMetadata) - 1
if numBlockMetadata >= 0 {
lastByteOffset = clientResp.BlockMetadata[numBlockMetadata].ByteOffset
lastSize = clientResp.BlockMetadata[numBlockMetadata].SizeBytes
}

if logger.V(HandlerTraceLogLevel).Enabled() {
var b strings.Builder
b.WriteString("[")
for _, bmd := range clientResp.BlockMetadata {
b.WriteString(fmt.Sprintf("{%d,%d}", bmd.ByteOffset, bmd.SizeBytes))
}
b.WriteString("]")
logger.WithValues(
"blockMetadataType", blockMetadataType.String(),
"responseNum", responseNum,
"volumeCapacityBytes", volumeCapacityBytes,
"blockMetadata", b.String(),
"numBlockMetadata", len(clientResp.BlockMetadata),
).Info("stream response")
}

if err := clientStream.Send(clientResp); err != nil {
return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedtoSendResponseFmt, err)
logger.WithValues(
"blockMetadataType", blockMetadataType.String(),
"lastByteOffset", lastByteOffset,
"lastSize", lastSize,
"responseNum", responseNum,
"volumeCapacityBytes", volumeCapacityBytes,
).Error(err, msgInternalFailedToSendResponse)
return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedToSendResponseFmt, err)
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/internal/server/grpc/get_metadata_allocated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,13 +482,16 @@ type mockCSIMetadataAllocatedResponse struct {
}

func TestStreamGetMetadataAllocatedResponse(t *testing.T) {
ctx := context.Background()
th := newTestHarness().WithMockCSIDriver(t).WithFakeClientAPIs()
defer th.TerminateMockCSIDriver()

grpcServer := th.StartGRPCServer(t, th.Runtime())
defer th.StopGRPCServer(t)

// Test at the trace logging level.
restoreFn := th.SetKlogVerbosity(HandlerTraceLogLevel, "Alloc")
defer restoreFn()

for _, tc := range []struct {
name string
req *api.GetMetadataAllocatedRequest
Expand Down Expand Up @@ -698,15 +701,16 @@ func TestStreamGetMetadataAllocatedResponse(t *testing.T) {
mockCSIStream.EXPECT().Recv().Return(nil, io.EOF)
}

sms := &fakeStreamServerSnapshotAllocated{err: tc.mockK8sStreamError}
ctx := grpcServer.getMetadataAllocatedContextWithLogger(tc.req, sms)

csiReq, err := grpcServer.convertToCSIGetMetadataAllocatedRequest(ctx, tc.req)
assert.NoError(t, err)

csiStream, err := csiClient.GetMetadataAllocated(ctx, csiReq)
assert.NoError(t, err)

sms := &fakeStreamServerSnapshotAllocated{err: tc.mockK8sStreamError}

errStream := grpcServer.streamGetMetadataAllocatedResponse(sms, csiStream)
errStream := grpcServer.streamGetMetadataAllocatedResponse(ctx, sms, csiStream)
if tc.expectStreamError {
assert.NoError(t, err)
st, ok := status.FromError(errStream)
Expand All @@ -729,6 +733,10 @@ type fakeStreamServerSnapshotAllocated struct {
err error
}

func (f *fakeStreamServerSnapshotAllocated) Context() context.Context {
return context.Background()
}

func (f *fakeStreamServerSnapshotAllocated) Send(m *api.GetMetadataAllocatedResponse) error {
if f.err != nil {
return f.err
Expand Down
64 changes: 60 additions & 4 deletions pkg/internal/server/grpc/get_metadata_delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package grpc

import (
"context"
"fmt"
"io"
"strings"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -135,22 +137,76 @@ func (s *Server) convertToCSIGetMetadataDeltaRequest(ctx context.Context, req *a
}

func (s *Server) streamGetMetadataDeltaResponse(ctx context.Context, clientStream api.SnapshotMetadata_GetMetadataDeltaServer, csiStream csi.SnapshotMetadata_GetMetadataDeltaClient) error {
var (
blockMetadataType api.BlockMetadataType
lastByteOffset int64
lastSize int64
logger = klog.FromContext(ctx)
numBlockMetadata int
responseNum int
volumeCapacityBytes int64
)

for {
csiResp, err := csiStream.Recv()
if err == io.EOF {
klog.FromContext(ctx).V(HandlerTraceLogLevel).Info("stream EOF")
logger.V(HandlerTraceLogLevel).WithValues(
"blockMetadataType", blockMetadataType.String(),
"lastByteOffset", lastByteOffset,
"lastSize", lastSize,
"lastResponseNum", responseNum,
"volumeCapacityBytes", volumeCapacityBytes,
).Info("stream EOF")
return nil
}

//TODO: stream logging with progress

if err != nil {
logger.WithValues(
"blockMetadataType", blockMetadataType.String(),
"lastByteOffset", lastByteOffset,
"lastSize", lastSize,
"lastResponseNum", responseNum,
"volumeCapacityBytes", volumeCapacityBytes,
).Error(err, msgInternalFailedCSIDriverResponse)
return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedCSIDriverResponseFmt, err)
}

responseNum++

clientResp := s.convertToGetMetadataDeltaResponse(csiResp)
blockMetadataType = clientResp.BlockMetadataType
volumeCapacityBytes = clientResp.VolumeCapacityBytes
numBlockMetadata = len(clientResp.BlockMetadata) - 1
if numBlockMetadata >= 0 {
lastByteOffset = clientResp.BlockMetadata[numBlockMetadata].ByteOffset
lastSize = clientResp.BlockMetadata[numBlockMetadata].SizeBytes
}

if logger.V(HandlerTraceLogLevel).Enabled() {
var b strings.Builder
b.WriteString("[")
for _, bmd := range clientResp.BlockMetadata {
b.WriteString(fmt.Sprintf("{%d,%d}", bmd.ByteOffset, bmd.SizeBytes))
}
b.WriteString("]")
logger.WithValues(
"blockMetadataType", blockMetadataType.String(),
"responseNum", responseNum,
"volumeCapacityBytes", volumeCapacityBytes,
"blockMetadata", b.String(),
"numBlockMetadata", len(clientResp.BlockMetadata),
).Info("stream response")
}

if err := clientStream.Send(clientResp); err != nil {
return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedtoSendResponseFmt, err)
logger.WithValues(
"blockMetadataType", blockMetadataType.String(),
"lastByteOffset", lastByteOffset,
"lastSize", lastSize,
"responseNum", responseNum,
"volumeCapacityBytes", volumeCapacityBytes,
).Error(err, msgInternalFailedToSendResponse)
return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedToSendResponseFmt, err)
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/internal/server/grpc/get_metadata_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,13 +712,16 @@ type mockCSIMetadataDeltaResponse struct {
}

func TestStreamGetMetadataDeltaResponse(t *testing.T) {
ctx := context.Background()
th := newTestHarness().WithMockCSIDriver(t).WithFakeClientAPIs()
defer th.TerminateMockCSIDriver()

grpcServer := th.StartGRPCServer(t, th.Runtime())
defer th.StopGRPCServer(t)

// Test at the trace logging level.
restoreFn := th.SetKlogVerbosity(HandlerTraceLogLevel, "Delta")
defer restoreFn()

for _, tc := range []struct {
name string
req *api.GetMetadataDeltaRequest
Expand Down Expand Up @@ -932,15 +935,16 @@ func TestStreamGetMetadataDeltaResponse(t *testing.T) {
mockCSIStream.EXPECT().Recv().Return(nil, io.EOF)
}

sms := &fakeStreamServerSnapshotDelta{err: tc.mockK8sStreamError}
ctx := grpcServer.getMetadataDeltaContextWithLogger(tc.req, sms)

csiReq, err := grpcServer.convertToCSIGetMetadataDeltaRequest(ctx, tc.req)
assert.NoError(t, err)

csiStream, err := csiClient.GetMetadataDelta(ctx, csiReq)
assert.NoError(t, err)

sms := &fakeStreamServerSnapshotDelta{err: tc.mockK8sStreamError}

errStream := grpcServer.streamGetMetadataDeltaResponse(sms, csiStream)
errStream := grpcServer.streamGetMetadataDeltaResponse(ctx, sms, csiStream)
if tc.expectStreamError {
assert.NoError(t, err)
st, ok := status.FromError(errStream)
Expand All @@ -963,6 +967,10 @@ type fakeStreamServerSnapshotDelta struct {
err error
}

func (f *fakeStreamServerSnapshotDelta) Context() context.Context {
return context.Background()
}

func (f *fakeStreamServerSnapshotDelta) Send(m *api.GetMetadataDeltaResponse) error {
if f.err != nil {
return f.err
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/server/grpc/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
msgInternalFailedToFindCR = "failed to find the SnapshotMetadataService CR for driver"
msgInternalFailedToFindCRFmt = msgInternalFailedToFindCR + " '%s': %v"
msgInternalFailedToSendResponse = "failed to send response"
msgInternalFailedtoSendResponseFmt = msgInternalFailedToSendResponse + ": %v"
msgInternalFailedToSendResponseFmt = msgInternalFailedToSendResponse + ": %v"

msgInvalidArgumentBaseSnapshotNameMissing = "baseSnapshotName cannot be empty"
msgInvalidArgumentNamespaceMissing = "namespace parameter cannot be empty"
Expand Down

0 comments on commit 92990e8

Please sign in to comment.