Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modifing max_workers while running example #690

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/tsenart/vegeta/v12

go 1.20
go 1.22

require (
github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b
Expand Down
34 changes: 29 additions & 5 deletions lib/attack.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Attacker struct {
maxBody int64
redirects int
chunked bool
seqmu sync.Mutex
}

const (
Expand Down Expand Up @@ -400,11 +401,14 @@ func (a *Attacker) Attack(tr Targeter, p Pacer, du time.Duration, name string) <
began: time.Now(),
}

go readMaxWorkerFromSocket("/tmp/vegeta-max-workers.sock", a)

results := make(chan *Result)
ticks := make(chan struct{})
removeWorker := make(chan struct{})
for i := uint64(0); i < workers; i++ {
wg.Add(1)
go a.attack(tr, atk, &wg, ticks, results)
go a.attack(tr, atk, &wg, ticks, results, removeWorker)
}

go func() {
Expand All @@ -413,6 +417,7 @@ func (a *Attacker) Attack(tr Targeter, p Pacer, du time.Duration, name string) <
wg.Wait()
close(results)
a.Stop()
close(removeWorker)
}()

count := uint64(0)
Expand Down Expand Up @@ -440,8 +445,16 @@ func (a *Attacker) Attack(tr Targeter, p Pacer, du time.Duration, name string) <
// all workers are blocked. start one more and try again
workers++
wg.Add(1)
go a.attack(tr, atk, &wg, ticks, results)
go a.attack(tr, atk, &wg, ticks, results, removeWorker)
}
}

if workers > a.maxWorkers {
workerChangeCount := workers - a.maxWorkers
for _ = range workerChangeCount {
removeWorker <- struct{}{}
}
workers -= workerChangeCount
}

select {
Expand All @@ -456,6 +469,12 @@ func (a *Attacker) Attack(tr Targeter, p Pacer, du time.Duration, name string) <
return results
}

func (a *Attacker) AdjustMaxWokers(maxWorkers uint64) {
a.seqmu.Lock()
a.maxWorkers = maxWorkers
a.seqmu.Unlock()
}

// Stop stops the current attack. The return value indicates whether this call
// has signalled the attack to stop (`true` for the first call) or whether it
// was a noop because it has been previously signalled to stop (`false` for any
Expand All @@ -470,10 +489,15 @@ func (a *Attacker) Stop() bool {
}
}

func (a *Attacker) attack(tr Targeter, atk *attack, workers *sync.WaitGroup, ticks <-chan struct{}, results chan<- *Result) {
func (a *Attacker) attack(tr Targeter, atk *attack, workers *sync.WaitGroup, ticks <-chan struct{}, results chan<- *Result, removeWorker <-chan struct{}) {
defer workers.Done()
for range ticks {
results <- a.hit(tr, atk)
for {
select {
case <-ticks:
results <- a.hit(tr, atk)
case <-removeWorker:
return
}
}
}

Expand Down
59 changes: 59 additions & 0 deletions lib/command_socket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package vegeta

import (
"fmt"
"net"
"os"
"strconv"
"strings"
)

// Used to adjust the max-workers while vegeta is running
func readMaxWorkerFromSocket(socketPath string, attacker *Attacker) {
// Create a Unix domain socket and listen for incoming connections.
socket, err := net.Listen("unix", socketPath)
if err != nil {
fmt.Fprintf(os.Stderr, "socket err1: %s\n", err)
return
}

// Wait for stopch to close and then shutdown socket
go func(attacker *Attacker, socket net.Listener) {
<-attacker.stopch
_ = socket.Close()
_ = os.Remove(socketPath)
}(attacker, socket)

for {
// Accept an incoming connection.
conn, err := socket.Accept()
if err != nil {
fmt.Fprintf(os.Stderr, "socket err2: %v\n", err)
return
}

// Handle the connection in a separate goroutine.
go func(conn net.Conn) {
defer conn.Close()
// Create a buffer for incoming data.
buf := make([]byte, 4096)

// Read data from the connection.
n, err := conn.Read(buf)
if err != nil {
fmt.Fprintf(os.Stderr, "socket err3: %v\n", err)
return
}

input := string(buf[:n])
input = strings.TrimSpace(input)
input = strings.Trim(input, "\n\r")
i, err := strconv.ParseUint(input, 10, 64)
if err != nil {
fmt.Fprintf(os.Stderr, "socket err4: %v\n", err)
}
attacker.AdjustMaxWokers(i)
fmt.Fprintf(os.Stderr, "worker update: %v\n", attacker.maxWorkers)
}(conn)
}
}