diff --git a/agent/main.go b/agent/main.go index 221394416..c5ed95701 100644 --- a/agent/main.go +++ b/agent/main.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "strings" + "sync" "time" sshServer "github.com/gliderlabs/ssh" @@ -24,8 +25,17 @@ import ( var Version = "0.0.1-alpha.8" var containerCpuMap = make(map[string][2]uint64) +var containerCpuMutex = &sync.Mutex{} -// var containerCpuMutex = &sync.Mutex{} +var sem = make(chan struct{}, 15) + +func acquireSemaphore() { + sem <- struct{}{} +} + +func releaseSemaphore() { + <-sem +} var diskIoStats = DiskIoStats{ Read: 0, @@ -48,11 +58,11 @@ var client = &http.Client{ Dial: func(proto, addr string) (net.Conn, error) { return net.Dial("unix", "/var/run/docker.sock") }, - ForceAttemptHTTP2: false, - IdleConnTimeout: 90 * time.Second, - DisableCompression: true, - MaxIdleConns: 10, - DisableKeepAlives: false, + ForceAttemptHTTP2: false, + IdleConnTimeout: 90 * time.Second, + DisableCompression: true, + MaxIdleConnsPerHost: 50, + DisableKeepAlives: false, }, } @@ -156,27 +166,35 @@ func getDockerStats() ([]*ContainerStats, error) { containerStats := make([]*ContainerStats, 0, len(containers)) + // store valid ids to clean up old container ids from map + validIds := make(map[string]struct{}, len(containers)) + + var wg sync.WaitGroup + for _, ctr := range containers { ctr.IdShort = ctr.ID[:12] - cstats, err := getContainerStats(ctr) - if err != nil { - // retry once - cstats, err = getContainerStats(ctr) + validIds[ctr.IdShort] = struct{}{} + wg.Add(1) + go func() { + defer wg.Done() + cstats, err := getContainerStats(ctr) if err != nil { - log.Printf("Error getting container stats: %+v\n", err) - continue + // retry once + cstats, err = getContainerStats(ctr) + if err != nil { + log.Printf("Error getting container stats: %+v\n", err) + return + } } - } - containerStats = append(containerStats, cstats) + containerStats = append(containerStats, cstats) + }() } - // clean up old container ids from map - validIds := make(map[string]struct{}, len(containers)) - for _, ctr := range containers { - validIds[ctr.IdShort] = struct{}{} - } + wg.Wait() + for id := range containerCpuMap { if _, exists := validIds[id]; !exists { + // log.Printf("Removing container cpu map entry: %+v\n", id) delete(containerCpuMap, id) } } @@ -185,6 +203,9 @@ func getDockerStats() ([]*ContainerStats, error) { } func getContainerStats(ctr *Container) (*ContainerStats, error) { + // use semaphore to limit concurrency + acquireSemaphore() + defer releaseSemaphore() resp, err := client.Get("http://localhost/containers/" + ctr.IdShort + "/stats?stream=0&one-shot=1") if err != nil { return &ContainerStats{}, err @@ -198,14 +219,18 @@ func getContainerStats(ctr *Container) (*ContainerStats, error) { name := ctr.Names[0][1:] - // memory - usedMemory := statsJson.MemoryStats.Usage - statsJson.MemoryStats.Cache + // memory (https://docs.docker.com/reference/cli/docker/container/stats/) + memCache := statsJson.MemoryStats.Stats["inactive_file"] + if memCache == 0 { + memCache = statsJson.MemoryStats.Stats["cache"] + } + usedMemory := statsJson.MemoryStats.Usage - memCache // pctMemory := float64(usedMemory) / float64(statsJson.MemoryStats.Limit) * 100 // cpu // add default values to containerCpu if it doesn't exist - // containerCpuMutex.Lock() - // defer containerCpuMutex.Unlock() + containerCpuMutex.Lock() + defer containerCpuMutex.Unlock() if _, ok := containerCpuMap[ctr.IdShort]; !ok { containerCpuMap[ctr.IdShort] = [2]uint64{0, 0} }