From 92990e89f0c2c372a780e9fa0c5f6165fc289389 Mon Sep 17 00:00:00 2001 From: Carl Braganza Date: Tue, 8 Oct 2024 02:37:11 +0000 Subject: [PATCH] Log stream responses with progress. --- pkg/internal/server/grpc/common_test.go | 27 ++++++++ .../server/grpc/get_metadata_allocated.go | 64 +++++++++++++++++-- .../grpc/get_metadata_allocated_test.go | 16 +++-- .../server/grpc/get_metadata_delta.go | 64 +++++++++++++++++-- .../server/grpc/get_metadata_delta_test.go | 16 +++-- pkg/internal/server/grpc/status.go | 2 +- 6 files changed, 172 insertions(+), 17 deletions(-) diff --git a/pkg/internal/server/grpc/common_test.go b/pkg/internal/server/grpc/common_test.go index e27d8244..9251d5f8 100644 --- a/pkg/internal/server/grpc/common_test.go +++ b/pkg/internal/server/grpc/common_test.go @@ -18,7 +18,9 @@ package grpc import ( "context" + "flag" "net" + "strconv" "strings" "testing" @@ -37,6 +39,7 @@ import ( apiruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" clientgotesting "k8s.io/client-go/testing" + "k8s.io/klog/v2" smsv1alpha1 "github.com/kubernetes-csi/external-snapshot-metadata/client/apis/snapshotmetadataservice/v1alpha1" fakecbt "github.com/kubernetes-csi/external-snapshot-metadata/client/clientset/versioned/fake" @@ -407,3 +410,27 @@ func convStringByteMapToStringStringMap(inMap map[string][]byte) map[string]stri } return ret } + +type KlogRestoreVerbosityFunc func() + +// SetKlogVerbosity sets up the default logger with the specified verbosity level. +func (th *testHarness) SetKlogVerbosity(verboseLevel int, uniquePrefix string) KlogRestoreVerbosityFunc { + klog.ClearLogger() + // Set the verbosity level using a new flag set. + // It is not possible to set a verbose klog/v2/testlogger as the background logger + // because the klog.V() performs its own checks. + var level klog.Level + level.Set(strconv.Itoa(verboseLevel)) + fs := flag.NewFlagSet(uniquePrefix+"Fs1", flag.ContinueOnError) + fs.Var(&level, uniquePrefix+"V1", "test log verbosity level") + klog.InitFlags(fs) + + return func() { + // restore the verbosity level using a new flag set + klog.ClearLogger() + fs := flag.NewFlagSet(uniquePrefix+"Fs2", flag.ExitOnError) + level.Set("1") + fs.Var(&level, uniquePrefix+"V2", "test log verbosity level") + klog.InitFlags(fs) + } +} diff --git a/pkg/internal/server/grpc/get_metadata_allocated.go b/pkg/internal/server/grpc/get_metadata_allocated.go index 23bf8e39..f8e9361e 100644 --- a/pkg/internal/server/grpc/get_metadata_allocated.go +++ b/pkg/internal/server/grpc/get_metadata_allocated.go @@ -18,7 +18,9 @@ package grpc import ( "context" + "fmt" "io" + "strings" "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" @@ -114,22 +116,76 @@ func (s *Server) convertToCSIGetMetadataAllocatedRequest(ctx context.Context, re } func (s *Server) streamGetMetadataAllocatedResponse(ctx context.Context, clientStream api.SnapshotMetadata_GetMetadataAllocatedServer, csiStream csi.SnapshotMetadata_GetMetadataAllocatedClient) error { + var ( + blockMetadataType api.BlockMetadataType + lastByteOffset int64 + lastSize int64 + logger = klog.FromContext(ctx) + numBlockMetadata int + responseNum int + volumeCapacityBytes int64 + ) + for { csiResp, err := csiStream.Recv() if err == io.EOF { - klog.FromContext(ctx).V(HandlerTraceLogLevel).Info("stream EOF") + logger.V(HandlerTraceLogLevel).WithValues( + "blockMetadataType", blockMetadataType.String(), + "lastByteOffset", lastByteOffset, + "lastSize", lastSize, + "lastResponseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + ).Info("stream EOF") return nil } - //TODO: stream logging with progress - if err != nil { + logger.WithValues( + "blockMetadataType", blockMetadataType.String(), + "lastByteOffset", lastByteOffset, + "lastSize", lastSize, + "lastResponseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + ).Error(err, msgInternalFailedCSIDriverResponse) return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedCSIDriverResponseFmt, err) } + responseNum++ + clientResp := s.convertToGetMetadataAllocatedResponse(csiResp) + blockMetadataType = clientResp.BlockMetadataType + volumeCapacityBytes = clientResp.VolumeCapacityBytes + numBlockMetadata = len(clientResp.BlockMetadata) - 1 + if numBlockMetadata >= 0 { + lastByteOffset = clientResp.BlockMetadata[numBlockMetadata].ByteOffset + lastSize = clientResp.BlockMetadata[numBlockMetadata].SizeBytes + } + + if logger.V(HandlerTraceLogLevel).Enabled() { + var b strings.Builder + b.WriteString("[") + for _, bmd := range clientResp.BlockMetadata { + b.WriteString(fmt.Sprintf("{%d,%d}", bmd.ByteOffset, bmd.SizeBytes)) + } + b.WriteString("]") + logger.WithValues( + "blockMetadataType", blockMetadataType.String(), + "responseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + "blockMetadata", b.String(), + "numBlockMetadata", len(clientResp.BlockMetadata), + ).Info("stream response") + } + if err := clientStream.Send(clientResp); err != nil { - return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedtoSendResponseFmt, err) + logger.WithValues( + "blockMetadataType", blockMetadataType.String(), + "lastByteOffset", lastByteOffset, + "lastSize", lastSize, + "responseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + ).Error(err, msgInternalFailedToSendResponse) + return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedToSendResponseFmt, err) } } } diff --git a/pkg/internal/server/grpc/get_metadata_allocated_test.go b/pkg/internal/server/grpc/get_metadata_allocated_test.go index 208920ea..df8d2fb4 100644 --- a/pkg/internal/server/grpc/get_metadata_allocated_test.go +++ b/pkg/internal/server/grpc/get_metadata_allocated_test.go @@ -482,13 +482,16 @@ type mockCSIMetadataAllocatedResponse struct { } func TestStreamGetMetadataAllocatedResponse(t *testing.T) { - ctx := context.Background() th := newTestHarness().WithMockCSIDriver(t).WithFakeClientAPIs() defer th.TerminateMockCSIDriver() grpcServer := th.StartGRPCServer(t, th.Runtime()) defer th.StopGRPCServer(t) + // Test at the trace logging level. + restoreFn := th.SetKlogVerbosity(HandlerTraceLogLevel, "Alloc") + defer restoreFn() + for _, tc := range []struct { name string req *api.GetMetadataAllocatedRequest @@ -698,15 +701,16 @@ func TestStreamGetMetadataAllocatedResponse(t *testing.T) { mockCSIStream.EXPECT().Recv().Return(nil, io.EOF) } + sms := &fakeStreamServerSnapshotAllocated{err: tc.mockK8sStreamError} + ctx := grpcServer.getMetadataAllocatedContextWithLogger(tc.req, sms) + csiReq, err := grpcServer.convertToCSIGetMetadataAllocatedRequest(ctx, tc.req) assert.NoError(t, err) csiStream, err := csiClient.GetMetadataAllocated(ctx, csiReq) assert.NoError(t, err) - sms := &fakeStreamServerSnapshotAllocated{err: tc.mockK8sStreamError} - - errStream := grpcServer.streamGetMetadataAllocatedResponse(sms, csiStream) + errStream := grpcServer.streamGetMetadataAllocatedResponse(ctx, sms, csiStream) if tc.expectStreamError { assert.NoError(t, err) st, ok := status.FromError(errStream) @@ -729,6 +733,10 @@ type fakeStreamServerSnapshotAllocated struct { err error } +func (f *fakeStreamServerSnapshotAllocated) Context() context.Context { + return context.Background() +} + func (f *fakeStreamServerSnapshotAllocated) Send(m *api.GetMetadataAllocatedResponse) error { if f.err != nil { return f.err diff --git a/pkg/internal/server/grpc/get_metadata_delta.go b/pkg/internal/server/grpc/get_metadata_delta.go index cab300cb..2b56b696 100644 --- a/pkg/internal/server/grpc/get_metadata_delta.go +++ b/pkg/internal/server/grpc/get_metadata_delta.go @@ -18,7 +18,9 @@ package grpc import ( "context" + "fmt" "io" + "strings" "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" @@ -135,22 +137,76 @@ func (s *Server) convertToCSIGetMetadataDeltaRequest(ctx context.Context, req *a } func (s *Server) streamGetMetadataDeltaResponse(ctx context.Context, clientStream api.SnapshotMetadata_GetMetadataDeltaServer, csiStream csi.SnapshotMetadata_GetMetadataDeltaClient) error { + var ( + blockMetadataType api.BlockMetadataType + lastByteOffset int64 + lastSize int64 + logger = klog.FromContext(ctx) + numBlockMetadata int + responseNum int + volumeCapacityBytes int64 + ) + for { csiResp, err := csiStream.Recv() if err == io.EOF { - klog.FromContext(ctx).V(HandlerTraceLogLevel).Info("stream EOF") + logger.V(HandlerTraceLogLevel).WithValues( + "blockMetadataType", blockMetadataType.String(), + "lastByteOffset", lastByteOffset, + "lastSize", lastSize, + "lastResponseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + ).Info("stream EOF") return nil } - //TODO: stream logging with progress - if err != nil { + logger.WithValues( + "blockMetadataType", blockMetadataType.String(), + "lastByteOffset", lastByteOffset, + "lastSize", lastSize, + "lastResponseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + ).Error(err, msgInternalFailedCSIDriverResponse) return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedCSIDriverResponseFmt, err) } + responseNum++ + clientResp := s.convertToGetMetadataDeltaResponse(csiResp) + blockMetadataType = clientResp.BlockMetadataType + volumeCapacityBytes = clientResp.VolumeCapacityBytes + numBlockMetadata = len(clientResp.BlockMetadata) - 1 + if numBlockMetadata >= 0 { + lastByteOffset = clientResp.BlockMetadata[numBlockMetadata].ByteOffset + lastSize = clientResp.BlockMetadata[numBlockMetadata].SizeBytes + } + + if logger.V(HandlerTraceLogLevel).Enabled() { + var b strings.Builder + b.WriteString("[") + for _, bmd := range clientResp.BlockMetadata { + b.WriteString(fmt.Sprintf("{%d,%d}", bmd.ByteOffset, bmd.SizeBytes)) + } + b.WriteString("]") + logger.WithValues( + "blockMetadataType", blockMetadataType.String(), + "responseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + "blockMetadata", b.String(), + "numBlockMetadata", len(clientResp.BlockMetadata), + ).Info("stream response") + } + if err := clientStream.Send(clientResp); err != nil { - return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedtoSendResponseFmt, err) + logger.WithValues( + "blockMetadataType", blockMetadataType.String(), + "lastByteOffset", lastByteOffset, + "lastSize", lastSize, + "responseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + ).Error(err, msgInternalFailedToSendResponse) + return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedToSendResponseFmt, err) } } } diff --git a/pkg/internal/server/grpc/get_metadata_delta_test.go b/pkg/internal/server/grpc/get_metadata_delta_test.go index d6d94529..e4b033c9 100644 --- a/pkg/internal/server/grpc/get_metadata_delta_test.go +++ b/pkg/internal/server/grpc/get_metadata_delta_test.go @@ -712,13 +712,16 @@ type mockCSIMetadataDeltaResponse struct { } func TestStreamGetMetadataDeltaResponse(t *testing.T) { - ctx := context.Background() th := newTestHarness().WithMockCSIDriver(t).WithFakeClientAPIs() defer th.TerminateMockCSIDriver() grpcServer := th.StartGRPCServer(t, th.Runtime()) defer th.StopGRPCServer(t) + // Test at the trace logging level. + restoreFn := th.SetKlogVerbosity(HandlerTraceLogLevel, "Delta") + defer restoreFn() + for _, tc := range []struct { name string req *api.GetMetadataDeltaRequest @@ -932,15 +935,16 @@ func TestStreamGetMetadataDeltaResponse(t *testing.T) { mockCSIStream.EXPECT().Recv().Return(nil, io.EOF) } + sms := &fakeStreamServerSnapshotDelta{err: tc.mockK8sStreamError} + ctx := grpcServer.getMetadataDeltaContextWithLogger(tc.req, sms) + csiReq, err := grpcServer.convertToCSIGetMetadataDeltaRequest(ctx, tc.req) assert.NoError(t, err) csiStream, err := csiClient.GetMetadataDelta(ctx, csiReq) assert.NoError(t, err) - sms := &fakeStreamServerSnapshotDelta{err: tc.mockK8sStreamError} - - errStream := grpcServer.streamGetMetadataDeltaResponse(sms, csiStream) + errStream := grpcServer.streamGetMetadataDeltaResponse(ctx, sms, csiStream) if tc.expectStreamError { assert.NoError(t, err) st, ok := status.FromError(errStream) @@ -963,6 +967,10 @@ type fakeStreamServerSnapshotDelta struct { err error } +func (f *fakeStreamServerSnapshotDelta) Context() context.Context { + return context.Background() +} + func (f *fakeStreamServerSnapshotDelta) Send(m *api.GetMetadataDeltaResponse) error { if f.err != nil { return f.err diff --git a/pkg/internal/server/grpc/status.go b/pkg/internal/server/grpc/status.go index 6b988d58..04a01fd1 100644 --- a/pkg/internal/server/grpc/status.go +++ b/pkg/internal/server/grpc/status.go @@ -31,7 +31,7 @@ const ( msgInternalFailedToFindCR = "failed to find the SnapshotMetadataService CR for driver" msgInternalFailedToFindCRFmt = msgInternalFailedToFindCR + " '%s': %v" msgInternalFailedToSendResponse = "failed to send response" - msgInternalFailedtoSendResponseFmt = msgInternalFailedToSendResponse + ": %v" + msgInternalFailedToSendResponseFmt = msgInternalFailedToSendResponse + ": %v" msgInvalidArgumentBaseSnapshotNameMissing = "baseSnapshotName cannot be empty" msgInvalidArgumentNamespaceMissing = "namespace parameter cannot be empty"