-
Notifications
You must be signed in to change notification settings - Fork 0
/
schedule.go
66 lines (59 loc) · 1.5 KB
/
schedule.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package mapreduce
import "fmt"
// schedule starts and waits for all tasks in the given phase (Map or Reduce).
func (mr *Master) schedule(phase jobPhase) {
var ntasks int
var nios int // number of inputs (for reduce) or outputs (for map)
//var taskArr []int
switch phase {
case mapPhase:
ntasks = len(mr.files)
nios = mr.nReduce
case reducePhase:
ntasks = mr.nReduce
nios = len(mr.files)
}
task := make(chan int)
go func() {
for index := 0; index < ntasks; index++ {
task <- index
}
}()
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios)
for id := range task {
workerAddr := <-mr.registerChannel
var args DoTaskArgs
args.File = mr.files[id]
args.JobName = mr.jobName
args.NumOtherPhase = nios
args.Phase = phase
args.TaskNumber = id
go func() {
suc := call(workerAddr, "Worker.DoTask", &args, new(struct{}))
if suc {
if args.TaskNumber == (ntasks - 1) {
close(task)
}
mr.registerChannel <- workerAddr
} else {
task <- args.TaskNumber
}
}()
}
// drain the registerChannel
/**
if phase == reducePhase {
for range mr.workers {
<-mr.registerChannel
}
}
*/
// All ntasks tasks have to be scheduled on workers, and only once all of
// them have been completed successfully should the function return.
// Remember that workers may fail, and that any given worker may finish
// multiple tasks.
//
// TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO
//
fmt.Printf("Schedule: %v phase done\n", phase)
}