Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
geyslan committed Dec 2, 2024
1 parent 2223095 commit 6972f48
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 36 deletions.
81 changes: 64 additions & 17 deletions cmd/evt/cmd/stress/stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,24 @@ func getStressConfig(cmd *cobra.Command) (*StressConfig, error) {
}, nil
}

const coolDownTime = 10 * time.Second

func coolDownCtx(ctx context.Context, msg string, coolDownTime time.Duration) {
fmt.Printf("%s: waiting %v for cool down...\n", msg, coolDownTime)

select {
case <-time.After(coolDownTime):
return
case <-ctx.Done():
return
}
}

func coolDown(msg string, coolDownTime time.Duration) {
fmt.Printf("%s: waiting %v for cool down...\n", msg, coolDownTime)
time.Sleep(coolDownTime)
}

func stressRun(cmd *cobra.Command, args []string) error {
logger.Init(logger.NewDefaultLoggingConfig())

Expand Down Expand Up @@ -228,9 +246,7 @@ func stressRun(cmd *cobra.Command, args []string) error {
triggerPids = append(triggerPids, triggerPid)
// limit the comm length to 15 printable characters
comm := fmt.Sprintf("%s.sh", evt)
if len(comm) > 15 {
comm = comm[:15]
}
comm = comm[:min(len(comm), 15)]
triggerComms = append(triggerComms, comm)
}
}
Expand All @@ -243,24 +259,56 @@ func stressRun(cmd *cobra.Command, args []string) error {
return err
}

const coolDownTime = 10 * time.Second
fmt.Printf("tracee started: waiting %v for cool down...\n", coolDownTime)
select {
case <-time.After(coolDownTime):
case <-ctx.Done():
goto cleanup
}
coolDownCtx(ctx, "tracee started", coolDownTime)

// signal all triggers to start
err = syscall.Kill(-os.Getpid(), syscall.SIGUSR1)
if err != nil {
return fmt.Errorf("sending SIGUSR1 to all triggers: %w", err)
for _, pid := range triggerPids {
err = syscall.Kill(pid, syscall.SIGUSR1)
if err != nil {
return fmt.Errorf("sending SIGUSR1 to trigger %d: %w", pid, err)
}
}
// err = syscall.Kill(-os.Getpid(), syscall.SIGUSR1)
// if err != nil {
// return fmt.Errorf("sending SIGUSR1 to all triggers: %w", err)
// }

if cfg.Mode.Selected == StressModeOps {
wg.Add(1)
go func(cancel context.CancelFunc) {
defer wg.Done()

for {
select {
case <-time.After(1 * time.Second):
allDone := true
for _, pid := range triggerPids {
if syscall.Kill(pid, 0) == nil {
allDone = false
break
}
}

if allDone {
coolDown("triggers finished", coolDownTime)
cancel()
return
}
}
}
}(cancel)
}

// block until tracee is finished or context is done
for {
select {
case <-ctx.Done():
for _, pid := range triggerPids {
err = syscall.Kill(pid, syscall.SIGTERM)
if err != nil {
return fmt.Errorf("sending SIGTERM to trigger %d: %w", pid, err)
}
}
goto cleanup
case err := <-traceeStatus:
if err != nil {
Expand Down Expand Up @@ -289,8 +337,8 @@ func triggerEvent(ctx context.Context, wg *sync.WaitGroup, event string, ops uin

cmd := exec.CommandContext(ctx, "./dist/evt", "trigger", "-e", event, "-o", fmt.Sprintf("%d", ops))
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: os.Getpid(),
// Setpgid: true,
// Pgid: os.Getpid(),
}
cmd.Stdin = nil
cmd.Stdout = os.Stdout
Expand All @@ -310,7 +358,7 @@ func triggerEvent(ctx context.Context, wg *sync.WaitGroup, event string, ops uin

waitErr := cmd.Wait()
if waitErr != nil {
err = fmt.Errorf("trigger finished: %w", waitErr)
fmt.Errorf("waiting for trigger: %w", waitErr)
}
}()

Expand Down Expand Up @@ -411,7 +459,6 @@ func RunTracee(
errCh <- fmt.Errorf("tracee finished: %w", err)
case <-time.After(1 * time.Second):
// give tracee some time to start
break
}
if err != nil {
return
Expand Down
39 changes: 20 additions & 19 deletions cmd/evt/cmd/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,44 +104,45 @@ func triggerRun(cmd *cobra.Command, args []string) error {
startChan := make(chan os.Signal, 1)
signal.Notify(startChan, syscall.SIGUSR1)
fmt.Printf("[trigger:%d:%s] Waiting for start signal\n", os.Getpid(), cfg.Event)

select {
case <-ctx.Done():
fmt.Printf("[trigger:%d:%s] Stopping triggering: %v\n", os.Getpid(), cfg.Event, ctx.Err())
return ctx.Err()
case <-startChan:
fmt.Printf("[trigger:%d:%s] Starting triggering %d ops with %v sleep time\n", os.Getpid(), cfg.Event, cfg.Ops, cfg.Sleep)
}

for i := uint32(0); i < cfg.Ops; i++ {
cmd := exec.CommandContext(ctx, fmt.Sprintf("./cmd/evt/cmd/trigger/triggers/%s.sh", cfg.Event))
err := cmd.Start()
if err != nil {
return fmt.Errorf("[trigger:%d:%s] failed to start command: %w", os.Getpid(), cfg.Event, err)
select {
case <-ctx.Done():
fmt.Printf("[trigger:%d:%s] Stopping triggering: %v\n", os.Getpid(), cfg.Event, ctx.Err())
return ctx.Err()
case <-time.After(cfg.Sleep):
// continue
}

err = cmd.Wait()
cmd := exec.CommandContext(ctx, fmt.Sprintf("./cmd/evt/cmd/trigger/triggers/%s.sh", cfg.Event))
err := cmd.Run()
if err != nil {
return fmt.Errorf("[trigger:%d:%s] failed to wait for command: %w", os.Getpid(), cfg.Event, err)
return fmt.Errorf("[trigger:%d:%s] failed to run command: %w", os.Getpid(), cfg.Event, err)
}

time.Sleep(cfg.Sleep)
// err := cmd.Start()
// if err != nil {
// return fmt.Errorf("[trigger:%d:%s] failed to start command: %w", os.Getpid(), cfg.Event, err)
// }

// err = cmd.Wait()
// if err != nil {
// return fmt.Errorf("[trigger:%d:%s] failed to wait for command: %w", os.Getpid(), cfg.Event, err)
// }
}

fmt.Printf("[trigger:%d:%s] Finished triggering %d ops\n", os.Getpid(), cfg.Event, cfg.Ops)

return nil
}

// func waitForCommand(cmd *exec.Cmd) <-chan error {
// done := make(chan error)

// go func() {
// done <- cmd.Wait()
// close(done)
// }()

// return done
// }

func Cmd() *cobra.Command {
return triggerCmd
}

0 comments on commit 6972f48

Please sign in to comment.