Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
leg100 committed Nov 9, 2024
1 parent d1bf204 commit 76180d9
Show file tree
Hide file tree
Showing 17 changed files with 96 additions and 45 deletions.
2 changes: 1 addition & 1 deletion internal/http/decode/decoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func ID(name string, r *http.Request) (resource.ID, error) {
if err != nil {
return resource.ID{}, err
}
return resource.ParseID(s), nil
return resource.ParseID(s)
}

func decode(dst interface{}, src map[string][]string) error {
Expand Down
9 changes: 5 additions & 4 deletions internal/logs/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ type pgdb struct {

func (db *pgdb) put(ctx context.Context, chunk Chunk) error {
err := db.Querier(ctx).InsertLogChunk(ctx, sqlc.InsertLogChunkParams{
RunID: chunk.RunID,
Phase: sql.String(string(chunk.Phase)),
Chunk: chunk.Data,
Offset: sql.Int4(chunk.Offset),
ChunkID: chunk.ID,
RunID: chunk.RunID,
Phase: sql.String(string(chunk.Phase)),
Chunk: chunk.Data,
Offset: sql.Int4(chunk.Offset),
})
if err != nil {
return sql.Error(err)
Expand Down
17 changes: 6 additions & 11 deletions internal/logs/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,13 @@ func (s *Service) PutChunk(ctx context.Context, opts PutChunkOptions) error {
return err
}

err = func() error {
chunk, err := newChunk(opts)
if err != nil {
return err
}
if err := s.chunkproxy.put(ctx, chunk); err != nil {
return err
}
return nil
}()
chunk, err := newChunk(opts)
if err != nil {
s.Error(err, "writing logs", "id", opts.RunID, "phase", opts.Phase, "offset", opts.Offset)
s.Error(err, "creating log chunk", "run_id", opts, "phase", opts.Phase, "offset", opts.Offset)
return err
}
if err := s.put(ctx, chunk); err != nil {
s.Error(err, "writing logs", "chunk_id", chunk.ID, "run_id", opts.RunID, "phase", opts.Phase, "offset", opts.Offset)
return err
}
s.V(3).Info("written logs", "id", opts.RunID, "phase", opts.Phase, "offset", opts.Offset)
Expand Down
6 changes: 5 additions & 1 deletion internal/pubsub/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ func (b *Broker[T]) unsubscribe(sub chan Event[T]) {
// forward retrieves the type T uniquely identified by id and forwards it onto
// subscribers as an event together with the action.
func (b *Broker[T]) forward(ctx context.Context, rowID string, action sql.Action) {
id := resource.ParseID(rowID)
id, err := resource.ParseID(rowID)
if err != nil {
b.Error(err, "parsing ID for database event", "table", b.table, "id", rowID, "action", action)
return
}

var event Event[T]
payload, err := b.getter(ctx, id, action)
Expand Down
2 changes: 1 addition & 1 deletion internal/pubsub/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestBroker_UnsubscribeFullSubscriber(t *testing.T) {
// deliberating publish more than subBufferSize events to trigger broker to
// unsubscribe the sub
for i := 0; i < subBufferSize+1; i++ {
broker.forward(ctx, "bar", sql.InsertAction)
broker.forward(ctx, "foo-123", sql.InsertAction)
}
assert.Equal(t, 0, len(broker.subs))
}
6 changes: 5 additions & 1 deletion internal/run/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ func (a *CLI) runDownloadCommand() *cobra.Command {
SilenceUsage: true,
SilenceErrors: true,
RunE: func(cmd *cobra.Command, args []string) error {
run, err := a.client.Get(cmd.Context(), resource.ParseID(args[0]))
id, err := resource.ParseID(args[0])
if err != nil {
return err
}
run, err := a.client.Get(cmd.Context(), id)
if err != nil {
return errors.Wrap(err, "retrieving run")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/run/tfe.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (a *tfe) toRun(from *Run, ctx context.Context) (*types.Run, error) {
Apply: &types.Apply{ID: resource.ConvertID(from.ID, "apply")},
// TODO: populate with real user.
CreatedBy: &types.User{
ID: resource.ParseID("user-123"),
ID: resource.ID{Kind: resource.UserKind, ID: "123"},
Username: "otf",
},
ConfigurationVersion: &types.ConfigurationVersion{
Expand Down
12 changes: 8 additions & 4 deletions internal/runner/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,20 @@ func (a *agentCLI) agentTokenCommand() *cobra.Command {

func (a *agentCLI) agentTokenNewCommand() *cobra.Command {
var (
poolID string
opts = CreateAgentTokenOptions{}
poolIDStr string
opts = CreateAgentTokenOptions{}
)
cmd := &cobra.Command{
Use: "new",
Short: "Create a new agent token",
SilenceUsage: true,
SilenceErrors: true,
RunE: func(cmd *cobra.Command, args []string) error {
_, token, err := a.CreateAgentToken(cmd.Context(), resource.ParseID(poolID), opts)
poolID, err := resource.ParseID(poolIDStr)
if err != nil {
return err
}
_, token, err := a.CreateAgentToken(cmd.Context(), poolID, opts)
if err != nil {
return err
}
Expand All @@ -71,7 +75,7 @@ func (a *agentCLI) agentTokenNewCommand() *cobra.Command {
return nil
},
}
cmd.Flags().StringVar(&poolID, "agent-pool-id", "", "ID of the agent pool in which the token is to be created.")
cmd.Flags().StringVar(&poolIDStr, "agent-pool-id", "", "ID of the agent pool in which the token is to be created.")
cmd.MarkFlagRequired("agent-pool-id")

cmd.Flags().StringVar(&opts.Description, "description", "", "Provide a description for the token.")
Expand Down
2 changes: 1 addition & 1 deletion internal/runner/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestAgentTokenNewCommand(t *testing.T) {
},
}
cmd := cli.agentTokenNewCommand()
cmd.SetArgs([]string{"testing", "--agent-pool-id", "pool-123", "--description", "my new token"})
cmd.SetArgs([]string{"--agent-pool-id", "pool-123", "--description", "my new token"})
got := bytes.Buffer{}
cmd.SetOut(&got)
require.NoError(t, cmd.Execute())
Expand Down
4 changes: 2 additions & 2 deletions internal/runner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/hashicorp/go-retryablehttp"
)

const runnerIDHeader = "otf-agent-id"
const runnerIDHeaderKey = "otf-agent-id"

type client interface {
register(ctx context.Context, opts registerOptions) (*RunnerMeta, error)
Expand All @@ -37,7 +37,7 @@ func (c *remoteClient) newRequest(method, path string, v interface{}) (*retryabl
return nil, err
}
if c.agentID != nil {
req.Header.Add(runnerIDHeader, c.agentID.String())
req.Header.Add(runnerIDHeaderKey, c.agentID.String())
}
return req, err
}
Expand Down
27 changes: 20 additions & 7 deletions internal/runner/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ type jobResult struct {

func (r jobResult) toJob() *Job {
job := &Job{
ID: r.JobID,
RunID: r.RunID,
Phase: internal.PhaseType(r.Phase.String),
Status: JobStatus(r.Status.String),
Expand Down Expand Up @@ -360,7 +361,7 @@ type poolresult struct {
AllowedWorkspaceIds []pgtype.Text
}

func (r poolresult) toPool() *Pool {
func (r poolresult) toPool() (*Pool, error) {
pool := &Pool{
ID: r.AgentPoolID,
Name: r.Name.String,
Expand All @@ -370,13 +371,21 @@ func (r poolresult) toPool() *Pool {
}
pool.AssignedWorkspaces = make([]resource.ID, len(r.WorkspaceIds))
for i, wid := range r.WorkspaceIds {
pool.AssignedWorkspaces[i] = resource.ParseID(wid.String)
var err error
pool.AssignedWorkspaces[i], err = resource.ParseID(wid.String)
if err != nil {
return nil, err
}
}
pool.AllowedWorkspaces = make([]resource.ID, len(r.AllowedWorkspaceIds))
for i, wid := range r.AllowedWorkspaceIds {
pool.AllowedWorkspaces[i] = resource.ParseID(wid.String)
var err error
pool.AllowedWorkspaces[i], err = resource.ParseID(wid.String)
if err != nil {
return nil, err
}
}
return pool
return pool, nil
}

func (db *db) createPool(ctx context.Context, pool *Pool) error {
Expand Down Expand Up @@ -447,15 +456,15 @@ func (db *db) getPool(ctx context.Context, poolID resource.ID) (*Pool, error) {
if err != nil {
return nil, sql.Error(err)
}
return poolresult(result).toPool(), nil
return poolresult(result).toPool()
}

func (db *db) getPoolByTokenID(ctx context.Context, tokenID resource.ID) (*Pool, error) {
result, err := db.Querier(ctx).FindAgentPoolByAgentTokenID(ctx, tokenID)
if err != nil {
return nil, sql.Error(err)
}
return poolresult(result).toPool(), nil
return poolresult(result).toPool()
}

func (db *db) listPoolsByOrganization(ctx context.Context, organization string, opts listPoolOptions) ([]*Pool, error) {
Expand All @@ -470,7 +479,11 @@ func (db *db) listPoolsByOrganization(ctx context.Context, organization string,
}
pools := make([]*Pool, len(rows))
for i, r := range rows {
pools[i] = poolresult(r).toPool()
var err error
pools[i], err = poolresult(r).toPool()
if err != nil {
return nil, err
}
}
return pools, nil
}
Expand Down
8 changes: 6 additions & 2 deletions internal/runner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,12 @@ func NewService(opts ServiceOptions) *Service {
if err != nil {
return nil, err
}
if runnerID := headers.Get(runnerIDHeader); runnerID != "" {
runner, err := svc.getRunner(ctx, resource.ParseID(runnerID))
if runnerIDValue := headers.Get(runnerIDHeaderKey); runnerIDValue != "" {
runnerID, err := resource.ParseID(runnerIDValue)
if err != nil {
return nil, err
}
runner, err := svc.getRunner(ctx, runnerID)
if err != nil {
return nil, fmt.Errorf("retrieving runner corresponding to ID found in http header: %w", err)
}
Expand Down
10 changes: 9 additions & 1 deletion internal/sql/migrations/003_add_ids_to_jobs_and_log_chunks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
ALTER TABLE jobs ADD COLUMN job_id TEXT;
UPDATE jobs SET job_id = 'job-' || substr(md5(random()::text), 0, 17);
ALTER TABLE jobs ADD PRIMARY KEY (job_id);
-- Add unique constraint - there should only be one job per run/phase combo
ALTER TABLE jobs ADD UNIQUE (run_id, phase);

-- replace job event function to instead provide job_id in payload
CREATE OR REPLACE FUNCTION public.jobs_notify_event() RETURNS trigger
Expand All @@ -25,7 +27,7 @@ BEGIN
END;
$$;

-- Add job_id primary key to jobs table and populate with random identifiers.
-- Change logs's chunk_id from a generated serial type to a text type and populate with random resource IDs
ALTER TABLE logs DROP chunk_id;
ALTER TABLE logs ADD chunk_id TEXT;
UPDATE logs SET chunk_id = 'chunk-' || substr(md5(random()::text), 0, 17);
Expand All @@ -36,8 +38,13 @@ ALTER TABLE workspaces DROP COLUMN lock_username;
ALTER TABLE workspaces ADD COLUMN lock_user_id TEXT;
ALTER TABLE workspaces ADD FOREIGN KEY (lock_user_id) REFERENCES users(user_id);

-- Make site-admin's user ID valid with new stricter ID format
UPDATE users SET user_id = 'user-36atQC2oGQng7pVz' WHERE username = 'site-admin';

---- create above / drop below ----

UPDATE users SET user_id = 'user-site-admin' WHERE username = 'site-admin';

ALTER TABLE workspaces DROP COLUMN lock_user_id;
ALTER TABLE workspaces ADD COLUMN lock_username TEXT;
ALTER TABLE workspaces ADD FOREIGN KEY (lock_username) REFERENCES users(username);
Expand All @@ -46,4 +53,5 @@ ALTER TABLE logs DROP chunk_id;
ALTER TABLE logs ADD chunk_id INT GENERATED ALWAYS AS IDENTITY;
ALTER TABLE logs ADD PRIMARY KEY (run_id, phase, chunk_id);

ALTER TABLE jobs DROP CONSTRAINT jobs_run_id_phase_key;
ALTER TABLE jobs DROP COLUMN job_id;
18 changes: 15 additions & 3 deletions internal/state/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ func (a *CLI) stateDeleteCommand() *cobra.Command {
SilenceUsage: true,
SilenceErrors: true,
RunE: func(cmd *cobra.Command, args []string) error {
if err := a.state.Delete(cmd.Context(), resource.ParseID(args[0])); err != nil {
id, err := resource.ParseID(args[0])
if err != nil {
return err
}
if err := a.state.Delete(cmd.Context(), id); err != nil {
return err
}
fmt.Fprintf(cmd.OutOrStdout(), "Deleted state version: %s\n", args[0])
Expand All @@ -131,7 +135,11 @@ func (a *CLI) stateDownloadCommand() *cobra.Command {
SilenceUsage: true,
SilenceErrors: true,
RunE: func(cmd *cobra.Command, args []string) error {
state, err := a.state.Download(cmd.Context(), resource.ParseID(args[0]))
id, err := resource.ParseID(args[0])
if err != nil {
return err
}
state, err := a.state.Download(cmd.Context(), id)
if err != nil {
return err
}
Expand All @@ -153,7 +161,11 @@ func (a *CLI) stateRollbackCommand() *cobra.Command {
SilenceUsage: true,
SilenceErrors: true,
RunE: func(cmd *cobra.Command, args []string) error {
_, err := a.state.Rollback(cmd.Context(), resource.ParseID(args[0]))
id, err := resource.ParseID(args[0])
if err != nil {
return err
}
_, err = a.state.Rollback(cmd.Context(), id)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions internal/tokens/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ func (m *middleware) parseIDFromJWT(token []byte) (resource.ID, error) {
if err != nil {
return resource.ID{}, err
}
subject := resource.ParseID(parsed.Subject())
return subject, nil
return resource.ParseID(parsed.Subject())
}

func isProtectedPath(path string) bool {
Expand Down
8 changes: 6 additions & 2 deletions internal/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ import (
"github.com/leg100/otf/internal/team"
)

const SiteAdminUsername = "site-admin"
const (
SiteAdminUsername = "site-admin"
)

var (
SiteAdminID = resource.ID{Kind: resource.UserKind, ID: "user-site-admin"}
// SiteAdminID is the hardcoded user id for the site admin user. The ID must
// be the same as the hardcoded value in the database migrations.
SiteAdminID = resource.ID{Kind: resource.UserKind, ID: "36atQC2oGQng7pVz"}
SiteAdmin = User{ID: SiteAdminID, Username: SiteAdminUsername}
_ authz.Subject = (*User)(nil)
)
Expand Down
5 changes: 4 additions & 1 deletion internal/workspace/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ func (a *CLI) workspaceEditCommand() *cobra.Command {
opts.ExecutionMode = (*ExecutionMode)(&mode)
}
if poolID != "" {
poolResourceID := resource.ParseID(poolID)
poolResourceID, err := resource.ParseID(poolID)
if err != nil {
return err
}
opts.AgentPoolID = &poolResourceID
}
ws, err := a.client.GetByName(cmd.Context(), organization, name)
Expand Down

0 comments on commit 76180d9

Please sign in to comment.