6.824 实验一 —— MapReduce

6.824 实验一 —— MapReduce,第1张

6.824 实验一 —— MapReduce 前言

实验一是要实现一个 MapReduce 系统,基本就是两个部分:实现 master 程序和实现 worker 程序。这个实验基本就是劝退怪了,一来是对 golang 的 rpc 和并发的使用要比较熟悉,二来就是要对 MapReduce 的整个流程机制要比较熟悉。其实有一个小秘诀,就是拼命看论文中的这张图,再拼命看下面的流程讲解:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VHTM6O4z-1642697405642)(https://tva1.sinaimg.cn/large/006VKfGmly1gyfq0cbiltj30y30n4wkx.jpg ‘MapReduce 执行流程,很重要很重要很重要’)]{width=“500px”}

这个实验我实现了两个版本,主要是并发控制的方式有些不同。最初是基于 mutex 锁的版本,后来重构成了基于 channel 的无锁版本。无锁版本的实现比较优雅,所以讲解也主要基于无锁版本。

实验讲解

做实验之前,首先需要读懂实验。说明书在:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html。主要这个实验需要在 Linux 环境下进行,因为进程通信基于 unix socket,MacOS 原则上来说也可以,但是据说还是会有些小问题。

代码中已经提供了一个单线程串行版的 MapReduce,代码在 src/main/mrsequential.go。这个版本很重要,建议先阅读一遍,可以大致了解整体的流程。有一些内容的处理也可以直接从中 copy。

并行版本的 master 程序入口在 main/mrcoordinator.go,worker 程序入口在 main/mrworker.go。实验需要实现的有三个文件,分别是 mr/coordinator.go、mr/worker.go、和 mr/rpc.go,分别描述了 master 的处理代码、worker 的处理代码以及它们之间通信的 rpc 结构。

mrcoordinator 会调用 mr/coordinator.go 中的 MakeCoordinator 函数,来构建 master 的结构,并启动 socket 监听,在返回后,主协程会不断调用 Coordinator.Done 方法,来检查是否已经完成整个 MapReduce 任务,确认完成后才会退出主协程。所以,在 MakeCoordinator 中,不应当有 *** 作阻止函数返回,否则会阻塞后续 *** 作。相关的监听等工作应当通过新协程实现。

mrworker 的处理就很简单了,只有一个主协程,直接调用了 mr/worker.go 的 Worker 函数,在这里处理即可。一般可以直接实现成单协程程序。

测试脚本为 src/main/test-mr.sh,它会将两个现成的 MapReduce 程序:wc 和 indexer 通过你的框架执行,并与串行执行的结果相比较。它同时还会检查并行运行相同的 Map 或 Reduce 任务、甚至 worker 执行任务期间发生 crash 时,最终是否能得到正确的结构。通常它会启动一个 master 进程和三个 worker 进程。如果在运行期间发生错误不退出时,可以通过 ps -A 命令,找到 mrcoordinator 进程的 pid,并 kill 掉即可。普通的 ctrl + c 可能无法完全退出,会影响后续的测试。

最后,请多阅读几遍实验指导书。

实现思路 整体流程

workers 会首先执行完 map 任务,生成很多中间文件 “mr-X-Y”,其中,X 是 map 任务的 id,Y 是对应的 reduce 任务 id。接着 reduce 会收集所有 Y 等于 reduce 任务 id 的文件,读取并进行 reduce *** 作,并将结果输出到 “mr-out-Y” 中。

master 实现 无锁思路

由于是一个无锁的实现,要避免多协程数据冲突,所有对主要数据结构的 *** 作应当收敛到一个协程中,这里可以称为调度协程。在 worker 通过 rpc 请求 master 时,例如获取一个 task 或者汇报完成工作,master 会通过一个自动创建的协程处理 rpc 请求,由于对主要数据结构的 *** 作已经收敛,这个 rpc 协程就必须通过 channel 要求调度协程代办,以保证没有数据竞争。由于 worker 和 master 之间可能有多种消息,这意味着调度协程必须同时管理多个 channel。这里可以运用 golang 的 select 结构:

// 只在这个 goroutine 中 *** 作结构
func (c *Coordinator) schedule() {
	for {
		select {
		case msg := <-c.getTaskChan:
			c.getTaskHandler(msg)
		case msg := <-c.doneTaskChan:
			c.doneTaskHandler(msg)
		case msg := <-c.timeoutChan:
			c.timeoutHandler(msg)
		case msg := <-c.doneCheckChan:
			c.doneCheckHandler(msg)
		}
	}
}

假设这时候有一个 worker 需要获取一个 task 来执行,请求 master 的 GetTask,GetTask 处理如下:

func (c *Coordinator) GetTask(_ *GetTaskReq, resp *GetTaskResp) error {
	msg := GetTaskMsg{
		resp: resp,
		ok:   make(chan struct{}),
	}
	c.getTaskChan <- msg
	<-msg.ok
	return nil
}

在向 getTaskChan 中,不止传入了 resp(getTask 不需要请求参数),还传入了一个 chan struct{} 类型的管道,这个管道是协调协程用于通知 rpc 协程处理完成的通道:当处理完成后,就会向 msg.ok 中写入一个 struct{},rpc 协程就会返回。

Coordinator

整个 Coordinator 结构如下:

type Coordinator struct {
	nMap    int
	nReduce int
	phase   TaskPhase
	allDone bool

	taskTimeOut map[int]time.Time
	tasks       []*Task

	getTaskChan   chan GetTaskMsg
	doneTaskChan  chan DoneTaskMsg
	doneCheckChan chan DoneCheckMsg
	timeoutChan   chan TimeoutMsg
}

phase 记录了当前任务执行的阶段,由于 reduce 任务必须在所有 map 任务结束后才能进行,所以 TaskPhase 分为 Map 和 Reduce 阶段,每个阶段中,tasks 切片只有对应阶段的任务。

taskTimeOut 记录了当前正在执行的任务的开始时间,会有一个协程定时去扫描这个 map,找出其中运行时间大于十秒的任务(超时),将对应的任务状态设置为未开始,以进行下一次调度。当然这个扫描 *** 作也需要通过协调协程进行。超时 map 中也只有当前阶段正在执行的任务,在切换阶段时会清空超时 map。

tasks 切片保存了当前阶段所有的 Task,以及相关的状态:

type ReduceTask struct {
	NMap int
}

type MapTask struct {
	FileName string
	NReduce  int
}

type TaskStatus int

var (
	TaskStatus_Idle     TaskStatus = 0
	TaskStatus_Running  TaskStatus = 1
	TaskStatus_Finished TaskStatus = 2
)

type Task struct {
	TaskId     int
	MapTask    MapTask
	ReduceTask ReduceTask
	TaskStatus TaskStatus
}

这里可以看到任务的状态被分成三个,分别是待执行、执行中以及执行完成。同时冗余保存了 MapTask 和 ReduceTask,具体使用哪个结构体由当前阶段来判断。

具体 *** 作

根据 Coordinator 中的管道,可以看出有四种情况需要和协调协程通信以进行 *** 作。

当 worker 请求一个任务时,可能获取到的任务类别有四种:

type TaskType int

var (
	TaskType_Map    TaskType = 0
	TaskType_Reduce TaskType = 1
	TaskType_Wait   TaskType = 2
	TaskType_Exit   TaskType = 3
)

master 首先遍历所有的 tasks,找出其中的状态为未执行的状态,并根据当前的阶段,返回 Map 或者 Reduce 任务。如果当前没有空闲任务的话,又分为以下两种情况。当前为 Map 阶段,这时需要返回 TaskType_Wait 任务,要求 worker 等待,Map 阶段结束后还需要进行 Reduce 任务;当前为 Reduce 阶段,这时所有任务已经完成,返回 TaskType_Exit 要求 worker 退出。

当 worker 完成时,会通知 master 任务完成。传递的信息中会带有任务的类型和任务的 Id。master 会忽略掉非当前阶段的任务,根据 taskId 修改 tasks 中的任务状态为 finished(忽略当前任务状态,直接改为完成),并删除 timeout 中的对应结构。

func (c *Coordinator) doneTaskHandler(msg DoneTaskMsg) {
	req := msg.req
	if req.TaskType == TaskType_Map && c.phase == TaskPhase_Reduce {
		// 提交非当前阶段的任务,直接返回
		msg.ok <- struct{}{}
		return
	}
	for _, task := range c.tasks {
		if task.TaskId == req.TaskId {
			// 无论当前状态,直接改为完成
			task.TaskStatus = TaskStatus_Finished
			break
		}
	}
	// 删除 timeout 结构
	delete(c.taskTimeOut, req.TaskId)
	allDone := true
	for _, task := range c.tasks {
		if task.TaskStatus != TaskStatus_Finished {
			allDone = false
			break
		}
	}
	if allDone {
		if c.phase == TaskPhase_Map {
			c.initReducePhase()
		} else {
			c.allDone = true
		}
	}
	msg.ok <- struct{}{}
}

如果是在 Reduce 阶段发现所有任务都完成了,还会设置一下 allDone 标志位。

Coordinator 在初始化时,还会启动一个协程,这个协程每秒请求一次协调协程,检查 timeoutMap 是否有超时任务,如果超时,就将其状态置为未开始,这样在下一次 worker 请求任务时就可以调度执行了。

func (c *Coordinator) timeoutHandler(msg TimeoutMsg) {
	now := time.Now()
	for taskId, start := range c.taskTimeOut {
		if now.Sub(start).Seconds() > 10 {
			for _, task := range c.tasks {
				if taskId == task.TaskId {
					if task.TaskStatus != TaskStatus_Finished {
						task.TaskStatus = TaskStatus_Idle
					}
					break
				}
			}
			delete(c.taskTimeOut, taskId)
			break
		}
	}
	msg.ok <- struct{}{}
	return
}

最后还有一个完成状态检查,是主线程调用 Coordinator.Done 进行的,请求协调协程时,只需要检查 allDone 标志位即可。

worker

worker 只有单个协程,循环从 master 处获取任务执行:

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	for {
		resp := callGetTask()
		switch resp.TaskType {
		case TaskType_Map:
			handleMapTask(resp.Task, mapf)
		case TaskType_Reduce:
			handleReduceTask(resp.Task, reducef)
		case TaskType_Wait:
			time.Sleep(time.Second)
		case TaskType_Exit:
			return
		}
	}
}

map 和 reduce 的 *** 作,可以参考串行单线程的实现。有一点注意是,由于可能有多个进程同时执行同一个任务,也可能会出现执行到一半崩溃的情况,遗留下的文件可能会导致后续 worker 重新执行时发生错误。所以创建输出文件时,可以通过 ioutil.TempFile 函数创建一个临时文件写入,等到写入完成后通过 os.Rename 重命名为目标文件,这样即可保证最后的输出文件一定是完整的。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/5710935.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存