Lab用时:13小时

注:6.824 在2022年春季添加了一个新的jobcount test,但由于我看不懂这个测试代码。经我测试,本文的程序能通过2021年的所有test cases,但是2022年的测试会jobcount test FAIL

不含jobcount test的测试结果

实验分析

Go实现MapReduceCoordinator(Master)Worker程序。MapReduce程序模型见下图,paper和Lecture中讲的非常详细,不再过多介绍。

MapReduce Model

结构设计

RPC

首先是RPC参数的设计,定义一个Task结构体,Coordinator应该根据Task来维护整个MapReduce的状态和进度,Worker也应该能从Task中获取到足够的数据进行MapReduce

type Task struct {
	TaskNum int // task编号
	TaskType int // map: 1, reduce: 2, taskAllDone: 3
	FName string // 待读取的文件名称
	NMap int // map tasks数量
	NReduce int // reduce tasks数量,MapReduce结束之后要生成NReduce个output files
}

Coordinator

Coordinator要用一个mutex锁来原子化更改和读取它的字段,防止发生race,以此保证MapReduce进程的状态和进度正确。

type Coordinator struct {
	mu sync.Mutex
	state int // mapping: 1, reducing: 2, allDone: 3

	nMap int // map tasks数量
	mTasks []mapTask // map tasks状态数组
	mapDoneCount int // 已完成的map tasks数量

	nReduce int // reduce tasks数量
	rTasks []reduceTask // reduce tasks状态数组
	reduceDoneCount int // 已完成的reduce tasks数量
}

type mapTask struct {
	fName string // Worker执行Map时读取的文件名
	state int // map task状态
	beginTime time.Time // task开始时间,因为Worker可能crash,所以Coordinator可以设定一个timeout来重新分配crash的tasks
}

type reduceTask struct {
	taskNum int // task编号
  // 在我的Worker实现中,reduce task可以通过TaskNum推断出读取的intermediate文件名,所以不需要fName字段
	state int // reduce task状态
	beginTime time.Time // task开始时间
}

Worker

参照mrsequential.go,定义ByKey并实现sort Interface,用来对intermediate k/v排序。

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

逻辑设计

这个Lab主要分几个阶段:

  1. Coordinator开放一个RPC服务,WorkerCoordinator之间通过RPC分配和提交Tasks。注意要创建多个goroutines运行http.Serve()函数,以保证能并发处理多个WorkerRPC请求。

    func (c *Coordinator) server() {
    	rpc.Register(c)
    	rpc.HandleHTTP()
    	//l, e := net.Listen("tcp", ":1234")
    	sockname := coordinatorSock()
    	os.Remove(sockname)
    	l, e := net.Listen("unix", sockname)
    	if e != nil {
    		log.Fatal("listen error:", e)
    	}
    	go http.Serve(l, nil)
    }
    
  2. Worker接受Coordinator分配的Task,并根据TaskType判断是map或是reduce task,调用不同的函数处理。每次执行完毕后会提交已经完的oldTask,并申请newTask

    func Worker(mapf func(string, string) []KeyValue,
    	reducef func(string, []string) string) {
      // 首次提交oldTask时TaskType为0,Coordinator会把这当作一次握手,不予处理
    	oldTask := Task{}
    	newTask := &Task{}
    
    	for call("Coordinator.Coordinate", oldTask, newTask) {
    		if newTask.TaskType == TaskMap {
    			workerMap(mapf, newTask)
    		} else if newTask.TaskType == TaskReduce {
    			workerReduce(reducef, newTask)
    		}
    		oldTask, newTask = *newTask, &Task{}
    	}
    }
    
    1. workerMap()workerReduce()的实现参考mrsequential.go
    2. 注意Worker可能发生crash,所以要用paper和lab hints中提到的ioutil.TempFile()函数创建临时文件,并在task顺利完成后使用os.Rename()函数把临时文件名改成正确的文件名,以保证intermediate k/v和output files的原子性
  3. Worker通过RPCCoordinator发起轮询,当RPC请求失败时就可以认为是MapReduce已经结束,就可以跳出循环,退出程序了。

  4. Coordinator处理oldTask并分配newTask

    func (c *Coordinator) Coordinate(oldTask Task, newTask *Task) error {
      // 处理oldTask
    	c.coordinateOldTask(oldTask)
    	// 分配newTask
    	c.coordinateNewTask(newTask)
    	return nil
    }
    

    注意因为Coordinator要同时处理多个RPC请求,所以处理oldTask时要用sync.Mutex保证不发生race

踩坑

  1. 一开始没做fault tolerance,early exit testcrash test过不去,改用临时文件并重命名即可。
  2. reduce parallelism test测试的是多个Worker能否同时执行reducef()函数,并非是单个Worker能否并发执行reducef()函数,实际的测试代码违反了论文中对mapf()reducef()用户函数的要求,有一些不合规(illegal)的side effects。
  3. jobcount test是2022年刚加上去的test case,貌似是要mapf()reducef()执行次数恰好等于分配的NMapNReduce数量才能通过。我不理解为什么要这样测试(怀疑是测试代码写错了)

优化空间

  1. 应该参照lab guidance,设定一个package-level的bool变量,再实现一个DPrintf函数对log.Printf()进行封装。这样只需要更改这个package-level的变量即可控制debug信息是否输出
  2. 可以不用sync.Mutex,改用Go语言的channel,应该能简化代码

总结

  1. 这个lab本身不难,但是debug非常耗时(lab hints写了),非常磨练debug能力
  2. 做lab之前一定要提前做好设计再动手写代码,这次就是前期设计做的不足,写代码途中更改了好多处struct,也多写了不少没用上的代码
  3. Go语言的-race flag太强了,多用!