Lab用时:13小时
注:6.824 在2022年春季添加了一个新的
jobcount test,但由于我看不懂这个测试代码。经我测试,本文的程序能通过2021年的所有test cases,但是2022年的测试会jobcount test FAIL

实验分析
用Go实现MapReduce的Coordinator(Master)和Worker程序。MapReduce程序模型见下图,paper和Lecture中讲的非常详细,不再过多介绍。

结构设计
RPC
首先是RPC参数的设计,定义一个Task结构体,Coordinator应该根据Task来维护整个MapReduce的状态和进度,Worker也应该能从Task中获取到足够的数据进行Map和Reduce
type Task struct {
TaskNum int // task编号
TaskType int // map: 1, reduce: 2, taskAllDone: 3
FName string // 待读取的文件名称
NMap int // map tasks数量
NReduce int // reduce tasks数量,MapReduce结束之后要生成NReduce个output files
}
Coordinator
Coordinator要用一个mutex锁来原子化更改和读取它的字段,防止发生race,以此保证MapReduce进程的状态和进度正确。
type Coordinator struct {
mu sync.Mutex
state int // mapping: 1, reducing: 2, allDone: 3
nMap int // map tasks数量
mTasks []mapTask // map tasks状态数组
mapDoneCount int // 已完成的map tasks数量
nReduce int // reduce tasks数量
rTasks []reduceTask // reduce tasks状态数组
reduceDoneCount int // 已完成的reduce tasks数量
}
type mapTask struct {
fName string // Worker执行Map时读取的文件名
state int // map task状态
beginTime time.Time // task开始时间,因为Worker可能crash,所以Coordinator可以设定一个timeout来重新分配crash的tasks
}
type reduceTask struct {
taskNum int // task编号
// 在我的Worker实现中,reduce task可以通过TaskNum推断出读取的intermediate文件名,所以不需要fName字段
state int // reduce task状态
beginTime time.Time // task开始时间
}
Worker
参照mrsequential.go,定义ByKey并实现sort Interface,用来对intermediate k/v排序。
// for sorting by key.
type ByKey []KeyValue
// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
逻辑设计
这个Lab主要分几个阶段:
Coordinator开放一个RPC服务,Worker和Coordinator之间通过RPC分配和提交Tasks。注意要创建多个goroutines运行http.Serve()函数,以保证能并发处理多个Worker的RPC请求。func (c *Coordinator) server() { rpc.Register(c) rpc.HandleHTTP() //l, e := net.Listen("tcp", ":1234") sockname := coordinatorSock() os.Remove(sockname) l, e := net.Listen("unix", sockname) if e != nil { log.Fatal("listen error:", e) } go http.Serve(l, nil) }Worker接受Coordinator分配的Task,并根据TaskType判断是map或是reducetask,调用不同的函数处理。每次执行完毕后会提交已经完的oldTask,并申请newTaskfunc Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { // 首次提交oldTask时TaskType为0,Coordinator会把这当作一次握手,不予处理 oldTask := Task{} newTask := &Task{} for call("Coordinator.Coordinate", oldTask, newTask) { if newTask.TaskType == TaskMap { workerMap(mapf, newTask) } else if newTask.TaskType == TaskReduce { workerReduce(reducef, newTask) } oldTask, newTask = *newTask, &Task{} } }workerMap()和workerReduce()的实现参考mrsequential.go- 注意
Worker可能发生crash,所以要用paper和lab hints中提到的ioutil.TempFile()函数创建临时文件,并在task顺利完成后使用os.Rename()函数把临时文件名改成正确的文件名,以保证intermediate k/v和output files的原子性。
Worker通过RPC向Coordinator发起轮询,当RPC请求失败时就可以认为是MapReduce已经结束,就可以跳出循环,退出程序了。Coordinator处理oldTask并分配newTask。func (c *Coordinator) Coordinate(oldTask Task, newTask *Task) error { // 处理oldTask c.coordinateOldTask(oldTask) // 分配newTask c.coordinateNewTask(newTask) return nil }注意因为
Coordinator要同时处理多个RPC请求,所以处理oldTask时要用sync.Mutex保证不发生race
踩坑
- 一开始没做fault tolerance,
early exit test和crash test过不去,改用临时文件并重命名即可。 reduce parallelism test测试的是多个Worker能否同时执行reducef()函数,并非是单个Worker能否并发执行reducef()函数,实际的测试代码违反了论文中对mapf()和reducef()用户函数的要求,有一些不合规(illegal)的side effects。jobcount test是2022年刚加上去的test case,貌似是要mapf()和reducef()执行次数恰好等于分配的NMap和NReduce数量才能通过。我不理解为什么要这样测试(怀疑是测试代码写错了)
优化空间
- 应该参照lab guidance,设定一个package-level的bool变量,再实现一个
DPrintf函数对log.Printf()进行封装。这样只需要更改这个package-level的变量即可控制debug信息是否输出 - 可以不用
sync.Mutex,改用Go语言的channel,应该能简化代码
总结
- 这个lab本身不难,但是debug非常耗时(lab hints写了),非常磨练debug能力
- 做lab之前一定要提前做好设计再动手写代码,这次就是前期设计做的不足,写代码途中更改了好多处struct,也多写了不少没用上的代码
- Go语言的
-raceflag太强了,多用!