-
Notifications
You must be signed in to change notification settings - Fork 10
/
task_execution.go
136 lines (112 loc) · 3.65 KB
/
task_execution.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package taskrunner
import (
"context"
"sync"
"time"
)
type taskExecutionState int
const (
// taskExecutionState_invalid tasks are not running and are queued to be started
// on the next invalidation plan.
taskExecutionState_invalid = iota
// taskExecutionState_running tasks are currently executing. They may transition to
// error on failure or done on succesful completion. Tasks may also be canceled.
taskExecutionState_running
// taskExecutionState_error tasks were running and failed. They may transition to invalid.
taskExecutionState_error
// taskExecutionState_done tasks were running and completed without errors. They may transition to invalid.
taskExecutionState_done
// taskExecutionState_canceled tasks were running and were stopped by the
// user. Tasks cannot leave canceled state.
taskExecutionState_canceled
)
// taskExecution is a node in the Executor's DAG. It holds the state
// for a single task's executions and is reused across task executions.
type taskExecution struct {
mu sync.Mutex
definition *Task
ctx context.Context
cancel func()
state taskExecutionState
terminalCh chan struct{}
dependencies []*taskExecution
dependents []*taskExecution
pendingInvalidations map[InvalidationEvent]struct{}
}
func (e *taskExecution) simpleEvent() *simpleEvent {
return &simpleEvent{
taskHandler: NewTaskHandler(e),
timestamp: time.Now(),
}
}
// ShouldExecute returns true if the taskExecution is marked
// invalidated AND all of its dependencies have successfully completed.
func (e *taskExecution) ShouldExecute() bool {
if e.state != taskExecutionState_invalid {
return false
}
ready := true
for _, dep := range e.dependencies {
if dep.state != taskExecutionState_done {
ready = false
}
}
return ready
}
// invalidate stops the task if running, resets the execution
// state for the task, then invalidates all dependents.
func (e *taskExecution) invalidate(executionCtx context.Context) {
if e.state == taskExecutionState_invalid || e.state == taskExecutionState_canceled {
return
}
e.cancel()
e.state = taskExecutionState_invalid
e.ctx, e.cancel = context.WithCancel(executionCtx)
<-e.terminalCh
e.terminalCh = make(chan struct{}, 1)
e.pendingInvalidations = make(map[InvalidationEvent]struct{})
}
// Invalidate marks a taskExecution as invalid. It does not produce
// side effects by itself.
func (e *taskExecution) Invalidate(event InvalidationEvent) bool {
e.mu.Lock()
defer e.mu.Unlock()
if e.state == taskExecutionState_invalid {
return false
}
if e.definition.ShouldInvalidate != nil && !e.definition.ShouldInvalidate(event) {
return false
}
e.pendingInvalidations[event] = struct{}{}
for _, dep := range e.dependents {
dep.Invalidate(DependencyChange{
Source: e.definition,
})
}
return true
}
type taskSet map[*Task]*taskExecution
func (s taskSet) add(executionCtx context.Context, task *Task) (*taskExecution, []*taskExecution) {
if s[task] != nil {
return s[task], s[task].dependencies
}
ctx, cancel := context.WithCancel(executionCtx)
self := &taskExecution{
definition: task,
ctx: ctx,
cancel: cancel,
state: taskExecutionState_invalid,
terminalCh: make(chan struct{}, 1),
pendingInvalidations: make(map[InvalidationEvent]struct{}),
}
var dependencies []*taskExecution
for _, dep := range task.Dependencies {
depExec, depExecDeps := s.add(ctx, dep)
depExec.dependents = append(depExec.dependents, self)
dependencies = append(dependencies, depExecDeps...)
dependencies = append(dependencies, depExec)
}
s[task] = self
self.dependencies = dependencies
return self, dependencies
}