大致介绍
Lab1 里需要实现一个最小可用的 MapReduce 系统,由两个核心组件组成:
- Coordinator 协调器 旧版:master
- Worker 工作节点
整体结构:
- Coordinator 负责分发任务、记录任务状态、处理超时重试
- Worker 负责向 Coordinator 请求任务、执行 Map/Reduce 函数、写入中间文件
- 中间文件包括 "mr-%d-%d" 和 "mr-out-%d"
- 所有通信通过 RPC 完成。
GitHub:该实验不适合在 windows 系统完成,适配 macOS 和 Unix
bash
git clone git://g.csail.mit.edu/6.5840-golabs-2026 6.5840
cd 6.5840
ls
Makefile src
$测试 WordCount 任务:
bash
cd ~/6.5840
cd src/main
go build -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run mrsequential.go wc.so pg*.txt
sort mr-out-0
A 509
ABOUT 2
ACT 8
ACTRESS 1
...具体代码
rpc.go
go
package mr
import (
"os"
"strconv"
)
// 任务类型
type TaskType int
const (
MapTask TaskType = iota // Map
ReduceTask // Reduce
WaitTask // 等待
ExitTask // 退出
)
// Worker 请求任务
type AskTaskArgs struct {
}
// Coordinator 分配任务
type AskTaskReply struct {
TaskType TaskType // 任务类型
TaskID int // 任务 ID
Filename string // 输入文件名
NReduce int // Reduce 分区数
NMap int // Map 总数
}
// Worker 完成 Map 任务
type FinishMapArgs struct {
TaskID int
}
type FinishMapReply struct {
}
// Worker 完成 Reduce 任务
type FinishReduceArgs struct {
TaskID int
}
type FinishReduceReply struct {
}
// Coordinator 的 Unix 套接字地址
func CoordinatorSock() string {
s := "/var/tmp/5840-mr-"
s += strconv.Itoa(os.Getuid())
return s
}worker.go
go
package mr
import (
"encoding/json"
"fmt"
"hash/fnv"
"io"
"log"
"net/rpc"
"os"
"sort"
"time"
)
// KeyValue 排序
type ByKey []KeyValue
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// Map 输出的键值对
type KeyValue struct {
Key string
Value string
}
// 哈希函数,用于决定 Key 分配到哪个 Reduce
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
var coordSockName string // Coordinator 套接字路径
// 向 Coordinator 请求任务
func askTask() AskTaskReply {
args := AskTaskArgs{}
reply := AskTaskReply{}
ok := call("Coordinator.AskTask", &args, &reply)
if !ok {
reply.TaskType = ExitTask
}
return reply
}
// Worker 主循环
func Worker(sockname string,
mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
coordSockName = sockname
for {
task := askTask()
switch task.TaskType {
case MapTask:
doMapTask(task, mapf)
FinishMap(task.TaskID)
case ReduceTask:
doReduceTask(task, reducef)
finishReduce(task.TaskID)
case WaitTask:
time.Sleep(time.Second)
case ExitTask:
return
}
}
}
// 通知 Coordinator Map 任务完成
func FinishMap(taskID int) {
args := FinishMapArgs{TaskID: taskID}
reply := FinishMapReply{}
call("Coordinator.FinishMap", &args, &reply)
}
// 执行 Map 任务
func doMapTask(
task AskTaskReply,
mapf func(string, string) []KeyValue,
) {
file, err := os.Open(task.Filename)
if err != nil {
log.Fatalf("cannot open %v", task.Filename)
}
content, err := io.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", task.Filename)
}
file.Close()
// 执行用户 Map 函数
kva := mapf(task.Filename, string(content))
// 按 Reduce 分区
buckets := make([][]KeyValue, task.NReduce)
for _, kv := range kva {
reduceID := ihash(kv.Key) % task.NReduce
buckets[reduceID] = append(buckets[reduceID], kv)
}
// 写入中间文件 mr-X-Y
for reduceID := 0; reduceID < task.NReduce; reduceID++ {
filename := fmt.Sprintf("mr-%d-%d", task.TaskID, reduceID)
file, err := os.Create(filename)
if err != nil {
log.Fatalf("cannot create %v", filename)
}
enc := json.NewEncoder(file)
for _, kv := range buckets[reduceID] {
if err := enc.Encode(&kv); err != nil {
log.Fatalf("cannot encode kv: %v", err)
}
}
file.Close()
}
}
// 通知 Coordinator Reduce 任务完成
func finishReduce(taskID int) {
args := FinishReduceArgs{TaskID: taskID}
reply := FinishReduceReply{}
call("Coordinator.FinishReduce", &args, &reply)
}
// 执行 Reduce 任务
func doReduceTask(
task AskTaskReply,
reducef func(string, []string) string,
) {
intermediate := []KeyValue{}
// 读取所有 Map 输出文件
for mapID := 0; mapID < task.NMap; mapID++ {
filename := fmt.Sprintf("mr-%d-%d", mapID, task.TaskID)
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
intermediate = append(intermediate, kv)
}
file.Close()
}
// 按键排序
sort.Sort(ByKey(intermediate))
// 写入最终输出文件 mr-out-Y
oname := fmt.Sprintf("mr-out-%d", task.TaskID)
ofile, err := os.Create(oname)
if err != nil {
log.Fatalf("cannot create %v", oname)
}
// 合并相同 Key 的值并调用 Reduce 函数
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
ofile.Close()
}
// RPC 调用 Coordinator
func call(rpcname string, args interface{}, reply interface{}) bool {
c, err := rpc.DialHTTP("unix", coordSockName)
if err != nil {
log.Fatalf("dialing: %v", err)
}
defer c.Close()
if err := c.Call(rpcname, args, reply); err == nil {
return true
}
log.Printf("%d: call failed err %v", os.Getpid(), err)
return false
}coordinator.go
go
package mr
import (
"log"
"net"
"net/http"
"net/rpc"
"os"
"sync"
"time"
)
// 任务状态
type TaskState int
const (
Idle TaskState = iota // 空闲
InProgress // 进行中
Completed // 已完成
)
// 任务信息
type Task struct {
Filename string
State TaskState
StartTime time.Time
}
// 执行阶段
type Phase int
const (
MapPhase Phase = iota // Map 阶段
ReducePhase // Reduce 阶段
DonePhase // 完成
)
// Coordinator 任务协调器
type Coordinator struct {
mu sync.Mutex
phase Phase
mapTasks []Task
reduceTasks []Task
nMap int
nReduce int
}
// 启动 RPC 服务器
func (c *Coordinator) server(sockname string) {
rpc.Register(c)
rpc.HandleHTTP()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatalf("listen error %s: %v", sockname, e)
}
go http.Serve(l, nil)
}
// 是否所有任务已完成
func (c *Coordinator) Done() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.phase == DonePhase
}
// 创建 Coordinator
func MakeCoordinator(sockname string, files []string, nReduce int) *Coordinator {
c := Coordinator{
phase: MapPhase,
nReduce: nReduce,
nMap: len(files),
}
for _, filename := range files {
c.mapTasks = append(c.mapTasks, Task{
Filename: filename,
State: Idle,
})
}
for i := 0; i < nReduce; i++ {
c.reduceTasks = append(c.reduceTasks, Task{
State: Idle,
})
}
c.server(sockname)
return &c
}
// 检查 Map 任务超时,超时则重置为空闲
func (c *Coordinator) checkMapTimeout() {
for i := range c.mapTasks {
if c.mapTasks[i].State == InProgress &&
time.Since(c.mapTasks[i].StartTime) > 10*time.Second {
c.mapTasks[i].State = Idle
}
}
}
// 检查 Reduce 任务超时
func (c *Coordinator) checkReduceTimeout() {
for i := range c.reduceTasks {
if c.reduceTasks[i].State == InProgress &&
time.Since(c.reduceTasks[i].StartTime) > 10*time.Second {
c.reduceTasks[i].State = Idle
}
}
}
// Worker 请求任务
func (c *Coordinator) AskTask(args *AskTaskArgs, reply *AskTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.phase == MapPhase {
c.checkMapTimeout()
for i := range c.mapTasks {
if c.mapTasks[i].State == Idle {
c.mapTasks[i].State = InProgress
c.mapTasks[i].StartTime = time.Now()
reply.TaskType = MapTask
reply.TaskID = i
reply.Filename = c.mapTasks[i].Filename
reply.NReduce = c.nReduce
reply.NMap = len(c.mapTasks)
return nil
}
}
}
if c.allMapDone() {
c.phase = ReducePhase
} else {
reply.TaskType = WaitTask
return nil
}
if c.phase == ReducePhase {
c.checkReduceTimeout()
for i := range c.reduceTasks {
if c.reduceTasks[i].State == Idle {
c.reduceTasks[i].State = InProgress
c.reduceTasks[i].StartTime = time.Now()
reply.TaskType = ReduceTask
reply.TaskID = i
reply.NReduce = c.nReduce
reply.NMap = c.nMap
return nil
}
}
}
if c.allReduceDone() {
c.phase = DonePhase
reply.TaskType = ExitTask
return nil
} else {
reply.TaskType = WaitTask
return nil
}
}
// 所有 Map 任务是否已完成
func (c *Coordinator) allMapDone() bool {
for _, task := range c.mapTasks {
if task.State != Completed {
return false
}
}
return true
}
// 所有 Reduce 任务是否已完成
func (c *Coordinator) allReduceDone() bool {
for _, task := range c.reduceTasks {
if task.State != Completed {
return false
}
}
return true
}
// Worker 通知 Reduce 任务完成
func (c *Coordinator) FinishReduce(
args *FinishReduceArgs,
reply *FinishReduceReply,
) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.reduceTasks[args.TaskID].State == InProgress {
c.reduceTasks[args.TaskID].State = Completed
}
return nil
}
// Worker 通知 Map 任务完成
func (c *Coordinator) FinishMap(
args *FinishMapArgs,
reply *FinishMapReply,
) error {
c.mu.Lock()
defer c.mu.Unlock()
c.mapTasks[args.TaskID].State = Completed
return nil
}程序流程

测试脚本
go
package mr
import (
//"log"
"testing"
//"time"
)
// 测试 WordCount
func TestWc(t *testing.T) {
mkOut()
t.Cleanup(func() {
if !t.Failed() {
cleanup()
}
})
app := "../../mrapps/wc.so"
files := findFilesPre("../main", "pg*txt", "..")
mkCorrectOutput(files, app, "mr-wc-correct.txt")
runMR(files, app, 3)
mergeOutput("mr-wc-all.txt")
runCmp(t, "mr-wc-all.txt", "mr-wc-correct.txt",
"incorrect combined reduce output: mr-wc-all.txt vs mr-wc-correct.txt")
}
// 测试 Indexer
func TestIndexer(t *testing.T) {
mkOut()
t.Cleanup(func() {
if !t.Failed() {
cleanup()
}
})
app := "../../mrapps/indexer.so"
files := findFilesPre("../main", "pg*txt", "..")
mkCorrectOutput(files, app, "mr-indexer-correct.txt")
runMR(files, app, 2)
mergeOutput("mr-indexer-all.txt")
runCmp(t, "mr-indexer-all.txt", "mr-indexer-correct.txt",
"incorrect combined reduce output: mr-indexer-all.txt vs mr-indexer-correct.txt")
}
// Map 任务是否并行执行
func TestMapParallel(t *testing.T) {
mkOut()
t.Cleanup(func() {
if !t.Failed() {
cleanup()
}
})
app := "../../mrapps/mtiming.so"
files := findFilesPre("../main", "pg*txt", "..")
runMR(files, app, 2)
files = findFiles(tmp, "mr-out*")
if n := countPattern(files, "times-"); n != 2 {
t.Fatalf("saw %d worker(s) instead of 2", n)
}
if n := countPattern(files, "parallel.*"); n == 0 {
t.Fatalf("map workers did not run in parallel")
}
}
// Reduce 任务是否并行执行
func TestReduceParallel(t *testing.T) {
mkOut()
t.Cleanup(func() {
if !t.Failed() {
cleanup()
}
})
app := "../../mrapps/rtiming.so"
files := findFilesPre("../main", "pg*txt", "..")
runMR(files, app, 2)
files = findFiles(tmp, "mr-out*")
if n := countPattern(files, "[a-z] 2"); n < 2 {
t.Fatalf("too few parallel reduces")
}
}
// Map 任务执行次数是否正确
func TestJobCount(t *testing.T) {
mkOut()
t.Cleanup(func() {
if !t.Failed() {
cleanup()
}
})
app := "../../mrapps/jobcount.so"
files := findFilesPre("../main", "pg*txt", "..")
runMR(files, app, 4)
files = findFiles(tmp, "mr-out*")
if n := countPattern(files, "a 8"); n != 1 {
t.Fatalf("map jobs ran incorrect number of times")
}
}
// Worker/Coordinator 是否在任务完成前提前退出
func TestEarlyExit(t *testing.T) {
const N = 4
mkOut()
t.Cleanup(func() {
if !t.Failed() {
cleanup()
}
})
app := "../../mrapps/early_exit.so"
files := findFilesPre("../main", "pg*txt", "..")
c := make(chan int)
c0 := make(chan bool)
go func(c chan int) {
for i := 0; i < N+1; i++ {
<-c
if i == 0 {
mergeOutput("mr-wc-initial.txt")
}
}
c0 <- true
}(c)
runMRchan(files, app, N, c, coordinatorSock())
<-c0
files = findFiles(tmp, "mr-out*")
mergeOutput("mr-wc-final.txt")
runCmp(t, "mr-wc-initial.txt", "mr-wc-final.txt",
"reduce output changed: mr-wc-initial.txt vs mr-wc-final.txt")
}
// Worker 崩溃后是否能正确恢复
func TestCrashWorker(t *testing.T) {
const N = 3
mkOut()
t.Cleanup(func() {
if !t.Failed() {
cleanup()
}
})
files := findFilesPre("../main", "pg*txt", "..")
mkCorrectOutput(files, "../../mrapps/nocrash.so", "mr-crash-correct.txt")
app := "../../mrapps/crash.so"
c := make(chan int)
sock := coordinatorSock()
go func(c chan int) {
for true {
w := <-c
if w == N {
return
}
startWorker(app, w, c, sock)
}
}(c)
runMRchan(files, app, N, c, sock)
files = findFiles(tmp, "mr-out*")
mergeOutput("mr-crash-all.txt")
runCmp(t, "mr-crash-all.txt", "mr-crash-correct.txt",
"incorrect combined reduce output: mr-crash-all.txt vs mr-crash-correct.txt")
}测试结果
