MIT 6.824 (2022) Lab 1: MapReduce

写在前面:这篇实验课的说明(原版)和代码前前后后看了很多遍,还是一团糨糊,不知从何入手。看到后面,又忘记了前面。于是想把它翻译一遍,希望能把题理解得更透彻点。果然,翻译过程中发现几个之前忽略的点。

原文地址:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html

简介

在这节实验课里你将构建一个MapReduce系统。你需要实现一个工作程序(worker process)和一个调度程序(coordinator process)。工作程序用来调用Map和Reduce函数,并处理文件的读取和写入。调度程序用来协调工作任务并处理失败的任务。你将构建出跟 这篇MapReduce论文 里描述的类似的东西。(注意:本实验中用”coordinator”替代里论文中的”master”。)

开始

首先需要搭配好Go的开发环境,无需赘述。 接着可以通过git来获取本实验所需的初始代码:

git clone git://g.csail.mit.edu/6.824-golabs-2022 6.824

我们在src/main/mrsequential.go提供了一个简单的MapReduce实现。它在同一个进程里执行里Map函数和Reduce函数。我们也提供了一些MapReduce的应用程序:单词计数器(mrapps/wc.go)和文本检索器(mrapps/indexer.go)。你可以通过以下命令来执行单词计数器程序:

cd 6.824
cd src/main
go build -race -buildmode=plugin ../mrapps/wc.go
go run -race mrsequential.go wc.so pg*.txt
more mr-out-0

注意:-race选项打开了Go语言的竞争探测器。我们建议你在开发和调试6.824实验代码时都加上这个选项。我们在评分时不会使用这个选项。尽管如此,如果你的代码里存在竟态,我们测试时不用竟态探测器它也会失败。

mrsequential.go将其输出保存在mr-out-0文件中。输入来自pg开头的txt文件。

可以随意从mrsequential.go中借用代码。你也应该打开mrapps/wc.go看看MapReduce的应用程序是如何实现的。

你的任务(难度:中等偏上)

你的任务是实现一个分布式的MapReduce应用,包含两个程序,调度程序(coordinator)和工作程序(worker)。程序运行时将有一个调度进程和一个或多个并行的工作进程。在现实世界中,工作进程会运行在不同的机器上,但本实验中它们将运行在同一台机器上。工作进程通过RPC和调度进程通信。每一个工作进程都会向调度进程请求一个任务,从一个或多个文件中读取任务需要的数据,执行这个任务,然后将输出写入一个或多个文件。如果一个工作进程没能在一个合理的时间(本实验设置为10s)内完成任务,调度进程应该能发现,并把相同的任务交给另一个工作进程。

我们已经给出了一些代码。调度进程和工作进程的入口分别在main/mrcoordinator.go 和 main/mrworker.go中,不要修改它们。请在mr/coordinator.gomr/worker.go, 和 mr/rpc.go中完成你的实现。

如何运行你的代码呢? 首先,确保单词计数插件已构建(在main目录下执行):

go build -race -buildmode=plugin ../mrapps/wc.go

运行调度进程

go run -race mrcoordinator.go pg-*.txt

pg开头的txt文件是mrcoordinator.go程序的输入参数,每个文件后面都有一个分隔符,并且是一个Map任务的入参。 在一个或多个命令窗口执行工作进程:

go run -race mrworker.go wc.so

当工作进程和调度进程结束时,在mr-out-开头的文件中查看执行结果。如果你完成了代码,将结果整合并排序,应该和顺序执行得到同样的结果:

cat mr-out* | sort | more

我们提供了一个测试脚本,main/test-mr.sh。这些测试检查了单词计数器和文本检索器在输入pg开头的文件时是否产生了正确的输出。这些测试还检测了你实现的Map和Reduce任务是否是并行的,以及有任务崩溃时能否恢复。

如果你现在执行这个脚本,它会挂起,因为调度进程永远不会结束:

bash test-mr.sh

你可以修改mr/coordinator.go中的Done()函数,把ret:=false改成ret:=true,这样的话,调度进程会立即结束。再执行上面的脚本,你会发现测试失败了。

这个测试脚本希望看到以mr-out-x命名的文件,它们是由Reduce任务产生的。而mr/coordinator.gomr/worker.go空的实现不会产生这些文件,所以测试失败了。

当你完成了代码,脚本的执行结果应该是这样的:

$ bash test-mr.sh
*** Starting wc test.
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting crash test.
--- crash test: PASS
*** PASSED ALL TESTS
$

你会看到RPC相关的错误。忽略它们。将调度进程注册为RPC服务时会检查它的所有方法是否满足RPC(有3个入参)。我们知道,Done()方法不会通过RPC调用。

规则

  • Map阶段应该将产生的键值对分成nReduce份,对应nReduce个Reduce任务,也是main/mrcoordinator.go调用MakeCoordinator()函数传入的参数。所以,每一个Map任务都会产生nReduce个中间文件供Reduce任务使用。
  • 将第X个Reduce任务产生的输出写入mr-out-X文件。
  • mr-out-X文件的每一行包含一个Reduce函数的输出。将键值对以Go语言的%v %v格式输出。参考一下main/mrsequential.go里的格式。如果不按这个格式输出,测试脚本将会失败。
  • 你可以修改mr/coordinator.gomr/worker.go, 和 mr/rpc.go文件。为了测试,也可以修改其他文件,但是要确保你的程序在原始的框架下能运行。我们会在原始的框架下进行测试。
  • 工作进程应该将Map函数产生的中间数据放到当前目录下的文件里,之后Reduce任务可以从这里读取文件作为其输入。
  • main/mrcoordinator.go期望mr/coordinator.go实现一个Done()方法,当MapReduce任务完成时返回true,这时main/mrcoordinator.go程序就会结束。
  • 当一个任务完成时,这个工作进程就应该结束。一个简单的实现就是使用call()函数的返回值:如果一个工作进程未能与调度进程连接上,那就可以假定调度进程因为任务完成而退出了,所以工作进程也就可以结束了。这取决于你的设计,你可能发现让调度进程给工作进程一个”请退出”的任务也是可以的。

提示

  • 这里 有一些关于开发和调试的建议。
  • 可以像这样开始:修改mr/worker.goWorker()函数,通过RPC向调度进程请求一个任务。然后修改调度进程,返回一个尚未开始的Map任务的文件名。再然后修改工作进程去读取这个文件,并调用Map函数,就像main/mrsequential.go里一样。
  • Map和Reduce函数通过Go语言的plugin包从.so结尾的文件中实时加载。
  • 如果你改变了mr/目录下的内容,就需要重新构建MapReduce的插件,就像这样:go build -race -buildmode=plugin ../mrapps/wc.go
  • 这个实验里,所有工作进程共享一个文件系统。当所有工作进程都运行在同一台机器上时,这是简单的,但如果运行在不同机器上,就需要一个像GFS一样的全局化的文件系统。
  • mr-X-Y是一个合理的给中间产物命名的方式,其中X是Map任务的ID,Y是Reduce任务的ID。
  • 工作进程的Map任务需要将中间产物以键值对的形式存入文件,在Reduce任务中还要能正确地读取。Go语言中的encoding/json包是个不错的选择。将键值对以JSON的格式写入文件中:
      enc:=json.NewEncoder(file)
      for _, kv := ... {
          err := enc.Encode(&kv)

    然后从文件中读取:

      dec := json.NewDecoder(file)
      for {
          var kv KeyValue
          if err:=dec.Decode(&kv);err!=nil{
            break
          }
          kva = append(kva, kv)
      }
  • 工作进程的Map部分可以通过mr/worker.go中的ihash(key)函数给key值选择Reduce任务。
  • 你可以从main/mrsequential.go中借鉴一些代码,在Map任务读取文件的时候,在Map/Reduce过程中存储中间键值对的时候,在给Reduce的输出排序的时候。
  • 调度进程,作为RPC服务器,是并发的。所以不要忘记给共享数据上锁。
  • 使用Go的竟态探测器,go build -racego run -racetest-mr.sh默认也是使用竟态探测器进行测试的。
  • 工作进程是需要等待的,例如Reduce任务在Map任务全部完成之前是不能启动的。一种方案是工作进程周期性地向调度进程请求任务,在每个请求间休眠一段时间(time.Sleep())。另一种方案是在调度进程的RPC处理器使用等待循环(time.Sleep()或者sync.Cond都可以)。Go程序将RPC的处理程序运行在自己的线程里,所以事实上一个处在等待中的RPC处理程序不会影响其他RPC的处理进程。
  • 调度进程无法准确地分辨出哪些工作进程是死机了,哪些是运行着但由于某些原因瘫痪了,哪些是执行着但是太慢了而起不了作用。你能做的就是让调度进程等待一段时间,如果还没有结果,就放弃它,将任务重新分配给另一个工作进程。在这个实验中,让调度进程等待10s,超出这个时间,调度进程就可以认为这个工作进程已经死了。
  • 如果你要实现备份任务(论文3.6节),要注意我们测试你的代码时,如果工作进程没有崩溃就不会执行其他不相关的任务。备份任务只有在相对长时间之后才会被触发(例如10s)。
  • 为了测试崩溃与恢复,你可以使用mrapps/crash.go插件。它在Map和Reduce函数中随机地退出。
  • 为了确保没人能发现因程序崩溃而产生的不完整的文件,在MapReduce的论文中提到了一个小技巧,即先写入临时文件,等它完成后再重命名。你可以用ioutil.TempFile创建临时文件,用 os.Rename重命名。
  • test-mr.sh在它的子目录mr-tmp中执行它所有的进程,所以如果出错了,可以在那里找到相关输出文件。你可以临时修改test-mr.sh文件,让它失败时就退出,这样就不会继续往下测试(覆盖之前的输入内容)。
  • test-mr-many.sh提供了一个执行test-mr.sh的测试框架(我们就是用这个测试你的代码)。它需要一个参数来指定测试执行的次数。你不能并行地运行多个test-mr.sh测试,因为调度进程会重复使用相同的socket,这会引发冲突。
  • Go程序中RPC只会发送大写字母开头的字段。内置结构也必须大写字母开头。
  • 向RPC系统的应答结构传递指针时,这个*reply应该指向一个空对象。RPC调用的代码应该像这样:
      reply := SomeType{}
      call(..., &reply)

    在调用之前不要给reply的任何字段设值。如果你不遵守这个规则,就会出问题。当你给reply设置非该类型的默认值时,RPC服务端会将reply字段设置为默认的类型值。你会发现数据并没有被写入,而调用端仍然是非默认的类型值。

附加题(不计分)

  1. 实现你自己的MapReduce应用(参考mrapps/*),例如Distributed Grep(论文2.3节)。
  2. 让你的调度进程和工作进程运行在不同的机器上,就像现实世界中一样。你需要让你的RPC通过TCP/IP交互,而不是现在的Unix sockets(见Coordinator.server()),并通过共享文件系统来读写文件。例如,你可以通过ssh登录MIT的Athena集群,它使用AFS共享文件;或者你可以租用AWS服务使用S3存储。

加入讨论

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据