N 个人去一个有 m 个坑位的大公共卫生间做清洁,该如何调度?这个实验的目标是实现一个分布式的 MapReduce 系统,包括协调器和工作进程,并确保它能正确处理并行任务、容错等情况。
<!-- more -->
## 运行
```bash
rm mr-out*
go run -race mrcoordinator.go pg-*.txt
go run -race mrworker.go wc.so
cat mr-out-* | sort | more
```
下面是典型的运行与测试 Lab 1 的步骤(假设你已经根据指导编写好 `mr/coordinator.go`, `mr/worker.go` 并有对应的代码实现):
1. **编译插件(应用代码)**
首先需要编译你要运行的 MapReduce 应用插件(比如 word count 程序)。进入 `6.824/src/main` 目录,执行:
```bash
cd ~/6.824/src/main
go build -race -buildmode=plugin ../mrapps/wc.go
```
成功后会生成 `wc.so` 文件。
2. **清理旧的输出文件**
运行前先确保没有残留的输出文件:
```bash
rm mr-out*
```
3. **启动 coordinator**
在同一个目录下运行 `mrcoordinator.go`,并传入输入文件列表(比如 `pg-xxx.txt` 文件集):
```bash
go run -race mrcoordinator.go pg-*.txt
```
该命令会启动一个 coordinator 进程,并监听来自 worker 的 RPC 请求。
4. **启动一个或多个 worker**
在另一个终端窗口中,同样在 `src/main` 目录下,启动 worker 进程,并加载你之前编译好的插件 `wc.so`:
```bash
go run -race mrworker.go wc.so
```
你可以多开几个终端窗口再运行数个相同命令,以模拟多个并发 worker。如果你的实现正确,coordinator 会分配任务给这些 worker,它们会处理完任务并写出最终的 `mr-out-X` 文件。
5. **查看输出结果**
当任务全部完成后(coordinator 进程会自动退出),你可以查看输出结果:
```bash
cat mr-out-* | sort | more
```
结果应与 `mrsequential.go` 单机版的输出一致。
6. **运行测试脚本**(可选)
为了验证你的实现正确性,你还可以使用官方提供的测试脚本 `test-mr.sh`:
```bash
bash test-mr.sh
```
此脚本会依次测试 word-count、indexer、并行度测试以及崩溃恢复测试。如果全部通过,会显示:
```Java
*** PASSED ALL TESTS
```
**总结**:
要取得实际执行的效果,你需要先编译插件,然后先启动 coordinator(给出输入文件),再启动 worker(加载插件),让 worker 请求并执行任务。最后,查看 `mr-out-*` 文件或运行官方测试脚本来验证结果。
## 资料
[6.5840 Lab 1: MapReduce (mit.edu)](https://pdos.csail.mit.edu/6.824/labs/lab-mr.html)
## 思想
[MapReduce](MapReduce.md)
## 入参
一些文章
```Java
.
├── pg-being_ernest.txt
├── pg-dorian_gray.txt
├── pg-frankenstein.txt
├── pg-grimm.txt
├── pg-huckleberry_finn.txt
├── pg-metamorphosis.txt
├── pg-sherlock_holmes.txt
├── pg-tom_sawyer.txt
```
## 处理
协调器和工作者的"main"例程位于 main/mrcoordinator.go 和 main/mrworker.go 中;不要更改这些文件。您应该将您的实现放在 mr/coordinator.go 、 mr/worker.go 和 mr/rpc.go 中。
## 出参
这些文章里单词的出现次数
## 代码流程
让我按照执行顺序来解释 MapReduce 的整个流程:
1. **初始化阶段**:
```go
// 1. Coordinator 启动并初始化
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
files: files, // 输入文件列表
nReduce: nReduce, // reduce任务数量
nMap: len(files), // map任务数量
// ... 初始化各种channel
}
c.server() // 启动RPC服务
go c.schedule() // 启动调度器
return &c
}
```
2. **Map 阶段**:
```go
// 2.1 Worker不断发送心跳请求任务
func Worker(mapF func(string, string) []KeyValue, reduceF func(string, []string) string) {
for {
response := doHeartbeat() // 请求任务
switch response.JobType {
case MapJob:
doMapTask(mapF, response) // 执行Map任务
// ...
}
}
}
// 2.2 执行Map任务
func doMapTask(mapF func(string, string) []KeyValue, response *HeartbeatResponse) {
// a. 读取输入文件
content, _ := ioutil.ReadAll(file)
// b. 调用用户定义的Map函数(wc.go中的Map)
kva := mapF(fileName, string(content))
// c. 按照key的hash值将结果分成nReduce份
intermediates := make([][]KeyValue, response.NReduce)
for _, kv := range kva {
index := iHash(kv.Key) % response.NReduce
intermediates[index] = append(intermediates[index], kv)
}
// d. 将中间结果写入文件
// 每个Map任务会生成nReduce个中间文件
}
```
3. **Reduce 阶段**:
```go
// 3.1 所有Map任务完成后,Coordinator切换到Reduce阶段
func (c *Coordinator) schedule() {
// ... Map阶段完成后
c.initReducePhase()
}
// 3.2 Worker执行Reduce任务
func doReduceTask(reduceF func(string, []string) string, response *HeartbeatResponse) {
// a. 读取所有相关的中间文件
for i := 0; i < response.NMap; i++ {
// 读取每个Map任务为当前Reduce任务生成的中间文件
}
// b. 按key归组
results := make(map[string][]string)
for _, kv := range kva {
results[kv.Key] = append(results[kv.Key], kv.Value)
}
// c. 对每个key调用用户定义的Reduce函数(wc.go中的Reduce)
for key, values := range results {
output := reduceF(key, values)
// 写入最终结果
}
}
```
4. **任务协调**:
```go
// Coordinator通过心跳机制分配任务
func (c *Coordinator) selectTask(response *HeartbeatResponse) bool {
// 1. 检查是否有空闲任务
// 2. 检查是否有超时需要重新执行的任务
// 3. 分配任务给Worker
// 4. 检查是否所有任务都完成
}
```
关键流程:
1. Coordinator 初始化并加载输入文件
2. Worker 通过心跳获取任务
3. Map 任务处理单个输入文件,产生中间文件
4. Reduce 任务处理所有相关的中间文件,生成最终输出
5. 整个过程通过 RPC 进行通信和协调
这个实现类似于 Google 的 MapReduce 论文,但简化了一些。
## 文件解读
```sql
(base) liuyishou@MacBook-Pro-3 src % tree
.
├── go.mod
├── go.sum
├── kvraft
│ ├── client.go
│ ├── common.go
│ ├── config.go
│ ├── server.go
│ └── test_test.go
├── kvsrv
│ ├── client.go
│ ├── common.go
│ ├── config.go
│ ├── server.go
│ └── test_test.go
├── labgob
│ ├── labgob.go
│ └── test_test.go
├── labrpc
│ ├── labrpc.go
│ └── test_test.go
├── main
│ ├── diskvd.go
│ ├── lockc.go
│ ├── lockd.go
│ ├── pg-being_ernest.txt 0 pg*.txt是需要被统计单词数的原文件
│ ├── pg-dorian_gray.txt
│ ├── pg-frankenstein.txt
│ ├── pg-grimm.txt
│ ├── pg-huckleberry_finn.txt
│ ├── pg-metamorphosis.txt
│ ├── pg-sherlock_holmes.txt
│ ├── pg-tom_sawyer.txt
│ ├── wc.so 0 单词计数(Word Count)的MapReduce应用,它被编译成了一个Go插件。
│ ├── mrsequential.go 0 运行的是一个单体(单机)版本的 MapReduce。
│ ├── mr-out-0 0 是示例程序`go run mrsequential.go wc.so pg*.txt`的输出结果。
│ ├── mrcoordinator.go 1 多 worker 版本的 mapReduce
│ ├── mrworker.go
│ ├── pbc.go
│ ├── pbd.go
│ ├── test-mr-many.sh
│ ├── viewd.go
│ └── test-mr.sh 2 打分的脚本,用来测试你的作业的完成情况
├── models
│ └── kv.go
├── mr
│ ├── coordinator.go
│ ├── rpc.go
│ └── worker.go
├── mrapps
│ ├── crash.go
│ ├── early_exit.go
│ ├── indexer.go
│ ├── jobcount.go
│ ├── mtiming.go
│ ├── nocrash.go
│ ├── rtiming.go
│ └── wc.go
├── porcupine
│ ├── bitset.go
│ ├── checker.go
│ ├── model.go
│ ├── porcupine.go
│ └── visualization.go
├── raft
│ ├── config.go
│ ├── persister.go
│ ├── raft.go
│ ├── test_test.go
│ └── util.go
├── shardctrler
│ ├── client.go
│ ├── common.go
│ ├── config.go
│ ├── server.go
│ └── test_test.go
└── shardkv
├── client.go
├── common.go
├── config.go
├── server.go
└── test_test.go
```