Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix restart batchjob infinite loop #1228

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions charts/radix-operator/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: v2
name: radix-operator
version: 1.46.2
appVersion: 1.66.2
version: 1.46.3
appVersion: 1.66.3
kubeVersion: ">=1.24.0"
description: Radix Operator
keywords:
Expand Down
63 changes: 15 additions & 48 deletions pkg/apis/batch/kubejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,34 @@ const (
)

func (s *syncer) reconcileKubeJob(ctx context.Context, batchJob *radixv1.RadixBatchJob, rd *radixv1.RadixDeployment, jobComponent *radixv1.RadixDeployJobComponent, existingJobs []*batchv1.Job) error {
batchJobKubeJobs := slice.FindAll(existingJobs, isKubeJobForBatchJob(batchJob))

if isBatchJobStopRequested(batchJob) {
// Delete existing k8s job if stop is requested for batch job
batchJobKubeJobs := slice.FindAll(existingJobs, func(job *batchv1.Job) bool { return isResourceLabeledWithBatchJobName(batchJob.Name, job) })
return s.deleteJobs(ctx, batchJobKubeJobs)
}

jobNeedToBeRestarted, err := s.handleJobToRestart(ctx, batchJob, existingJobs)
if err != nil {
return err
}
if !jobNeedToBeRestarted && (isBatchJobDone(s.radixBatch, batchJob.Name) ||
slice.Any(existingJobs, func(job *batchv1.Job) bool { return isResourceLabeledWithBatchJobName(batchJob.Name, job) })) {
requiresRestart := s.jobRequiresRestart(*batchJob)
if !requiresRestart && (s.isBatchJobDone(batchJob.Name) || len(batchJobKubeJobs) > 0) {
return nil
}
err = s.validatePayloadSecretReference(ctx, batchJob, jobComponent)
if err != nil {

if requiresRestart {
s.restartedJobs[batchJob.Name] = *batchJob
if err := s.deleteJobs(ctx, batchJobKubeJobs); err != nil {
return err
}
}

if err := s.validatePayloadSecretReference(ctx, batchJob, jobComponent); err != nil {
return err
}

job, err := s.buildJob(ctx, batchJob, jobComponent, rd)
if err != nil {
return err
}

return retry.RetryOnConflict(retry.DefaultRetry, func() error {
_, err = s.kubeClient.BatchV1().Jobs(s.radixBatch.GetNamespace()).Create(ctx, job, metav1.CreateOptions{})
return err
Expand All @@ -76,45 +82,6 @@ func (s *syncer) validatePayloadSecretReference(ctx context.Context, batchJob *r
return nil
}

func (s *syncer) handleJobToRestart(ctx context.Context, batchJob *radixv1.RadixBatchJob, existingJobs []*batchv1.Job) (bool, error) {
jobStatusIdx := slice.FindIndex(s.radixBatch.Status.JobStatuses, func(jobStatus radixv1.RadixBatchJobStatus) bool {
return jobStatus.Name == batchJob.Name
})

jobRestartTimestamp, jobStatusRestartTimestamp := s.getJobRestartTimestamps(batchJob, jobStatusIdx)
if !needRestartJob(jobRestartTimestamp, jobStatusRestartTimestamp) {
return false, nil
}

jobsToDelete := slice.FindAll(existingJobs, func(job *batchv1.Job) bool { return isResourceLabeledWithBatchJobName(batchJob.Name, job) })
err := s.deleteJobs(ctx, jobsToDelete)
if err != nil {
return true, err
}

jobStatus := radixv1.RadixBatchJobStatus{
Name: batchJob.Name,
Restart: jobRestartTimestamp,
}
if jobStatusIdx >= 0 {
s.radixBatch.Status.JobStatuses[jobStatusIdx] = jobStatus
return true, nil
}
s.radixBatch.Status.JobStatuses = append(s.radixBatch.Status.JobStatuses, jobStatus)
return true, nil
}

func needRestartJob(jobRestartTimestamp string, jobStatusRestartTimestamp string) bool {
return len(jobRestartTimestamp) > 0 && jobRestartTimestamp != jobStatusRestartTimestamp
}

func (s *syncer) getJobRestartTimestamps(batchJob *radixv1.RadixBatchJob, jobStatusIdx int) (string, string) {
if jobStatusIdx >= 0 {
return batchJob.Restart, s.radixBatch.Status.JobStatuses[jobStatusIdx].Restart
}
return batchJob.Restart, ""
}

func (s *syncer) deleteJobs(ctx context.Context, jobsToDelete []*batchv1.Job) error {
for _, jobToDelete := range jobsToDelete {
err := s.kubeClient.BatchV1().Jobs(jobToDelete.GetNamespace()).Delete(ctx, jobToDelete.GetName(), metav1.DeleteOptions{PropagationPolicy: pointers.Ptr(metav1.DeletePropagationBackground)})
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/batch/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (s *syncer) reconcileService(ctx context.Context, batchJob *radixv1.RadixBa
return nil
}

if isBatchJobStopRequested(batchJob) || isBatchJobDone(s.radixBatch, batchJob.Name) {
if isBatchJobStopRequested(batchJob) || s.isBatchJobDone(batchJob.Name) {
return nil
}

Expand Down
16 changes: 12 additions & 4 deletions pkg/apis/batch/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,28 @@ func (s *syncer) buildJobStatuses(ctx context.Context) ([]radixv1.RadixBatchJobS
}

func (s *syncer) buildBatchJobStatus(ctx context.Context, batchJob *radixv1.RadixBatchJob, allJobs []*batchv1.Job) radixv1.RadixBatchJobStatus {
restartedJob, isRestartedJob := s.restartedJobs[batchJob.Name]

currentStatus, hasCurrentStatus := slice.FindFirst(s.radixBatch.Status.JobStatuses, func(jobStatus radixv1.RadixBatchJobStatus) bool {
return jobStatus.Name == batchJob.Name
})
if hasCurrentStatus && isJobStatusDone(currentStatus) {

if !isRestartedJob && hasCurrentStatus && isJobStatusDone(currentStatus) {
return currentStatus
}

status := radixv1.RadixBatchJobStatus{
Name: batchJob.Name,
Phase: radixv1.BatchJobPhaseWaiting,
}
if hasCurrentStatus {
status.Restart = currentStatus.Restart

if isRestartedJob {
status.Restart = restartedJob.Restart
}

if hasCurrentStatus && !isRestartedJob {
status.Phase = currentStatus.Phase
status.Restart = currentStatus.Restart
}

if isBatchJobStopRequested(batchJob) {
Expand All @@ -151,7 +159,7 @@ func (s *syncer) buildBatchJobStatus(ctx context.Context, batchJob *radixv1.Radi
return status
}

job, jobFound := slice.FindFirst(allJobs, func(job *batchv1.Job) bool { return isResourceLabeledWithBatchJobName(batchJob.Name, job) })
job, jobFound := slice.FindFirst(allJobs, isKubeJobForBatchJob(batchJob))
if !jobFound {
return status
}
Expand Down
60 changes: 46 additions & 14 deletions pkg/apis/batch/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/equinor/radix-common/utils/slice"
"github.com/equinor/radix-operator/pkg/apis/config"
"github.com/equinor/radix-operator/pkg/apis/kube"
radixv1 "github.com/equinor/radix-operator/pkg/apis/radix/v1"
Expand All @@ -15,6 +16,10 @@ import (
"k8s.io/client-go/kubernetes"
)

const (
syncStatusForEveryNumberOfBatchJobsReconciled = 10
)

// Syncer of RadixBatch
type Syncer interface {
// OnSync Syncs RadixBatch
Expand All @@ -24,20 +29,22 @@ type Syncer interface {
// NewSyncer Constructor os RadixBatches Syncer
func NewSyncer(kubeclient kubernetes.Interface, kubeUtil *kube.Kube, radixClient radixclient.Interface, radixBatch *radixv1.RadixBatch, config *config.Config) Syncer {
return &syncer{
kubeClient: kubeclient,
kubeUtil: kubeUtil,
radixClient: radixClient,
radixBatch: radixBatch,
config: config,
kubeClient: kubeclient,
kubeUtil: kubeUtil,
radixClient: radixClient,
radixBatch: radixBatch,
config: config,
restartedJobs: map[string]radixv1.RadixBatchJob{},
}
}

type syncer struct {
kubeClient kubernetes.Interface
kubeUtil *kube.Kube
radixClient radixclient.Interface
radixBatch *radixv1.RadixBatch
config *config.Config
kubeClient kubernetes.Interface
kubeUtil *kube.Kube
radixClient radixclient.Interface
radixBatch *radixv1.RadixBatch
config *config.Config
restartedJobs map[string]radixv1.RadixBatchJob
}

// OnSync Syncs RadixBatches
Expand All @@ -49,16 +56,14 @@ func (s *syncer) OnSync(ctx context.Context) error {
return err
}

if isBatchDone(s.radixBatch) {
if s.isBatchDone() && !s.isRestartRequestedForAnyBatchJob() {
return nil
}

return s.syncStatus(ctx, s.reconcile(ctx))
}

func (s *syncer) reconcile(ctx context.Context) error {
const syncStatusForEveryNumberOfBatchJobsReconciled = 10

rd, jobComponent, err := s.getRadixDeploymentAndJobComponent(ctx)
if err != nil {
return err
Expand All @@ -83,7 +88,7 @@ func (s *syncer) reconcile(ctx context.Context) error {
return fmt.Errorf("batchjob %s: failed to reconcile kubejob: %w", batchJob.Name, err)
}

if i%syncStatusForEveryNumberOfBatchJobsReconciled == 0 {
if i%syncStatusForEveryNumberOfBatchJobsReconciled == (syncStatusForEveryNumberOfBatchJobsReconciled - 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we only syncing after 10 jobs, and not on every iteration? And what happens if a job has less than 10, eg. 5, when will they get synced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch controller will always perform a status update as the final step in OnSync:

return s.syncStatus(ctx, s.reconcile(ctx))

The reason we update status for every 10 jobs is to give some kind of feedback about the sync progress when a batch contains many jobe. If a batch has 100-200 jobs, it can easily take more than a minute to iterate and process them all, and instead of updating status at the end, we update it for every 10 jobs to give status feedback faster just to let the user know that "things are in progress"

if err := s.syncStatus(ctx, nil); err != nil {
return fmt.Errorf("batchjob %s: failed to sync status: %w", batchJob.Name, err)
}
Expand Down Expand Up @@ -129,3 +134,30 @@ func (s *syncer) batchJobIdentifierLabel(batchJobName, appName string) labels.Se
radixlabels.ForBatchJobName(batchJobName),
)
}

func (s *syncer) jobRequiresRestart(job radixv1.RadixBatchJob) bool {
if job.Restart == "" {
return false
}

currentStatus, found := slice.FindFirst(s.radixBatch.Status.JobStatuses, func(jobStatus radixv1.RadixBatchJobStatus) bool {
return jobStatus.Name == job.Name
})

return !found || job.Restart != currentStatus.Restart
}

func (s *syncer) isBatchDone() bool {
return s.radixBatch.Status.Condition.Type == radixv1.BatchConditionTypeCompleted
}

func (s *syncer) isBatchJobDone(batchJobName string) bool {
return slice.Any(s.radixBatch.Status.JobStatuses,
func(jobStatus radixv1.RadixBatchJobStatus) bool {
return jobStatus.Name == batchJobName && isJobStatusDone(jobStatus)
})
}

func (s *syncer) isRestartRequestedForAnyBatchJob() bool {
return slice.Any(s.radixBatch.Spec.Jobs, s.jobRequiresRestart)
}
Loading
Loading