Skip to content

大致介绍

Lab1 原文链接🔗

Lab1 里需要实现一个最小可用的 MapReduce 系统,由两个核心组件组成:

  • Coordinator 协调器 旧版:master
  • Worker 工作节点

整体结构:

  • Coordinator 负责分发任务、记录任务状态、处理超时重试
  • Worker 负责向 Coordinator 请求任务、执行 Map/Reduce 函数、写入中间文件
  • 中间文件包括 "mr-%d-%d" 和 "mr-out-%d"
  • 所有通信通过 RPC 完成。

GitHub:该实验不适合在 windows 系统完成,适配 macOS 和 Unix

bash
git clone git://g.csail.mit.edu/6.5840-golabs-2026 6.5840
cd 6.5840
ls
Makefile src
$

测试 WordCount 任务:

bash
cd ~/6.5840
cd src/main
go build -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run mrsequential.go wc.so pg*.txt
sort mr-out-0
A 509
ABOUT 2
ACT 8
ACTRESS 1 
...

具体代码

rpc.go

go
package mr

import (
	"os"
	"strconv"
)

// 任务类型
type TaskType int

const (
	MapTask    TaskType = iota // Map
	ReduceTask                 // Reduce
	WaitTask                   // 等待
	ExitTask                   // 退出
)

// Worker 请求任务
type AskTaskArgs struct {
}

// Coordinator 分配任务
type AskTaskReply struct {
	TaskType TaskType // 任务类型

	TaskID   int    // 任务 ID
	Filename string // 输入文件名

	NReduce int // Reduce 分区数
	NMap    int // Map 总数
}

// Worker 完成 Map 任务
type FinishMapArgs struct {
	TaskID int
}

type FinishMapReply struct {
}

// Worker 完成 Reduce 任务
type FinishReduceArgs struct {
	TaskID int
}

type FinishReduceReply struct {
}

// Coordinator 的 Unix 套接字地址
func CoordinatorSock() string {
	s := "/var/tmp/5840-mr-"
	s += strconv.Itoa(os.Getuid())
	return s
}

worker.go

go
package mr

import (
	"encoding/json"
	"fmt"
	"hash/fnv"
	"io"
	"log"
	"net/rpc"
	"os"
	"sort"
	"time"
)

// KeyValue 排序
type ByKey []KeyValue

func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

// Map 输出的键值对
type KeyValue struct {
	Key   string
	Value string
}

// 哈希函数,用于决定 Key 分配到哪个 Reduce
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}

var coordSockName string // Coordinator 套接字路径

// 向 Coordinator 请求任务
func askTask() AskTaskReply {
	args := AskTaskArgs{}
	reply := AskTaskReply{}

	ok := call("Coordinator.AskTask", &args, &reply)
	if !ok {
		reply.TaskType = ExitTask
	}
	return reply
}

// Worker 主循环
func Worker(sockname string,
	mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	coordSockName = sockname
	for {
		task := askTask()

		switch task.TaskType {
		case MapTask:
			doMapTask(task, mapf)
			FinishMap(task.TaskID)
		case ReduceTask:
			doReduceTask(task, reducef)
			finishReduce(task.TaskID)
		case WaitTask:
			time.Sleep(time.Second)
		case ExitTask:
			return
		}
	}
}

// 通知 Coordinator Map 任务完成
func FinishMap(taskID int) {
	args := FinishMapArgs{TaskID: taskID}
	reply := FinishMapReply{}
	call("Coordinator.FinishMap", &args, &reply)
}

// 执行 Map 任务
func doMapTask(
	task AskTaskReply,
	mapf func(string, string) []KeyValue,
) {
	file, err := os.Open(task.Filename)
	if err != nil {
		log.Fatalf("cannot open %v", task.Filename)
	}

	content, err := io.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", task.Filename)
	}
	file.Close()

	// 执行用户 Map 函数
	kva := mapf(task.Filename, string(content))

	// 按 Reduce 分区
	buckets := make([][]KeyValue, task.NReduce)
	for _, kv := range kva {
		reduceID := ihash(kv.Key) % task.NReduce
		buckets[reduceID] = append(buckets[reduceID], kv)
	}

	// 写入中间文件 mr-X-Y
	for reduceID := 0; reduceID < task.NReduce; reduceID++ {
		filename := fmt.Sprintf("mr-%d-%d", task.TaskID, reduceID)
		file, err := os.Create(filename)
		if err != nil {
			log.Fatalf("cannot create %v", filename)
		}

		enc := json.NewEncoder(file)
		for _, kv := range buckets[reduceID] {
			if err := enc.Encode(&kv); err != nil {
				log.Fatalf("cannot encode kv: %v", err)
			}
		}
		file.Close()
	}
}

// 通知 Coordinator Reduce 任务完成
func finishReduce(taskID int) {
	args := FinishReduceArgs{TaskID: taskID}
	reply := FinishReduceReply{}
	call("Coordinator.FinishReduce", &args, &reply)
}

// 执行 Reduce 任务
func doReduceTask(
	task AskTaskReply,
	reducef func(string, []string) string,
) {
	intermediate := []KeyValue{}

	// 读取所有 Map 输出文件
	for mapID := 0; mapID < task.NMap; mapID++ {
		filename := fmt.Sprintf("mr-%d-%d", mapID, task.TaskID)
		file, err := os.Open(filename)
		if err != nil {
			log.Fatalf("cannot open %v", filename)
		}

		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			intermediate = append(intermediate, kv)
		}
		file.Close()
	}

	// 按键排序
	sort.Sort(ByKey(intermediate))

	// 写入最终输出文件 mr-out-Y
	oname := fmt.Sprintf("mr-out-%d", task.TaskID)
	ofile, err := os.Create(oname)
	if err != nil {
		log.Fatalf("cannot create %v", oname)
	}

	// 合并相同 Key 的值并调用 Reduce 函数
	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}

		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}

		output := reducef(intermediate[i].Key, values)
		fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
		i = j
	}

	ofile.Close()
}

// RPC 调用 Coordinator
func call(rpcname string, args interface{}, reply interface{}) bool {
	c, err := rpc.DialHTTP("unix", coordSockName)
	if err != nil {
		log.Fatalf("dialing: %v", err)
	}
	defer c.Close()

	if err := c.Call(rpcname, args, reply); err == nil {
		return true
	}
	log.Printf("%d: call failed err %v", os.Getpid(), err)
	return false
}

coordinator.go

go
package mr

import (
	"log"
	"net"
	"net/http"
	"net/rpc"
	"os"
	"sync"
	"time"
)

// 任务状态
type TaskState int

const (
	Idle       TaskState = iota // 空闲
	InProgress                  // 进行中
	Completed                   // 已完成
)

// 任务信息
type Task struct {
	Filename  string
	State     TaskState
	StartTime time.Time
}

// 执行阶段
type Phase int

const (
	MapPhase    Phase = iota // Map 阶段
	ReducePhase              // Reduce 阶段
	DonePhase                // 完成
)

// Coordinator 任务协调器
type Coordinator struct {
	mu sync.Mutex

	phase Phase

	mapTasks    []Task
	reduceTasks []Task

	nMap    int
	nReduce int
}

// 启动 RPC 服务器
func (c *Coordinator) server(sockname string) {
	rpc.Register(c)
	rpc.HandleHTTP()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatalf("listen error %s: %v", sockname, e)
	}
	go http.Serve(l, nil)
}

// 是否所有任务已完成
func (c *Coordinator) Done() bool {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.phase == DonePhase
}

// 创建 Coordinator
func MakeCoordinator(sockname string, files []string, nReduce int) *Coordinator {
	c := Coordinator{
		phase:   MapPhase,
		nReduce: nReduce,
		nMap:    len(files),
	}
	for _, filename := range files {
		c.mapTasks = append(c.mapTasks, Task{
			Filename: filename,
			State:    Idle,
		})
	}
	for i := 0; i < nReduce; i++ {
		c.reduceTasks = append(c.reduceTasks, Task{
			State: Idle,
		})
	}
	c.server(sockname)
	return &c
}

// 检查 Map 任务超时,超时则重置为空闲
func (c *Coordinator) checkMapTimeout() {
	for i := range c.mapTasks {
		if c.mapTasks[i].State == InProgress &&
			time.Since(c.mapTasks[i].StartTime) > 10*time.Second {
			c.mapTasks[i].State = Idle
		}
	}
}

// 检查 Reduce 任务超时
func (c *Coordinator) checkReduceTimeout() {
	for i := range c.reduceTasks {
		if c.reduceTasks[i].State == InProgress &&
			time.Since(c.reduceTasks[i].StartTime) > 10*time.Second {
			c.reduceTasks[i].State = Idle
		}
	}
}

// Worker 请求任务
func (c *Coordinator) AskTask(args *AskTaskArgs, reply *AskTaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	if c.phase == MapPhase {
		c.checkMapTimeout()
		for i := range c.mapTasks {
			if c.mapTasks[i].State == Idle {
				c.mapTasks[i].State = InProgress
				c.mapTasks[i].StartTime = time.Now()

				reply.TaskType = MapTask
				reply.TaskID = i
				reply.Filename = c.mapTasks[i].Filename
				reply.NReduce = c.nReduce
				reply.NMap = len(c.mapTasks)
				return nil
			}
		}
	}

	if c.allMapDone() {
		c.phase = ReducePhase
	} else {
		reply.TaskType = WaitTask
		return nil
	}

	if c.phase == ReducePhase {
		c.checkReduceTimeout()
		for i := range c.reduceTasks {
			if c.reduceTasks[i].State == Idle {
				c.reduceTasks[i].State = InProgress
				c.reduceTasks[i].StartTime = time.Now()

				reply.TaskType = ReduceTask
				reply.TaskID = i
				reply.NReduce = c.nReduce
				reply.NMap = c.nMap
				return nil
			}
		}
	}

	if c.allReduceDone() {
		c.phase = DonePhase
		reply.TaskType = ExitTask
		return nil
	} else {
		reply.TaskType = WaitTask
		return nil
	}
}

// 所有 Map 任务是否已完成
func (c *Coordinator) allMapDone() bool {
	for _, task := range c.mapTasks {
		if task.State != Completed {
			return false
		}
	}
	return true
}

// 所有 Reduce 任务是否已完成
func (c *Coordinator) allReduceDone() bool {
	for _, task := range c.reduceTasks {
		if task.State != Completed {
			return false
		}
	}
	return true
}

// Worker 通知 Reduce 任务完成
func (c *Coordinator) FinishReduce(
	args *FinishReduceArgs,
	reply *FinishReduceReply,
) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	if c.reduceTasks[args.TaskID].State == InProgress {
		c.reduceTasks[args.TaskID].State = Completed
	}
	return nil
}

// Worker 通知 Map 任务完成
func (c *Coordinator) FinishMap(
	args *FinishMapArgs,
	reply *FinishMapReply,
) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	c.mapTasks[args.TaskID].State = Completed
	return nil
}

程序流程

程序流程图

测试脚本

go
package mr

import (
	//"log"
	"testing"
	//"time"
)

// 测试 WordCount
func TestWc(t *testing.T) {
	mkOut()
	t.Cleanup(func() {
		if !t.Failed() {
			cleanup()
		}
	})
	app := "../../mrapps/wc.so"
	files := findFilesPre("../main", "pg*txt", "..")
	mkCorrectOutput(files, app, "mr-wc-correct.txt")
	runMR(files, app, 3)
	mergeOutput("mr-wc-all.txt")
	runCmp(t, "mr-wc-all.txt", "mr-wc-correct.txt",
		"incorrect combined reduce output: mr-wc-all.txt vs mr-wc-correct.txt")
}

// 测试 Indexer
func TestIndexer(t *testing.T) {
	mkOut()
	t.Cleanup(func() {
		if !t.Failed() {
			cleanup()
		}
	})
	app := "../../mrapps/indexer.so"
	files := findFilesPre("../main", "pg*txt", "..")
	mkCorrectOutput(files, app, "mr-indexer-correct.txt")
	runMR(files, app, 2)
	mergeOutput("mr-indexer-all.txt")
	runCmp(t, "mr-indexer-all.txt", "mr-indexer-correct.txt",
		"incorrect combined reduce output: mr-indexer-all.txt vs mr-indexer-correct.txt")
}

// Map 任务是否并行执行
func TestMapParallel(t *testing.T) {
	mkOut()
	t.Cleanup(func() {
		if !t.Failed() {
			cleanup()
		}
	})
	app := "../../mrapps/mtiming.so"
	files := findFilesPre("../main", "pg*txt", "..")
	runMR(files, app, 2)
	files = findFiles(tmp, "mr-out*")
	if n := countPattern(files, "times-"); n != 2 {
		t.Fatalf("saw %d worker(s) instead of 2", n)
	}
	if n := countPattern(files, "parallel.*"); n == 0 {
		t.Fatalf("map workers did not run in parallel")
	}
}

// Reduce 任务是否并行执行
func TestReduceParallel(t *testing.T) {
	mkOut()
	t.Cleanup(func() {
		if !t.Failed() {
			cleanup()
		}
	})
	app := "../../mrapps/rtiming.so"
	files := findFilesPre("../main", "pg*txt", "..")
	runMR(files, app, 2)
	files = findFiles(tmp, "mr-out*")
	if n := countPattern(files, "[a-z] 2"); n < 2 {
		t.Fatalf("too few parallel reduces")
	}
}

// Map 任务执行次数是否正确
func TestJobCount(t *testing.T) {
	mkOut()
	t.Cleanup(func() {
		if !t.Failed() {
			cleanup()
		}
	})
	app := "../../mrapps/jobcount.so"
	files := findFilesPre("../main", "pg*txt", "..")
	runMR(files, app, 4)
	files = findFiles(tmp, "mr-out*")
	if n := countPattern(files, "a 8"); n != 1 {
		t.Fatalf("map jobs ran incorrect number of times")
	}
}

//  Worker/Coordinator 是否在任务完成前提前退出
func TestEarlyExit(t *testing.T) {
	const N = 4
	mkOut()
	t.Cleanup(func() {
		if !t.Failed() {
			cleanup()
		}
	})
	app := "../../mrapps/early_exit.so"
	files := findFilesPre("../main", "pg*txt", "..")
	c := make(chan int)
	c0 := make(chan bool)
	go func(c chan int) {
		for i := 0; i < N+1; i++ {
			<-c
			if i == 0 {
				mergeOutput("mr-wc-initial.txt")
			}
		}
		c0 <- true
	}(c)
	runMRchan(files, app, N, c, coordinatorSock())
	<-c0
	files = findFiles(tmp, "mr-out*")
	mergeOutput("mr-wc-final.txt")
	runCmp(t, "mr-wc-initial.txt", "mr-wc-final.txt",
		"reduce output changed: mr-wc-initial.txt vs mr-wc-final.txt")
}

// Worker 崩溃后是否能正确恢复
func TestCrashWorker(t *testing.T) {
	const N = 3
	mkOut()
	t.Cleanup(func() {
		if !t.Failed() {
			cleanup()
		}
	})
	files := findFilesPre("../main", "pg*txt", "..")
	mkCorrectOutput(files, "../../mrapps/nocrash.so", "mr-crash-correct.txt")
	app := "../../mrapps/crash.so"
	c := make(chan int)
	sock := coordinatorSock()
	go func(c chan int) {
		for true {
			w := <-c
			if w == N {
				return
			}
			startWorker(app, w, c, sock)
		}
	}(c)
	runMRchan(files, app, N, c, sock)
	files = findFiles(tmp, "mr-out*")
	mergeOutput("mr-crash-all.txt")
	runCmp(t, "mr-crash-all.txt", "mr-crash-correct.txt",
		"incorrect combined reduce output: mr-crash-all.txt vs mr-crash-correct.txt")
}

测试结果

go test result