Skip to content

Commit

Permalink
use semaphore to limit concurrency in agent
Browse files Browse the repository at this point in the history
subtract mem cache from container stats
  • Loading branch information
henrygd committed Jul 24, 2024
1 parent 76cfaaa commit 5e255f8
Showing 1 changed file with 48 additions and 23 deletions.
71 changes: 48 additions & 23 deletions agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"os"
"strings"
"sync"
"time"

sshServer "github.com/gliderlabs/ssh"
Expand All @@ -24,8 +25,17 @@ import (
var Version = "0.0.1-alpha.8"

var containerCpuMap = make(map[string][2]uint64)
var containerCpuMutex = &sync.Mutex{}

// var containerCpuMutex = &sync.Mutex{}
var sem = make(chan struct{}, 15)

func acquireSemaphore() {
sem <- struct{}{}
}

func releaseSemaphore() {
<-sem
}

var diskIoStats = DiskIoStats{
Read: 0,
Expand All @@ -48,11 +58,11 @@ var client = &http.Client{
Dial: func(proto, addr string) (net.Conn, error) {
return net.Dial("unix", "/var/run/docker.sock")
},
ForceAttemptHTTP2: false,
IdleConnTimeout: 90 * time.Second,
DisableCompression: true,
MaxIdleConns: 10,
DisableKeepAlives: false,
ForceAttemptHTTP2: false,
IdleConnTimeout: 90 * time.Second,
DisableCompression: true,
MaxIdleConnsPerHost: 50,
DisableKeepAlives: false,
},
}

Expand Down Expand Up @@ -156,27 +166,35 @@ func getDockerStats() ([]*ContainerStats, error) {

containerStats := make([]*ContainerStats, 0, len(containers))

// store valid ids to clean up old container ids from map
validIds := make(map[string]struct{}, len(containers))

var wg sync.WaitGroup

for _, ctr := range containers {
ctr.IdShort = ctr.ID[:12]
cstats, err := getContainerStats(ctr)
if err != nil {
// retry once
cstats, err = getContainerStats(ctr)
validIds[ctr.IdShort] = struct{}{}
wg.Add(1)
go func() {
defer wg.Done()
cstats, err := getContainerStats(ctr)
if err != nil {
log.Printf("Error getting container stats: %+v\n", err)
continue
// retry once
cstats, err = getContainerStats(ctr)
if err != nil {
log.Printf("Error getting container stats: %+v\n", err)
return
}
}
}
containerStats = append(containerStats, cstats)
containerStats = append(containerStats, cstats)
}()
}

// clean up old container ids from map
validIds := make(map[string]struct{}, len(containers))
for _, ctr := range containers {
validIds[ctr.IdShort] = struct{}{}
}
wg.Wait()

for id := range containerCpuMap {
if _, exists := validIds[id]; !exists {
// log.Printf("Removing container cpu map entry: %+v\n", id)
delete(containerCpuMap, id)
}
}
Expand All @@ -185,6 +203,9 @@ func getDockerStats() ([]*ContainerStats, error) {
}

func getContainerStats(ctr *Container) (*ContainerStats, error) {
// use semaphore to limit concurrency
acquireSemaphore()
defer releaseSemaphore()
resp, err := client.Get("http://localhost/containers/" + ctr.IdShort + "/stats?stream=0&one-shot=1")
if err != nil {
return &ContainerStats{}, err
Expand All @@ -198,14 +219,18 @@ func getContainerStats(ctr *Container) (*ContainerStats, error) {

name := ctr.Names[0][1:]

// memory
usedMemory := statsJson.MemoryStats.Usage - statsJson.MemoryStats.Cache
// memory (https://docs.docker.com/reference/cli/docker/container/stats/)
memCache := statsJson.MemoryStats.Stats["inactive_file"]
if memCache == 0 {
memCache = statsJson.MemoryStats.Stats["cache"]
}
usedMemory := statsJson.MemoryStats.Usage - memCache
// pctMemory := float64(usedMemory) / float64(statsJson.MemoryStats.Limit) * 100

// cpu
// add default values to containerCpu if it doesn't exist
// containerCpuMutex.Lock()
// defer containerCpuMutex.Unlock()
containerCpuMutex.Lock()
defer containerCpuMutex.Unlock()
if _, ok := containerCpuMap[ctr.IdShort]; !ok {
containerCpuMap[ctr.IdShort] = [2]uint64{0, 0}
}
Expand Down

0 comments on commit 5e255f8

Please sign in to comment.