Skip to content

Commit

Permalink
metrics: add utility to record metrics for sidecar
Browse files Browse the repository at this point in the history
enable prometheus metrics for the sidecar using the csi-lib-utils
metrics package. The httpServer for metrics can be enable by adding
the httpEndpoint arg to the sidecar. The server will record metrics
for operations like GetMetadataAllocated, GetMetadataDelta in the format
`snapshot_metadata_controller_operation_total_seconds_bucket{driver_name="",operation_name="",operation_status="",target_snapshot="",
base_snapshot="",le="0.1"}`, for GetMetaAllocated operations "base_snapshot"
value will be empty.

Signed-off-by: Nikhil-Ladha <[email protected]>
  • Loading branch information
Nikhil-Ladha committed Jan 17, 2025
1 parent 5dfd5b9 commit 8331a93
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 11 deletions.
8 changes: 7 additions & 1 deletion pkg/internal/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type Args struct {
TLSCertFile string
// Absolute path to the TLS key file.
TLSKeyFile string
// HttpEndpoint is the address of the metrics sever
HttpEndpoint string
// MetricsPath is the path where metrics will be recorded
MetricsPath string
}

func (args *Args) Validate() error {
Expand Down Expand Up @@ -169,7 +173,9 @@ func (rt *Runtime) kubeConnect(kubeconfig string, kubeAPIQPS float32, kubeAPIBur
func (rt *Runtime) csiConnect(csiAddress string) error {
ctx := context.Background()

metricsManager := metrics.NewCSIMetricsManagerForSidecar("" /* driverName */)
metricsManager := metrics.NewCSIMetricsManagerWithOptions("",
metrics.WithSubsystem(SubSystem),
metrics.WithLabelNames(LabelTargetSnapshotName, LabelBaseSnapshotName))
csiConn, err := connection.Connect(
ctx,
csiAddress,
Expand Down
8 changes: 7 additions & 1 deletion pkg/internal/runtime/test_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type TestHarness struct {
MockCSIIdentityServer *driver.MockIdentityServer
MockCSISnapshotMetadataServer *driver.MockSnapshotMetadataServer
MockCSIDriverConn *grpc.ClientConn
MetricsManager metrics.CSIMetricsManager

FakeCSIDriver *driver.CSIDriver

Expand Down Expand Up @@ -120,6 +121,8 @@ func (th *TestHarness) RuntimeArgs() Args {
GRPCPort: th.rtaPortNumber,
TLSCertFile: th.tlsCertFile,
TLSKeyFile: th.tlsKeyFile,
HttpEndpoint: "localhost:8081",
MetricsPath: "/metrics",
}
}

Expand Down Expand Up @@ -205,7 +208,6 @@ func (th *TestHarness) WithMockCSIDriver(t *testing.T) *TestHarness {
mockController := gomock.NewController(t)
identityServer := driver.NewMockIdentityServer(mockController)
snapshotMetadataServer := driver.NewMockSnapshotMetadataServer(mockController)
metricsManager := metrics.NewCSIMetricsManagerForSidecar("" /* driverName */)
drv := driver.NewMockCSIDriver(&driver.MockCSIDriverServers{
Identity: identityServer,
SnapshotMetadata: snapshotMetadataServer,
Expand All @@ -215,6 +217,9 @@ func (th *TestHarness) WithMockCSIDriver(t *testing.T) *TestHarness {

// Create a client connection to it
addr := drv.Address()
metricsManager := metrics.NewCSIMetricsManagerWithOptions("",
metrics.WithSubsystem(SubSystem),
metrics.WithLabelNames(LabelTargetSnapshotName, LabelBaseSnapshotName))
csiConn, err := connection.Connect(context.Background(), addr, metricsManager)
if err != nil {
t.Fatal("Connect", err)
Expand All @@ -226,6 +231,7 @@ func (th *TestHarness) WithMockCSIDriver(t *testing.T) *TestHarness {
th.MockCSIIdentityServer = identityServer
th.MockCSISnapshotMetadataServer = snapshotMetadataServer
th.driverName = "mock-csi-driver"
th.MetricsManager = metricsManager

return th
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/internal/runtime/util_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package runtime

import (
"time"

"k8s.io/klog/v2"
)

const (
LabelTargetSnapshotName = "target_snapshot"
LabelBaseSnapshotName = "base_snapshot"
SubSystem = "snapshot_metadata_controller"

// MetadataAllocatedOperationName is the operation that tracks how long the controller takes to get the allocated blocks for a snapshot.
// Specifically, the operation metric is emitted based on the following timestamps:
// - Start_time: controller notices the first time that there is a GetMetadataAllocated RPC call to fetch the allocated blocks of metadata
// - End_time: controller notices that the RPC call is finished and the allocated blocks is streamed back to the driver
MetadataAllocatedOperationName = "MetadataAllocated"

// MetadataDeltaOperationName is the operation that tracks how long the controller takes to get the changed blocks between 2 snapshots
// Specifically, the operation metric is emitted based on the following timestamps:
// - Start_time: controller notices the first time that there is a GetMetadataDelta RPC call to fetch the changed blocks between 2 snapshots
// - End_time: controller notices that the RPC call is finished and the changed blocks is streamed back to the driver
MetadataDeltaOperationName = "MetadataDelta"
)

// RecordMetricsWithLabels is a wrapper on the csi-lib-utils RecordMetrics function, that calls the
// "RecordMetrics" functions with the necessary labels added to the MetricsManager runtime.
func (rt *Runtime) RecordMetricsWithLabels(opLabel map[string]string, opName string, startTime time.Time, opErr error) {
metricsWithLabel, err := rt.MetricsManager.WithLabelValues(opLabel)
if err != nil {
klog.Error(err, "failed to add labels to metrics")
return
}

opDuration := time.Since(startTime)
metricsWithLabel.RecordMetrics(opName, opErr, opDuration)
}
2 changes: 2 additions & 0 deletions pkg/internal/server/grpc/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/metrics"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1"
fakesnapshot "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned/fake"
snapshotutils "github.com/kubernetes-csi/external-snapshotter/v8/pkg/utils"
Expand Down Expand Up @@ -112,6 +113,7 @@ func (th *testHarness) Runtime() *runtime.Runtime {
SnapshotClient: th.FakeSnapshotClient,
DriverName: th.DriverName,
CSIConn: th.mockCSIDriverConn,
MetricsManager: metrics.NewCSIMetricsManagerWithOptions(th.DriverName, metrics.WithSubsystem(runtime.SubSystem), metrics.WithLabelNames(runtime.LabelTargetSnapshotName, runtime.LabelBaseSnapshotName)),
}
}

Expand Down
15 changes: 13 additions & 2 deletions pkg/internal/server/grpc/get_metadata_allocated.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,32 @@ import (
"fmt"
"io"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"

"github.com/kubernetes-csi/external-snapshot-metadata/pkg/api"
"github.com/kubernetes-csi/external-snapshot-metadata/pkg/internal/runtime"
)

func (s *Server) GetMetadataAllocated(req *api.GetMetadataAllocatedRequest, stream api.SnapshotMetadata_GetMetadataAllocatedServer) error {
func (s *Server) GetMetadataAllocated(req *api.GetMetadataAllocatedRequest, stream api.SnapshotMetadata_GetMetadataAllocatedServer) (err error) {
// Create a timeout context so that failure in either sending to the client or
// receiving from the CSI driver will ultimately abort the handler session.
// The context could also get canceled by the client.
ctx, cancelFn := context.WithTimeout(s.getMetadataAllocatedContextWithLogger(req, stream), s.config.MaxStreamDur)
defer cancelFn()

// Record metrics when the operation ends
defer func(startTime time.Time) {
opLabel := map[string]string{
runtime.LabelTargetSnapshotName: fmt.Sprintf("%s/%s", req.Namespace, req.SnapshotName),
}
s.config.Runtime.RecordMetricsWithLabels(opLabel, runtime.MetadataAllocatedOperationName, startTime, err)
}(time.Now())

if err := s.validateGetMetadataAllocatedRequest(req); err != nil {
klog.FromContext(ctx).Error(err, "validation failed")
return err
Expand All @@ -63,7 +73,8 @@ func (s *Server) GetMetadataAllocated(req *api.GetMetadataAllocatedRequest, stre
return err
}

return s.streamGetMetadataAllocatedResponse(ctx, stream, csiStream)
err = s.streamGetMetadataAllocatedResponse(ctx, stream, csiStream)
return err
}

// getMetadataAllocatedContextWithLogger returns the stream context with an embedded
Expand Down
25 changes: 25 additions & 0 deletions pkg/internal/server/grpc/get_metadata_allocated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,31 @@ func TestGetMetadataAllocatedViaGRPCClient(t *testing.T) {
} else if errStream != nil {
assert.ErrorIs(t, errStream, io.EOF)
}

// Validate metrics are recorded correctly
metrics, _ := grpcServer.config.Runtime.MetricsManager.GetRegistry().Gather()
statusFound := 0
snapshotFound := 0

// Validate that both gauge and controller metrics is recorded
assert.GreaterOrEqual(t, 2, len(metrics))
assert.Equal(t, *metrics[0].Name, "process_start_time_seconds")
assert.Equal(t, *metrics[1].Name, "snapshot_metadata_controller_operations_seconds")

// Validate grpc_status_code and target_snapshot name
for _, metric := range metrics[1].Metric {
for _, labels := range metric.Label {
expTargetSnapshotName := fmt.Sprintf("%s/%s", tc.req.Namespace, tc.req.SnapshotName)
if *labels.Name == "grpc_status_code" && *labels.Value == tc.expStatusCode.String() {
statusFound = 1
}
if *labels.Name == "target_snapshot" && *labels.Value == expTargetSnapshotName {
snapshotFound = 1
}
}
}
assert.Equal(t, 1, statusFound)
assert.Equal(t, 1, snapshotFound)
})
}
}
Expand Down
16 changes: 14 additions & 2 deletions pkg/internal/server/grpc/get_metadata_delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,33 @@ import (
"fmt"
"io"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"

"github.com/kubernetes-csi/external-snapshot-metadata/pkg/api"
"github.com/kubernetes-csi/external-snapshot-metadata/pkg/internal/runtime"
)

func (s *Server) GetMetadataDelta(req *api.GetMetadataDeltaRequest, stream api.SnapshotMetadata_GetMetadataDeltaServer) error {
func (s *Server) GetMetadataDelta(req *api.GetMetadataDeltaRequest, stream api.SnapshotMetadata_GetMetadataDeltaServer) (err error) {
// Create a timeout context so that failure in either sending to the client or
// receiving from the CSI driver will ultimately abort the handler session.
// The context could also get canceled by the client.
ctx, cancelFn := context.WithTimeout(s.getMetadataDeltaContextWithLogger(req, stream), s.config.MaxStreamDur)
defer cancelFn()

// Record metrics when the operation ends
defer func(startTime time.Time) {
opLabel := map[string]string{
runtime.LabelTargetSnapshotName: fmt.Sprintf("%s/%s", req.Namespace, req.TargetSnapshotName),
runtime.LabelBaseSnapshotName: fmt.Sprintf("%s/%s", req.Namespace, req.BaseSnapshotName),
}
s.config.Runtime.RecordMetricsWithLabels(opLabel, runtime.MetadataAllocatedOperationName, startTime, err)
}(time.Now())

if err := s.validateGetMetadataDeltaRequest(req); err != nil {
klog.FromContext(ctx).Error(err, "validation failed")
return err
Expand All @@ -63,7 +74,8 @@ func (s *Server) GetMetadataDelta(req *api.GetMetadataDeltaRequest, stream api.S
return err
}

return s.streamGetMetadataDeltaResponse(ctx, stream, csiStream)
err = s.streamGetMetadataDeltaResponse(ctx, stream, csiStream)
return err
}

func (s *Server) getMetadataDeltaContextWithLogger(req *api.GetMetadataDeltaRequest, stream api.SnapshotMetadata_GetMetadataDeltaServer) context.Context {
Expand Down
31 changes: 31 additions & 0 deletions pkg/internal/server/grpc/get_metadata_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,37 @@ func TestGetMetadataDeltaViaGRPCClient(t *testing.T) {
} else if errStream != nil {
assert.ErrorIs(t, errStream, io.EOF)
}

// Validate metrics are recorded correctly
metrics, _ := grpcServer.config.Runtime.MetricsManager.GetRegistry().Gather()
statusFound := 0
targetSnapshotFound := 0
baseSnapshotFound := 0

// Validate that both gauge and controller metrics is recorded
assert.GreaterOrEqual(t, 2, len(metrics))
assert.Equal(t, *metrics[0].Name, "process_start_time_seconds")
assert.Equal(t, *metrics[1].Name, "snapshot_metadata_controller_operations_seconds")

// Validate grpc_status_code and target_snapshot name
for _, metric := range metrics[1].Metric {
for _, labels := range metric.Label {
expTargetSnapshotName := fmt.Sprintf("%s/%s", tc.req.Namespace, tc.req.TargetSnapshotName)
expBaseSnapshotName := fmt.Sprintf("%s/%s", tc.req.Namespace, tc.req.BaseSnapshotName)
if *labels.Name == "grpc_status_code" && *labels.Value == tc.expStatusCode.String() {
statusFound = 1
}
if *labels.Name == "target_snapshot" && *labels.Value == expTargetSnapshotName {
targetSnapshotFound = 1
}
if *labels.Name == "base_snapshot" && *labels.Value == expBaseSnapshotName {
baseSnapshotFound = 1
}
}
}
assert.Equal(t, 1, statusFound)
assert.Equal(t, 1, targetSnapshotFound)
assert.Equal(t, 1, baseSnapshotFound)
})
}
}
Expand Down
29 changes: 24 additions & 5 deletions pkg/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sidecar
import (
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
Expand Down Expand Up @@ -86,16 +87,25 @@ func Run(argv []string, version string) int {

klog.Infof("CSI driver name: %q", rt.DriverName)

// TBD May need to exposed metric HTTP end point
// here because the wait for the CSI driver is open ended.

grpcServer, err := startGRPCServerAndValidateCSIDriver(s.createServerConfig(rt))
if err != nil {
klog.Error(err)
return 1
}

// TODO: Start the HTTP metrics server here.
// start listening & serving http endpoint, if set
mux := http.NewServeMux()
if *s.httpEndpoint != "" {
rt.MetricsManager.RegisterToServer(mux, *s.metricsPath)
rt.MetricsManager.SetDriverName(rt.DriverName)
go func() {
klog.Infof("ServeMux listening at %q", *s.httpEndpoint)
err := http.ListenAndServe(*s.httpEndpoint, mux)
if err != nil {
klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", *s.httpEndpoint, *s.metricsPath, err)
}
}()
}

shutdownOnTerminationSignal(grpcServer)

Expand Down Expand Up @@ -172,7 +182,6 @@ func (s *sidecarFlagSet) parseFlagsAndHandleShowVersion(args []string) (handledS
}

func (s *sidecarFlagSet) runtimeArgsFromFlags() runtime.Args {
// TODO: set the HTTP server properties.
return runtime.Args{
CSIAddress: *s.csiAddress,
CSITimeout: *s.csiTimeout,
Expand All @@ -182,6 +191,8 @@ func (s *sidecarFlagSet) runtimeArgsFromFlags() runtime.Args {
GRPCPort: *s.grpcPort,
TLSCertFile: *s.tlsCert,
TLSKeyFile: *s.tlsKey,
HttpEndpoint: *s.httpEndpoint,
MetricsPath: *s.metricsPath,
}
}

Expand Down Expand Up @@ -222,6 +233,14 @@ func (s *sidecarFlagSet) runtimeArgsToArgv(progName string, rta runtime.Args) []
argv = append(argv, "-"+flagKubeAPIQPS, strconv.FormatFloat(float64(rta.KubeAPIQPS), 'f', -1, 32))
}

if rta.HttpEndpoint != "" {
argv = append(argv, "-"+flagHTTPEndpoint, rta.HttpEndpoint)
}

if rta.MetricsPath != defaultMetricsPath {
argv = append(argv, "-"+flagMetricsPath, rta.MetricsPath)
}

return argv
}

Expand Down
Loading

0 comments on commit 8331a93

Please sign in to comment.