Skip to content

Commit

Permalink
[shell-operator] refactoring (#689)
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Okhlopkov <[email protected]>
Co-authored-by: Pavel Okhlopkov <[email protected]>
  • Loading branch information
ldmonster and Pavel Okhlopkov authored Nov 19, 2024
1 parent 4b4561d commit 0b4a0ef
Show file tree
Hide file tree
Showing 63 changed files with 836 additions and 704 deletions.
5 changes: 3 additions & 2 deletions cmd/shell-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/flant/kube-client/klogtolog"
"github.com/flant/shell-operator/pkg/app"
"github.com/flant/shell-operator/pkg/debug"
"github.com/flant/shell-operator/pkg/jq"
"github.com/flant/shell-operator/pkg/filter/jq"
shell_operator "github.com/flant/shell-operator/pkg/shell-operator"
utils_signal "github.com/flant/shell-operator/pkg/utils/signal"
)
Expand All @@ -33,7 +33,8 @@ func main() {
// print version
kpApp.Command("version", "Show version.").Action(func(_ *kingpin.ParseContext) error {
fmt.Printf("%s %s\n", app.AppName, app.Version)
fmt.Println(jq.FilterInfo())
fl := jq.NewFilter(app.JqLibraryPath)
fmt.Println(fl.FilterInfo())
return nil
})

Expand Down
13 changes: 10 additions & 3 deletions pkg/app/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ var DebugUnixSocket = "/var/run/shell-operator/debug.socket"

var DebugHttpServerAddr = ""

var DebugKeepTmpFiles = "no"
var (
DebugKeepTmpFilesVar = "no"
DebugKeepTmpFiles = false
)

var DebugKubernetesAPI = false

Expand All @@ -27,8 +30,12 @@ func DefineDebugFlags(kpApp *kingpin.Application, cmd *kingpin.CmdClause) {
cmd.Flag("debug-keep-tmp-files", "set to yes to disable cleanup of temporary files").
Envar("DEBUG_KEEP_TMP_FILES").
Hidden().
Default(DebugKeepTmpFiles).
StringVar(&DebugKeepTmpFiles)
Default(DebugKeepTmpFilesVar).Action(func(_ *kingpin.ParseContext) error {
DebugKeepTmpFiles = DebugKeepTmpFilesVar == "yes"

return nil
}).
StringVar(&DebugKeepTmpFilesVar)

cmd.Flag("debug-kubernetes-api", "enable client-go debug messages").
Envar("DEBUG_KUBERNETES_API").
Expand Down
116 changes: 80 additions & 36 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,9 @@ import (

"github.com/deckhouse/deckhouse/pkg/log"

"github.com/flant/shell-operator/pkg/app"
utils "github.com/flant/shell-operator/pkg/utils/labels"
)

type CmdUsage struct {
Sys time.Duration
User time.Duration
MaxRss int64
}

func Run(cmd *exec.Cmd) error {
// TODO context: hook name, hook phase, hook binding
// TODO observability
Expand All @@ -33,21 +26,86 @@ func Run(cmd *exec.Cmd) error {
return cmd.Run()
}

func RunAndLogLines(cmd *exec.Cmd, logLabels map[string]string, logger *log.Logger) (*CmdUsage, error) {
// TODO observability
type Executor struct {
cmd *exec.Cmd
logProxyHookJSON bool
proxyJsonKey string
logger *log.Logger
}

func (e *Executor) WithLogProxyHookJSON(logProxyHookJSON bool) *Executor {
e.logProxyHookJSON = logProxyHookJSON

return e
}

func (e *Executor) WithLogProxyHookJSONKey(logProxyHookJSONKey string) *Executor {
if logProxyHookJSONKey == "" {
return e
}

e.proxyJsonKey = logProxyHookJSONKey

return e
}

func (e *Executor) WithLogger(logger *log.Logger) *Executor {
e.logger = logger

return e
}

func (e *Executor) WithCMDStdout(w io.Writer) *Executor {
e.cmd.Stdout = w

return e
}

func (e *Executor) WithCMDStderr(w io.Writer) *Executor {
e.cmd.Stderr = w

return e
}

func NewExecutor(dir string, entrypoint string, args []string, envs []string) *Executor {
cmd := exec.Command(entrypoint, args...)
cmd.Env = append(cmd.Env, envs...)
cmd.Dir = dir

ex := &Executor{
cmd: cmd,
proxyJsonKey: "proxyJsonLog",
logger: log.NewLogger(log.Options{}).Named("auto-executor"),
}

return ex
}

func (e *Executor) Output() ([]byte, error) {
e.logger.Debugf("Executing command '%s' in '%s' dir", strings.Join(e.cmd.Args, " "), e.cmd.Dir)
return e.cmd.Output()
}

type CmdUsage struct {
Sys time.Duration
User time.Duration
MaxRss int64
}

func (e *Executor) RunAndLogLines(logLabels map[string]string) (*CmdUsage, error) {
stdErr := bytes.NewBuffer(nil)
logEntry := utils.EnrichLoggerWithLabels(logger, logLabels)
logEntry := utils.EnrichLoggerWithLabels(e.logger, logLabels)
stdoutLogEntry := logEntry.With("output", "stdout")
stderrLogEntry := logEntry.With("output", "stderr")

logEntry.Debugf("Executing command '%s' in '%s' dir", strings.Join(cmd.Args, " "), cmd.Dir)
logEntry.Debugf("Executing command '%s' in '%s' dir", strings.Join(e.cmd.Args, " "), e.cmd.Dir)

plo := &proxyLogger{app.LogProxyHookJSON, stdoutLogEntry, make([]byte, 0)}
ple := &proxyLogger{app.LogProxyHookJSON, stderrLogEntry, make([]byte, 0)}
cmd.Stdout = plo
cmd.Stderr = io.MultiWriter(ple, stdErr)
plo := &proxyLogger{e.logProxyHookJSON, e.proxyJsonKey, stdoutLogEntry, make([]byte, 0)}
ple := &proxyLogger{e.logProxyHookJSON, e.proxyJsonKey, stderrLogEntry, make([]byte, 0)}
e.cmd.Stdout = plo
e.cmd.Stderr = io.MultiWriter(ple, stdErr)

err := cmd.Run()
err := e.cmd.Run()
if err != nil {
if len(stdErr.Bytes()) > 0 {
return nil, fmt.Errorf("%s", stdErr.String())
Expand All @@ -57,14 +115,14 @@ func RunAndLogLines(cmd *exec.Cmd, logLabels map[string]string, logger *log.Logg
}

var usage *CmdUsage
if cmd.ProcessState != nil {
if e.cmd.ProcessState != nil {
usage = &CmdUsage{
Sys: cmd.ProcessState.SystemTime(),
User: cmd.ProcessState.UserTime(),
Sys: e.cmd.ProcessState.SystemTime(),
User: e.cmd.ProcessState.UserTime(),
}

// FIXME Maxrss is Unix specific.
sysUsage := cmd.ProcessState.SysUsage()
sysUsage := e.cmd.ProcessState.SysUsage()
if v, ok := sysUsage.(*syscall.Rusage); ok {
// v.Maxrss is int32 on arm/v7
usage.MaxRss = int64(v.Maxrss) //nolint:unconvert
Expand All @@ -76,6 +134,7 @@ func RunAndLogLines(cmd *exec.Cmd, logLabels map[string]string, logger *log.Logg

type proxyLogger struct {
logProxyHookJSON bool
proxyJsonLogKey string

logger *log.Logger

Expand Down Expand Up @@ -116,7 +175,7 @@ func (pl *proxyLogger) Write(p []byte) (int, error) {
return len(p), err
}

logger := pl.logger.With(app.ProxyJsonLogKey, true)
logger := pl.logger.With(pl.proxyJsonLogKey, true)

logLineRaw, _ := json.Marshal(logMap)
logLine := string(logLineRaw)
Expand Down Expand Up @@ -176,18 +235,3 @@ func (pl *proxyLogger) writerScanner(p []byte) {
pl.logger.Error("reading from scanner", slog.String("error", err.Error()))
}
}

func Output(cmd *exec.Cmd) (output []byte, err error) {
// TODO context: hook name, hook phase, hook binding
// TODO observability
log.Debugf("Executing command '%s' in '%s' dir", strings.Join(cmd.Args, " "), cmd.Dir)
output, err = cmd.Output()
return
}

func MakeCommand(dir string, entrypoint string, args []string, envs []string) *exec.Cmd {
cmd := exec.Command(entrypoint, args...)
cmd.Env = append(cmd.Env, envs...)
cmd.Dir = dir
return cmd
}
78 changes: 44 additions & 34 deletions pkg/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@ import (
"io"
"math/rand/v2"
"os"
"os/exec"
"regexp"
"testing"
"time"

"github.com/deckhouse/deckhouse/pkg/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/flant/shell-operator/pkg/app"
)

func TestRunAndLogLines(t *testing.T) {
Expand All @@ -35,11 +32,11 @@ func TestRunAndLogLines(t *testing.T) {
logger.SetOutput(&buf)

t.Run("simple log", func(t *testing.T) {
app.LogProxyHookJSON = true

cmd := exec.Command("echo", `{"foo": "baz"}`)
ex := NewExecutor("", "echo", []string{`{"foo": "baz"}`}, []string{}).
WithLogProxyHookJSON(true).
WithLogger(logger)

_, err := RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)
_, err := ex.RunAndLogLines(map[string]string{"a": "b"})
assert.NoError(t, err)

assert.Equal(t, buf.String(), `{"level":"fatal","msg":"hook result","a":"b","hook":{"foo":"baz"},"output":"stdout","proxyJsonLog":true,"time":"2006-01-02T15:04:05Z"}`+"\n")
Expand All @@ -48,10 +45,10 @@ func TestRunAndLogLines(t *testing.T) {
})

t.Run("not json log", func(t *testing.T) {
app.LogProxyHookJSON = false
cmd := exec.Command("echo", `foobar`)
ex := NewExecutor("", "echo", []string{"foobar"}, []string{}).
WithLogger(logger)

_, err := RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)
_, err := ex.RunAndLogLines(map[string]string{"a": "b"})
assert.NoError(t, err)

assert.Equal(t, buf.String(), `{"level":"info","msg":"foobar","a":"b","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n")
Expand All @@ -67,10 +64,11 @@ func TestRunAndLogLines(t *testing.T) {

_, _ = io.WriteString(f, `{"foo": "`+randStringRunes(1024*1024)+`"}`)

app.LogProxyHookJSON = true
cmd := exec.Command("cat", f.Name())
ex := NewExecutor("", "cat", []string{f.Name()}, []string{}).
WithLogProxyHookJSON(true).
WithLogger(logger)

_, err = RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)
_, err = ex.RunAndLogLines(map[string]string{"a": "b"})
assert.NoError(t, err)

reg := regexp.MustCompile(`{"level":"fatal","msg":"hook result","a":"b","hook":{"truncated":".*:truncated"},"output":"stdout","proxyJsonLog":true,"time":"2006-01-02T15:04:05Z"`)
Expand All @@ -87,10 +85,10 @@ func TestRunAndLogLines(t *testing.T) {

_, _ = io.WriteString(f, `result `+randStringRunes(1024*1024))

app.LogProxyHookJSON = false
cmd := exec.Command("cat", f.Name())
ex := NewExecutor("", "cat", []string{f.Name()}, []string{}).
WithLogger(logger)

_, err = RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)
_, err = ex.RunAndLogLines(map[string]string{"a": "b"})
assert.NoError(t, err)

reg := regexp.MustCompile(`{"level":"info","msg":"result .*:truncated","a":"b","output":"stdout","time":"2006-01-02T15:04:05Z"`)
Expand All @@ -101,38 +99,47 @@ func TestRunAndLogLines(t *testing.T) {

t.Run("invalid json structure", func(t *testing.T) {
logger.SetLevel(log.LevelDebug)
app.LogProxyHookJSON = true
cmd := exec.Command("echo", `["a","b","c"]`)
_, err := RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)

ex := NewExecutor("", "echo", []string{`["a","b","c"]`}, []string{}).
WithLogProxyHookJSON(true).
WithLogger(logger)

_, err := ex.RunAndLogLines(map[string]string{"a": "b"})
assert.NoError(t, err)
assert.Equal(t, buf.String(), `{"level":"debug","msg":"Executing command 'echo [\"a\",\"b\",\"c\"]' in '' dir","source":"executor/executor.go:43","a":"b","time":"2006-01-02T15:04:05Z"}`+"\n"+
`{"level":"debug","msg":"json log line not map[string]interface{}","source":"executor/executor.go:111","a":"b","line":["a","b","c"],"output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n"+
`{"level":"info","msg":"[\"a\",\"b\",\"c\"]\n","source":"executor/executor.go:114","a":"b","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n")

assert.Equal(t, buf.String(), `{"level":"debug","msg":"Executing command 'echo [\"a\",\"b\",\"c\"]' in '' dir","source":"executor/executor.go:101","a":"b","time":"2006-01-02T15:04:05Z"}`+"\n"+
`{"level":"debug","msg":"json log line not map[string]interface{}","source":"executor/executor.go:170","a":"b","line":["a","b","c"],"output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n"+
`{"level":"info","msg":"[\"a\",\"b\",\"c\"]\n","source":"executor/executor.go:173","a":"b","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n")

buf.Reset()
})

t.Run("multiline", func(t *testing.T) {
logger.SetLevel(log.LevelInfo)
app.LogProxyHookJSON = true
cmd := exec.Command("echo", `
arg := `
{"a":"b",
"c":"d"}
`)
_, err := RunAndLogLines(cmd, map[string]string{"foor": "baar"}, logger)
`
ex := NewExecutor("", "echo", []string{arg}, []string{}).
WithLogProxyHookJSON(true).
WithLogger(logger)

_, err := ex.RunAndLogLines(map[string]string{"foor": "baar"})
assert.NoError(t, err)
assert.Equal(t, buf.String(), `{"level":"fatal","msg":"hook result","foor":"baar","hook":{"a":"b","c":"d"},"output":"stdout","proxyJsonLog":true,"time":"2006-01-02T15:04:05Z"}`+"\n")

buf.Reset()
})

t.Run("multiline non json", func(t *testing.T) {
app.LogProxyHookJSON = false
cmd := exec.Command("echo", `
arg := `
a b
c d
`)
_, err := RunAndLogLines(cmd, map[string]string{"foor": "baar"}, logger)
`
ex := NewExecutor("", "echo", []string{arg}, []string{}).
WithLogger(logger)

_, err := ex.RunAndLogLines(map[string]string{"foor": "baar"})
assert.NoError(t, err)
assert.Equal(t, buf.String(), `{"level":"info","msg":"a b","foor":"baar","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n"+
`{"level":"info","msg":"c d","foor":"baar","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n")
Expand All @@ -141,12 +148,15 @@ c d
})

t.Run("multiline json", func(t *testing.T) {
app.LogProxyHookJSON = true
cmd := exec.Command("echo", `{
arg := `{
"a":"b",
"c":"d"
}`)
_, err := RunAndLogLines(cmd, map[string]string{"foor": "baar"}, logger)
}`
ex := NewExecutor("", "echo", []string{arg}, []string{}).
WithLogProxyHookJSON(true).
WithLogger(logger)

_, err := ex.RunAndLogLines(map[string]string{"foor": "baar"})
assert.NoError(t, err)
assert.Equal(t, buf.String(), `{"level":"fatal","msg":"hook result","foor":"baar","hook":{"a":"b","c":"d"},"output":"stdout","proxyJsonLog":true,"time":"2006-01-02T15:04:05Z"}`+"\n")

Expand Down
6 changes: 6 additions & 0 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package filter

type Filter interface {
ApplyFilter(filterStr string, data []byte) (string, error)
FilterInfo() string
}
Loading

0 comments on commit 0b4a0ef

Please sign in to comment.