This repository has been archived by the owner on Sep 13, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
mapreduceda-gui-mo-ji-qun-shang-de-jian-dan-shu-ju-chu-li-fang-shi-mit-6824-lab-1.html
387 lines (341 loc) · 39.4 KB
/
mapreduceda-gui-mo-ji-qun-shang-de-jian-dan-shu-ju-chu-li-fang-shi-mit-6824-lab-1.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>MapReduce:大规模集群上的简单数据处理方式 && MIT 6.824 Lab 1</title>
<link href="/feeds/all.atom.xml" type="application/atom+xml" rel="alternate" title="华科美团点评技术俱乐部 Full Atom Feed" />
<link href="/feeds/fen-bu-shi.atom.xml" type="application/atom+xml" rel="alternate" title="华科美团点评技术俱乐部 Categories Atom Feed" />
<!-- Bootstrap Core CSS -->
<link href="/theme/css/bootstrap.min.css" rel="stylesheet">
<!-- Custom CSS -->
<link href="/theme/css/clean-blog.min.css" rel="stylesheet">
<!-- Code highlight color scheme -->
<link href="/theme/css/code_blocks/darkly.css" rel="stylesheet">
<!-- Custom Fonts -->
<link href="https://maxcdn.bootstrapcdn.com/font-awesome/4.5.0/css/font-awesome.min.css" rel="stylesheet" type="text/css">
<link href='https://fonts.googleapis.com/css?family=Lora:400,700,400italic,700italic' rel='stylesheet' type='text/css'>
<link href='https://fonts.googleapis.com/css?family=Open+Sans:300italic,400italic,600italic,700italic,800italic,400,300,600,700,800' rel='stylesheet' type='text/css'>
<!-- HTML5 Shim and Respond.js IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/libs/html5shiv/3.7.0/html5shiv.js"></script>
<script src="https://oss.maxcdn.com/libs/respond.js/1.4.2/respond.min.js"></script>
<![endif]-->
<meta name="description" content="MapReduce是Google公司的Jeffrey和Sanjay提出的一个编程模型,主要用于大规模数据集的并行运算。它吸收了函数式编程语言中的Map和Reduce,通过Map处理原始键值对生成中间键值对,通过Reduce合并相同中间键对应的值。这一特性符合很多现实生活中的...">
<meta name="author" content="Lichen Zhang">
<meta name="tags" content="分布式">
<meta name="tags" content="lab">
<meta property="og:locale" content="zh_CN.UTF-8">
<meta property="og:site_name" content="华科美团点评技术俱乐部">
<meta property="og:type" content="article">
<meta property="article:author" content="/author/lichen-zhang.html">
<meta property="og:url" content="/mapreduceda-gui-mo-ji-qun-shang-de-jian-dan-shu-ju-chu-li-fang-shi-mit-6824-lab-1.html">
<meta property="og:title" content="MapReduce:大规模集群上的简单数据处理方式 && MIT 6.824 Lab 1">
<meta property="article:published_time" content="2017-06-12 11:36:28+08:00">
<meta property="og:description" content="MapReduce是Google公司的Jeffrey和Sanjay提出的一个编程模型,主要用于大规模数据集的并行运算。它吸收了函数式编程语言中的Map和Reduce,通过Map处理原始键值对生成中间键值对,通过Reduce合并相同中间键对应的值。这一特性符合很多现实生活中的...">
<meta property="og:image" content="//images/bg.jpg">
</head>
<body>
<!-- Navigation -->
<nav class="navbar navbar-default navbar-custom navbar-fixed-top">
<div class="container-fluid">
<!-- Brand and toggle get grouped for better mobile display -->
<div class="navbar-header page-scroll">
<button type="button" class="navbar-toggle" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a class="navbar-brand" href="/">华科美团点评技术俱乐部</a>
</div>
<!-- Collect the nav links, forms, and other content for toggling -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav navbar-right">
<li><a href="/categories.html">分类</a></li>
<li><a href="/archives.html">归档</a></li>
<li><a href="/authors.html">作者</a></li>
<li><a href="/tags.html">标签</a></li>
<li><a href="/pages/about/index.html">关于</a></li>
<li><a href="/pages/friendlinks/index.html">友链</a></li>
</ul>
</div>
<!-- /.navbar-collapse -->
</div>
<!-- /.container -->
</nav>
<!-- Page Header -->
<header class="intro-header" style="background-image: url('/images/bg.jpg')">
<div class="container">
<div class="row">
<div class="col-lg-8 col-lg-offset-2 col-md-10 col-md-offset-1">
<div class="post-heading">
<h1>MapReduce:大规模集群上的简单数据处理方式 && MIT 6.824 Lab 1</h1>
<span class="meta">Posted by
<a href="/author/lichen-zhang.html">Lichen Zhang</a>
on 2017年 6月12日 周一
</span>
</div>
</div>
</div>
</div>
</header>
<!-- Main Content -->
<div class="container">
<div class="row">
<div class="col-lg-8 col-lg-offset-2 col-md-10 col-md-offset-1">
<!-- Post Content -->
<article>
<p>MapReduce是Google公司的Jeffrey和Sanjay提出的一个编程模型,主要用于大规模数据集的并行运算。它吸收了函数式编程语言中的Map和Reduce,通过Map处理原始键值对生成中间键值对,通过Reduce合并相同中间键对应的值。这一特性符合很多现实生活中的场景。</p>
<p>这种编程模型下的程序可以并行化地在大规模集群上运行,而在这一过程中用户却不需考虑输入数据的划分、程序的运行安排、节点故障和机器间通信,只需完成对数据的处理和合并。</p>
<p>我阅读了MapReduce的论文,并完成了MIT 6.824的第一个Lab,对其有了更深的了解。</p>
<!--more-->
<h2>过程总览</h2>
<p><img alt="overview" src="/images/Map-Reduce/overview.png"></p>
<ol>
<li>用户程序调用MapReduce库,先把输入文件划分为M份(M可由用户指定),每一份通常为16MB到64MB,如图左方所示分成了split0~4,然后将用户程序fork到集群内其它机器上。 </li>
<li>用户程序副本中有一个称为master,其余称为worker,master负责调度,为空闲worker分配任务(Map任务或者Reduce任务)。 </li>
<li>Map worker开始读取对应的输入数据,它们从输入数据中抽取出Key-Value Pair,每一个都作为参数传递给Map函数,Map函数产生的中间Key-Value Pair被缓存在内存中。 </li>
<li>缓存在内存中的中间Key-Value Pair会被定期写入本地磁盘,而且被分为R个区(R可由用户指定),这些中间Key-Value Pair的位置会被通报给master,master负责将信息通报给Reduce worker。 </li>
<li>master通报Reduce worker它负责的分区的位置,当Reduce worker把所有它负责的中间数据都读过来后,先对它们进行排序,使得相同Key的数据聚集在一起。若内部排序无法满足要求,则使用外部排序。</li>
<li>Reduce worker遍历排序后的中间Key-Value Pair,对于每个唯一的Key,都将Key与关联的Value传递给Reduce函数,Reduce函数产生的输出会添加到对应分区的输出文件中。</li>
<li>当所有的Map和Reduce作业都完成了,master唤醒用户程序,MapReduce函数返回。</li>
<li>MapReduce共产生R个输出文件(对应R个Reduce任务),用作分布式存储系统或其他分布式应用中。</li>
</ol>
<h2>容错机制</h2>
<h3>Worker错误</h3>
<p>当master定期发送的ping在某一时间段内没有到达某worker时,将该worker置为失效:</p>
<ol>
<li>若该worker任务为Map,则将该worker的所有任务置为空闲,并在分配任务时将其安排给其他的worker;当一个接替其任务的Map worker完成时,向所有Reduce worker发送通知,任何还未从失效worker读取数据的Reduce worker将从新worker读取数据;</li>
<li>若该worker任务为Reduce,则将该worker的正在运行的任务置为空闲,并在分配任务时将其安排给其他的worker;已完成的任务不做调整。</li>
</ol>
<h3>Master错误</h3>
<p>由于只有一个master,因此在发生错误时会返回主程序,由客户端确认状态。</p>
<h3>容错性保证</h3>
<p>当用户提供的Map和Reduce操作对输入为确定函数时,分布式实现的输出与顺序输出保持一致,这种一致性是通过对Map和Reduce操作的输出进行原子提交来保证的,即依赖于文件系统的原子性操作。</p>
<p>当用户提供的Map和Reduce操作对输入为不确定函数时,MapReduce系统也提供了很弱的处理机制(不再赘述)。</p>
<h2>优化技巧</h2>
<h3>存储位置</h3>
<p>利用GFS,对数据产生多个备份,在调用任务时,尽量从本地读取数据,避免网络调用占用带宽,降低速度。</p>
<h3>片段分配</h3>
<p>为了保证速度和准确性,需要有效分配片段,通常限制M,使每一份为16MB到64MB,而R为worker机器数量的小倍数。</p>
<h3>备用任务</h3>
<p>当一个MapReduce将要完成的时候,master启用备用进程,来执行还在执行的任务,以减少落后worker造成的影响。</p>
<h2>Lab 解析</h2>
<p>本Lab要求你补全一个基本完成的MapReduce程序,共有5个Part,其中Part I、II为Sequential MapReduce,Part III、IV为Sequential MapReduce,Part V为附加任务。程序整体难度不大,主要的难点在于熟悉Go语言。</p>
<h3>Part I: Map/Reduce input and output</h3>
<p>要求实现两个模板化的函数<code>doMap</code>和<code>doReduce</code>,按照注释所给步骤以及论文相关步骤编写即可。</p>
<div class="highlight"><pre><span></span><span class="kd">func</span> <span class="nx">doMap</span><span class="p">(</span>
<span class="nx">jobName</span> <span class="kt">string</span><span class="p">,</span>
<span class="nx">mapTaskNumber</span> <span class="kt">int</span><span class="p">,</span>
<span class="nx">inFile</span> <span class="kt">string</span><span class="p">,</span>
<span class="nx">nReduce</span> <span class="kt">int</span><span class="p">,</span>
<span class="nx">mapF</span> <span class="kd">func</span><span class="p">(</span><span class="nx">file</span> <span class="kt">string</span><span class="p">,</span> <span class="nx">contents</span> <span class="kt">string</span><span class="p">)</span> <span class="p">[]</span><span class="nx">KeyValue</span><span class="p">,</span>
<span class="p">)</span> <span class="p">{</span>
<span class="nx">inContent</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">ioutil</span><span class="p">.</span><span class="nx">ReadFile</span><span class="p">(</span><span class="nx">inFile</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="nx">fmt</span><span class="p">.</span><span class="nx">Fprintln</span><span class="p">(</span><span class="nx">os</span><span class="p">.</span><span class="nx">Stderr</span><span class="p">,</span> <span class="nx">err</span><span class="p">)</span>
<span class="k">return</span>
<span class="p">}</span>
<span class="nx">keyValue</span> <span class="o">:=</span> <span class="nx">mapF</span><span class="p">(</span><span class="nx">inFile</span><span class="p">,</span> <span class="nb">string</span><span class="p">(</span><span class="nx">inContent</span><span class="p">))</span>
<span class="nx">partitions</span> <span class="o">:=</span> <span class="nb">make</span><span class="p">([]</span><span class="o">*</span><span class="nx">json</span><span class="p">.</span><span class="nx">Encoder</span><span class="p">,</span> <span class="nx">nReduce</span><span class="p">,</span> <span class="nx">nReduce</span><span class="p">)</span>
<span class="k">for</span> <span class="nx">id</span> <span class="o">:=</span> <span class="mi">0</span><span class="p">;</span> <span class="nx">id</span> <span class="p"><</span> <span class="nx">nReduce</span><span class="p">;</span> <span class="nx">id</span><span class="o">++</span> <span class="p">{</span>
<span class="nx">handler</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">os</span><span class="p">.</span><span class="nx">OpenFile</span><span class="p">(</span><span class="nx">reduceName</span><span class="p">(</span><span class="nx">jobName</span><span class="p">,</span> <span class="nx">mapTaskNumber</span><span class="p">,</span> <span class="nx">id</span><span class="p">),</span> <span class="nx">os</span><span class="p">.</span><span class="nx">O_CREATE</span><span class="p">|</span><span class="nx">os</span><span class="p">.</span><span class="nx">O_WRONLY</span><span class="p">,</span> <span class="mo">0644</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="nx">fmt</span><span class="p">.</span><span class="nx">Fprintln</span><span class="p">(</span><span class="nx">os</span><span class="p">.</span><span class="nx">Stderr</span><span class="p">,</span> <span class="nx">err</span><span class="p">)</span>
<span class="k">return</span>
<span class="p">}</span>
<span class="k">defer</span> <span class="nx">handler</span><span class="p">.</span><span class="nx">Close</span><span class="p">()</span>
<span class="nx">partitions</span><span class="p">[</span><span class="nx">id</span><span class="p">]</span> <span class="p">=</span> <span class="nx">json</span><span class="p">.</span><span class="nx">NewEncoder</span><span class="p">(</span><span class="nx">handler</span><span class="p">)</span>
<span class="p">}</span>
<span class="k">for</span> <span class="nx">_</span><span class="p">,</span> <span class="nx">keyValueSingle</span> <span class="o">:=</span> <span class="k">range</span> <span class="nx">keyValue</span> <span class="p">{</span>
<span class="nx">_</span> <span class="p">=</span> <span class="nx">partitions</span><span class="p">[</span><span class="nx">ihash</span><span class="p">(</span><span class="nx">keyValueSingle</span><span class="p">.</span><span class="nx">Key</span><span class="p">)</span><span class="o">%</span><span class="nx">nReduce</span><span class="p">].</span><span class="nx">Encode</span><span class="p">(</span><span class="o">&</span><span class="nx">keyValueSingle</span><span class="p">)</span>
<span class="p">}</span>
<span class="p">}</span>
</pre></div>
<div class="highlight"><pre><span></span><span class="kd">func</span> <span class="nx">doReduce</span><span class="p">(</span>
<span class="nx">jobName</span> <span class="kt">string</span><span class="p">,</span>
<span class="nx">reduceTaskNumber</span> <span class="kt">int</span><span class="p">,</span>
<span class="nx">outFile</span> <span class="kt">string</span><span class="p">,</span>
<span class="nx">nMap</span> <span class="kt">int</span><span class="p">,</span>
<span class="nx">reduceF</span> <span class="kd">func</span><span class="p">(</span><span class="nx">key</span> <span class="kt">string</span><span class="p">,</span> <span class="nx">values</span> <span class="p">[]</span><span class="kt">string</span><span class="p">)</span> <span class="kt">string</span><span class="p">,</span>
<span class="p">)</span> <span class="p">{</span>
<span class="nx">midContentBuf</span> <span class="o">:=</span> <span class="nx">bytes</span><span class="p">.</span><span class="nx">NewBuffer</span><span class="p">(</span><span class="kc">nil</span><span class="p">)</span>
<span class="k">for</span> <span class="nx">maps</span> <span class="o">:=</span> <span class="mi">0</span><span class="p">;</span> <span class="nx">maps</span> <span class="p"><</span> <span class="nx">nMap</span><span class="p">;</span> <span class="nx">maps</span><span class="o">++</span> <span class="p">{</span>
<span class="nx">f</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">os</span><span class="p">.</span><span class="nx">Open</span><span class="p">(</span><span class="nx">reduceName</span><span class="p">(</span><span class="nx">jobName</span><span class="p">,</span> <span class="nx">maps</span><span class="p">,</span> <span class="nx">reduceTaskNumber</span><span class="p">))</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="nx">fmt</span><span class="p">.</span><span class="nx">Fprintln</span><span class="p">(</span><span class="nx">os</span><span class="p">.</span><span class="nx">Stderr</span><span class="p">,</span> <span class="nx">err</span><span class="p">)</span>
<span class="k">return</span>
<span class="p">}</span>
<span class="nx">io</span><span class="p">.</span><span class="nx">Copy</span><span class="p">(</span><span class="nx">midContentBuf</span><span class="p">,</span> <span class="nx">f</span><span class="p">)</span>
<span class="nx">f</span><span class="p">.</span><span class="nx">Close</span><span class="p">()</span>
<span class="p">}</span>
<span class="nx">decoder</span> <span class="o">:=</span> <span class="nx">json</span><span class="p">.</span><span class="nx">NewDecoder</span><span class="p">(</span><span class="nx">bytes</span><span class="p">.</span><span class="nx">NewReader</span><span class="p">(</span><span class="nx">midContentBuf</span><span class="p">.</span><span class="nx">Bytes</span><span class="p">()))</span>
<span class="kd">var</span> <span class="nx">kv</span> <span class="nx">KeyValue</span>
<span class="nx">keyValueMap</span> <span class="o">:=</span> <span class="nb">make</span><span class="p">(</span><span class="kd">map</span><span class="p">[</span><span class="kt">string</span><span class="p">][]</span><span class="kt">string</span><span class="p">)</span>
<span class="k">for</span> <span class="p">{</span>
<span class="nx">err</span> <span class="o">:=</span> <span class="nx">decoder</span><span class="p">.</span><span class="nx">Decode</span><span class="p">(</span><span class="o">&</span><span class="nx">kv</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">==</span> <span class="nx">io</span><span class="p">.</span><span class="nx">EOF</span> <span class="p">{</span>
<span class="k">break</span>
<span class="p">}</span>
<span class="nx">keyValueMap</span><span class="p">[</span><span class="nx">kv</span><span class="p">.</span><span class="nx">Key</span><span class="p">]</span> <span class="p">=</span> <span class="nb">append</span><span class="p">(</span><span class="nx">keyValueMap</span><span class="p">[</span><span class="nx">kv</span><span class="p">.</span><span class="nx">Key</span><span class="p">],</span> <span class="nx">kv</span><span class="p">.</span><span class="nx">Value</span><span class="p">)</span>
<span class="p">}</span>
<span class="nx">keys</span> <span class="o">:=</span> <span class="p">[]</span><span class="kt">string</span><span class="p">{}</span>
<span class="k">for</span> <span class="nx">keyValueSingle</span> <span class="o">:=</span> <span class="k">range</span> <span class="nx">keyValueMap</span> <span class="p">{</span>
<span class="nx">keys</span> <span class="p">=</span> <span class="nb">append</span><span class="p">(</span><span class="nx">keys</span><span class="p">,</span> <span class="nx">keyValueSingle</span><span class="p">)</span>
<span class="p">}</span>
<span class="nx">sort</span><span class="p">.</span><span class="nx">Strings</span><span class="p">(</span><span class="nx">keys</span><span class="p">)</span>
<span class="nx">answerFileName</span> <span class="o">:=</span> <span class="nx">mergeName</span><span class="p">(</span><span class="nx">jobName</span><span class="p">,</span> <span class="nx">reduceTaskNumber</span><span class="p">)</span>
<span class="nx">answerFile</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">os</span><span class="p">.</span><span class="nx">OpenFile</span><span class="p">(</span><span class="nx">answerFileName</span><span class="p">,</span> <span class="nx">os</span><span class="p">.</span><span class="nx">O_CREATE</span><span class="p">|</span><span class="nx">os</span><span class="p">.</span><span class="nx">O_WRONLY</span><span class="p">,</span> <span class="mo">0644</span><span class="p">)</span>
<span class="k">defer</span> <span class="nx">answerFile</span><span class="p">.</span><span class="nx">Close</span><span class="p">()</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="nx">fmt</span><span class="p">.</span><span class="nx">Fprintln</span><span class="p">(</span><span class="nx">os</span><span class="p">.</span><span class="nx">Stderr</span><span class="p">,</span> <span class="nx">err</span><span class="p">)</span>
<span class="k">return</span>
<span class="p">}</span>
<span class="nx">encoder</span> <span class="o">:=</span> <span class="nx">json</span><span class="p">.</span><span class="nx">NewEncoder</span><span class="p">(</span><span class="nx">answerFile</span><span class="p">)</span>
<span class="k">for</span> <span class="nx">_</span><span class="p">,</span> <span class="nx">key</span> <span class="o">:=</span> <span class="k">range</span> <span class="nx">keys</span> <span class="p">{</span>
<span class="nx">encoder</span><span class="p">.</span><span class="nx">Encode</span><span class="p">(</span><span class="nx">KeyValue</span><span class="p">{</span><span class="nx">key</span><span class="p">,</span> <span class="nx">reduceF</span><span class="p">(</span><span class="nx">key</span><span class="p">,</span> <span class="nx">keyValueMap</span><span class="p">[</span><span class="nx">key</span><span class="p">])})</span>
<span class="p">}</span>
<span class="p">}</span>
</pre></div>
<h3>Part II: Single-worker word count</h3>
<p>要求实现wordcount,闭着眼睛乱写可以通过测试。</p>
<div class="highlight"><pre><span></span><span class="kd">func</span> <span class="nx">mapF</span><span class="p">(</span><span class="nx">filename</span> <span class="kt">string</span><span class="p">,</span> <span class="nx">contents</span> <span class="kt">string</span><span class="p">)</span> <span class="p">[]</span><span class="nx">mapreduce</span><span class="p">.</span><span class="nx">KeyValue</span> <span class="p">{</span>
<span class="nx">keyValues</span> <span class="o">:=</span> <span class="nb">make</span><span class="p">([]</span><span class="nx">mapreduce</span><span class="p">.</span><span class="nx">KeyValue</span><span class="p">,</span> <span class="mi">0</span><span class="p">,</span> <span class="mi">0</span><span class="p">)</span>
<span class="nx">f</span> <span class="o">:=</span> <span class="kd">func</span><span class="p">(</span><span class="nx">c</span> <span class="kt">rune</span><span class="p">)</span> <span class="kt">bool</span> <span class="p">{</span>
<span class="k">return</span> <span class="p">!</span><span class="nx">unicode</span><span class="p">.</span><span class="nx">IsLetter</span><span class="p">(</span><span class="nx">c</span><span class="p">)</span>
<span class="p">}</span>
<span class="nx">fields</span> <span class="o">:=</span> <span class="nx">strings</span><span class="p">.</span><span class="nx">FieldsFunc</span><span class="p">(</span><span class="nx">contents</span><span class="p">,</span> <span class="nx">f</span><span class="p">)</span>
<span class="k">for</span> <span class="nx">_</span><span class="p">,</span> <span class="nx">each</span> <span class="o">:=</span> <span class="k">range</span> <span class="nx">fields</span> <span class="p">{</span>
<span class="nx">keyValues</span> <span class="p">=</span> <span class="nb">append</span><span class="p">(</span><span class="nx">keyValues</span><span class="p">,</span> <span class="nx">mapreduce</span><span class="p">.</span><span class="nx">KeyValue</span><span class="p">{</span><span class="nx">each</span><span class="p">,</span> <span class="s">""</span><span class="p">})</span>
<span class="p">}</span>
<span class="k">return</span> <span class="nx">keyValues</span>
<span class="p">}</span>
<span class="kd">func</span> <span class="nx">reduceF</span><span class="p">(</span><span class="nx">key</span> <span class="kt">string</span><span class="p">,</span> <span class="nx">values</span> <span class="p">[]</span><span class="kt">string</span><span class="p">)</span> <span class="kt">string</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">strconv</span><span class="p">.</span><span class="nx">Itoa</span><span class="p">(</span><span class="nb">len</span><span class="p">(</span><span class="nx">values</span><span class="p">))</span>
<span class="p">}</span>
</pre></div>
<h3>Part III: Distributing MapReduce tasks</h3>
<p>要求实现给worker分配任务的<code>schedule</code>函数,worker地址是通过<code>registerChannel</code>获取的,在<code>schedule</code>中会启动n个<code>goroutine</code>,每个都从<code>registerChannel</code>中获取worker地址,然后进行<code>RPC</code>调用,成功后,再放回到<code>registerChannel</code>中。这里首次使用了<code>channel</code>和<code>goroutine</code>等Go语言的高级特性,需要好好学习。</p>
<h3>Part IV: Handling worker failures</h3>
<p>在Part 3的基础上,要求实现worker的容错机制,这里只需要简单地不把无法到达的worker加入<code>registerChannel</code>中。</p>
<div class="highlight"><pre><span></span><span class="kd">func</span> <span class="nx">schedule</span><span class="p">(</span><span class="nx">jobName</span> <span class="kt">string</span><span class="p">,</span> <span class="nx">mapFiles</span> <span class="p">[]</span><span class="kt">string</span><span class="p">,</span> <span class="nx">nReduce</span> <span class="kt">int</span><span class="p">,</span> <span class="nx">phase</span> <span class="nx">jobPhase</span><span class="p">,</span> <span class="nx">registerChan</span> <span class="kd">chan</span> <span class="kt">string</span><span class="p">)</span> <span class="p">{</span>
<span class="kd">var</span> <span class="nx">ntasks</span> <span class="kt">int</span>
<span class="kd">var</span> <span class="nx">n_other</span> <span class="kt">int</span> <span class="c1">// number of inputs (for reduce) or outputs (for map)</span>
<span class="k">switch</span> <span class="nx">phase</span> <span class="p">{</span>
<span class="k">case</span> <span class="nx">mapPhase</span><span class="p">:</span>
<span class="nx">ntasks</span> <span class="p">=</span> <span class="nb">len</span><span class="p">(</span><span class="nx">mapFiles</span><span class="p">)</span>
<span class="nx">n_other</span> <span class="p">=</span> <span class="nx">nReduce</span>
<span class="k">case</span> <span class="nx">reducePhase</span><span class="p">:</span>
<span class="nx">ntasks</span> <span class="p">=</span> <span class="nx">nReduce</span>
<span class="nx">n_other</span> <span class="p">=</span> <span class="nb">len</span><span class="p">(</span><span class="nx">mapFiles</span><span class="p">)</span>
<span class="p">}</span>
<span class="nx">fmt</span><span class="p">.</span><span class="nx">Printf</span><span class="p">(</span><span class="s">"Schedule: %v %v tasks (%d I/Os)\n"</span><span class="p">,</span> <span class="nx">ntasks</span><span class="p">,</span> <span class="nx">phase</span><span class="p">,</span> <span class="nx">n_other</span><span class="p">)</span>
<span class="kd">var</span> <span class="nx">waitGroup</span> <span class="nx">sync</span><span class="p">.</span><span class="nx">WaitGroup</span>
<span class="k">for</span> <span class="nx">task</span> <span class="o">:=</span> <span class="mi">0</span><span class="p">;</span> <span class="nx">task</span> <span class="p"><</span> <span class="nx">ntasks</span><span class="p">;</span> <span class="nx">task</span><span class="o">++</span> <span class="p">{</span>
<span class="nx">waitGroup</span><span class="p">.</span><span class="nx">Add</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span>
<span class="nx">worker</span> <span class="o">:=</span> <span class="o"><-</span><span class="nx">registerChan</span>
<span class="k">go</span> <span class="kd">func</span><span class="p">(</span><span class="nx">id</span> <span class="kt">int</span><span class="p">,</span> <span class="nx">worker</span> <span class="kt">string</span><span class="p">)</span> <span class="p">{</span>
<span class="k">defer</span> <span class="nx">waitGroup</span><span class="p">.</span><span class="nx">Done</span><span class="p">()</span>
<span class="nx">args</span> <span class="o">:=</span> <span class="nx">DoTaskArgs</span><span class="p">{</span><span class="nx">jobName</span><span class="p">,</span> <span class="nx">mapFiles</span><span class="p">[</span><span class="nx">id</span><span class="p">],</span> <span class="nx">phase</span><span class="p">,</span> <span class="nx">id</span><span class="p">,</span> <span class="nx">n_other</span><span class="p">}</span>
<span class="nx">ok</span> <span class="o">:=</span> <span class="nx">call</span><span class="p">(</span><span class="nx">worker</span><span class="p">,</span> <span class="s">"Worker.DoTask"</span><span class="p">,</span> <span class="nx">args</span><span class="p">,</span> <span class="kc">nil</span><span class="p">)</span>
<span class="k">if</span> <span class="p">!</span><span class="nx">ok</span> <span class="p">{</span>
<span class="k">for</span> <span class="p">!</span><span class="nx">ok</span> <span class="p">{</span>
<span class="nx">worker</span> <span class="o">:=</span> <span class="o"><-</span><span class="nx">registerChan</span>
<span class="nx">ok</span> <span class="p">=</span> <span class="nx">call</span><span class="p">(</span><span class="nx">worker</span><span class="p">,</span> <span class="s">"Worker.DoTask"</span><span class="p">,</span> <span class="nx">args</span><span class="p">,</span> <span class="kc">nil</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">ok</span> <span class="p">{</span>
<span class="k">go</span> <span class="kd">func</span><span class="p">()</span> <span class="p">{</span>
<span class="nx">registerChan</span> <span class="o"><-</span> <span class="nx">worker</span>
<span class="p">}()</span>
<span class="k">break</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="p">}</span> <span class="k">else</span> <span class="p">{</span>
<span class="k">go</span> <span class="kd">func</span><span class="p">()</span> <span class="p">{</span>
<span class="nx">registerChan</span> <span class="o"><-</span> <span class="nx">worker</span>
<span class="p">}()</span>
<span class="p">}</span>
<span class="p">}(</span><span class="nx">task</span><span class="p">,</span> <span class="nx">worker</span><span class="p">)</span>
<span class="p">}</span>
<span class="nx">waitGroup</span><span class="p">.</span><span class="nx">Wait</span><span class="p">()</span>
<span class="p">}</span>
</pre></div>
<h3>Part V: Inverted index generation (optional for extra credit)</h3>
<p>要求实现一个倒排索引,难度不大。</p>
<div class="highlight"><pre><span></span><span class="kd">func</span> <span class="nx">mapF</span><span class="p">(</span><span class="nx">document</span> <span class="kt">string</span><span class="p">,</span> <span class="nx">value</span> <span class="kt">string</span><span class="p">)</span> <span class="p">(</span><span class="nx">res</span> <span class="p">[]</span><span class="nx">mapreduce</span><span class="p">.</span><span class="nx">KeyValue</span><span class="p">)</span> <span class="p">{</span>
<span class="nx">keyValues</span> <span class="o">:=</span> <span class="nb">make</span><span class="p">([]</span><span class="nx">mapreduce</span><span class="p">.</span><span class="nx">KeyValue</span><span class="p">,</span> <span class="mi">0</span><span class="p">,</span> <span class="mi">0</span><span class="p">)</span>
<span class="nx">f</span> <span class="o">:=</span> <span class="kd">func</span><span class="p">(</span><span class="nx">c</span> <span class="kt">rune</span><span class="p">)</span> <span class="kt">bool</span> <span class="p">{</span>
<span class="k">return</span> <span class="p">!</span><span class="nx">unicode</span><span class="p">.</span><span class="nx">IsLetter</span><span class="p">(</span><span class="nx">c</span><span class="p">)</span>
<span class="p">}</span>
<span class="nx">fields</span> <span class="o">:=</span> <span class="nx">strings</span><span class="p">.</span><span class="nx">FieldsFunc</span><span class="p">(</span><span class="nx">value</span><span class="p">,</span> <span class="nx">f</span><span class="p">)</span>
<span class="nx">sort</span><span class="p">.</span><span class="nx">Strings</span><span class="p">(</span><span class="nx">fields</span><span class="p">)</span>
<span class="k">for</span> <span class="nx">_</span><span class="p">,</span> <span class="nx">each</span> <span class="o">:=</span> <span class="k">range</span> <span class="nx">fields</span> <span class="p">{</span>
<span class="nx">keyValues</span> <span class="p">=</span> <span class="nb">append</span><span class="p">(</span><span class="nx">keyValues</span><span class="p">,</span> <span class="nx">mapreduce</span><span class="p">.</span><span class="nx">KeyValue</span><span class="p">{</span><span class="nx">each</span><span class="p">,</span> <span class="nx">document</span><span class="p">})</span>
<span class="p">}</span>
<span class="k">return</span> <span class="nx">keyValues</span>
<span class="p">}</span>
<span class="kd">func</span> <span class="nx">reduceF</span><span class="p">(</span><span class="nx">key</span> <span class="kt">string</span><span class="p">,</span> <span class="nx">values</span> <span class="p">[]</span><span class="kt">string</span><span class="p">)</span> <span class="kt">string</span> <span class="p">{</span>
<span class="nx">uniq</span> <span class="o">:=</span> <span class="nb">make</span><span class="p">(</span><span class="kd">map</span><span class="p">[</span><span class="kt">string</span><span class="p">]</span><span class="kt">int</span><span class="p">)</span>
<span class="k">for</span> <span class="nx">_</span><span class="p">,</span> <span class="nx">value</span> <span class="o">:=</span> <span class="k">range</span> <span class="nx">values</span> <span class="p">{</span>
<span class="nx">uniq</span><span class="p">[</span><span class="nx">value</span><span class="p">]</span> <span class="p">=</span> <span class="mi">1</span>
<span class="p">}</span>
<span class="nx">answer</span> <span class="o">:=</span> <span class="nx">strconv</span><span class="p">.</span><span class="nx">Itoa</span><span class="p">(</span><span class="nb">len</span><span class="p">(</span><span class="nx">uniq</span><span class="p">))</span>
<span class="nx">name</span> <span class="o">:=</span> <span class="nb">make</span><span class="p">([]</span><span class="kt">string</span><span class="p">,</span> <span class="mi">0</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="nx">uniq</span><span class="p">))</span>
<span class="k">for</span> <span class="nx">key</span><span class="p">,</span> <span class="nx">_</span> <span class="o">:=</span> <span class="k">range</span> <span class="nx">uniq</span> <span class="p">{</span>
<span class="nx">name</span> <span class="p">=</span> <span class="nb">append</span><span class="p">(</span><span class="nx">name</span><span class="p">,</span> <span class="nx">key</span><span class="p">)</span>
<span class="p">}</span>
<span class="nx">answer</span> <span class="o">+=</span> <span class="s">" "</span>
<span class="nx">sort</span><span class="p">.</span><span class="nx">Strings</span><span class="p">(</span><span class="nx">name</span><span class="p">)</span>
<span class="nx">answer</span> <span class="o">+=</span> <span class="nx">strings</span><span class="p">.</span><span class="nx">Join</span><span class="p">(</span><span class="nx">name</span><span class="p">,</span> <span class="s">","</span><span class="p">)</span>
<span class="k">return</span> <span class="nx">answer</span>
<span class="p">}</span>
</pre></div>
</article>
<div class="tags">
<p>tags: <a href="/tag/fen-bu-shi.html">分布式</a>, <a href="/tag/lab.html">lab</a></p>
</div>
<hr>
</div>
</div>
</div>
<hr>
<!-- Footer -->
<footer>
<div class="container">
<div class="row">
<div class="col-lg-8 col-lg-offset-2 col-md-10 col-md-offset-1">
<ul class="list-inline text-center">
<li>
<a href="https://github.com/HUSTMeituanClub">
<span class="fa-stack fa-lg">
<i class="fa fa-circle fa-stack-2x"></i>
<i class="fa fa-github fa-stack-1x fa-inverse"></i>
</span>
</a>
</li>
<li>
<a href="mailto:@hustmeituan.club">
<span class="fa-stack fa-lg">
<i class="fa fa-circle fa-stack-2x"></i>
<i class="fa fa-envelope fa-stack-1x fa-inverse"></i>
</span>
</a>
</li>
</ul>
<p class="copyright text-muted">
Blog powered by <a href="http://getpelican.com">Pelican</a>,
which takes great advantage of <a href="http://python.org">Python</a>.
</p> </div>
</div>
</div>
</footer>
<!-- jQuery -->
<script src="/theme/js/jquery.min.js"></script>
<!-- Bootstrap Core JavaScript -->
<script src="/theme/js/bootstrap.min.js"></script>
<!-- Custom Theme JavaScript -->
<script src="/theme/js/clean-blog.min.js"></script>
</body>
</html>