Skip to content

Commit

Permalink
Fix restart batchjob infinite loop (#1228)
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsgstrabo authored Nov 22, 2024
1 parent 4b44992 commit 8e16e50
Show file tree
Hide file tree
Showing 7 changed files with 403 additions and 95 deletions.
4 changes: 2 additions & 2 deletions charts/radix-operator/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: v2
name: radix-operator
version: 1.46.2
appVersion: 1.66.2
version: 1.46.3
appVersion: 1.66.3
kubeVersion: ">=1.24.0"
description: Radix Operator
keywords:
Expand Down
63 changes: 15 additions & 48 deletions pkg/apis/batch/kubejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,34 @@ const (
)

func (s *syncer) reconcileKubeJob(ctx context.Context, batchJob *radixv1.RadixBatchJob, rd *radixv1.RadixDeployment, jobComponent *radixv1.RadixDeployJobComponent, existingJobs []*batchv1.Job) error {
batchJobKubeJobs := slice.FindAll(existingJobs, isKubeJobForBatchJob(batchJob))

if isBatchJobStopRequested(batchJob) {
// Delete existing k8s job if stop is requested for batch job
batchJobKubeJobs := slice.FindAll(existingJobs, func(job *batchv1.Job) bool { return isResourceLabeledWithBatchJobName(batchJob.Name, job) })
return s.deleteJobs(ctx, batchJobKubeJobs)
}

jobNeedToBeRestarted, err := s.handleJobToRestart(ctx, batchJob, existingJobs)
if err != nil {
return err
}
if !jobNeedToBeRestarted && (isBatchJobDone(s.radixBatch, batchJob.Name) ||
slice.Any(existingJobs, func(job *batchv1.Job) bool { return isResourceLabeledWithBatchJobName(batchJob.Name, job) })) {
requiresRestart := s.jobRequiresRestart(*batchJob)
if !requiresRestart && (s.isBatchJobDone(batchJob.Name) || len(batchJobKubeJobs) > 0) {
return nil
}
err = s.validatePayloadSecretReference(ctx, batchJob, jobComponent)
if err != nil {

if requiresRestart {
s.restartedJobs[batchJob.Name] = *batchJob
if err := s.deleteJobs(ctx, batchJobKubeJobs); err != nil {
return err
}
}

if err := s.validatePayloadSecretReference(ctx, batchJob, jobComponent); err != nil {
return err
}

job, err := s.buildJob(ctx, batchJob, jobComponent, rd)
if err != nil {
return err
}

return retry.RetryOnConflict(retry.DefaultRetry, func() error {
_, err = s.kubeClient.BatchV1().Jobs(s.radixBatch.GetNamespace()).Create(ctx, job, metav1.CreateOptions{})
return err
Expand All @@ -76,45 +82,6 @@ func (s *syncer) validatePayloadSecretReference(ctx context.Context, batchJob *r
return nil
}

func (s *syncer) handleJobToRestart(ctx context.Context, batchJob *radixv1.RadixBatchJob, existingJobs []*batchv1.Job) (bool, error) {
jobStatusIdx := slice.FindIndex(s.radixBatch.Status.JobStatuses, func(jobStatus radixv1.RadixBatchJobStatus) bool {
return jobStatus.Name == batchJob.Name
})

jobRestartTimestamp, jobStatusRestartTimestamp := s.getJobRestartTimestamps(batchJob, jobStatusIdx)
if !needRestartJob(jobRestartTimestamp, jobStatusRestartTimestamp) {
return false, nil
}

jobsToDelete := slice.FindAll(existingJobs, func(job *batchv1.Job) bool { return isResourceLabeledWithBatchJobName(batchJob.Name, job) })
err := s.deleteJobs(ctx, jobsToDelete)
if err != nil {
return true, err
}

jobStatus := radixv1.RadixBatchJobStatus{
Name: batchJob.Name,
Restart: jobRestartTimestamp,
}
if jobStatusIdx >= 0 {
s.radixBatch.Status.JobStatuses[jobStatusIdx] = jobStatus
return true, nil
}
s.radixBatch.Status.JobStatuses = append(s.radixBatch.Status.JobStatuses, jobStatus)
return true, nil
}

func needRestartJob(jobRestartTimestamp string, jobStatusRestartTimestamp string) bool {
return len(jobRestartTimestamp) > 0 && jobRestartTimestamp != jobStatusRestartTimestamp
}

func (s *syncer) getJobRestartTimestamps(batchJob *radixv1.RadixBatchJob, jobStatusIdx int) (string, string) {
if jobStatusIdx >= 0 {
return batchJob.Restart, s.radixBatch.Status.JobStatuses[jobStatusIdx].Restart
}
return batchJob.Restart, ""
}

func (s *syncer) deleteJobs(ctx context.Context, jobsToDelete []*batchv1.Job) error {
for _, jobToDelete := range jobsToDelete {
err := s.kubeClient.BatchV1().Jobs(jobToDelete.GetNamespace()).Delete(ctx, jobToDelete.GetName(), metav1.DeleteOptions{PropagationPolicy: pointers.Ptr(metav1.DeletePropagationBackground)})
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/batch/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (s *syncer) reconcileService(ctx context.Context, batchJob *radixv1.RadixBa
return nil
}

if isBatchJobStopRequested(batchJob) || isBatchJobDone(s.radixBatch, batchJob.Name) {
if isBatchJobStopRequested(batchJob) || s.isBatchJobDone(batchJob.Name) {
return nil
}

Expand Down
16 changes: 12 additions & 4 deletions pkg/apis/batch/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,28 @@ func (s *syncer) buildJobStatuses(ctx context.Context) ([]radixv1.RadixBatchJobS
}

func (s *syncer) buildBatchJobStatus(ctx context.Context, batchJob *radixv1.RadixBatchJob, allJobs []*batchv1.Job) radixv1.RadixBatchJobStatus {
restartedJob, isRestartedJob := s.restartedJobs[batchJob.Name]

currentStatus, hasCurrentStatus := slice.FindFirst(s.radixBatch.Status.JobStatuses, func(jobStatus radixv1.RadixBatchJobStatus) bool {
return jobStatus.Name == batchJob.Name
})
if hasCurrentStatus && isJobStatusDone(currentStatus) {

if !isRestartedJob && hasCurrentStatus && isJobStatusDone(currentStatus) {
return currentStatus
}

status := radixv1.RadixBatchJobStatus{
Name: batchJob.Name,
Phase: radixv1.BatchJobPhaseWaiting,
}
if hasCurrentStatus {
status.Restart = currentStatus.Restart

if isRestartedJob {
status.Restart = restartedJob.Restart
}

if hasCurrentStatus && !isRestartedJob {
status.Phase = currentStatus.Phase
status.Restart = currentStatus.Restart
}

if isBatchJobStopRequested(batchJob) {
Expand All @@ -151,7 +159,7 @@ func (s *syncer) buildBatchJobStatus(ctx context.Context, batchJob *radixv1.Radi
return status
}

job, jobFound := slice.FindFirst(allJobs, func(job *batchv1.Job) bool { return isResourceLabeledWithBatchJobName(batchJob.Name, job) })
job, jobFound := slice.FindFirst(allJobs, isKubeJobForBatchJob(batchJob))
if !jobFound {
return status
}
Expand Down
60 changes: 46 additions & 14 deletions pkg/apis/batch/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/equinor/radix-common/utils/slice"
"github.com/equinor/radix-operator/pkg/apis/config"
"github.com/equinor/radix-operator/pkg/apis/kube"
radixv1 "github.com/equinor/radix-operator/pkg/apis/radix/v1"
Expand All @@ -15,6 +16,10 @@ import (
"k8s.io/client-go/kubernetes"
)

const (
syncStatusForEveryNumberOfBatchJobsReconciled = 10
)

// Syncer of RadixBatch
type Syncer interface {
// OnSync Syncs RadixBatch
Expand All @@ -24,20 +29,22 @@ type Syncer interface {
// NewSyncer Constructor os RadixBatches Syncer
func NewSyncer(kubeclient kubernetes.Interface, kubeUtil *kube.Kube, radixClient radixclient.Interface, radixBatch *radixv1.RadixBatch, config *config.Config) Syncer {
return &syncer{
kubeClient: kubeclient,
kubeUtil: kubeUtil,
radixClient: radixClient,
radixBatch: radixBatch,
config: config,
kubeClient: kubeclient,
kubeUtil: kubeUtil,
radixClient: radixClient,
radixBatch: radixBatch,
config: config,
restartedJobs: map[string]radixv1.RadixBatchJob{},
}
}

type syncer struct {
kubeClient kubernetes.Interface
kubeUtil *kube.Kube
radixClient radixclient.Interface
radixBatch *radixv1.RadixBatch
config *config.Config
kubeClient kubernetes.Interface
kubeUtil *kube.Kube
radixClient radixclient.Interface
radixBatch *radixv1.RadixBatch
config *config.Config
restartedJobs map[string]radixv1.RadixBatchJob
}

// OnSync Syncs RadixBatches
Expand All @@ -49,16 +56,14 @@ func (s *syncer) OnSync(ctx context.Context) error {
return err
}

if isBatchDone(s.radixBatch) {
if s.isBatchDone() && !s.isRestartRequestedForAnyBatchJob() {
return nil
}

return s.syncStatus(ctx, s.reconcile(ctx))
}

func (s *syncer) reconcile(ctx context.Context) error {
const syncStatusForEveryNumberOfBatchJobsReconciled = 10

rd, jobComponent, err := s.getRadixDeploymentAndJobComponent(ctx)
if err != nil {
return err
Expand All @@ -83,7 +88,7 @@ func (s *syncer) reconcile(ctx context.Context) error {
return fmt.Errorf("batchjob %s: failed to reconcile kubejob: %w", batchJob.Name, err)
}

if i%syncStatusForEveryNumberOfBatchJobsReconciled == 0 {
if i%syncStatusForEveryNumberOfBatchJobsReconciled == (syncStatusForEveryNumberOfBatchJobsReconciled - 1) {
if err := s.syncStatus(ctx, nil); err != nil {
return fmt.Errorf("batchjob %s: failed to sync status: %w", batchJob.Name, err)
}
Expand Down Expand Up @@ -129,3 +134,30 @@ func (s *syncer) batchJobIdentifierLabel(batchJobName, appName string) labels.Se
radixlabels.ForBatchJobName(batchJobName),
)
}

func (s *syncer) jobRequiresRestart(job radixv1.RadixBatchJob) bool {
if job.Restart == "" {
return false
}

currentStatus, found := slice.FindFirst(s.radixBatch.Status.JobStatuses, func(jobStatus radixv1.RadixBatchJobStatus) bool {
return jobStatus.Name == job.Name
})

return !found || job.Restart != currentStatus.Restart
}

func (s *syncer) isBatchDone() bool {
return s.radixBatch.Status.Condition.Type == radixv1.BatchConditionTypeCompleted
}

func (s *syncer) isBatchJobDone(batchJobName string) bool {
return slice.Any(s.radixBatch.Status.JobStatuses,
func(jobStatus radixv1.RadixBatchJobStatus) bool {
return jobStatus.Name == batchJobName && isJobStatusDone(jobStatus)
})
}

func (s *syncer) isRestartRequestedForAnyBatchJob() bool {
return slice.Any(s.radixBatch.Spec.Jobs, s.jobRequiresRestart)
}
Loading

0 comments on commit 8e16e50

Please sign in to comment.