From 266734fb9a2e427546b1da2384bb21d2aae73c37 Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Mon, 21 Nov 2022 11:15:57 -0500 Subject: [PATCH 01/17] add vscode to gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 344cbacbee..d76a82c430 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,5 @@ packaging/docker/*/buildkite-agent packaging/docker/*/hooks/ -.idea \ No newline at end of file +.idea +.vscode From 76c72fe12aae7a01843f13f5a87d58dfb9336e58 Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Tue, 22 Nov 2022 13:03:16 -0500 Subject: [PATCH 02/17] Add kubernetes-exec experiment - Modifies the job runner to wait for RPC logs and exit codes - Modifies the bootstrap to send RPC logs and exit code --- agent/agent_worker.go | 2 +- agent/job_runner.go | 50 +++++++++--- bootstrap/bootstrap.go | 25 ++++++ bootstrap/shell/shell.go | 11 +++ kubernetes/kubernetes.go | 161 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 236 insertions(+), 13 deletions(-) create mode 100644 kubernetes/kubernetes.go diff --git a/agent/agent_worker.go b/agent/agent_worker.go index e4d16958fb..93bb34cf41 100644 --- a/agent/agent_worker.go +++ b/agent/agent_worker.go @@ -85,7 +85,7 @@ type AgentWorker struct { // When this worker runs a job, we'll store an instance of the // JobRunner here - jobRunner *JobRunner + jobRunner jobRunner // retrySleepFunc is useful for testing retry loops fast // Hopefully this can be replaced with a global setting for tests in future: diff --git a/agent/job_runner.go b/agent/job_runner.go index 67daa3f803..27a596d0e8 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -8,12 +8,14 @@ import ( "path/filepath" "strings" "sync" + "syscall" "time" "github.com/buildkite/agent/v3/api" "github.com/buildkite/agent/v3/bootstrap/shell" "github.com/buildkite/agent/v3/experiments" "github.com/buildkite/agent/v3/hook" + "github.com/buildkite/agent/v3/kubernetes" "github.com/buildkite/agent/v3/logger" "github.com/buildkite/agent/v3/metrics" "github.com/buildkite/agent/v3/process" @@ -56,6 +58,11 @@ type JobRunnerConfig struct { DebugHTTP bool } +type jobRunner interface { + Run(ctx context.Context) error + CancelAndStop() error +} + type JobRunner struct { // The configuration for the job runner conf JobRunnerConfig @@ -76,7 +83,7 @@ type JobRunner struct { metrics *metrics.Scope // The internal process of the job - process *process.Process + process jobAPI // The internal buffer of the process output output *process.Buffer @@ -100,8 +107,19 @@ type JobRunner struct { envFile *os.File } +type jobAPI interface { + Done() <-chan struct{} + Started() <-chan struct{} + Interrupt() error + Terminate() error + Run(ctx context.Context) error + WaitStatus() syscall.WaitStatus +} + +var _ jobRunner = (*JobRunner)(nil) + // Initializes the job runner -func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterResponse, job *api.Job, apiClient APIClient, conf JobRunnerConfig) (*JobRunner, error) { +func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterResponse, job *api.Job, apiClient APIClient, conf JobRunnerConfig) (jobRunner, error) { runner := &JobRunner{ agent: ag, job: job, @@ -238,16 +256,24 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe processEnv := append(os.Environ(), env...) // The process that will run the bootstrap script - runner.process = process.New(l, process.Config{ - Path: cmd[0], - Args: cmd[1:], - Dir: conf.AgentConfiguration.BuildPath, - Env: processEnv, - PTY: conf.AgentConfiguration.RunInPty, - Stdout: processWriter, - Stderr: processWriter, - InterruptSignal: conf.CancelSignal, - }) + if experiments.IsEnabled("kubernetes-exec") { + runner.process = kubernetes.New(l, kubernetes.Config{ + Env: processEnv, + Stdout: processWriter, + Stderr: processWriter, + }) + } else { + runner.process = process.New(l, process.Config{ + Path: cmd[0], + Args: cmd[1:], + Dir: conf.AgentConfiguration.BuildPath, + Env: processEnv, + PTY: conf.AgentConfiguration.RunInPty, + Stdout: processWriter, + Stderr: processWriter, + InterruptSignal: conf.CancelSignal, + }) + } // Close the writer end of the pipe when the process finishes go func() { diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 598547bd81..de97a4a57f 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -22,6 +22,7 @@ import ( "github.com/buildkite/agent/v3/env" "github.com/buildkite/agent/v3/experiments" "github.com/buildkite/agent/v3/hook" + "github.com/buildkite/agent/v3/kubernetes" "github.com/buildkite/agent/v3/process" "github.com/buildkite/agent/v3/redaction" "github.com/buildkite/agent/v3/tracetools" @@ -78,6 +79,30 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { b.shell.Debug = b.Config.Debug b.shell.InterruptSignal = b.Config.CancelSignal } + var kubernetesClient kubernetes.Client + if experiments.IsEnabled("kubernetes-exec") { + b.shell.Commentf("Using experimental Kubernetes support") + err := roko.NewRetrier( + roko.WithMaxAttempts(7), + roko.WithStrategy(roko.Exponential(2*time.Second, 0)), + ).Do(func(rtr *roko.Retrier) error { + if err := kubernetesClient.Connect(); err != nil { + return err + } + writer := io.MultiWriter(os.Stdout, &kubernetesClient) + b.shell.Writer = writer + b.shell.Logger = &shell.WriterLogger{ + Writer: writer, + Ansi: true, + } + return nil + }) + if err != nil { + b.shell.Errorf("Error connecting to kubernetes runner: %v", err) + return 1 + } + defer kubernetesClient.Exit(b.shell.WaitStatus()) + } var err error diff --git a/bootstrap/shell/shell.go b/bootstrap/shell/shell.go index 8c4a339205..c37a752ee5 100644 --- a/bootstrap/shell/shell.go +++ b/bootstrap/shell/shell.go @@ -169,6 +169,17 @@ func (s *Shell) Terminate() { } } +// Terminate running command +func (s *Shell) WaitStatus() syscall.WaitStatus { + s.cmdLock.Lock() + defer s.cmdLock.Unlock() + + if s.cmd != nil && s.cmd.proc != nil { + return s.cmd.proc.WaitStatus() + } + return 0 +} + // LockFile is a pid-based lock for cross-process locking type LockFile interface { Unlock() error diff --git a/kubernetes/kubernetes.go b/kubernetes/kubernetes.go new file mode 100644 index 0000000000..519d6f99e9 --- /dev/null +++ b/kubernetes/kubernetes.go @@ -0,0 +1,161 @@ +package kubernetes + +import ( + "bytes" + "context" + "io" + "log" + "net" + "net/http" + "net/rpc" + "os" + "sync" + "syscall" + + "github.com/buildkite/agent/v3/logger" +) + +const defaultSocketPath = "/workspace/buildkite.sock" + +func New(l logger.Logger, c Config) *Runner { + if c.SocketPath == "" { + c.SocketPath = defaultSocketPath + } + return &Runner{ + logger: l, + conf: c, + } +} + +type Runner struct { + logger logger.Logger + conf Config + mu sync.Mutex + listener net.Listener + started, done chan struct{} + waitStatus syscall.WaitStatus + once sync.Once +} + +type Config struct { + SocketPath string + Env []string + Stdout, Stderr io.Writer +} + +func (r *Runner) Run(ctx context.Context) error { + rpc.Register(r) + rpc.HandleHTTP() + l, err := net.Listen("unix", r.conf.SocketPath) + if err != nil { + log.Fatal("listen error:", err) + } + defer l.Close() + defer os.Remove(r.conf.SocketPath) + r.listener = l + go http.Serve(l, nil) + + r.mu.Lock() + if r.done == nil { + r.done = make(chan struct{}) + } + if r.started == nil { + r.started = make(chan struct{}) + } + r.mu.Unlock() + <-r.done + return nil +} + +func (r *Runner) Started() <-chan struct{} { + r.mu.Lock() + defer r.mu.Unlock() + + return r.started +} + +func (r *Runner) Done() <-chan struct{} { + r.mu.Lock() + defer r.mu.Unlock() + + return r.done +} + +func (r *Runner) Interrupt() error { + r.mu.Lock() + defer r.mu.Unlock() + panic("unimplemented") +} + +func (r *Runner) Terminate() error { + r.mu.Lock() + defer r.mu.Unlock() + panic("unimplemented") +} + +func (r *Runner) WaitStatus() syscall.WaitStatus { + return r.waitStatus +} + +// ==== sidecar api ==== + +type Empty struct{} +type Logs struct { + Data []byte +} + +type ExitCode struct { + ExitStatus syscall.WaitStatus +} + +func (t *Runner) WriteLogs(args Logs, reply *Empty) error { + t.once.Do(func() { + close(t.started) + }) + _, err := io.Copy(t.conf.Stdout, bytes.NewReader(args.Data)) + return err +} + +func (t *Runner) Exit(args ExitCode, reply *Empty) error { + t.waitStatus = args.ExitStatus + close(t.done) + return nil +} + +type Client struct { + SocketPath string + client *rpc.Client +} + +func (c *Client) Connect() error { + if c.SocketPath == "" { + c.SocketPath = defaultSocketPath + } + client, err := rpc.DialHTTP("unix", c.SocketPath) + if err != nil { + return err + } + c.client = client + return nil +} + +func (c *Client) Exit(exitStatus syscall.WaitStatus) error { + if c.client == nil { + return nil + } + return c.client.Call("Runner.Exit", ExitCode{ + ExitStatus: exitStatus, + }, nil) +} + +// Write implements io.Writer +func (c *Client) Write(p []byte) (int, error) { + if c.client == nil { + return 0, nil + } + n := len(p) + err := c.client.Call("Runner.WriteLogs", Logs{ + Data: p, + }, nil) + return n, err +} From ea6659d8eb0eafc685dda0fe9a947b3f658f681b Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Tue, 29 Nov 2022 13:01:44 -0500 Subject: [PATCH 03/17] Implement client ordering --- agent/job_runner.go | 12 ++- bootstrap/bootstrap.go | 6 ++ kubernetes/kubernetes.go | 154 ++++++++++++++++++++++++++++------ kubernetes/kubernetes_test.go | 138 ++++++++++++++++++++++++++++++ 4 files changed, 280 insertions(+), 30 deletions(-) create mode 100644 kubernetes/kubernetes_test.go diff --git a/agent/job_runner.go b/agent/job_runner.go index 27a596d0e8..71aa93a8b5 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -6,6 +6,7 @@ import ( "io" "os" "path/filepath" + "strconv" "strings" "sync" "syscall" @@ -257,10 +258,15 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe // The process that will run the bootstrap script if experiments.IsEnabled("kubernetes-exec") { + containerCount, err := strconv.Atoi(os.Getenv("BUILDKITE_CONTAINER_COUNT")) + if err != nil { + return nil, fmt.Errorf("failed to parse BUILDKITE_CONTAINER_COUNT: %w", err) + } runner.process = kubernetes.New(l, kubernetes.Config{ - Env: processEnv, - Stdout: processWriter, - Stderr: processWriter, + Env: processEnv, + Stdout: processWriter, + Stderr: processWriter, + ClientCount: containerCount, }) } else { runner.process = process.New(l, process.Config{ diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index de97a4a57f..62f2cf20fd 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -86,6 +86,11 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { roko.WithMaxAttempts(7), roko.WithStrategy(roko.Exponential(2*time.Second, 0)), ).Do(func(rtr *roko.Retrier) error { + id, err := strconv.Atoi(os.Getenv("BUILDKITE_CONTAINER_ID")) + if err != nil { + return fmt.Errorf("failed to parse container id, %s", os.Getenv("BUILDKITE_CONTAINER_ID")) + } + kubernetesClient.ID = id if err := kubernetesClient.Connect(); err != nil { return err } @@ -101,6 +106,7 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { b.shell.Errorf("Error connecting to kubernetes runner: %v", err) return 1 } + <-kubernetesClient.WaitReady() defer kubernetesClient.Exit(b.shell.WaitStatus()) } diff --git a/kubernetes/kubernetes.go b/kubernetes/kubernetes.go index 519d6f99e9..0763e6d29c 100644 --- a/kubernetes/kubernetes.go +++ b/kubernetes/kubernetes.go @@ -3,14 +3,15 @@ package kubernetes import ( "bytes" "context" + "fmt" "io" - "log" "net" "net/http" "net/rpc" "os" "sync" "syscall" + "time" "github.com/buildkite/agent/v3/logger" ) @@ -21,9 +22,18 @@ func New(l logger.Logger, c Config) *Runner { if c.SocketPath == "" { c.SocketPath = defaultSocketPath } + clients := make(map[int]*clientResult, c.ClientCount) + for i := 0; i < c.ClientCount; i++ { + clients[i] = &clientResult{} + } return &Runner{ - logger: l, - conf: c, + logger: l, + conf: c, + clients: clients, + server: rpc.NewServer(), + mux: http.NewServeMux(), + done: make(chan struct{}), + started: make(chan struct{}), } } @@ -33,36 +43,46 @@ type Runner struct { mu sync.Mutex listener net.Listener started, done chan struct{} - waitStatus syscall.WaitStatus - once sync.Once + startedOnce, + closedOnce sync.Once + server *rpc.Server + mux *http.ServeMux + clients map[int]*clientResult } +type clientResult struct { + ExitStatus syscall.WaitStatus + State clientState +} + +type clientState int + +const ( + stateUnknown clientState = iota + stateConnected + stateExited +) + type Config struct { SocketPath string + ClientCount int Env []string Stdout, Stderr io.Writer } func (r *Runner) Run(ctx context.Context) error { - rpc.Register(r) - rpc.HandleHTTP() - l, err := net.Listen("unix", r.conf.SocketPath) + r.server.Register(r) + r.mux.Handle(rpc.DefaultRPCPath, r.server) + + l, err := (&net.ListenConfig{}).Listen(ctx, "unix", r.conf.SocketPath) if err != nil { - log.Fatal("listen error:", err) + return fmt.Errorf("failed to listen: %w", err) } defer l.Close() defer os.Remove(r.conf.SocketPath) r.listener = l - go http.Serve(l, nil) + go http.Serve(l, r.mux) - r.mu.Lock() - if r.done == nil { - r.done = make(chan struct{}) - } - if r.started == nil { - r.started = make(chan struct{}) - } - r.mu.Unlock() <-r.done return nil } @@ -94,7 +114,13 @@ func (r *Runner) Terminate() error { } func (r *Runner) WaitStatus() syscall.WaitStatus { - return r.waitStatus + // TODO: fix this somehow?? + var ws syscall.WaitStatus + for _, client := range r.clients { + ws = client.ExitStatus + break + } + return ws } // ==== sidecar api ==== @@ -105,24 +131,73 @@ type Logs struct { } type ExitCode struct { + ID int ExitStatus syscall.WaitStatus } -func (t *Runner) WriteLogs(args Logs, reply *Empty) error { - t.once.Do(func() { - close(t.started) +type Status struct { + Ready bool +} + +func (r *Runner) WriteLogs(args Logs, reply *Empty) error { + r.startedOnce.Do(func() { + close(r.started) }) - _, err := io.Copy(t.conf.Stdout, bytes.NewReader(args.Data)) + _, err := io.Copy(r.conf.Stdout, bytes.NewReader(args.Data)) return err } -func (t *Runner) Exit(args ExitCode, reply *Empty) error { - t.waitStatus = args.ExitStatus - close(t.done) +func (r *Runner) Exit(args ExitCode, reply *Empty) error { + r.mu.Lock() + defer r.mu.Unlock() + + client, found := r.clients[args.ID] + if !found { + return fmt.Errorf("unrecognized client id: %d", args.ID) + } + client.ExitStatus = args.ExitStatus + client.State = stateExited + + allExited := true + for _, client := range r.clients { + allExited = client.State == stateExited && allExited + } + if allExited { + r.closedOnce.Do(func() { + close(r.done) + }) + } + return nil +} + +func (r *Runner) Register(id int, reply *Empty) error { + r.mu.Lock() + defer r.mu.Unlock() + client, found := r.clients[id] + if !found { + return fmt.Errorf("client id %d not found", id) + } + if client.State == stateConnected { + return fmt.Errorf("client id %d already registered", id) + } + client.State = stateConnected + return nil +} + +func (r *Runner) Status(id int, reply *Status) error { + r.mu.Lock() + defer r.mu.Unlock() + + if id == 0 { + reply.Ready = true + } else if client, found := r.clients[id-1]; found && client.State == stateExited { + reply.Ready = true + } return nil } type Client struct { + ID int SocketPath string client *rpc.Client } @@ -136,7 +211,7 @@ func (c *Client) Connect() error { return err } c.client = client - return nil + return c.client.Call("Runner.Register", c.ID, nil) } func (c *Client) Exit(exitStatus syscall.WaitStatus) error { @@ -144,6 +219,7 @@ func (c *Client) Exit(exitStatus syscall.WaitStatus) error { return nil } return c.client.Call("Runner.Exit", ExitCode{ + ID: c.ID, ExitStatus: exitStatus, }, nil) } @@ -159,3 +235,27 @@ func (c *Client) Write(p []byte) (int, error) { }, nil) return n, err } + +func (c *Client) WaitReady() <-chan error { + result := make(chan error) + go func() { + for { + var reply Status + if err := c.client.Call("Runner.Status", c.ID, &reply); err != nil { + result <- err + return + } + if reply.Ready { + close(result) + return + } + // TODO: configurable interval + time.Sleep(time.Second) + } + }() + return result +} + +func (c *Client) Close() { + c.client.Close() +} diff --git a/kubernetes/kubernetes_test.go b/kubernetes/kubernetes_test.go new file mode 100644 index 0000000000..d515f1c483 --- /dev/null +++ b/kubernetes/kubernetes_test.go @@ -0,0 +1,138 @@ +package kubernetes + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/buildkite/agent/v3/logger" + "github.com/stretchr/testify/require" +) + +func TestOrderedClients(t *testing.T) { + runner := newRunner(t, 3) + socketPath := runner.conf.SocketPath + + client0 := &Client{ID: 0} + client1 := &Client{ID: 1} + client2 := &Client{ID: 2} + clients := []*Client{client0, client1, client2} + + // wait for runner to listen + require.Eventually(t, func() bool { + _, err := os.Lstat(socketPath) + return err == nil + + }, time.Second*10, time.Millisecond, "expected socket file to exist") + + for _, client := range clients { + client.SocketPath = socketPath + require.NoError(t, client.Connect()) + t.Cleanup(client.Close) + } + select { + case err := <-client0.WaitReady(): + require.NoError(t, err) + break + case err := <-client1.WaitReady(): + require.NoError(t, err) + require.FailNow(t, "client1 should not be ready") + case err := <-client2.WaitReady(): + require.NoError(t, err) + require.FailNow(t, "client2 should not be ready") + case <-runner.Done(): + require.FailNow(t, "runner should not be done") + case <-time.After(time.Second): + require.FailNow(t, "client0 should be ready") + } + + require.NoError(t, client0.Exit(0)) + select { + case err := <-client1.WaitReady(): + require.NoError(t, err) + break + case err := <-client2.WaitReady(): + require.NoError(t, err) + require.FailNow(t, "client2 should not be ready") + case <-runner.Done(): + require.FailNow(t, "runner should not be done") + case <-time.After(time.Second): + require.FailNow(t, "client1 should be ready") + } + + require.NoError(t, client1.Exit(0)) + select { + case err := <-client2.WaitReady(): + require.NoError(t, err) + break + case <-runner.Done(): + require.FailNow(t, "runner should not be done") + case <-time.After(time.Second): + require.FailNow(t, "client2 should be ready") + } + + require.NoError(t, client2.Exit(0)) + select { + case <-runner.Done(): + break + case <-time.After(time.Second): + require.FailNow(t, "runner should be done when all clients have exited") + } +} + +func TestDuplicateClients(t *testing.T) { + runner := newRunner(t, 2) + socketPath := runner.conf.SocketPath + + client0 := Client{ID: 0, SocketPath: socketPath} + client1 := Client{ID: 0, SocketPath: socketPath} + + // wait for runner to listen + require.Eventually(t, func() bool { + _, err := os.Lstat(socketPath) + return err == nil + + }, time.Second*10, time.Millisecond, "expected socket file to exist") + + require.NoError(t, client0.Connect()) + require.Error(t, client1.Connect(), "expected an error when connecting a client with a duplicate ID") +} + +func TestExcessClients(t *testing.T) { + runner := newRunner(t, 1) + socketPath := runner.conf.SocketPath + + client0 := Client{ID: 0, SocketPath: socketPath} + client1 := Client{ID: 1, SocketPath: socketPath} + + // wait for runner to listen + require.Eventually(t, func() bool { + _, err := os.Lstat(socketPath) + return err == nil + + }, time.Second*10, time.Millisecond, "expected socket file to exist") + + require.NoError(t, client0.Connect()) + require.Error(t, client1.Connect(), "expected an error when connecting too many clients") +} + +func newRunner(t *testing.T, clientCount int) *Runner { + tempDir, err := os.MkdirTemp("", t.Name()) + require.NoError(t, err) + socketPath := filepath.Join(tempDir, "bk.sock") + t.Cleanup(func() { + os.RemoveAll(tempDir) + }) + runner := New(logger.Discard, Config{ + SocketPath: socketPath, + ClientCount: clientCount, + }) + runnerCtx, cancelRunner := context.WithCancel(context.Background()) + go runner.Run(runnerCtx) + t.Cleanup(func() { + cancelRunner() + }) + return runner +} From 6039e1e2b9a46d25c436a7d8c0e7661f745fa0eb Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Wed, 30 Nov 2022 10:45:33 -0500 Subject: [PATCH 04/17] Wire the agent access token to the bootstrap Normally this token is generated when the agent registers, and then passed to the bootstrap process in the environment. Since the agent registration happens in a separate container, we need to plumb through this through the RPC API to get it to the bootstrap container. --- agent/job_runner.go | 2 +- bootstrap/bootstrap.go | 8 +++++++- kubernetes/kubernetes.go | 19 +++++++++++++------ 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/agent/job_runner.go b/agent/job_runner.go index 71aa93a8b5..9d02394883 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -263,7 +263,7 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe return nil, fmt.Errorf("failed to parse BUILDKITE_CONTAINER_COUNT: %w", err) } runner.process = kubernetes.New(l, kubernetes.Config{ - Env: processEnv, + AccessToken: apiClient.Config().Token, Stdout: processWriter, Stderr: processWriter, ClientCount: containerCount, diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 62f2cf20fd..69334a3b65 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -106,7 +106,13 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { b.shell.Errorf("Error connecting to kubernetes runner: %v", err) return 1 } - <-kubernetesClient.WaitReady() + status := <-kubernetesClient.WaitReady() + if status.Err != nil { + b.shell.Errorf("Error waiting for client to become ready: %v", err) + return 1 + } + os.Setenv("BUILDKITE_AGENT_ACCESS_TOKEN", status.AccessToken) + b.shell.Env.Set("BUILDKITE_AGENT_ACCESS_TOKEN", status.AccessToken) defer kubernetesClient.Exit(b.shell.WaitStatus()) } diff --git a/kubernetes/kubernetes.go b/kubernetes/kubernetes.go index 0763e6d29c..7d78985361 100644 --- a/kubernetes/kubernetes.go +++ b/kubernetes/kubernetes.go @@ -66,8 +66,8 @@ const ( type Config struct { SocketPath string ClientCount int - Env []string Stdout, Stderr io.Writer + AccessToken string } func (r *Runner) Run(ctx context.Context) error { @@ -136,7 +136,8 @@ type ExitCode struct { } type Status struct { - Ready bool + Ready bool + AccessToken string } func (r *Runner) WriteLogs(args Logs, reply *Empty) error { @@ -187,6 +188,7 @@ func (r *Runner) Register(id int, reply *Empty) error { func (r *Runner) Status(id int, reply *Status) error { r.mu.Lock() defer r.mu.Unlock() + reply.AccessToken = r.conf.AccessToken if id == 0 { reply.Ready = true @@ -236,17 +238,22 @@ func (c *Client) Write(p []byte) (int, error) { return n, err } -func (c *Client) WaitReady() <-chan error { - result := make(chan error) +type WaitReadyResponse struct { + Err error + Status +} + +func (c *Client) WaitReady() <-chan WaitReadyResponse { + result := make(chan WaitReadyResponse) go func() { for { var reply Status if err := c.client.Call("Runner.Status", c.ID, &reply); err != nil { - result <- err + result <- WaitReadyResponse{Err: err} return } if reply.Ready { - close(result) + result <- WaitReadyResponse{Status: reply} return } // TODO: configurable interval From db9742e330f2516c152b62d017890bc950020783 Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Wed, 30 Nov 2022 11:45:05 -0500 Subject: [PATCH 05/17] Log client exits Just for the sake of debugging --- kubernetes/kubernetes.go | 1 + 1 file changed, 1 insertion(+) diff --git a/kubernetes/kubernetes.go b/kubernetes/kubernetes.go index 7d78985361..9cb1605e65 100644 --- a/kubernetes/kubernetes.go +++ b/kubernetes/kubernetes.go @@ -156,6 +156,7 @@ func (r *Runner) Exit(args ExitCode, reply *Empty) error { if !found { return fmt.Errorf("unrecognized client id: %d", args.ID) } + r.logger.Info("client %d exited with code %d", args.ID, args.ExitStatus.ExitStatus()) client.ExitStatus = args.ExitStatus client.State = stateExited From 58db634cda846c4abc469e04e6e8a557dc47f826 Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Tue, 6 Dec 2022 12:20:29 -0500 Subject: [PATCH 06/17] Fix kubernetes test --- kubernetes/kubernetes_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/kubernetes/kubernetes_test.go b/kubernetes/kubernetes_test.go index d515f1c483..bfa28b7de5 100644 --- a/kubernetes/kubernetes_test.go +++ b/kubernetes/kubernetes_test.go @@ -34,13 +34,13 @@ func TestOrderedClients(t *testing.T) { } select { case err := <-client0.WaitReady(): - require.NoError(t, err) + require.NoError(t, err.Err) break case err := <-client1.WaitReady(): - require.NoError(t, err) + require.NoError(t, err.Err) require.FailNow(t, "client1 should not be ready") case err := <-client2.WaitReady(): - require.NoError(t, err) + require.NoError(t, err.Err) require.FailNow(t, "client2 should not be ready") case <-runner.Done(): require.FailNow(t, "runner should not be done") @@ -51,10 +51,10 @@ func TestOrderedClients(t *testing.T) { require.NoError(t, client0.Exit(0)) select { case err := <-client1.WaitReady(): - require.NoError(t, err) + require.NoError(t, err.Err) break case err := <-client2.WaitReady(): - require.NoError(t, err) + require.NoError(t, err.Err) require.FailNow(t, "client2 should not be ready") case <-runner.Done(): require.FailNow(t, "runner should not be done") @@ -65,7 +65,7 @@ func TestOrderedClients(t *testing.T) { require.NoError(t, client1.Exit(0)) select { case err := <-client2.WaitReady(): - require.NoError(t, err) + require.NoError(t, err.Err) break case <-runner.Done(): require.FailNow(t, "runner should not be done") From 9e9adee6126fcb89816f5324f587ebdc613ff67d Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Tue, 6 Dec 2022 13:15:56 -0500 Subject: [PATCH 07/17] Exit early if we get a non-zero client exit --- agent/job_runner.go | 3 +- bootstrap/bootstrap.go | 4 +- bootstrap/shell/shell.go | 4 +- kubernetes/kubernetes.go | 37 ++++++++++++---- kubernetes/kubernetes_test.go | 79 +++++++++++++++++++++++++++++++---- process/process.go | 8 +++- 6 files changed, 111 insertions(+), 24 deletions(-) diff --git a/agent/job_runner.go b/agent/job_runner.go index 9d02394883..34a4b76e44 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -9,7 +9,6 @@ import ( "strconv" "strings" "sync" - "syscall" "time" "github.com/buildkite/agent/v3/api" @@ -114,7 +113,7 @@ type jobAPI interface { Interrupt() error Terminate() error Run(ctx context.Context) error - WaitStatus() syscall.WaitStatus + WaitStatus() process.WaitStatus } var _ jobRunner = (*JobRunner)(nil) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 69334a3b65..d19c83be0e 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -113,7 +113,9 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { } os.Setenv("BUILDKITE_AGENT_ACCESS_TOKEN", status.AccessToken) b.shell.Env.Set("BUILDKITE_AGENT_ACCESS_TOKEN", status.AccessToken) - defer kubernetesClient.Exit(b.shell.WaitStatus()) + defer func() { + kubernetesClient.Exit(b.shell.WaitStatus()) + }() } var err error diff --git a/bootstrap/shell/shell.go b/bootstrap/shell/shell.go index c37a752ee5..8fe6e762c1 100644 --- a/bootstrap/shell/shell.go +++ b/bootstrap/shell/shell.go @@ -170,14 +170,14 @@ func (s *Shell) Terminate() { } // Terminate running command -func (s *Shell) WaitStatus() syscall.WaitStatus { +func (s *Shell) WaitStatus() process.WaitStatus { s.cmdLock.Lock() defer s.cmdLock.Unlock() if s.cmd != nil && s.cmd.proc != nil { return s.cmd.proc.WaitStatus() } - return 0 + return syscall.WaitStatus(0) } // LockFile is a pid-based lock for cross-process locking diff --git a/kubernetes/kubernetes.go b/kubernetes/kubernetes.go index 9cb1605e65..6aa271232f 100644 --- a/kubernetes/kubernetes.go +++ b/kubernetes/kubernetes.go @@ -3,6 +3,8 @@ package kubernetes import ( "bytes" "context" + "encoding/gob" + "errors" "fmt" "io" "net" @@ -14,8 +16,13 @@ import ( "time" "github.com/buildkite/agent/v3/logger" + "github.com/buildkite/agent/v3/process" ) +func init() { + gob.Register(new(syscall.WaitStatus)) +} + const defaultSocketPath = "/workspace/buildkite.sock" func New(l logger.Logger, c Config) *Runner { @@ -51,7 +58,7 @@ type Runner struct { } type clientResult struct { - ExitStatus syscall.WaitStatus + ExitStatus process.WaitStatus State clientState } @@ -113,12 +120,17 @@ func (r *Runner) Terminate() error { panic("unimplemented") } -func (r *Runner) WaitStatus() syscall.WaitStatus { - // TODO: fix this somehow?? - var ws syscall.WaitStatus +func (r *Runner) WaitStatus() process.WaitStatus { + var ws process.WaitStatus for _, client := range r.clients { + if client.ExitStatus.ExitStatus() != 0 { + return client.ExitStatus + } + if client.ExitStatus.Signaled() { + return client.ExitStatus + } + // just return any ExitStatus if we don't find any "interesting" ones ws = client.ExitStatus - break } return ws } @@ -132,7 +144,7 @@ type Logs struct { type ExitCode struct { ID int - ExitStatus syscall.WaitStatus + ExitStatus process.WaitStatus } type Status struct { @@ -159,6 +171,11 @@ func (r *Runner) Exit(args ExitCode, reply *Empty) error { r.logger.Info("client %d exited with code %d", args.ID, args.ExitStatus.ExitStatus()) client.ExitStatus = args.ExitStatus client.State = stateExited + if client.ExitStatus.ExitStatus() != 0 { + r.closedOnce.Do(func() { + close(r.done) + }) + } allExited := true for _, client := range r.clients { @@ -205,6 +222,8 @@ type Client struct { client *rpc.Client } +var errNotConnected = errors.New("client not connected") + func (c *Client) Connect() error { if c.SocketPath == "" { c.SocketPath = defaultSocketPath @@ -217,9 +236,9 @@ func (c *Client) Connect() error { return c.client.Call("Runner.Register", c.ID, nil) } -func (c *Client) Exit(exitStatus syscall.WaitStatus) error { +func (c *Client) Exit(exitStatus process.WaitStatus) error { if c.client == nil { - return nil + return errNotConnected } return c.client.Call("Runner.Exit", ExitCode{ ID: c.ID, @@ -230,7 +249,7 @@ func (c *Client) Exit(exitStatus syscall.WaitStatus) error { // Write implements io.Writer func (c *Client) Write(p []byte) (int, error) { if c.client == nil { - return 0, nil + return 0, errNotConnected } n := len(p) err := c.client.Call("Runner.WriteLogs", Logs{ diff --git a/kubernetes/kubernetes_test.go b/kubernetes/kubernetes_test.go index bfa28b7de5..08335a832c 100644 --- a/kubernetes/kubernetes_test.go +++ b/kubernetes/kubernetes_test.go @@ -2,8 +2,10 @@ package kubernetes import ( "context" + "encoding/gob" "os" "path/filepath" + "syscall" "testing" "time" @@ -48,7 +50,7 @@ func TestOrderedClients(t *testing.T) { require.FailNow(t, "client0 should be ready") } - require.NoError(t, client0.Exit(0)) + require.NoError(t, client0.Exit(waitStatusSuccess)) select { case err := <-client1.WaitReady(): require.NoError(t, err.Err) @@ -62,7 +64,7 @@ func TestOrderedClients(t *testing.T) { require.FailNow(t, "client1 should be ready") } - require.NoError(t, client1.Exit(0)) + require.NoError(t, client1.Exit(waitStatusSuccess)) select { case err := <-client2.WaitReady(): require.NoError(t, err.Err) @@ -73,7 +75,7 @@ func TestOrderedClients(t *testing.T) { require.FailNow(t, "client2 should be ready") } - require.NoError(t, client2.Exit(0)) + require.NoError(t, client2.Exit(waitStatusSuccess)) select { case <-runner.Done(): break @@ -107,15 +109,35 @@ func TestExcessClients(t *testing.T) { client0 := Client{ID: 0, SocketPath: socketPath} client1 := Client{ID: 1, SocketPath: socketPath} - // wait for runner to listen - require.Eventually(t, func() bool { - _, err := os.Lstat(socketPath) - return err == nil + require.NoError(t, client0.Connect()) + require.Error(t, client1.Connect(), "expected an error when connecting too many clients") +} - }, time.Second*10, time.Millisecond, "expected socket file to exist") +func TestWaitStatusNonZero(t *testing.T) { + runner := newRunner(t, 2) + + client0 := Client{ID: 0, SocketPath: runner.conf.SocketPath} + client1 := Client{ID: 1, SocketPath: runner.conf.SocketPath} require.NoError(t, client0.Connect()) - require.Error(t, client1.Connect(), "expected an error when connecting too many clients") + require.NoError(t, client1.Connect()) + require.NoError(t, client0.Exit(waitStatusFailure)) + require.NoError(t, client1.Exit(waitStatusSuccess)) + require.Equal(t, runner.WaitStatus().ExitStatus(), 1) +} + +func TestWaitStatusSignaled(t *testing.T) { + runner := newRunner(t, 2) + + client0 := Client{ID: 0, SocketPath: runner.conf.SocketPath} + client1 := Client{ID: 1, SocketPath: runner.conf.SocketPath} + + require.NoError(t, client0.Connect()) + require.NoError(t, client1.Connect()) + require.NoError(t, client0.Exit(waitStatusSignaled)) + require.NoError(t, client1.Exit(waitStatusSuccess)) + require.Equal(t, runner.WaitStatus().ExitStatus(), 0) + require.True(t, runner.WaitStatus().Signaled()) } func newRunner(t *testing.T, clientCount int) *Runner { @@ -134,5 +156,44 @@ func newRunner(t *testing.T, clientCount int) *Runner { t.Cleanup(func() { cancelRunner() }) + + // wait for runner to listen + require.Eventually(t, func() bool { + _, err := os.Lstat(socketPath) + return err == nil + + }, time.Second*10, time.Millisecond, "expected socket file to exist") + return runner } + +var ( + waitStatusSuccess = waitStatus{Code: 0} + waitStatusFailure = waitStatus{Code: 1} + waitStatusSignaled = waitStatus{Code: 0, SignalCode: intptr(1)} +) + +func init() { + gob.Register(new(waitStatus)) +} + +type waitStatus struct { + Code int + SignalCode *int +} + +func (w waitStatus) ExitStatus() int { + return w.Code +} + +func (w waitStatus) Signaled() bool { + return w.SignalCode != nil +} + +func (w waitStatus) Signal() syscall.Signal { + return syscall.Signal(*w.SignalCode) +} + +func intptr(x int) *int { + return &x +} diff --git a/process/process.go b/process/process.go index 217c70ada3..ed500f40ea 100644 --- a/process/process.go +++ b/process/process.go @@ -43,6 +43,12 @@ var signalMap = map[string]Signal{ "SIGTERM": SIGTERM, } +type WaitStatus interface { + ExitStatus() int + Signaled() bool + Signal() syscall.Signal +} + func (s Signal) String() string { for k, sig := range signalMap { if sig == s { @@ -107,7 +113,7 @@ func (p *Process) WaitResult() error { } // WaitStatus returns the status of the Wait() call -func (p *Process) WaitStatus() syscall.WaitStatus { +func (p *Process) WaitStatus() WaitStatus { return p.status } From 1697ffffc842f55c78cc650667bbfff765a006be Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Fri, 16 Dec 2022 11:01:40 -0500 Subject: [PATCH 08/17] Implement interrupt --- bootstrap/bootstrap.go | 19 ++++-- kubernetes/kubernetes.go | 121 +++++++++++++++++++++++----------- kubernetes/kubernetes_test.go | 109 +++++++++++++++--------------- 3 files changed, 149 insertions(+), 100 deletions(-) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index d19c83be0e..732ff2e706 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -79,8 +79,8 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { b.shell.Debug = b.Config.Debug b.shell.InterruptSignal = b.Config.CancelSignal } - var kubernetesClient kubernetes.Client if experiments.IsEnabled("kubernetes-exec") { + var kubernetesClient kubernetes.Client b.shell.Commentf("Using experimental Kubernetes support") err := roko.NewRetrier( roko.WithMaxAttempts(7), @@ -91,9 +91,12 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { return fmt.Errorf("failed to parse container id, %s", os.Getenv("BUILDKITE_CONTAINER_ID")) } kubernetesClient.ID = id - if err := kubernetesClient.Connect(); err != nil { + connect, err := kubernetesClient.Connect() + if err != nil { return err } + os.Setenv("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken) + b.shell.Env.Set("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken) writer := io.MultiWriter(os.Stdout, &kubernetesClient) b.shell.Writer = writer b.shell.Logger = &shell.WriterLogger{ @@ -106,13 +109,17 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { b.shell.Errorf("Error connecting to kubernetes runner: %v", err) return 1 } - status := <-kubernetesClient.WaitReady() - if status.Err != nil { + if err := kubernetesClient.Await(ctx, kubernetes.RunStateStart); err != nil { b.shell.Errorf("Error waiting for client to become ready: %v", err) return 1 } - os.Setenv("BUILDKITE_AGENT_ACCESS_TOKEN", status.AccessToken) - b.shell.Env.Set("BUILDKITE_AGENT_ACCESS_TOKEN", status.AccessToken) + go func() { + if err := kubernetesClient.Await(ctx, kubernetes.RunStateInterrupt); err != nil { + b.shell.Errorf("Error waiting for client interrupt: %v", err) + return + } + b.cancelCh <- struct{}{} + }() defer func() { kubernetesClient.Exit(b.shell.WaitStatus()) }() diff --git a/kubernetes/kubernetes.go b/kubernetes/kubernetes.go index 6aa271232f..f8a3be5c28 100644 --- a/kubernetes/kubernetes.go +++ b/kubernetes/kubernetes.go @@ -34,24 +34,28 @@ func New(l logger.Logger, c Config) *Runner { clients[i] = &clientResult{} } return &Runner{ - logger: l, - conf: c, - clients: clients, - server: rpc.NewServer(), - mux: http.NewServeMux(), - done: make(chan struct{}), - started: make(chan struct{}), + logger: l, + conf: c, + clients: clients, + server: rpc.NewServer(), + mux: http.NewServeMux(), + done: make(chan struct{}), + started: make(chan struct{}), + interrupt: make(chan struct{}), } } type Runner struct { - logger logger.Logger - conf Config - mu sync.Mutex - listener net.Listener - started, done chan struct{} + logger logger.Logger + conf Config + mu sync.Mutex + listener net.Listener + started, + done, + interrupt chan struct{} startedOnce, - closedOnce sync.Once + closedOnce, + interruptOnce sync.Once server *rpc.Server mux *http.ServeMux clients map[int]*clientResult @@ -108,12 +112,17 @@ func (r *Runner) Done() <-chan struct{} { return r.done } +// Interrupts all clients, triggering graceful shutdown func (r *Runner) Interrupt() error { r.mu.Lock() defer r.mu.Unlock() - panic("unimplemented") + r.interruptOnce.Do(func() { + close(r.interrupt) + }) + return nil } +// Stops the RPC server, allowing Run to return immediately func (r *Runner) Terminate() error { r.mu.Lock() defer r.mu.Unlock() @@ -152,6 +161,10 @@ type Status struct { AccessToken string } +type RegisterResponse struct { + AccessToken string +} + func (r *Runner) WriteLogs(args Logs, reply *Empty) error { r.startedOnce.Do(func() { close(r.started) @@ -189,31 +202,43 @@ func (r *Runner) Exit(args ExitCode, reply *Empty) error { return nil } -func (r *Runner) Register(id int, reply *Empty) error { +func (r *Runner) Register(id int, reply *RegisterResponse) error { r.mu.Lock() defer r.mu.Unlock() + r.startedOnce.Do(func() { + close(r.started) + }) client, found := r.clients[id] if !found { return fmt.Errorf("client id %d not found", id) } - if client.State == stateConnected { + if client.State != stateUnknown { return fmt.Errorf("client id %d already registered", id) } + r.logger.Info("client %d connected", id) client.State = stateConnected + reply.AccessToken = r.conf.AccessToken return nil } -func (r *Runner) Status(id int, reply *Status) error { +func (r *Runner) Status(id int, reply *RunState) error { r.mu.Lock() defer r.mu.Unlock() - reply.AccessToken = r.conf.AccessToken - if id == 0 { - reply.Ready = true - } else if client, found := r.clients[id-1]; found && client.State == stateExited { - reply.Ready = true + select { + case <-r.done: + return rpc.ErrShutdown + case <-r.interrupt: + *reply = RunStateInterrupt + return nil + default: + if id == 0 { + *reply = RunStateStart + } else if client, found := r.clients[id-1]; found && client.State == stateExited { + *reply = RunStateStart + } + return nil } - return nil } type Client struct { @@ -224,16 +249,20 @@ type Client struct { var errNotConnected = errors.New("client not connected") -func (c *Client) Connect() error { +func (c *Client) Connect() (RegisterResponse, error) { if c.SocketPath == "" { c.SocketPath = defaultSocketPath } client, err := rpc.DialHTTP("unix", c.SocketPath) if err != nil { - return err + return RegisterResponse{}, err } c.client = client - return c.client.Call("Runner.Register", c.ID, nil) + var resp RegisterResponse + if err := c.client.Call("Runner.Register", c.ID, &resp); err != nil { + return RegisterResponse{}, err + } + return resp, nil } func (c *Client) Exit(exitStatus process.WaitStatus) error { @@ -263,24 +292,36 @@ type WaitReadyResponse struct { Status } -func (c *Client) WaitReady() <-chan WaitReadyResponse { - result := make(chan WaitReadyResponse) - go func() { - for { - var reply Status - if err := c.client.Call("Runner.Status", c.ID, &reply); err != nil { - result <- WaitReadyResponse{Err: err} - return +type RunState int + +const ( + RunStateWait RunState = iota + RunStateStart + RunStateInterrupt +) + +var ErrInterrupt = errors.New("interrupt signal received") + +func (c *Client) Await(ctx context.Context, desiredState RunState) error { + for { + select { + case <-ctx.Done(): + default: + var current RunState + if err := c.client.Call("Runner.Status", c.ID, ¤t); err != nil { + if desiredState == RunStateInterrupt && errors.Is(err, rpc.ErrShutdown) { + return nil + } + return err } - if reply.Ready { - result <- WaitReadyResponse{Status: reply} - return + if current == desiredState { + return nil + } else if current == RunStateInterrupt { + return ErrInterrupt } - // TODO: configurable interval time.Sleep(time.Second) } - }() - return result + } } func (c *Client) Close() { diff --git a/kubernetes/kubernetes_test.go b/kubernetes/kubernetes_test.go index 08335a832c..30530faf28 100644 --- a/kubernetes/kubernetes_test.go +++ b/kubernetes/kubernetes_test.go @@ -31,55 +31,29 @@ func TestOrderedClients(t *testing.T) { for _, client := range clients { client.SocketPath = socketPath - require.NoError(t, client.Connect()) + require.NoError(t, connect(client)) t.Cleanup(client.Close) } - select { - case err := <-client0.WaitReady(): - require.NoError(t, err.Err) - break - case err := <-client1.WaitReady(): - require.NoError(t, err.Err) - require.FailNow(t, "client1 should not be ready") - case err := <-client2.WaitReady(): - require.NoError(t, err.Err) - require.FailNow(t, "client2 should not be ready") - case <-runner.Done(): - require.FailNow(t, "runner should not be done") - case <-time.After(time.Second): - require.FailNow(t, "client0 should be ready") - } + ctx := context.Background() + require.NoError(t, client0.Await(ctx, RunStateStart)) + require.NoError(t, client1.Await(ctx, RunStateWait)) + require.NoError(t, client2.Await(ctx, RunStateWait)) require.NoError(t, client0.Exit(waitStatusSuccess)) - select { - case err := <-client1.WaitReady(): - require.NoError(t, err.Err) - break - case err := <-client2.WaitReady(): - require.NoError(t, err.Err) - require.FailNow(t, "client2 should not be ready") - case <-runner.Done(): - require.FailNow(t, "runner should not be done") - case <-time.After(time.Second): - require.FailNow(t, "client1 should be ready") - } + require.NoError(t, client0.Await(ctx, RunStateStart)) + require.NoError(t, client1.Await(ctx, RunStateStart)) + require.NoError(t, client2.Await(ctx, RunStateWait)) require.NoError(t, client1.Exit(waitStatusSuccess)) - select { - case err := <-client2.WaitReady(): - require.NoError(t, err.Err) - break - case <-runner.Done(): - require.FailNow(t, "runner should not be done") - case <-time.After(time.Second): - require.FailNow(t, "client2 should be ready") - } + require.NoError(t, client0.Await(ctx, RunStateStart)) + require.NoError(t, client1.Await(ctx, RunStateStart)) + require.NoError(t, client2.Await(ctx, RunStateStart)) require.NoError(t, client2.Exit(waitStatusSuccess)) select { case <-runner.Done(): break - case <-time.After(time.Second): + default: require.FailNow(t, "runner should be done when all clients have exited") } } @@ -88,8 +62,8 @@ func TestDuplicateClients(t *testing.T) { runner := newRunner(t, 2) socketPath := runner.conf.SocketPath - client0 := Client{ID: 0, SocketPath: socketPath} - client1 := Client{ID: 0, SocketPath: socketPath} + client0 := &Client{ID: 0, SocketPath: socketPath} + client1 := &Client{ID: 0, SocketPath: socketPath} // wait for runner to listen require.Eventually(t, func() bool { @@ -98,29 +72,29 @@ func TestDuplicateClients(t *testing.T) { }, time.Second*10, time.Millisecond, "expected socket file to exist") - require.NoError(t, client0.Connect()) - require.Error(t, client1.Connect(), "expected an error when connecting a client with a duplicate ID") + require.NoError(t, connect(client0)) + require.Error(t, connect(client1), "expected an error when connecting a client with a duplicate ID") } func TestExcessClients(t *testing.T) { runner := newRunner(t, 1) socketPath := runner.conf.SocketPath - client0 := Client{ID: 0, SocketPath: socketPath} - client1 := Client{ID: 1, SocketPath: socketPath} + client0 := &Client{ID: 0, SocketPath: socketPath} + client1 := &Client{ID: 1, SocketPath: socketPath} - require.NoError(t, client0.Connect()) - require.Error(t, client1.Connect(), "expected an error when connecting too many clients") + require.NoError(t, connect(client0)) + require.Error(t, connect(client1), "expected an error when connecting too many clients") } func TestWaitStatusNonZero(t *testing.T) { runner := newRunner(t, 2) - client0 := Client{ID: 0, SocketPath: runner.conf.SocketPath} - client1 := Client{ID: 1, SocketPath: runner.conf.SocketPath} + client0 := &Client{ID: 0, SocketPath: runner.conf.SocketPath} + client1 := &Client{ID: 1, SocketPath: runner.conf.SocketPath} - require.NoError(t, client0.Connect()) - require.NoError(t, client1.Connect()) + require.NoError(t, connect(client0)) + require.NoError(t, connect(client1)) require.NoError(t, client0.Exit(waitStatusFailure)) require.NoError(t, client1.Exit(waitStatusSuccess)) require.Equal(t, runner.WaitStatus().ExitStatus(), 1) @@ -129,17 +103,38 @@ func TestWaitStatusNonZero(t *testing.T) { func TestWaitStatusSignaled(t *testing.T) { runner := newRunner(t, 2) - client0 := Client{ID: 0, SocketPath: runner.conf.SocketPath} - client1 := Client{ID: 1, SocketPath: runner.conf.SocketPath} + client0 := &Client{ID: 0, SocketPath: runner.conf.SocketPath} + client1 := &Client{ID: 1, SocketPath: runner.conf.SocketPath} - require.NoError(t, client0.Connect()) - require.NoError(t, client1.Connect()) + require.NoError(t, connect(client0)) + require.NoError(t, connect(client1)) require.NoError(t, client0.Exit(waitStatusSignaled)) require.NoError(t, client1.Exit(waitStatusSuccess)) require.Equal(t, runner.WaitStatus().ExitStatus(), 0) require.True(t, runner.WaitStatus().Signaled()) } +func TestInterrupt(t *testing.T) { + runner := newRunner(t, 2) + + client0 := &Client{ID: 0, SocketPath: runner.conf.SocketPath} + client1 := &Client{ID: 1, SocketPath: runner.conf.SocketPath} + + require.NoError(t, connect(client0)) + require.NoError(t, connect(client1)) + + require.NoError(t, runner.Interrupt()) + + ctx := context.Background() + require.ErrorIs(t, client0.Await(ctx, RunStateWait), ErrInterrupt) + require.Error(t, client0.Await(ctx, RunStateStart), ErrInterrupt) + require.NoError(t, client0.Await(ctx, RunStateInterrupt)) + + require.Error(t, client1.Await(ctx, RunStateWait), ErrInterrupt) + require.Error(t, client1.Await(ctx, RunStateStart), ErrInterrupt) + require.NoError(t, client1.Await(ctx, RunStateInterrupt)) +} + func newRunner(t *testing.T, clientCount int) *Runner { tempDir, err := os.MkdirTemp("", t.Name()) require.NoError(t, err) @@ -197,3 +192,9 @@ func (w waitStatus) Signal() syscall.Signal { func intptr(x int) *int { return &x } + +// helper for ignoring the response from regular client.Connect +func connect(c *Client) error { + _, err := c.Connect() + return err +} From 94bfe3a62a87021c611745f0422aee4d73a4645e Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Fri, 16 Dec 2022 11:36:23 -0500 Subject: [PATCH 09/17] Implement terminate --- bootstrap/bootstrap.go | 1 - kubernetes/kubernetes.go | 11 +++++++---- kubernetes/kubernetes_test.go | 22 ++++++++++++++-------- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 732ff2e706..6207cc83ba 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -116,7 +116,6 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { go func() { if err := kubernetesClient.Await(ctx, kubernetes.RunStateInterrupt); err != nil { b.shell.Errorf("Error waiting for client interrupt: %v", err) - return } b.cancelCh <- struct{}{} }() diff --git a/kubernetes/kubernetes.go b/kubernetes/kubernetes.go index f8a3be5c28..8bfb2f6f1b 100644 --- a/kubernetes/kubernetes.go +++ b/kubernetes/kubernetes.go @@ -116,6 +116,7 @@ func (r *Runner) Done() <-chan struct{} { func (r *Runner) Interrupt() error { r.mu.Lock() defer r.mu.Unlock() + r.interruptOnce.Do(func() { close(r.interrupt) }) @@ -126,7 +127,11 @@ func (r *Runner) Interrupt() error { func (r *Runner) Terminate() error { r.mu.Lock() defer r.mu.Unlock() - panic("unimplemented") + + r.closedOnce.Do(func() { + close(r.done) + }) + return nil } func (r *Runner) WaitStatus() process.WaitStatus { @@ -306,12 +311,10 @@ func (c *Client) Await(ctx context.Context, desiredState RunState) error { for { select { case <-ctx.Done(): + return ctx.Err() default: var current RunState if err := c.client.Call("Runner.Status", c.ID, ¤t); err != nil { - if desiredState == RunStateInterrupt && errors.Is(err, rpc.ErrShutdown) { - return nil - } return err } if current == desiredState { diff --git a/kubernetes/kubernetes_test.go b/kubernetes/kubernetes_test.go index 30530faf28..d07c8c1001 100644 --- a/kubernetes/kubernetes_test.go +++ b/kubernetes/kubernetes_test.go @@ -3,6 +3,7 @@ package kubernetes import ( "context" "encoding/gob" + "net/rpc" "os" "path/filepath" "syscall" @@ -116,23 +117,28 @@ func TestWaitStatusSignaled(t *testing.T) { func TestInterrupt(t *testing.T) { runner := newRunner(t, 2) - + ctx := context.Background() client0 := &Client{ID: 0, SocketPath: runner.conf.SocketPath} - client1 := &Client{ID: 1, SocketPath: runner.conf.SocketPath} require.NoError(t, connect(client0)) - require.NoError(t, connect(client1)) - require.NoError(t, runner.Interrupt()) - ctx := context.Background() require.ErrorIs(t, client0.Await(ctx, RunStateWait), ErrInterrupt) require.Error(t, client0.Await(ctx, RunStateStart), ErrInterrupt) require.NoError(t, client0.Await(ctx, RunStateInterrupt)) +} + +func TestTerminate(t *testing.T) { + runner := newRunner(t, 2) + ctx := context.Background() + client0 := &Client{ID: 0, SocketPath: runner.conf.SocketPath} + + require.NoError(t, connect(client0)) + require.NoError(t, runner.Terminate()) - require.Error(t, client1.Await(ctx, RunStateWait), ErrInterrupt) - require.Error(t, client1.Await(ctx, RunStateStart), ErrInterrupt) - require.NoError(t, client1.Await(ctx, RunStateInterrupt)) + require.ErrorContains(t, client0.Await(ctx, RunStateWait), rpc.ErrShutdown.Error()) + require.ErrorContains(t, client0.Await(ctx, RunStateStart), rpc.ErrShutdown.Error()) + require.ErrorContains(t, client0.Await(ctx, RunStateInterrupt), rpc.ErrShutdown.Error()) } func newRunner(t *testing.T, clientCount int) *Runner { From ea0dda5ff56bc695c29f8b5155f8be628547d833 Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Wed, 21 Dec 2022 14:20:53 -0500 Subject: [PATCH 10/17] Fix WaitStatus on Windows Previously wouldn't compile on Windows, where syscall.WaitStatus is a struct --- bootstrap/shell/shell.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bootstrap/shell/shell.go b/bootstrap/shell/shell.go index 8fe6e762c1..5f92b03b31 100644 --- a/bootstrap/shell/shell.go +++ b/bootstrap/shell/shell.go @@ -177,7 +177,8 @@ func (s *Shell) WaitStatus() process.WaitStatus { if s.cmd != nil && s.cmd.proc != nil { return s.cmd.proc.WaitStatus() } - return syscall.WaitStatus(0) + var ws syscall.WaitStatus + return ws } // LockFile is a pid-based lock for cross-process locking From 3b212aa8c272b390e103fd183317093bb6a8bcce Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Thu, 22 Dec 2022 10:37:12 -0500 Subject: [PATCH 11/17] Return an error for WaitStatus if proc not started - Modeled this on the behavior of exec.Command's Wait() fn - Returning an empty WaitStatus if we somehow reach this code without having started the shell means we might hide the error since the default value will report a 0 exit code --- bootstrap/bootstrap.go | 7 ++++++- bootstrap/shell/shell.go | 13 +++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 6207cc83ba..8cc741fee5 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -120,7 +120,12 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { b.cancelCh <- struct{}{} }() defer func() { - kubernetesClient.Exit(b.shell.WaitStatus()) + ws, err := b.shell.WaitStatus() + if err != nil { + b.shell.Errorf("Error getting wait status: %v", err) + return + } + kubernetesClient.Exit(ws) }() } diff --git a/bootstrap/shell/shell.go b/bootstrap/shell/shell.go index 5f92b03b31..2b5c119084 100644 --- a/bootstrap/shell/shell.go +++ b/bootstrap/shell/shell.go @@ -169,16 +169,17 @@ func (s *Shell) Terminate() { } } -// Terminate running command -func (s *Shell) WaitStatus() process.WaitStatus { +// Returns the WaitStatus of the shell's process. +// +// The shell must have been started. +func (s *Shell) WaitStatus() (process.WaitStatus, error) { s.cmdLock.Lock() defer s.cmdLock.Unlock() - if s.cmd != nil && s.cmd.proc != nil { - return s.cmd.proc.WaitStatus() + if s.cmd == nil || s.cmd.proc == nil { + return nil, errors.New("shell not started") } - var ws syscall.WaitStatus - return ws + return s.cmd.proc.WaitStatus(), nil } // LockFile is a pid-based lock for cross-process locking From 6f5ef14a014ec5983c2999fb0ecb10a5ee19c120 Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Fri, 13 Jan 2023 11:50:10 -0500 Subject: [PATCH 12/17] Fix file permissions of socket If different containers run as different users, they do not have the required rw permissions on the socket. By default it was being assigned 0755 perms --- kubernetes/kubernetes.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kubernetes/kubernetes.go b/kubernetes/kubernetes.go index 8bfb2f6f1b..0e798cc192 100644 --- a/kubernetes/kubernetes.go +++ b/kubernetes/kubernetes.go @@ -85,12 +85,15 @@ func (r *Runner) Run(ctx context.Context) error { r.server.Register(r) r.mux.Handle(rpc.DefaultRPCPath, r.server) + oldUmask := syscall.Umask(0) // set umask of socket file to 0777 (world read-write-executable) l, err := (&net.ListenConfig{}).Listen(ctx, "unix", r.conf.SocketPath) if err != nil { return fmt.Errorf("failed to listen: %w", err) } defer l.Close() defer os.Remove(r.conf.SocketPath) + + syscall.Umask(oldUmask) // change back to regular umask r.listener = l go http.Serve(l, r.mux) From 09230d7ac8c7892a6d0329a871b4d2e78e741b89 Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Fri, 13 Jan 2023 14:13:37 -0500 Subject: [PATCH 13/17] Fix exit code handling The shell's waitstatus is not necessarily the command's exit code, if for instance there is a failure spawning the subprocess in the case of /bin/bash not existing. The BUILDKITE_COMMAND_EXIT_STATUS is what the bootstrap actually exits with, so this is the value we should be reporting to the agent --- bootstrap/bootstrap.go | 10 ++++----- kubernetes/kubernetes.go | 37 +++++++++++++++++++++--------- kubernetes/kubernetes_test.go | 42 +++++------------------------------ 3 files changed, 35 insertions(+), 54 deletions(-) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 8cc741fee5..fcab0d43e0 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -120,12 +120,10 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { b.cancelCh <- struct{}{} }() defer func() { - ws, err := b.shell.WaitStatus() - if err != nil { - b.shell.Errorf("Error getting wait status: %v", err) - return - } - kubernetesClient.Exit(ws) + exitStatus, _ := b.shell.Env.Get("BUILDKITE_COMMAND_EXIT_STATUS") + exitStatusCode, _ := strconv.Atoi(exitStatus) + + kubernetesClient.Exit(exitStatusCode) }() } diff --git a/kubernetes/kubernetes.go b/kubernetes/kubernetes.go index 0e798cc192..d2b95fe481 100644 --- a/kubernetes/kubernetes.go +++ b/kubernetes/kubernetes.go @@ -62,7 +62,7 @@ type Runner struct { } type clientResult struct { - ExitStatus process.WaitStatus + ExitStatus int State clientState } @@ -137,17 +137,32 @@ func (r *Runner) Terminate() error { return nil } +type waitStatus struct { + Code int + SignalCode *int +} + +func (w waitStatus) ExitStatus() int { + return w.Code +} + +func (w waitStatus) Signal() syscall.Signal { + var signal syscall.Signal + return signal +} + +func (w waitStatus) Signaled() bool { + return false +} + func (r *Runner) WaitStatus() process.WaitStatus { var ws process.WaitStatus for _, client := range r.clients { - if client.ExitStatus.ExitStatus() != 0 { - return client.ExitStatus - } - if client.ExitStatus.Signaled() { - return client.ExitStatus + if client.ExitStatus != 0 { + return waitStatus{Code: client.ExitStatus} } // just return any ExitStatus if we don't find any "interesting" ones - ws = client.ExitStatus + ws = waitStatus{Code: client.ExitStatus} } return ws } @@ -161,7 +176,7 @@ type Logs struct { type ExitCode struct { ID int - ExitStatus process.WaitStatus + ExitStatus int } type Status struct { @@ -189,10 +204,10 @@ func (r *Runner) Exit(args ExitCode, reply *Empty) error { if !found { return fmt.Errorf("unrecognized client id: %d", args.ID) } - r.logger.Info("client %d exited with code %d", args.ID, args.ExitStatus.ExitStatus()) + r.logger.Info("client %d exited with code %d", args.ID, args.ExitStatus) client.ExitStatus = args.ExitStatus client.State = stateExited - if client.ExitStatus.ExitStatus() != 0 { + if client.ExitStatus != 0 { r.closedOnce.Do(func() { close(r.done) }) @@ -273,7 +288,7 @@ func (c *Client) Connect() (RegisterResponse, error) { return resp, nil } -func (c *Client) Exit(exitStatus process.WaitStatus) error { +func (c *Client) Exit(exitStatus int) error { if c.client == nil { return errNotConnected } diff --git a/kubernetes/kubernetes_test.go b/kubernetes/kubernetes_test.go index d07c8c1001..d082e9a9fc 100644 --- a/kubernetes/kubernetes_test.go +++ b/kubernetes/kubernetes_test.go @@ -6,7 +6,6 @@ import ( "net/rpc" "os" "path/filepath" - "syscall" "testing" "time" @@ -40,17 +39,17 @@ func TestOrderedClients(t *testing.T) { require.NoError(t, client1.Await(ctx, RunStateWait)) require.NoError(t, client2.Await(ctx, RunStateWait)) - require.NoError(t, client0.Exit(waitStatusSuccess)) + require.NoError(t, client0.Exit(0)) require.NoError(t, client0.Await(ctx, RunStateStart)) require.NoError(t, client1.Await(ctx, RunStateStart)) require.NoError(t, client2.Await(ctx, RunStateWait)) - require.NoError(t, client1.Exit(waitStatusSuccess)) + require.NoError(t, client1.Exit(0)) require.NoError(t, client0.Await(ctx, RunStateStart)) require.NoError(t, client1.Await(ctx, RunStateStart)) require.NoError(t, client2.Await(ctx, RunStateStart)) - require.NoError(t, client2.Exit(waitStatusSuccess)) + require.NoError(t, client2.Exit(0)) select { case <-runner.Done(): break @@ -96,25 +95,11 @@ func TestWaitStatusNonZero(t *testing.T) { require.NoError(t, connect(client0)) require.NoError(t, connect(client1)) - require.NoError(t, client0.Exit(waitStatusFailure)) - require.NoError(t, client1.Exit(waitStatusSuccess)) + require.NoError(t, client0.Exit(1)) + require.NoError(t, client1.Exit(0)) require.Equal(t, runner.WaitStatus().ExitStatus(), 1) } -func TestWaitStatusSignaled(t *testing.T) { - runner := newRunner(t, 2) - - client0 := &Client{ID: 0, SocketPath: runner.conf.SocketPath} - client1 := &Client{ID: 1, SocketPath: runner.conf.SocketPath} - - require.NoError(t, connect(client0)) - require.NoError(t, connect(client1)) - require.NoError(t, client0.Exit(waitStatusSignaled)) - require.NoError(t, client1.Exit(waitStatusSuccess)) - require.Equal(t, runner.WaitStatus().ExitStatus(), 0) - require.True(t, runner.WaitStatus().Signaled()) -} - func TestInterrupt(t *testing.T) { runner := newRunner(t, 2) ctx := context.Background() @@ -178,23 +163,6 @@ func init() { gob.Register(new(waitStatus)) } -type waitStatus struct { - Code int - SignalCode *int -} - -func (w waitStatus) ExitStatus() int { - return w.Code -} - -func (w waitStatus) Signaled() bool { - return w.SignalCode != nil -} - -func (w waitStatus) Signal() syscall.Signal { - return syscall.Signal(*w.SignalCode) -} - func intptr(x int) *int { return &x } From f78da7642e5c6ec212b805ca3116ec9ecd18d352 Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Tue, 17 Jan 2023 12:52:27 -0500 Subject: [PATCH 14/17] Move kubernetes experiment to a helper method --- bootstrap/bootstrap.go | 86 +++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 42 deletions(-) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index fcab0d43e0..facbae6dec 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -80,50 +80,12 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { b.shell.InterruptSignal = b.Config.CancelSignal } if experiments.IsEnabled("kubernetes-exec") { - var kubernetesClient kubernetes.Client - b.shell.Commentf("Using experimental Kubernetes support") - err := roko.NewRetrier( - roko.WithMaxAttempts(7), - roko.WithStrategy(roko.Exponential(2*time.Second, 0)), - ).Do(func(rtr *roko.Retrier) error { - id, err := strconv.Atoi(os.Getenv("BUILDKITE_CONTAINER_ID")) - if err != nil { - return fmt.Errorf("failed to parse container id, %s", os.Getenv("BUILDKITE_CONTAINER_ID")) - } - kubernetesClient.ID = id - connect, err := kubernetesClient.Connect() - if err != nil { - return err - } - os.Setenv("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken) - b.shell.Env.Set("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken) - writer := io.MultiWriter(os.Stdout, &kubernetesClient) - b.shell.Writer = writer - b.shell.Logger = &shell.WriterLogger{ - Writer: writer, - Ansi: true, - } - return nil - }) - if err != nil { - b.shell.Errorf("Error connecting to kubernetes runner: %v", err) - return 1 - } - if err := kubernetesClient.Await(ctx, kubernetes.RunStateStart); err != nil { - b.shell.Errorf("Error waiting for client to become ready: %v", err) - return 1 + kubernetesClient := &kubernetes.Client{} + if err := b.startKubernetesClient(ctx, kubernetesClient); err != nil { + b.shell.Errorf("Failed to start kubernetes client: %v", err) } - go func() { - if err := kubernetesClient.Await(ctx, kubernetes.RunStateInterrupt); err != nil { - b.shell.Errorf("Error waiting for client interrupt: %v", err) - } - b.cancelCh <- struct{}{} - }() defer func() { - exitStatus, _ := b.shell.Env.Get("BUILDKITE_COMMAND_EXIT_STATUS") - exitStatusCode, _ := strconv.Atoi(exitStatus) - - kubernetesClient.Exit(exitStatusCode) + kubernetesClient.Exit(exitCode) }() } @@ -2027,3 +1989,43 @@ type pluginCheckout struct { CheckoutDir string HooksDir string } + +func (b *Bootstrap) startKubernetesClient(ctx context.Context, kubernetesClient *kubernetes.Client) error { + b.shell.Commentf("Using experimental Kubernetes support") + err := roko.NewRetrier( + roko.WithMaxAttempts(7), + roko.WithStrategy(roko.Exponential(2*time.Second, 0)), + ).Do(func(rtr *roko.Retrier) error { + id, err := strconv.Atoi(os.Getenv("BUILDKITE_CONTAINER_ID")) + if err != nil { + return fmt.Errorf("failed to parse container id, %s", os.Getenv("BUILDKITE_CONTAINER_ID")) + } + kubernetesClient.ID = id + connect, err := kubernetesClient.Connect() + if err != nil { + return err + } + os.Setenv("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken) + b.shell.Env.Set("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken) + writer := io.MultiWriter(os.Stdout, kubernetesClient) + b.shell.Writer = writer + b.shell.Logger = &shell.WriterLogger{ + Writer: writer, + Ansi: true, + } + return nil + }) + if err != nil { + return fmt.Errorf("error connecting to kubernetes runner: %w", err) + } + if err := kubernetesClient.Await(ctx, kubernetes.RunStateStart); err != nil { + return fmt.Errorf("error waiting for client to become ready: %w", err) + } + go func() { + if err := kubernetesClient.Await(ctx, kubernetes.RunStateInterrupt); err != nil { + b.shell.Errorf("Error waiting for client interrupt: %v", err) + } + b.cancelCh <- struct{}{} + }() + return nil +} From fb33bb9d9d2da5d6a9d07adc9cbfabcb50695b9e Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Wed, 18 Jan 2023 11:07:32 -0500 Subject: [PATCH 15/17] Fix build on Windows --- kubernetes/kubernetes.go | 7 +++++-- kubernetes/umask.go | 13 +++++++++++++ kubernetes/umask_windows.go | 13 +++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) create mode 100644 kubernetes/umask.go create mode 100644 kubernetes/umask_windows.go diff --git a/kubernetes/kubernetes.go b/kubernetes/kubernetes.go index d2b95fe481..3d64d59011 100644 --- a/kubernetes/kubernetes.go +++ b/kubernetes/kubernetes.go @@ -85,7 +85,10 @@ func (r *Runner) Run(ctx context.Context) error { r.server.Register(r) r.mux.Handle(rpc.DefaultRPCPath, r.server) - oldUmask := syscall.Umask(0) // set umask of socket file to 0777 (world read-write-executable) + oldUmask, err := Umask(0) // set umask of socket file to 0777 (world read-write-executable) + if err != nil { + return fmt.Errorf("failed to set socket umask: %w", err) + } l, err := (&net.ListenConfig{}).Listen(ctx, "unix", r.conf.SocketPath) if err != nil { return fmt.Errorf("failed to listen: %w", err) @@ -93,7 +96,7 @@ func (r *Runner) Run(ctx context.Context) error { defer l.Close() defer os.Remove(r.conf.SocketPath) - syscall.Umask(oldUmask) // change back to regular umask + Umask(oldUmask) // change back to regular umask r.listener = l go http.Serve(l, r.mux) diff --git a/kubernetes/umask.go b/kubernetes/umask.go new file mode 100644 index 0000000000..3cae818d19 --- /dev/null +++ b/kubernetes/umask.go @@ -0,0 +1,13 @@ +//go:build !windows +// +build !windows + +package kubernetes + +import ( + "golang.org/x/sys/unix" +) + +// Umask is a wrapper for `unix.Umask()` on non-Windows platforms +func Umask(mask int) (old int, err error) { + return unix.Umask(mask), nil +} diff --git a/kubernetes/umask_windows.go b/kubernetes/umask_windows.go new file mode 100644 index 0000000000..188a07e1f4 --- /dev/null +++ b/kubernetes/umask_windows.go @@ -0,0 +1,13 @@ +//go:build windows +// +build windows + +package kubernetes + +import ( + "errors" +) + +// Umask returns an error on Windows +func Umask(mask int) (int, error) { + return 0, errors.New("platform and architecture is not supported") +} From 82d6e2e15eb224266b08705500132772a2415a17 Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Thu, 26 Jan 2023 11:46:42 -0500 Subject: [PATCH 16/17] Don't build Kubernetes client on Windows --- kubernetes/kubernetes_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kubernetes/kubernetes_test.go b/kubernetes/kubernetes_test.go index d082e9a9fc..da4a4a0829 100644 --- a/kubernetes/kubernetes_test.go +++ b/kubernetes/kubernetes_test.go @@ -1,3 +1,5 @@ +//go:build !windows + package kubernetes import ( From c553392dcd04b3ce75b1c65b554de45b5b2f932a Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Thu, 16 Feb 2023 13:37:15 -0500 Subject: [PATCH 17/17] Short-circuit if the client fails to connect Previously it would log "Failed to start" but then proceed to run the command anyway. --- bootstrap/bootstrap.go | 1 + 1 file changed, 1 insertion(+) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index facbae6dec..6ccd700e4d 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -83,6 +83,7 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { kubernetesClient := &kubernetes.Client{} if err := b.startKubernetesClient(ctx, kubernetesClient); err != nil { b.shell.Errorf("Failed to start kubernetes client: %v", err) + return 1 } defer func() { kubernetesClient.Exit(exitCode)