Skip to content

Commit

Permalink
*: init multirunners server command
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Fu <[email protected]>
  • Loading branch information
fuweid committed Jan 10, 2024
1 parent 636d5a4 commit def26d7
Show file tree
Hide file tree
Showing 6 changed files with 382 additions and 1 deletion.
5 changes: 4 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ RUN mkdir -p /output
WORKDIR /kperf-build
RUN --mount=source=./,target=/kperf-build,rw make build && PREFIX=/output make install

FROM gcr.io/distroless/static-debian12:nonroot AS release-stage
# TODO: We should consider to implement our own curl to upload data
FROM ubuntu:22.04 AS release-stage

RUN apt update -y && apt install curl -y

WORKDIR /

Expand Down
7 changes: 7 additions & 0 deletions cmd/kperf/commands/multirunners/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,17 @@ var Command = cli.Command{
Name: "multirunners",
ShortName: "mrunners",
Usage: "packages runner as job and deploy runners into kubernetes",
Flags: []cli.Flag{
cli.StringFlag{
Name: "kubeconfig",
Usage: "Path to the kubeconfig file",
},
},
Subcommands: []cli.Command{
runCommand,
waitCommand,
resultCommand,
serverCommand,
},
}

Expand Down
85 changes: 85 additions & 0 deletions cmd/kperf/commands/multirunners/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package multirunners

import (
"fmt"
"strings"

"github.com/Azure/kperf/runner"

"github.com/urfave/cli"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

var serverCommand = cli.Command{
Name: "server",
Flags: []cli.Flag{
cli.StringFlag{
Name: "namespace",
Usage: "The namespace scope for runners",
Value: "default",
},
cli.StringSliceFlag{
Name: "runners",
Usage: "The runner spec's URI",
Required: true,
},
cli.StringFlag{
Name: "runner-image",
Usage: "The runner's conainer image",
Required: true,
},
cli.IntFlag{
Name: "port",
Value: 8080,
},
cli.StringFlag{
Name: "host",
Value: "0.0.0.0",
},
cli.StringFlag{
Name: "data",
Usage: "The runner result should be stored in that path",
Value: "/tmp/data",
},
},
Hidden: true,
Action: func(cliCtx *cli.Context) error {
name := strings.TrimSpace(cliCtx.Args().Get(0))
if len(name) == 0 {
return fmt.Errorf("required non-empty name")
}

addr := fmt.Sprintf("%s:%d", cliCtx.String("host"), cliCtx.Int("port"))
dataDir := cliCtx.String("data")

kubeCfgPath := cliCtx.String("kubeconfig")
config, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath)
if err != nil {
return err
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}

groups := []*runner.GroupHandler{}
imgRef := cliCtx.String("runner-image")
ns := cliCtx.String("namespace")
for idx, specUri := range cliCtx.StringSlice("runners") {

Check warning on line 70 in cmd/kperf/commands/multirunners/server.go

View workflow job for this annotation

GitHub Actions / linter

var-naming: range var specUri should be specURI (revive)
gName := fmt.Sprintf("%s-%d", name, idx)
g, err := runner.NewGroupHandler(clientset, gName, ns, specUri, imgRef)
if err != nil {
return err
}
groups = append(groups, g)
}

srv, err := runner.NewServer(dataDir, addr, groups...)
if err != nil {
return err
}
return srv.Run()
},
}
155 changes: 155 additions & 0 deletions runner/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package runner

import (
"context"
"fmt"
"net/url"

"github.com/Azure/kperf/api/types"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)

// GroupHandler is to deploy job to run several runners with same load profile.
type GroupHandler struct {
name string
namespace string
uid string

Check failure on line 19 in runner/group.go

View workflow job for this annotation

GitHub Actions / linter

field `uid` is unused (unused)

count int
imageRef string

Check failure on line 22 in runner/group.go

View workflow job for this annotation

GitHub Actions / linter

field `imageRef` is unused (unused)
profile types.LoadProfile

clientset kubernetes.Interface
}

// NewGroupHandler returns new instance of GroupHandler.
//
// The profileUrl input has two formats
//
// 1. file:///absolute_path?count=x
// 2. configmap:///configmap_name?count=x
func NewGroupHandler(clientset kubernetes.Interface, name, ns, profileUrl, imageRef string) (*GroupHandler, error) {

Check warning on line 34 in runner/group.go

View workflow job for this annotation

GitHub Actions / linter

var-naming: func parameter profileUrl should be profileURL (revive)
u, err := url.Parse(profileUrl)
if err != nil {
return nil, fmt.Errorf("invalid uri %s: %w", profileUrl, err)
}

var count int
var profile types.LoadProfile

switch u.Scheme {
case "file":
// TODO
case "configmap":
// TODO
default:
return nil, fmt.Errorf("unsupported scheme %s", u.Scheme)
}

return &GroupHandler{
name: name,
namespace: ns,
count: count,
profile: profile,
clientset: clientset,
}, nil
}

// Deploy deploys a group of runners as a job if necessary.
func (h *GroupHandler) Deploy(ctx context.Context, uploadUrl string) error {

Check warning on line 62 in runner/group.go

View workflow job for this annotation

GitHub Actions / linter

var-naming: method parameter uploadUrl should be uploadURL (revive)
// 1. Use client to check configmap named by h.name
// 1.1 If not exist
// create configmap named by h.name
// the configmap has profile.yaml data (marshal h.profile into YAML)
// 1.2 else
// check configmap and verify profile.yaml data is equal to h.profile
// if the data is not correct, return error
// 2. Use client to check job named by h.name
// 2.1 If not exist
// create job named by h.name
// 2.2 else
// check if the existing job spec is expected
// if not, return error
// 3. Update h.uid = job.Uid
//
// NOTE: The job spec should be like
/*
apiVersion: batch/v1
kind: Job
metadata:
name: {{ h.name }}
namespace: {{ h.namespace }}
spec:
completions: {{ h.count }}
parallelism: {{ h.count }}
template:
spec:
# TODO: affinity support
containers:
# FIXME:
#
# We should consider to use `--result` flag to upload data
# directly instead of using curl. When `--result=http://xyz:xxx`,
# we should upload data into that target url.
- args:
- kperf
- runner
- run
- --config=/data/config.yaml
- --user-agent=$(POD_NAME)
- --result=/host/$(POD_NS)-$(POD_NAME)-$(POD_UID).json
- && curl -X POST {{ uploadUrl }} -d @/host/$(POD_NS)-$(POD_NAME)-$(POD_UID).json
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_UID
valueFrom:
fieldRef:
fieldPath: metadata.uid
- name: POD_NS
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: {{ h.image }}
imagePullPolicy: Always
name: runner
volumeMounts:
- mountPath: /data/
name: config
- mountPath: /host
name: host-root
restartPolicy: Never
# TODO: support serviceAccount/serviceAccountName
volumes:
- configMap:
name: {{ h.name }}
name: config
- hostPath:
path: /tmp
type: ""
name: host-root
*/
return fmt.Errorf("not implemented yet")
}

// Status returns the job's status.
func (h *GroupHandler) Status(ctx context.Context) (*batchv1.JobStatus, error) {

Check warning on line 141 in runner/group.go

View workflow job for this annotation

GitHub Actions / linter

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
// return the job named by h.name
return nil, fmt.Errorf("not implemented yet")
}

// Pods returns all the pods controlled by the job.
func (h *GroupHandler) Pods(ctx context.Context) ([]*corev1.Pod, error) {

Check warning on line 147 in runner/group.go

View workflow job for this annotation

GitHub Actions / linter

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
// return all the pods controlled by the job.
return nil, fmt.Errorf("not implemented yet")
}

// Profile returns load profile.
func (h *GroupHandler) Profile() types.LoadProfile {
return h.profile
}
60 changes: 60 additions & 0 deletions runner/localstore/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package localstore

import (
"fmt"
"io"
)

// Store is a filesystem-like key/value storage.
//
// Each key/value has committed and ingesting status. When OpenWriter returns
// ingestion transcation, the Store opens rootDir/ingesting/$random file to
// receive value data. Once all the data is written, the Commit(ref) moves the
// file into rootDir/committed/ref.
type Store struct {
rootDir string

Check failure on line 15 in runner/localstore/store.go

View workflow job for this annotation

GitHub Actions / linter

field `rootDir` is unused (unused)
}

// NewStore returns new instance of Store.
func NewStore(rootDir string) *Store {

Check warning on line 19 in runner/localstore/store.go

View workflow job for this annotation

GitHub Actions / linter

unused-parameter: parameter 'rootDir' seems to be unused, consider removing or renaming it as _ (revive)
return &Store{}
}

// OpenWriter is to initiate a writing operation, ingestion transcation. A
// single ingestion transcation is to open temporary file and allow caller to
// write data into the temporary file. Once all the data is written, the caller
// should call Commit to complete ingestion transcation.
func (s *Store) OpenWriter() (Writer, error) {
return nil, fmt.Errorf("not implemented yet")
}

// OpenReader is to open committed content named by ref.
func (s *Store) OpenReader(ref string) (Reader, error) {

Check warning on line 32 in runner/localstore/store.go

View workflow job for this annotation

GitHub Actions / linter

unused-parameter: parameter 'ref' seems to be unused, consider removing or renaming it as _ (revive)
return nil, fmt.Errorf("not implemented yet")
}

// Delete is to delete committed content named by ref.
func (s *Store) Delete(ref string) error {

Check warning on line 37 in runner/localstore/store.go

View workflow job for this annotation

GitHub Actions / linter

unused-parameter: parameter 'ref' seems to be unused, consider removing or renaming it as _ (revive)
return fmt.Errorf("not implemented yet")
}

// Writer handles writing of content into local store
type Writer interface {
// Close closes the writer.
//
// If the writer has not been committed, this allows aborting.
// Calling Close on a closed writer will not error.
io.WriteCloser

// Commit commits data as file named by ref.
//
// Commit always close Writer. If ref already exists, it will return
// error.
Commit(ref string) error
}

type Reader interface {
io.ReaderAt
io.Closer
Size() int64
}
Loading

0 comments on commit def26d7

Please sign in to comment.