Skip to content

Commit

Permalink
Merge pull request #1884 from buildkite/kubernetes-job-runner
Browse files Browse the repository at this point in the history
  • Loading branch information
triarius authored Feb 21, 2023
2 parents 59afdea + c553392 commit c7abbce
Show file tree
Hide file tree
Showing 10 changed files with 672 additions and 15 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
packaging/docker/*/buildkite-agent
packaging/docker/*/hooks/

.idea
.idea
.vscode
2 changes: 1 addition & 1 deletion agent/agent_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type AgentWorker struct {

// When this worker runs a job, we'll store an instance of the
// JobRunner here
jobRunner *JobRunner
jobRunner jobRunner

// retrySleepFunc is useful for testing retry loops fast
// Hopefully this can be replaced with a global setting for tests in future:
Expand Down
55 changes: 43 additions & 12 deletions agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/buildkite/agent/v3/bootstrap/shell"
"github.com/buildkite/agent/v3/experiments"
"github.com/buildkite/agent/v3/hook"
"github.com/buildkite/agent/v3/kubernetes"
"github.com/buildkite/agent/v3/logger"
"github.com/buildkite/agent/v3/metrics"
"github.com/buildkite/agent/v3/process"
Expand Down Expand Up @@ -56,6 +58,11 @@ type JobRunnerConfig struct {
DebugHTTP bool
}

type jobRunner interface {
Run(ctx context.Context) error
CancelAndStop() error
}

type JobRunner struct {
// The configuration for the job runner
conf JobRunnerConfig
Expand All @@ -76,7 +83,7 @@ type JobRunner struct {
metrics *metrics.Scope

// The internal process of the job
process *process.Process
process jobAPI

// The internal buffer of the process output
output *process.Buffer
Expand All @@ -100,8 +107,19 @@ type JobRunner struct {
envFile *os.File
}

type jobAPI interface {
Done() <-chan struct{}
Started() <-chan struct{}
Interrupt() error
Terminate() error
Run(ctx context.Context) error
WaitStatus() process.WaitStatus
}

var _ jobRunner = (*JobRunner)(nil)

// Initializes the job runner
func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterResponse, job *api.Job, apiClient APIClient, conf JobRunnerConfig) (*JobRunner, error) {
func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterResponse, job *api.Job, apiClient APIClient, conf JobRunnerConfig) (jobRunner, error) {
runner := &JobRunner{
agent: ag,
job: job,
Expand Down Expand Up @@ -238,16 +256,29 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe
processEnv := append(os.Environ(), env...)

// The process that will run the bootstrap script
runner.process = process.New(l, process.Config{
Path: cmd[0],
Args: cmd[1:],
Dir: conf.AgentConfiguration.BuildPath,
Env: processEnv,
PTY: conf.AgentConfiguration.RunInPty,
Stdout: processWriter,
Stderr: processWriter,
InterruptSignal: conf.CancelSignal,
})
if experiments.IsEnabled("kubernetes-exec") {
containerCount, err := strconv.Atoi(os.Getenv("BUILDKITE_CONTAINER_COUNT"))
if err != nil {
return nil, fmt.Errorf("failed to parse BUILDKITE_CONTAINER_COUNT: %w", err)
}
runner.process = kubernetes.New(l, kubernetes.Config{
AccessToken: apiClient.Config().Token,
Stdout: processWriter,
Stderr: processWriter,
ClientCount: containerCount,
})
} else {
runner.process = process.New(l, process.Config{
Path: cmd[0],
Args: cmd[1:],
Dir: conf.AgentConfiguration.BuildPath,
Env: processEnv,
PTY: conf.AgentConfiguration.RunInPty,
Stdout: processWriter,
Stderr: processWriter,
InterruptSignal: conf.CancelSignal,
})
}

// Close the writer end of the pipe when the process finishes
go func() {
Expand Down
51 changes: 51 additions & 0 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/buildkite/agent/v3/env"
"github.com/buildkite/agent/v3/experiments"
"github.com/buildkite/agent/v3/hook"
"github.com/buildkite/agent/v3/kubernetes"
"github.com/buildkite/agent/v3/process"
"github.com/buildkite/agent/v3/redaction"
"github.com/buildkite/agent/v3/tracetools"
Expand Down Expand Up @@ -78,6 +79,16 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) {
b.shell.Debug = b.Config.Debug
b.shell.InterruptSignal = b.Config.CancelSignal
}
if experiments.IsEnabled("kubernetes-exec") {
kubernetesClient := &kubernetes.Client{}
if err := b.startKubernetesClient(ctx, kubernetesClient); err != nil {
b.shell.Errorf("Failed to start kubernetes client: %v", err)
return 1
}
defer func() {
kubernetesClient.Exit(exitCode)
}()
}

var err error

Expand Down Expand Up @@ -1979,3 +1990,43 @@ type pluginCheckout struct {
CheckoutDir string
HooksDir string
}

func (b *Bootstrap) startKubernetesClient(ctx context.Context, kubernetesClient *kubernetes.Client) error {
b.shell.Commentf("Using experimental Kubernetes support")
err := roko.NewRetrier(
roko.WithMaxAttempts(7),
roko.WithStrategy(roko.Exponential(2*time.Second, 0)),
).Do(func(rtr *roko.Retrier) error {
id, err := strconv.Atoi(os.Getenv("BUILDKITE_CONTAINER_ID"))
if err != nil {
return fmt.Errorf("failed to parse container id, %s", os.Getenv("BUILDKITE_CONTAINER_ID"))
}
kubernetesClient.ID = id
connect, err := kubernetesClient.Connect()
if err != nil {
return err
}
os.Setenv("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken)
b.shell.Env.Set("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken)
writer := io.MultiWriter(os.Stdout, kubernetesClient)
b.shell.Writer = writer
b.shell.Logger = &shell.WriterLogger{
Writer: writer,
Ansi: true,
}
return nil
})
if err != nil {
return fmt.Errorf("error connecting to kubernetes runner: %w", err)
}
if err := kubernetesClient.Await(ctx, kubernetes.RunStateStart); err != nil {
return fmt.Errorf("error waiting for client to become ready: %w", err)
}
go func() {
if err := kubernetesClient.Await(ctx, kubernetes.RunStateInterrupt); err != nil {
b.shell.Errorf("Error waiting for client interrupt: %v", err)
}
b.cancelCh <- struct{}{}
}()
return nil
}
13 changes: 13 additions & 0 deletions bootstrap/shell/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,19 @@ func (s *Shell) Terminate() {
}
}

// Returns the WaitStatus of the shell's process.
//
// The shell must have been started.
func (s *Shell) WaitStatus() (process.WaitStatus, error) {
s.cmdLock.Lock()
defer s.cmdLock.Unlock()

if s.cmd == nil || s.cmd.proc == nil {
return nil, errors.New("shell not started")
}
return s.cmd.proc.WaitStatus(), nil
}

// LockFile is a pid-based lock for cross-process locking
type LockFile interface {
Unlock() error
Expand Down
Loading

0 comments on commit c7abbce

Please sign in to comment.