Lab用时:13小时
注:6.824 在2022年春季添加了一个新的
jobcount test
,但由于我看不懂这个测试代码。经我测试,本文的程序能通过2021年的所有test cases,但是2022年的测试会jobcount test FAIL
实验分析
用Go
实现MapReduce
的Coordinator(Master)
和Worker
程序。MapReduce
程序模型见下图,paper和Lecture中讲的非常详细,不再过多介绍。
结构设计
RPC
首先是RPC参数的设计,定义一个Task
结构体,Coordinator
应该根据Task
来维护整个MapReduce
的状态和进度,Worker
也应该能从Task
中获取到足够的数据进行Map
和Reduce
type Task struct {
TaskNum int // task编号
TaskType int // map: 1, reduce: 2, taskAllDone: 3
FName string // 待读取的文件名称
NMap int // map tasks数量
NReduce int // reduce tasks数量,MapReduce结束之后要生成NReduce个output files
}
Coordinator
Coordinator
要用一个mutex锁来原子化更改和读取它的字段,防止发生race,以此保证MapReduce
进程的状态和进度正确。
type Coordinator struct {
mu sync.Mutex
state int // mapping: 1, reducing: 2, allDone: 3
nMap int // map tasks数量
mTasks []mapTask // map tasks状态数组
mapDoneCount int // 已完成的map tasks数量
nReduce int // reduce tasks数量
rTasks []reduceTask // reduce tasks状态数组
reduceDoneCount int // 已完成的reduce tasks数量
}
type mapTask struct {
fName string // Worker执行Map时读取的文件名
state int // map task状态
beginTime time.Time // task开始时间,因为Worker可能crash,所以Coordinator可以设定一个timeout来重新分配crash的tasks
}
type reduceTask struct {
taskNum int // task编号
// 在我的Worker实现中,reduce task可以通过TaskNum推断出读取的intermediate文件名,所以不需要fName字段
state int // reduce task状态
beginTime time.Time // task开始时间
}
Worker
参照mrsequential.go
,定义ByKey
并实现sort Interface
,用来对intermediate k/v排序。
// for sorting by key.
type ByKey []KeyValue
// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
逻辑设计
这个Lab主要分几个阶段:
Coordinator
开放一个RPC
服务,Worker
和Coordinator
之间通过RPC
分配和提交Tasks。注意要创建多个goroutines
运行http.Serve()
函数,以保证能并发处理多个Worker
的RPC
请求。func (c *Coordinator) server() { rpc.Register(c) rpc.HandleHTTP() //l, e := net.Listen("tcp", ":1234") sockname := coordinatorSock() os.Remove(sockname) l, e := net.Listen("unix", sockname) if e != nil { log.Fatal("listen error:", e) } go http.Serve(l, nil) }
Worker
接受Coordinator
分配的Task,并根据TaskType
判断是map
或是reduce
task,调用不同的函数处理。每次执行完毕后会提交已经完的oldTask
,并申请newTask
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { // 首次提交oldTask时TaskType为0,Coordinator会把这当作一次握手,不予处理 oldTask := Task{} newTask := &Task{} for call("Coordinator.Coordinate", oldTask, newTask) { if newTask.TaskType == TaskMap { workerMap(mapf, newTask) } else if newTask.TaskType == TaskReduce { workerReduce(reducef, newTask) } oldTask, newTask = *newTask, &Task{} } }
workerMap()
和workerReduce()
的实现参考mrsequential.go
- 注意
Worker
可能发生crash,所以要用paper和lab hints中提到的ioutil.TempFile()
函数创建临时文件,并在task顺利完成后使用os.Rename()
函数把临时文件名改成正确的文件名,以保证intermediate k/v和output files的原子性。
Worker
通过RPC
向Coordinator
发起轮询,当RPC
请求失败时就可以认为是MapReduce
已经结束,就可以跳出循环,退出程序了。Coordinator
处理oldTask
并分配newTask
。func (c *Coordinator) Coordinate(oldTask Task, newTask *Task) error { // 处理oldTask c.coordinateOldTask(oldTask) // 分配newTask c.coordinateNewTask(newTask) return nil }
注意因为
Coordinator
要同时处理多个RPC
请求,所以处理oldTask
时要用sync.Mutex
保证不发生race
踩坑
- 一开始没做fault tolerance,
early exit test
和crash test
过不去,改用临时文件并重命名即可。 reduce parallelism test
测试的是多个Worker
能否同时执行reducef()
函数,并非是单个Worker
能否并发执行reducef()
函数,实际的测试代码违反了论文中对mapf()
和reducef()
用户函数的要求,有一些不合规(illegal)的side effects。jobcount test
是2022年刚加上去的test case,貌似是要mapf()
和reducef()
执行次数恰好等于分配的NMap
和NReduce
数量才能通过。我不理解为什么要这样测试(怀疑是测试代码写错了)
优化空间
- 应该参照lab guidance,设定一个package-level的bool变量,再实现一个
DPrintf
函数对log.Printf()
进行封装。这样只需要更改这个package-level的变量即可控制debug信息是否输出 - 可以不用
sync.Mutex
,改用Go语言的channel,应该能简化代码
总结
- 这个lab本身不难,但是debug非常耗时(lab hints写了),非常磨练debug能力
- 做lab之前一定要提前做好设计再动手写代码,这次就是前期设计做的不足,写代码途中更改了好多处struct,也多写了不少没用上的代码
- Go语言的
-race
flag太强了,多用!