本文首发于 http://www.YoungZY.com/
写在前面:这篇实验课的说明(原版)和代码前前后后看了很多遍,还是一团糨糊,不知从何入手。看到后面,又忘记了前面。于是想把它翻译一遍,希望能把题理解得更透彻点。果然,翻译过程中发现几个之前忽略的点。
原文地址:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
简介
在这节实验课里你将构建一个MapReduce系统。你需要实现一个工作程序(worker process)和一个调度程序(coordinator process)。工作程序用来调用Map和Reduce函数,并处理文件的读取和写入。调度程序用来协调工作任务并处理失败的任务。你将构建出跟 这篇MapReduce论文 里描述的类似的东西。(注意:本实验中用”coordinator”替代里论文中的”master”。)
开始
首先需要搭配好Go的开发环境,无需赘述。 接着可以通过git来获取本实验所需的初始代码:
git clone git://g.csail.mit.edu/6.824-golabs-2022 6.824
我们在src/main/mrsequential.go
提供了一个简单的MapReduce实现。它在同一个进程里执行里Map函数和Reduce函数。我们也提供了一些MapReduce的应用程序:单词计数器(mrapps/wc.go
)和文本检索器(mrapps/indexer.go
)。你可以通过以下命令来执行单词计数器程序:
cd 6.824
cd src/main
go build -race -buildmode=plugin ../mrapps/wc.go
go run -race mrsequential.go wc.so pg*.txt
more mr-out-0
注意:-race
选项打开了Go语言的竞争探测器。我们建议你在开发和调试6.824实验代码时都加上这个选项。我们在评分时不会使用这个选项。尽管如此,如果你的代码里存在竟态,我们测试时不用竟态探测器它也会失败。
mrsequential.go
将其输出保存在mr-out-0
文件中。输入来自pg
开头的txt文件。
可以随意从mrsequential.go
中借用代码。你也应该打开mrapps/wc.go
看看MapReduce的应用程序是如何实现的。
你的任务(难度:中等偏上)
你的任务是实现一个分布式的MapReduce应用,包含两个程序,调度程序(coordinator)和工作程序(worker)。程序运行时将有一个调度进程和一个或多个并行的工作进程。在现实世界中,工作进程会运行在不同的机器上,但本实验中它们将运行在同一台机器上。工作进程通过RPC和调度进程通信。每一个工作进程都会向调度进程请求一个任务,从一个或多个文件中读取任务需要的数据,执行这个任务,然后将输出写入一个或多个文件。如果一个工作进程没能在一个合理的时间(本实验设置为10s)内完成任务,调度进程应该能发现,并把相同的任务交给另一个工作进程。
我们已经给出了一些代码。调度进程和工作进程的入口分别在main/mrcoordinator.go
和 main/mrworker.go
中,不要修改它们。请在mr/coordinator.go
、mr/worker.go
, 和 mr/rpc.go
中完成你的实现。
如何运行你的代码呢? 首先,确保单词计数插件已构建(在main
目录下执行):
go build -race -buildmode=plugin ../mrapps/wc.go
运行调度进程
go run -race mrcoordinator.go pg-*.txt
pg
开头的txt文件是mrcoordinator.go
程序的输入参数,每个文件后面都有一个分隔符,并且是一个Map任务的入参。 在一个或多个命令窗口执行工作进程:
go run -race mrworker.go wc.so
当工作进程和调度进程结束时,在mr-out-
开头的文件中查看执行结果。如果你完成了代码,将结果整合并排序,应该和顺序执行得到同样的结果:
cat mr-out* | sort | more
我们提供了一个测试脚本,main/test-mr.sh
。这些测试检查了单词计数器和文本检索器在输入pg
开头的文件时是否产生了正确的输出。这些测试还检测了你实现的Map和Reduce任务是否是并行的,以及有任务崩溃时能否恢复。
如果你现在执行这个脚本,它会挂起,因为调度进程永远不会结束:
bash test-mr.sh
你可以修改mr/coordinator.go
中的Done()
函数,把ret:=false
改成ret:=true
,这样的话,调度进程会立即结束。再执行上面的脚本,你会发现测试失败了。
这个测试脚本希望看到以mr-out-x
命名的文件,它们是由Reduce任务产生的。而mr/coordinator.go
和mr/worker.go
空的实现不会产生这些文件,所以测试失败了。
当你完成了代码,脚本的执行结果应该是这样的:
$ bash test-mr.sh
*** Starting wc test.
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting crash test.
--- crash test: PASS
*** PASSED ALL TESTS
$
你会看到RPC相关的错误。忽略它们。将调度进程注册为RPC服务时会检查它的所有方法是否满足RPC(有3个入参)。我们知道,Done()
方法不会通过RPC调用。
规则
- Map阶段应该将产生的键值对分成
nReduce
份,对应nReduce
个Reduce任务,也是main/mrcoordinator.go
调用MakeCoordinator()
函数传入的参数。所以,每一个Map任务都会产生nReduce
个中间文件供Reduce任务使用。 - 将第X个Reduce任务产生的输出写入
mr-out-X
文件。 mr-out-X
文件的每一行包含一个Reduce函数的输出。将键值对以Go语言的%v %v
格式输出。参考一下main/mrsequential.go
里的格式。如果不按这个格式输出,测试脚本将会失败。- 你可以修改
mr/coordinator.go
、mr/worker.go
, 和mr/rpc.go
文件。为了测试,也可以修改其他文件,但是要确保你的程序在原始的框架下能运行。我们会在原始的框架下进行测试。 - 工作进程应该将Map函数产生的中间数据放到当前目录下的文件里,之后Reduce任务可以从这里读取文件作为其输入。
main/mrcoordinator.go
期望mr/coordinator.go
实现一个Done()
方法,当MapReduce任务完成时返回true
,这时main/mrcoordinator.go
程序就会结束。- 当一个任务完成时,这个工作进程就应该结束。一个简单的实现就是使用
call()
函数的返回值:如果一个工作进程未能与调度进程连接上,那就可以假定调度进程因为任务完成而退出了,所以工作进程也就可以结束了。这取决于你的设计,你可能发现让调度进程给工作进程一个”请退出”的任务也是可以的。
提示
- 这里 有一些关于开发和调试的建议。
- 可以像这样开始:修改
mr/worker.go
的Worker()
函数,通过RPC向调度进程请求一个任务。然后修改调度进程,返回一个尚未开始的Map任务的文件名。再然后修改工作进程去读取这个文件,并调用Map函数,就像main/mrsequential.go
里一样。 - Map和Reduce函数通过Go语言的
plugin
包从.so
结尾的文件中实时加载。 - 如果你改变了
mr/
目录下的内容,就需要重新构建MapReduce的插件,就像这样:go build -race -buildmode=plugin ../mrapps/wc.go
- 这个实验里,所有工作进程共享一个文件系统。当所有工作进程都运行在同一台机器上时,这是简单的,但如果运行在不同机器上,就需要一个像GFS一样的全局化的文件系统。
mr-X-Y
是一个合理的给中间产物命名的方式,其中X是Map任务的ID,Y是Reduce任务的ID。- 工作进程的Map任务需要将中间产物以键值对的形式存入文件,在Reduce任务中还要能正确地读取。Go语言中的
encoding/json
包是个不错的选择。将键值对以JSON的格式写入文件中:enc:=json.NewEncoder(file) for _, kv := ... { err := enc.Encode(&kv)
然后从文件中读取:
dec := json.NewDecoder(file) for { var kv KeyValue if err:=dec.Decode(&kv);err!=nil{ break } kva = append(kva, kv) }
- 工作进程的Map部分可以通过
mr/worker.go
中的ihash(key)
函数给key值选择Reduce任务。 - 你可以从
main/mrsequential.go
中借鉴一些代码,在Map任务读取文件的时候,在Map/Reduce过程中存储中间键值对的时候,在给Reduce的输出排序的时候。 - 调度进程,作为RPC服务器,是并发的。所以不要忘记给共享数据上锁。
- 使用Go的竟态探测器,
go build -race
和go run -race
。test-mr.sh
默认也是使用竟态探测器进行测试的。 - 工作进程是需要等待的,例如Reduce任务在Map任务全部完成之前是不能启动的。一种方案是工作进程周期性地向调度进程请求任务,在每个请求间休眠一段时间(
time.Sleep()
)。另一种方案是在调度进程的RPC处理器使用等待循环(time.Sleep()
或者sync.Cond
都可以)。Go程序将RPC的处理程序运行在自己的线程里,所以事实上一个处在等待中的RPC处理程序不会影响其他RPC的处理进程。 - 调度进程无法准确地分辨出哪些工作进程是死机了,哪些是运行着但由于某些原因瘫痪了,哪些是执行着但是太慢了而起不了作用。你能做的就是让调度进程等待一段时间,如果还没有结果,就放弃它,将任务重新分配给另一个工作进程。在这个实验中,让调度进程等待10s,超出这个时间,调度进程就可以认为这个工作进程已经死了。
- 如果你要实现备份任务(论文3.6节),要注意我们测试你的代码时,如果工作进程没有崩溃就不会执行其他不相关的任务。备份任务只有在相对长时间之后才会被触发(例如10s)。
- 为了测试崩溃与恢复,你可以使用
mrapps/crash.go
插件。它在Map和Reduce函数中随机地退出。 - 为了确保没人能发现因程序崩溃而产生的不完整的文件,在MapReduce的论文中提到了一个小技巧,即先写入临时文件,等它完成后再重命名。你可以用
ioutil.TempFile
创建临时文件,用os.Rename
重命名。 test-mr.sh
在它的子目录mr-tmp
中执行它所有的进程,所以如果出错了,可以在那里找到相关输出文件。你可以临时修改test-mr.sh
文件,让它失败时就退出,这样就不会继续往下测试(覆盖之前的输入内容)。test-mr-many.sh
提供了一个执行test-mr.sh
的测试框架(我们就是用这个测试你的代码)。它需要一个参数来指定测试执行的次数。你不能并行地运行多个test-mr.sh
测试,因为调度进程会重复使用相同的socket,这会引发冲突。- Go程序中RPC只会发送大写字母开头的字段。内置结构也必须大写字母开头。
- 向RPC系统的应答结构传递指针时,这个
*reply
应该指向一个空对象。RPC调用的代码应该像这样:reply := SomeType{} call(..., &reply)
在调用之前不要给reply的任何字段设值。如果你不遵守这个规则,就会出问题。当你给reply设置非该类型的默认值时,RPC服务端会将reply字段设置为默认的类型值。你会发现数据并没有被写入,而调用端仍然是非默认的类型值。
附加题(不计分)
- 实现你自己的MapReduce应用(参考
mrapps/*
),例如Distributed Grep(论文2.3节)。 - 让你的调度进程和工作进程运行在不同的机器上,就像现实世界中一样。你需要让你的RPC通过TCP/IP交互,而不是现在的Unix sockets(见
Coordinator.server()
),并通过共享文件系统来读写文件。例如,你可以通过ssh
登录MIT的Athena集群,它使用AFS共享文件;或者你可以租用AWS服务使用S3存储。
加入讨论