Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[shell-operator] refactoring #689

Merged
merged 3 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/shell-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/flant/kube-client/klogtolog"
"github.com/flant/shell-operator/pkg/app"
"github.com/flant/shell-operator/pkg/debug"
"github.com/flant/shell-operator/pkg/jq"
"github.com/flant/shell-operator/pkg/filter/jq"
shell_operator "github.com/flant/shell-operator/pkg/shell-operator"
utils_signal "github.com/flant/shell-operator/pkg/utils/signal"
)
Expand All @@ -33,7 +33,8 @@ func main() {
// print version
kpApp.Command("version", "Show version.").Action(func(_ *kingpin.ParseContext) error {
fmt.Printf("%s %s\n", app.AppName, app.Version)
fmt.Println(jq.FilterInfo())
fl := jq.NewFilter(app.JqLibraryPath)
fmt.Println(fl.FilterInfo())
return nil
})

Expand Down
13 changes: 10 additions & 3 deletions pkg/app/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ var DebugUnixSocket = "/var/run/shell-operator/debug.socket"

var DebugHttpServerAddr = ""

var DebugKeepTmpFiles = "no"
var (
DebugKeepTmpFilesVar = "no"
DebugKeepTmpFiles = false
)

var DebugKubernetesAPI = false

Expand All @@ -27,8 +30,12 @@ func DefineDebugFlags(kpApp *kingpin.Application, cmd *kingpin.CmdClause) {
cmd.Flag("debug-keep-tmp-files", "set to yes to disable cleanup of temporary files").
Envar("DEBUG_KEEP_TMP_FILES").
Hidden().
Default(DebugKeepTmpFiles).
StringVar(&DebugKeepTmpFiles)
Default(DebugKeepTmpFilesVar).Action(func(_ *kingpin.ParseContext) error {
DebugKeepTmpFiles = DebugKeepTmpFilesVar == "yes"

return nil
}).
StringVar(&DebugKeepTmpFilesVar)

cmd.Flag("debug-kubernetes-api", "enable client-go debug messages").
Envar("DEBUG_KUBERNETES_API").
Expand Down
116 changes: 80 additions & 36 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,9 @@ import (

"github.com/deckhouse/deckhouse/pkg/log"

"github.com/flant/shell-operator/pkg/app"
utils "github.com/flant/shell-operator/pkg/utils/labels"
)

type CmdUsage struct {
Sys time.Duration
User time.Duration
MaxRss int64
}

func Run(cmd *exec.Cmd) error {
// TODO context: hook name, hook phase, hook binding
// TODO observability
Expand All @@ -33,21 +26,86 @@ func Run(cmd *exec.Cmd) error {
return cmd.Run()
}

func RunAndLogLines(cmd *exec.Cmd, logLabels map[string]string, logger *log.Logger) (*CmdUsage, error) {
// TODO observability
type Executor struct {
cmd *exec.Cmd
logProxyHookJSON bool
proxyJsonKey string
logger *log.Logger
}

func (e *Executor) WithLogProxyHookJSON(logProxyHookJSON bool) *Executor {
e.logProxyHookJSON = logProxyHookJSON

return e
}

func (e *Executor) WithLogProxyHookJSONKey(logProxyHookJSONKey string) *Executor {
if logProxyHookJSONKey == "" {
return e
}

e.proxyJsonKey = logProxyHookJSONKey

return e
}

func (e *Executor) WithLogger(logger *log.Logger) *Executor {
e.logger = logger

return e
}

func (e *Executor) WithCMDStdout(w io.Writer) *Executor {
e.cmd.Stdout = w

return e
}

func (e *Executor) WithCMDStderr(w io.Writer) *Executor {
e.cmd.Stderr = w

return e
}

func NewExecutor(dir string, entrypoint string, args []string, envs []string) *Executor {
cmd := exec.Command(entrypoint, args...)
cmd.Env = append(cmd.Env, envs...)
cmd.Dir = dir

ex := &Executor{
cmd: cmd,
proxyJsonKey: "proxyJsonLog",
logger: log.NewLogger(log.Options{}).Named("auto-executor"),
}

return ex
}

func (e *Executor) Output() ([]byte, error) {
e.logger.Debugf("Executing command '%s' in '%s' dir", strings.Join(e.cmd.Args, " "), e.cmd.Dir)
return e.cmd.Output()
}

type CmdUsage struct {
Sys time.Duration
User time.Duration
MaxRss int64
}

func (e *Executor) RunAndLogLines(logLabels map[string]string) (*CmdUsage, error) {
stdErr := bytes.NewBuffer(nil)
logEntry := utils.EnrichLoggerWithLabels(logger, logLabels)
logEntry := utils.EnrichLoggerWithLabels(e.logger, logLabels)
stdoutLogEntry := logEntry.With("output", "stdout")
stderrLogEntry := logEntry.With("output", "stderr")

logEntry.Debugf("Executing command '%s' in '%s' dir", strings.Join(cmd.Args, " "), cmd.Dir)
logEntry.Debugf("Executing command '%s' in '%s' dir", strings.Join(e.cmd.Args, " "), e.cmd.Dir)

plo := &proxyLogger{app.LogProxyHookJSON, stdoutLogEntry, make([]byte, 0)}
ple := &proxyLogger{app.LogProxyHookJSON, stderrLogEntry, make([]byte, 0)}
cmd.Stdout = plo
cmd.Stderr = io.MultiWriter(ple, stdErr)
plo := &proxyLogger{e.logProxyHookJSON, e.proxyJsonKey, stdoutLogEntry, make([]byte, 0)}
ple := &proxyLogger{e.logProxyHookJSON, e.proxyJsonKey, stderrLogEntry, make([]byte, 0)}
e.cmd.Stdout = plo
e.cmd.Stderr = io.MultiWriter(ple, stdErr)

err := cmd.Run()
err := e.cmd.Run()
if err != nil {
if len(stdErr.Bytes()) > 0 {
return nil, fmt.Errorf("%s", stdErr.String())
Expand All @@ -57,14 +115,14 @@ func RunAndLogLines(cmd *exec.Cmd, logLabels map[string]string, logger *log.Logg
}

var usage *CmdUsage
if cmd.ProcessState != nil {
if e.cmd.ProcessState != nil {
usage = &CmdUsage{
Sys: cmd.ProcessState.SystemTime(),
User: cmd.ProcessState.UserTime(),
Sys: e.cmd.ProcessState.SystemTime(),
User: e.cmd.ProcessState.UserTime(),
}

// FIXME Maxrss is Unix specific.
sysUsage := cmd.ProcessState.SysUsage()
sysUsage := e.cmd.ProcessState.SysUsage()
if v, ok := sysUsage.(*syscall.Rusage); ok {
// v.Maxrss is int32 on arm/v7
usage.MaxRss = int64(v.Maxrss) //nolint:unconvert
Expand All @@ -76,6 +134,7 @@ func RunAndLogLines(cmd *exec.Cmd, logLabels map[string]string, logger *log.Logg

type proxyLogger struct {
logProxyHookJSON bool
proxyJsonLogKey string

logger *log.Logger

Expand Down Expand Up @@ -116,7 +175,7 @@ func (pl *proxyLogger) Write(p []byte) (int, error) {
return len(p), err
}

logger := pl.logger.With(app.ProxyJsonLogKey, true)
logger := pl.logger.With(pl.proxyJsonLogKey, true)

logLineRaw, _ := json.Marshal(logMap)
logLine := string(logLineRaw)
Expand Down Expand Up @@ -176,18 +235,3 @@ func (pl *proxyLogger) writerScanner(p []byte) {
pl.logger.Error("reading from scanner", slog.String("error", err.Error()))
}
}

func Output(cmd *exec.Cmd) (output []byte, err error) {
// TODO context: hook name, hook phase, hook binding
// TODO observability
log.Debugf("Executing command '%s' in '%s' dir", strings.Join(cmd.Args, " "), cmd.Dir)
output, err = cmd.Output()
return
}

func MakeCommand(dir string, entrypoint string, args []string, envs []string) *exec.Cmd {
cmd := exec.Command(entrypoint, args...)
cmd.Env = append(cmd.Env, envs...)
cmd.Dir = dir
return cmd
}
78 changes: 44 additions & 34 deletions pkg/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@ import (
"io"
"math/rand/v2"
"os"
"os/exec"
"regexp"
"testing"
"time"

"github.com/deckhouse/deckhouse/pkg/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/flant/shell-operator/pkg/app"
)

func TestRunAndLogLines(t *testing.T) {
Expand All @@ -35,11 +32,11 @@ func TestRunAndLogLines(t *testing.T) {
logger.SetOutput(&buf)

t.Run("simple log", func(t *testing.T) {
app.LogProxyHookJSON = true

cmd := exec.Command("echo", `{"foo": "baz"}`)
ex := NewExecutor("", "echo", []string{`{"foo": "baz"}`}, []string{}).
WithLogProxyHookJSON(true).
WithLogger(logger)

_, err := RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)
_, err := ex.RunAndLogLines(map[string]string{"a": "b"})
assert.NoError(t, err)

assert.Equal(t, buf.String(), `{"level":"fatal","msg":"hook result","a":"b","hook":{"foo":"baz"},"output":"stdout","proxyJsonLog":true,"time":"2006-01-02T15:04:05Z"}`+"\n")
Expand All @@ -48,10 +45,10 @@ func TestRunAndLogLines(t *testing.T) {
})

t.Run("not json log", func(t *testing.T) {
app.LogProxyHookJSON = false
cmd := exec.Command("echo", `foobar`)
ex := NewExecutor("", "echo", []string{"foobar"}, []string{}).
WithLogger(logger)

_, err := RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)
_, err := ex.RunAndLogLines(map[string]string{"a": "b"})
assert.NoError(t, err)

assert.Equal(t, buf.String(), `{"level":"info","msg":"foobar","a":"b","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n")
Expand All @@ -67,10 +64,11 @@ func TestRunAndLogLines(t *testing.T) {

_, _ = io.WriteString(f, `{"foo": "`+randStringRunes(1024*1024)+`"}`)

app.LogProxyHookJSON = true
cmd := exec.Command("cat", f.Name())
ex := NewExecutor("", "cat", []string{f.Name()}, []string{}).
WithLogProxyHookJSON(true).
WithLogger(logger)

_, err = RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)
_, err = ex.RunAndLogLines(map[string]string{"a": "b"})
assert.NoError(t, err)

reg := regexp.MustCompile(`{"level":"fatal","msg":"hook result","a":"b","hook":{"truncated":".*:truncated"},"output":"stdout","proxyJsonLog":true,"time":"2006-01-02T15:04:05Z"`)
Expand All @@ -87,10 +85,10 @@ func TestRunAndLogLines(t *testing.T) {

_, _ = io.WriteString(f, `result `+randStringRunes(1024*1024))

app.LogProxyHookJSON = false
cmd := exec.Command("cat", f.Name())
ex := NewExecutor("", "cat", []string{f.Name()}, []string{}).
WithLogger(logger)

_, err = RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)
_, err = ex.RunAndLogLines(map[string]string{"a": "b"})
assert.NoError(t, err)

reg := regexp.MustCompile(`{"level":"info","msg":"result .*:truncated","a":"b","output":"stdout","time":"2006-01-02T15:04:05Z"`)
Expand All @@ -101,38 +99,47 @@ func TestRunAndLogLines(t *testing.T) {

t.Run("invalid json structure", func(t *testing.T) {
logger.SetLevel(log.LevelDebug)
app.LogProxyHookJSON = true
cmd := exec.Command("echo", `["a","b","c"]`)
_, err := RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)

ex := NewExecutor("", "echo", []string{`["a","b","c"]`}, []string{}).
WithLogProxyHookJSON(true).
WithLogger(logger)

_, err := ex.RunAndLogLines(map[string]string{"a": "b"})
assert.NoError(t, err)
assert.Equal(t, buf.String(), `{"level":"debug","msg":"Executing command 'echo [\"a\",\"b\",\"c\"]' in '' dir","source":"executor/executor.go:43","a":"b","time":"2006-01-02T15:04:05Z"}`+"\n"+
`{"level":"debug","msg":"json log line not map[string]interface{}","source":"executor/executor.go:111","a":"b","line":["a","b","c"],"output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n"+
`{"level":"info","msg":"[\"a\",\"b\",\"c\"]\n","source":"executor/executor.go:114","a":"b","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n")

assert.Equal(t, buf.String(), `{"level":"debug","msg":"Executing command 'echo [\"a\",\"b\",\"c\"]' in '' dir","source":"executor/executor.go:101","a":"b","time":"2006-01-02T15:04:05Z"}`+"\n"+
`{"level":"debug","msg":"json log line not map[string]interface{}","source":"executor/executor.go:170","a":"b","line":["a","b","c"],"output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n"+
`{"level":"info","msg":"[\"a\",\"b\",\"c\"]\n","source":"executor/executor.go:173","a":"b","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n")

buf.Reset()
})

t.Run("multiline", func(t *testing.T) {
logger.SetLevel(log.LevelInfo)
app.LogProxyHookJSON = true
cmd := exec.Command("echo", `
arg := `
{"a":"b",
"c":"d"}
`)
_, err := RunAndLogLines(cmd, map[string]string{"foor": "baar"}, logger)
`
ex := NewExecutor("", "echo", []string{arg}, []string{}).
WithLogProxyHookJSON(true).
WithLogger(logger)

_, err := ex.RunAndLogLines(map[string]string{"foor": "baar"})
assert.NoError(t, err)
assert.Equal(t, buf.String(), `{"level":"fatal","msg":"hook result","foor":"baar","hook":{"a":"b","c":"d"},"output":"stdout","proxyJsonLog":true,"time":"2006-01-02T15:04:05Z"}`+"\n")

buf.Reset()
})

t.Run("multiline non json", func(t *testing.T) {
app.LogProxyHookJSON = false
cmd := exec.Command("echo", `
arg := `
a b
c d
`)
_, err := RunAndLogLines(cmd, map[string]string{"foor": "baar"}, logger)
`
ex := NewExecutor("", "echo", []string{arg}, []string{}).
WithLogger(logger)

_, err := ex.RunAndLogLines(map[string]string{"foor": "baar"})
assert.NoError(t, err)
assert.Equal(t, buf.String(), `{"level":"info","msg":"a b","foor":"baar","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n"+
`{"level":"info","msg":"c d","foor":"baar","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n")
Expand All @@ -141,12 +148,15 @@ c d
})

t.Run("multiline json", func(t *testing.T) {
app.LogProxyHookJSON = true
cmd := exec.Command("echo", `{
arg := `{
"a":"b",
"c":"d"
}`)
_, err := RunAndLogLines(cmd, map[string]string{"foor": "baar"}, logger)
}`
ex := NewExecutor("", "echo", []string{arg}, []string{}).
WithLogProxyHookJSON(true).
WithLogger(logger)

_, err := ex.RunAndLogLines(map[string]string{"foor": "baar"})
assert.NoError(t, err)
assert.Equal(t, buf.String(), `{"level":"fatal","msg":"hook result","foor":"baar","hook":{"a":"b","c":"d"},"output":"stdout","proxyJsonLog":true,"time":"2006-01-02T15:04:05Z"}`+"\n")

Expand Down
6 changes: 6 additions & 0 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package filter

type Filter interface {
ApplyFilter(filterStr string, data []byte) (string, error)
FilterInfo() string
}
Loading
Loading