Skip to content

Commit

Permalink
fix: runner unable to re-register (#707)
Browse files Browse the repository at this point in the history
  • Loading branch information
leg100 authored Nov 12, 2024
1 parent 1d41f61 commit 41f5669
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 47 deletions.
31 changes: 17 additions & 14 deletions internal/runner/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package runner

import (
"context"
"errors"
"log/slog"
"net/netip"
"time"
Expand Down Expand Up @@ -44,6 +43,8 @@ type RunnerMetaAgentPool struct {
Name string `json:"name"`
// Agent pool's organization.
OrganizationName string `json:"organization-name"`
// ID of agent token that was used to authenticate runner.
TokenID resource.ID `json:"token-id"`
}

type registerOptions struct {
Expand All @@ -64,31 +65,32 @@ type registerOptions struct {
CurrentJobs []resource.ID `json:"current-jobs,omitempty"`
}

func (m *RunnerMeta) register(opts registerOptions) error {
if m.ID != resource.EmptyID {
return errors.New("runner has already registered")
// register registers an unregistered runner, constructing a RunnerMeta which
// provides info about the newly registered runner.
func register(runner *unregistered, opts registerOptions) (*RunnerMeta, error) {
meta := &RunnerMeta{
ID: resource.NewID(resource.RunnerKind),
Name: opts.Name,
Version: opts.Version,
MaxJobs: opts.Concurrency,
AgentPool: runner.pool,
}
m.ID = resource.NewID(resource.RunnerKind)
m.Name = opts.Name
m.Version = opts.Version
m.MaxJobs = opts.Concurrency

if err := m.setStatus(RunnerIdle, true); err != nil {
return err
if err := meta.setStatus(RunnerIdle, true); err != nil {
return nil, err
}
if opts.IPAddress != nil {
m.IPAddress = *opts.IPAddress
meta.IPAddress = *opts.IPAddress
} else {
// IP address not provided: try to get local IP address used for
// outbound comms, and if that fails, use localhost
ip, err := internal.GetOutboundIP()
if err != nil {
ip = netip.IPv6Loopback()
}
m.IPAddress = ip
meta.IPAddress = ip
}

return nil
return meta, nil
}

func (m *RunnerMeta) setStatus(status RunnerStatus, ping bool) error {
Expand Down Expand Up @@ -138,6 +140,7 @@ func (m *RunnerMetaAgentPool) LogValue() slog.Value {
slog.String("id", m.ID.String()),
slog.String("name", m.Name),
slog.String("organization", m.OrganizationName),
slog.String("token-id", m.TokenID.String()),
)
}

Expand Down
12 changes: 9 additions & 3 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,15 @@ func newRunner(
func (r *Runner) Start(ctx context.Context) error {
r.logger.V(r.v).Info("starting runner", "version", internal.Version)

// initialize terminator
// Initialize terminator, which is responsible for terminating jobs in
// response to cancelation signals.
terminator := &terminator{mapping: make(map[resource.ID]cancelable)}

// Authenticate as unregistered runner with the registration endpoint. This
// is only necessary for the server runner; the agent runner relies on
// middleware to authenticate as an unregistered runner on the server.
ctx = authz.AddSubjectToContext(ctx, &unregistered{})

// register runner with server, which responds with an updated runner
// registrationMetadata, including a unique ID.
registrationMetadata, err := r.client.register(ctx, registerOptions{
Expand Down Expand Up @@ -151,10 +157,10 @@ func (r *Runner) Start(ctx context.Context) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

r.logger.V(r.v).Info("sending final status update before shutting down")

if updateErr := r.client.updateStatus(ctx, registrationMetadata.ID, RunnerExited); updateErr != nil {
err = fmt.Errorf("sending final status update: %w", updateErr)
} else {
r.logger.V(r.v).Info("sent final status update", "status", "exited")
}
}()

Expand Down
67 changes: 67 additions & 0 deletions internal/runner/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package runner

import (
"context"
"testing"

"github.com/leg100/otf/internal/logr"
"github.com/leg100/otf/internal/resource"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRunner(t *testing.T) {
updates := make(chan RunnerStatus)
wantID := resource.NewID(resource.RunnerKind)

r, err := newRunner(
logr.Discard(),
&fakeRunnerClient{registeredID: wantID, updates: updates},
&fakeOperationSpawner{},
false,
Config{},
)
require.NoError(t, err)

// Terminate runner at end of test
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

startErr := make(chan error)
go func() {
startErr <- r.Start(ctx)
}()

// Test that runner registers itself
assert.Equal(t, &RunnerMeta{ID: wantID}, <-r.registered)
// Terminate runner
cancel()
// Test that runner sends final status update
assert.Equal(t, RunnerExited, <-updates)
}

type fakeRunnerClient struct {
client

registeredID resource.ID
updates chan RunnerStatus
}

func (f *fakeRunnerClient) register(ctx context.Context, opts registerOptions) (*RunnerMeta, error) {
return &RunnerMeta{ID: f.registeredID}, nil
}

func (f *fakeRunnerClient) getJobs(ctx context.Context, agentID resource.ID) ([]*Job, error) {
// Block until context canceled
<-ctx.Done()
return nil, nil
}

func (f *fakeRunnerClient) updateStatus(ctx context.Context, agentID resource.ID, status RunnerStatus) error {
f.updates <- status
return nil
}

type fakeOperationSpawner struct {
operationSpawner
}
20 changes: 2 additions & 18 deletions internal/runner/server.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package runner

import (
"context"

"github.com/leg100/otf/internal/authz"
"github.com/leg100/otf/internal/logr"
"github.com/leg100/otf/internal/releases"
)
Expand All @@ -24,13 +21,8 @@ type ServerRunnerOptions struct {
Jobs operationJobsClient
}

// ServerRunner is a runner built into the otfd server prcess.
type ServerRunner struct {
*Runner
}

// NewServerRunner constructs a server runner.
func NewServerRunner(opts ServerRunnerOptions) (*ServerRunner, error) {
func NewServerRunner(opts ServerRunnerOptions) (*Runner, error) {
daemon, err := newRunner(
opts.Logger,
opts.Runners,
Expand All @@ -53,15 +45,7 @@ func NewServerRunner(opts ServerRunnerOptions) (*ServerRunner, error) {
if err != nil {
return nil, err
}
return &ServerRunner{Runner: daemon}, nil
}

// Start the server runner daemon.
func (d *ServerRunner) Start(ctx context.Context) error {
// Authenticate as runner with server endpoints.
ctx = authz.AddSubjectToContext(ctx, d.RunnerMeta)

return d.Runner.Start(ctx)
return daemon, nil
}

type localOperationSpawner struct {
Expand Down
29 changes: 17 additions & 12 deletions internal/runner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ import (
"github.com/leg100/otf/internal/workspace"
)

var (
ErrInvalidStateTransition = errors.New("invalid runner state transition")
ErrUnauthorizedRegistration = errors.New("unauthorized runner registration")
)
var ErrInvalidStateTransition = errors.New("invalid runner state transition")

type (
Service struct {
Expand Down Expand Up @@ -129,12 +126,14 @@ func NewService(opts ServiceOptions) *Service {
// Register with auth middleware the agent token kind and a means of
// retrieving the appropriate runner corresponding to the agent token ID
opts.TokensService.RegisterKind(resource.AgentTokenKind, func(ctx context.Context, tokenID resource.ID) (authz.Subject, error) {
// Fetch agent pool corresponding to the provided token. This
// effectively authenticates the token.
pool, err := svc.db.getPoolByTokenID(ctx, tokenID)
if err != nil {
return nil, err
}
// if the runner has registered then it should be sending its ID in an
// http header
// if the runner has already registered then it should be sending its ID
// in an http header
headers, err := otfhttp.HeadersFromContext(ctx)
if err != nil {
return nil, err
Expand All @@ -152,10 +151,11 @@ func NewService(opts ServiceOptions) *Service {
}
// Agent runner hasn't registered yet, so set subject to a runner with a
// agent pool info, which will be used when registering the runner below.
return &RunnerMeta{AgentPool: &RunnerMetaAgentPool{
return &unregistered{pool: &RunnerMetaAgentPool{
ID: pool.ID,
Name: pool.Name,
OrganizationName: pool.Organization,
TokenID: tokenID,
}}, nil
})
// create jobs when a plan or apply is enqueued
Expand Down Expand Up @@ -206,17 +206,22 @@ func (s *Service) WatchJobs(ctx context.Context) (<-chan pubsub.Event[*Job], fun

func (s *Service) register(ctx context.Context, opts registerOptions) (*RunnerMeta, error) {
runner, err := func() (*RunnerMeta, error) {
runner, err := runnerFromContext(ctx)
subject, err := authz.SubjectFromContext(ctx)
if err != nil {
return nil, ErrUnauthorizedRegistration
return nil, err
}
if err := runner.register(opts); err != nil {
unregistered, ok := subject.(*unregistered)
if !ok {
return nil, internal.ErrAccessNotPermitted
}
registered, err := register(unregistered, opts)
if err != nil {
return nil, err
}
if err := s.db.create(ctx, runner); err != nil {
if err := s.db.create(ctx, registered); err != nil {
return nil, err
}
return runner, nil
return registered, nil
}()
if err != nil {
s.Error(err, "registering runner")
Expand Down
14 changes: 14 additions & 0 deletions internal/runner/unregistered.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package runner

import "github.com/leg100/otf/internal/authz"

// unregistered describes a runner that is not yet registered.
type unregistered struct {
// unregistered is a subject only for the purposes of satisfying the
// token-handling middleware which doesn't call any of the interface
// methods.
authz.Subject

// pool is non-nil if the runner is an agent.
pool *RunnerMetaAgentPool
}

0 comments on commit 41f5669

Please sign in to comment.