一、概念
这个lab的主要内容是实现一个简单的MapReduce分布式系统。
MapReduce的概念来自google的论文:
论文中有一个非常简明的图片,介绍了MapReduce的基本结构:
有关信息:
- MapReduce的任务是充分利用分布式的计算机集群,对文件进行任务分割并分配到下游机器分别操作,然后合并结果(本lab不需要使用多台设备,而用多个进程来模拟);
- 为了把任务显式地分配给多台设备处理,要处理的任务必须能够被分解,且合并时正确性不受先后顺序影响;
- MapReduce的下游机群是不能信任的,即需要做容错处理。
具体步骤:
- 为了将任务分发给多台机器完成,需要先将任务拆分,在拆分后针对分片执行用户操作(
Map
),生成分布式的中间值intermediate
,然后将多台机器的中间值进行合并降维(Reduce
),得到结果。因此,用户代码将负责Map
和Reduce
的操作,而任务的分发调度则由MapReduce框架控制; - 下游机器的程序称为
worker
,指向统一的中心节点master
(lab中称为coordinator
)。worker
通过已经实现好的rpc和coordinator
交互(本lab中只能worker单向调用coordinator); - 将所需要处理的文件切片(本lab中不用,一个文件作为一个切片即可),然后将一个切片交予一个worker处理。每个worker针对切片调用用户的map函数,生成中间结果;
- 全部Map任务完成后,coordinator开始组织worker进行reduce任务。worker准备好map产生的中间值,调用用户的reduce函数,生成该reduce对应的最后结果输出。
二、项目结构
参考代码
题目提供了一个mrsequential.go
供参考,它是mapReduce的非分布式实现,展示了框架具体的功能。在实现worker时可以复制这里的代码。
需要写的代码
需要写的代码在/stc/mr下:
- coordinator.go:不解释,内含一个rpc服务器
- rpc.go:在此定义与rpc有关的请求和返回结构
- worker.go:不解释,内含与coordinator通信的例子
调用结构
- 启动coordinator时会调用外部的mrcoordinator。不需要修改,但要知道它会调用你写的makeCoordinator方法来启动你的coordinator;
- 启动worker时调用外部的mrworker。不需要修改,但需要知道它通过调用你的Worker方法来启动worker,同时将用户函数mapf和reducef传入。
测试与环境
程序只能在linux上测试。
写完代码要运行测试脚本,使用bash /src/main/test-mr.sh
注意事项
- 用户需要的键值对在map阶段以分布式状态存在,而用户要求程序结束后必须按键的hash值储存结果,因此我们必须安排在reduce阶段每个worker处理特定一部分的键值对,不能重复处理。这里采取的解决方案是在map阶段就将中间值结果分成临时文件输出,如mr-X-Y.tmp,其中X表示map任务id,Y表示reduce任务id(也就是键对应的hash%nReduce的结果)。这样在reduce时就可以依据文件名Y的值,选取自己reduce任务需要的文件读取中间值。
- 在coordinator实现过程中一定会用到多线程(goroutine),且worker由rpc调用的coordinator的方法的线程也与coordinator的主线程不同,所以对其共享的数据在访问时一定要加锁。
- lab要求所有任务结束后worker必须关闭,因此任务结束后coordinator不能立即关闭,应等待一段时间,让worker的ping到达后在reply中要求worker自行退出。
- 外部的调用程序会一段时间调用一次你的done方法,判断程序是否运行完成。因此需要在所有任务结束后使得done方法返回true。
三、代码实现
1. Coordinator.go
定义Coordinator类(已经有了),再定义下MapTask和ReduceTask类型,储存map和reduce任务有关的信息:
stat
码用于coordinator判断任务状态,startTime
记录下任务开始时间便于判断超时。
map任务和reduce任务分别以列表的形式储存。
在coordinator运行时,作为控制核心,应当由其预生成好任务,并且启动循环监测任务状态的go程:
这个新的go程主要用于检查map任务是否全部完成,然后对正在处理中的任务,判断是否超时。如果超时,则重新设回未分配状态。如果全部任务已完成,则跳转reduce阶段。同时写一个函数用于即时判断任务完成。
接下来就是由worker调用,进行任务分发的部分。由于我们的结构中coordinator是不储存worker状态的,我们只需要实时维护任务的状态,有任意worker调用时分配一个待进行的任务即可。
上述代码同时处理了reduce的情况,之后进行讨论。
由于worker是不受信任的,所以我们只在worker完成分配给它的任务时让它与coordinator联系,并让其储存状态。接收worker通知其已完成map任务的代码:
至此处理map的部分就结束了。接下来的部分是针对reduce任务的控制,与map部分基本一致:
2. woker.go
worker主体放在死循环内,每次循环sleep100毫秒,反复询问coordinator是否有任务或需要退出。从调用的回复中可以得知该任务是map还是reduce任务,执行过程抽取到doMapping和doReducing函数中。
关于Map操作的执行,可以参照lab给出的示例代码。这里主要难点在于对中间变量应该如何处理。
上面提到过,键值对应该通过对键名和nReduce的哈希值求得应该被哪个reduce任务处理。因此在这一阶段就应求出对应的X-Y文件名并储存。储存的方式是用json格式导出到文件。
关于reduce过程,同样可以借鉴给出的示例代码,但也要稍微复杂些。
对以表格一样的形式离散储存的中间量文件,要遍历该reduce任务应处理的文件,转换成键值对数组然后合并。合并完成后也要参照示例代码给出的方法进行一个排序,这样某个键对应的值才能连续排列。将排好的键值对作为参数传给用户的reduce函数,取得输出。
同样,需要写好和coordinator交互的一些方法:
3. rpc.go
这里不需要写实际执行的代码,只需要声明好交互需要用的参数和返回值类型就行。
在像这样结构比较复杂的机制实现的过程中,从框架总体结构和相关的储存类型开始设计会比较简单。
四、测试和总结
运行测试脚本:
顺利通过测试。
测试中有一个crash test需要注意,它会随机使一个worker不工作,模拟现实的分布式机器情况。所以需要设计好容错机制,本lab主要是超时重新分配机制。
由于本lab的测试环境要求为linux,而开发环境windows下运行goland更优,所以可以采取类似交叉调试的方法,通过IDE的deploy功能同步代码到linux服务器,然后用IDE的终端连接服务器执行编译。
完成这个lab,就实现了一个简单的中心化的执行统一任务的分布式系统。同时对golang的掌握也有一定帮助。