From 2b6298b8ce17d29e2726e25209296b7481a4ce8a Mon Sep 17 00:00:00 2001 From: Lakshman Date: Sun, 28 Jul 2024 23:49:09 -0600 Subject: [PATCH] Added profiling to the relay and client. Signed-off-by: L Lakshmanan --- tools/invoker/client.go | 49 +++++++++++++++++++++++++++++++++++++++-- tools/relay/server.go | 21 ++++++++++++++++-- 2 files changed, 66 insertions(+), 4 deletions(-) diff --git a/tools/invoker/client.go b/tools/invoker/client.go index 2a95b4f3..b69e5008 100644 --- a/tools/invoker/client.go +++ b/tools/invoker/client.go @@ -29,6 +29,7 @@ import ( "flag" "fmt" "os" + "strings" "strconv" "sync" "sync/atomic" @@ -53,6 +54,8 @@ const TimeseriesDBAddr = "10.96.0.84:90" var ( completed int64 latSlice LatencySlice + profSlice LatencySlice + funcDurEnableFlag *bool portFlag *int grpcTimeout time.Duration withTracing *bool @@ -64,6 +67,8 @@ func main() { rps := flag.Float64("rps", 1.0, "Target requests per second") runDuration := flag.Int("time", 5, "Run the experiment for X seconds") latencyOutputFile := flag.String("latf", "lat.csv", "CSV file for the latency measurements in microseconds") + funcDurationOutputFile := flag.String("durf", "dur.csv", "CSV file for the function duration measurements in microseconds") + funcDurEnableFlag = flag.Bool("profile", false, "Enable function duration profiling") portFlag = flag.Int("port", 80, "The port that functions listen to") withTracing = flag.Bool("trace", false, "Enable tracing in the client") zipkin := flag.String("zipkin", "http://localhost:9411/api/v2/spans", "zipkin url") @@ -107,6 +112,9 @@ func main() { realRPS := runExperiment(endpoints, *runDuration, *rps) writeLatencies(realRPS, *latencyOutputFile) + if *funcDurEnableFlag { + writeFunctionDurations(*funcDurationOutputFile) + } } func readEndpoints(path string) (endpoints []*endpoint.Endpoint, _ error) { @@ -176,7 +184,7 @@ func SayHello(address, workflowID string) { ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout) defer cancel() - _, err = c.SayHello(ctx, &pb.HelloRequest{ + response, err := c.SayHello(ctx, &pb.HelloRequest{ Name: "Invoke relay", VHiveMetadata: vhivemetadata.MakeVHiveMetadata( workflowID, @@ -187,6 +195,17 @@ func SayHello(address, workflowID string) { if err != nil { log.Warnf("Failed to invoke %v, err=%v", address, err) } else { + if *funcDurEnableFlag { + log.Debugf("Inside if\n") + words := strings.Fields(response.Message) + lastWord := words[len(words)-1] + duration, err := strconv.ParseInt(lastWord, 10, 64) + if err == nil { + profSlice.Lock() + profSlice.slice = append(profSlice.slice, duration) + profSlice.Unlock() + } + } atomic.AddInt64(&completed, 1) } } @@ -249,10 +268,36 @@ func writeLatencies(rps float64, latencyOutputFile string) { for _, lat := range latSlice.slice { _, err := datawriter.WriteString(strconv.FormatInt(lat, 10) + "\n") if err != nil { - log.Fatal("Failed to write the URLs to a file ", err) + log.Fatal("Failed to write the latencies to a file ", err) } } datawriter.Flush() file.Close() } + +func writeFunctionDurations(funcDurationOutputFile string) { + profSlice.Lock() + defer profSlice.Unlock() + + fileName := funcDurationOutputFile + log.Info("The measured function durations are saved in ", fileName) + + file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + + if err != nil { + log.Fatal("Failed creating file: ", err) + } + + datawriter := bufio.NewWriter(file) + + for _, dur := range profSlice.slice { + _, err := datawriter.WriteString(strconv.FormatInt(dur, 10) + "\n") + if err != nil { + log.Fatal("Failed to write the function durations to a file ", err) + } + } + + datawriter.Flush() + file.Close() +} \ No newline at end of file diff --git a/tools/relay/server.go b/tools/relay/server.go index 9e7f331a..05404d55 100644 --- a/tools/relay/server.go +++ b/tools/relay/server.go @@ -28,6 +28,8 @@ import ( "fmt" "net" "os" + "strconv" + "time" pb "github.com/vhive-serverless/vSwarm-proto/proto/helloworld" @@ -61,6 +63,7 @@ var ( value = flag.String("value", "helloWorld", "String input to pass to benchmark") functionMethod = flag.String("function-method", "default", "Which method of benchmark to invoke") verbose = flag.Bool("verbose", false, "Enable verbose log printing") + profileFunction = flag.Bool("profile-function", false, "Enable function profiling") // Client grpcClient grpcClients.GrpcClient @@ -155,8 +158,22 @@ func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloRe // Create new packet pkt := inputGenerator.Next() log.Debugf("Send to func: %s\n", pkt) + + startTime := time.Now() reply, err := grpcClient.Request(ctx, pkt) - log.Debugf("Recv from func: %s\n", reply) + endTime := time.Now() + elapsedTime := int64(endTime.Sub(startTime).Microseconds()) + + var finalReply string + + if *profileFunction { + log.Debugf("Invoked in %d usec\n. Recv from func: %s\n", elapsedTime, reply) + elapsedTimeStr := strconv.FormatInt(elapsedTime, 10) + finalReply = reply + "|" + elapsedTimeStr + } else { + log.Debugf("Recv from func: %s\n", reply) + finalReply = reply + } - return &pb.HelloReply{Message: reply}, err + return &pb.HelloReply{Message: finalReply}, err }