labs & notes on 6.824

Lab 1: MapReduce

MapReduce论文

概念

MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. Many real world tasks are expressible in this model, as shown in the paper.

Map, written by the user, takes an input pair and pro- duces a set of intermediate key/value pairs. The MapRe- duce library groups together all intermediate values asso- ciated with the same intermediate key I and passes them to the Reduce function.

The Reduce function, also written by the user, accepts an intermediate key I and a set of values for that key. It merges together these values to form a possibly smaller set of values. Typically just zero or one output value is produced per Reduce invocation. The intermediate val- ues are supplied to the user’s reduce function via an iter- ator. This allows us to handle lists of values that are too large to fit in memory.

应用

  • Distributed Grep
  • Count of URL Access Frequency
  • Reverse Web-Link Graph
  • Term-Vector per Host
  • Inverted Index
  • Distributed Sort

工作流程

截屏2022-02-24 下午4.32.50

  1. The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (con- trollable by the user via an optional parameter). It then starts up many copies of the program on a clus- ter of machines.

  2. One of the copies of the program is special – the master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or a reduce task.

  3. A worker who is assigned a map task reads the contents of the corresponding input split. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory.

  4. Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers.

  5. When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task. If the amount of intermediate data is too large to fit in memory, an external sort is used.

  6. The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user’s Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition.

  7. When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user program returns back to the user code.

Master

The master keeps several data structures. For each map task and reduce task, it stores the state (idle, in-progress, or completed), and the identity of the worker machine (for non-idle tasks).

For each completed map task, the master stores the locations and sizes of the R intermediate file regions produced by the map task.

Fault Tolerance

Worker Failure

The master pings every worker periodically. If no re- sponse is received from a worker in a certain amount of time, the master marks the worker as failed. Any map tasks completed by the worker are reset back to their initial idle state, and therefore become eligible for schedul- ing on other workers. Similarly, any map task or reduce task in progress on a failed worker is also reset to idle and becomes eligible for rescheduling.

Completed map tasks are re-executed on a failure be- cause their output is stored on the local disk(s) of the failed machine and is therefore inaccessible. Completed reduce tasks do not need to be re-executed since their output is stored in a global file system.

When a map task is executed first by worker A and then later executed by worker B (because A failed), all workers executing reduce tasks are notified of the re- execution. Any reduce task that has not already read the data from worker A will read the data from worker B.

Master Failure

It is easy to make the master write periodic checkpoints of the master data structures described above. If the mas- ter task dies, a new copy can be started from the last checkpointed state.

However, given that there is only a single master, its failure is unlikely; therefore our cur- rent implementation aborts the MapReduce computation if the master fails. Clients can check for this condition and retry the MapReduce operation if they desire.

Lab

Your job

Your job is to implement a distributed MapReduce, consisting of two programs, the coordinator and the worker. There will be just one coordinator process, and one or more worker processes executing in parallel. In a real system the workers would run on a bunch of different machines, but for this lab you'll run them all on a single machine. The workers will talk to the coordinator via RPC. Each worker process will ask the coordinator for a task, read the task's input from one or more files, execute the task, and write the task's output to one or more files. The coordinator should notice if a worker hasn't completed its task in a reasonable amount of time (for this lab, use ten seconds), and give the same task to a different worker.

A few rules

  • The map phase should divide the intermediate keys into buckets for nReduce reduce tasks, where nReduce is the number of reduce tasks -- argument that main/mrcoordinator.go passes to MakeCoordinator(). So, each mapper needs to create nReduce intermediate files for consumption by the reduce tasks.
  • The worker implementation should put the output of the X'th reduce task in the file mr-out-X.
  • A mr-out-X file should contain one line per Reduce function output. The line should be generated with the Go "%v %v" format, called with the key and value. Have a look in main/mrsequential.go for the line commented "this is the correct format". The test script will fail if your implementation deviates too much from this format.
  • You can modify mr/worker.go, mr/coordinator.go, and mr/rpc.go. You can temporarily modify other files for testing, but make sure your code works with the original versions; we'll test with the original versions.
  • The worker should put intermediate Map output in files in the current directory, where your worker can later read them as input to Reduce tasks.
  • main/mrcoordinator.go expects mr/coordinator.go to implement a Done() method that returns true when the MapReduce job is completely finished; at that point, mrcoordinator.go will exit.
  • When the job is completely finished, the worker processes should exit. A simple way to implement this is to use the return value from call(): if the worker fails to contact the coordinator, it can assume that the coordinator has exited because the job is done, and so the worker can terminate too. Depending on your design, you might also find it helpful to have a "please exit" pseudo-task that the coordinator can give to workers.

Hints

  • The Guidance page has some tips on developing and debugging.

  • One way to get started is to modify mr/worker.go's Worker() to send an RPC to the coordinator asking for a task. Then modify the coordinator to respond with the file name of an as-yet-unstarted map task. Then modify the worker to read that file and call the application Map function, as in mrsequential.go.

  • The application Map and Reduce functions are loaded at run-time using the Go plugin package, from files whose names end in .so.

  • If you change anything in the mr/ directory, you will probably have to re-build any MapReduce plugins you use, with something like go build -race -buildmode=plugin ../mrapps/wc.go

  • This lab relies on the workers sharing a file system. That's straightforward when all workers run on the same machine, but would require a global filesystem like GFS if the workers ran on different machines.

  • A reasonable naming convention for intermediate files is mr-X-Y, where X is the Map task number, and Y is the reduce task number.

  • The worker's map task code will need a way to store intermediate key/value pairs in files in a way that can be correctly read back during reduce tasks. One possibility is to use Go'sencoding/jsonpackage. To write key/value pairs in JSON format to an open file:

      enc := json.NewEncoder(file)
      for _, kv := ... {
        err := enc.Encode(&kv)
    

    and to read such a file back:

      dec := json.NewDecoder(file)
      for {
        var kv KeyValue
        if err := dec.Decode(&kv); err != nil {
          break
        }
        kva = append(kva, kv)
      }
    
  • The map part of your worker can use the ihash(key) function (in worker.go) to pick the reduce task for a given key.

  • You can steal some code from mrsequential.go for reading Map input files, for sorting intermedate key/value pairs between the Map and Reduce, and for storing Reduce output in files.

  • The coordinator, as an RPC server, will be concurrent; don't forget to lock shared data.

  • Use Go's race detector, with go build -race and go run -race. test-mr.sh by default runs the tests with the race detector.

  • Workers will sometimes need to wait, e.g. reduces can't start until the last map has finished. One possibility is for workers to periodically ask the coordinator for work, sleeping with time.Sleep() between each request. Another possibility is for the relevant RPC handler in the coordinator to have a loop that waits, either with time.Sleep() or sync.Cond. Go runs the handler for each RPC in its own thread, so the fact that one handler is waiting won't prevent the coordinator from processing other RPCs.

  • The coordinator can't reliably distinguish between crashed workers, workers that are alive but have stalled for some reason, and workers that are executing but too slowly to be useful. The best you can do is have the coordinator wait for some amount of time, and then give up and re-issue the task to a different worker. For this lab, have the coordinator wait for ten seconds; after that the coordinator should assume the worker has died (of course, it might not have).

  • If you choose to implement Backup Tasks (Section 3.6), note that we test that your code doesn't schedule extraneous tasks when workers execute tasks without crashing. Backup tasks should only be scheduled after some relatively long period of time (e.g., 10s).

  • To test crash recovery, you can use the mrapps/crash.go application plugin. It randomly exits in the Map and Reduce functions.

  • To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile to create a temporary file and os.Rename to atomically rename it.

  • test-mr.sh runs all its processes in the sub-directory mr-tmp, so if something goes wrong and you want to look at intermediate or output files, look there. You can temporarily modify test-mr.sh to exit after the failing test, so the script does not continue testing (and overwrite the output files).

  • test-mr-many.sh provides a bare-bones script for running test-mr.sh with a timeout (which is how we'll test your code). It takes as an argument the number of times to run the tests. You should not run several test-mr.sh instances in parallel because the coordinator will reuse the same socket, causing conflicts.

  • Go RPC sends only struct fields whose names start with capital letters. Sub-structures must also have capitalized field names.

  • When passing a pointer to a reply struct to the RPC system, the object that *reply points to should be zero-allocated. The code for RPC calls should always look like

      reply := SomeType{}
      call(..., &reply)
    

    without setting any fields of reply before the call. If you don't follow this requirement, there will be a problem when you pre-initialize a reply field to the non-default value for that datatype, and the server on which the RPC executes sets that reply field to the default value; you will observe that the write doesn't appear to take effect, and that on the caller side, the non-default value remains.

Similar Resources

Source code related to an on-site demonstration (for Ardan Labs) of packaging Go applications with Nix

Ardan Labs Nix Demo High-Level Overview We bumbled the scheduling of an earlier presentation about git worktree that a few co-workers attended. In eff

Oct 12, 2022

Labs from MIT's graduate-level Distributed Systems course

Labs from MIT's graduate-level Distributed Systems course Course website here Lab 1: MapReduce Lab 2: Raft Consensus Algorithm Lab 2A: Raft Leader Ele

Jun 20, 2022

Hands-on Labs on Microservices Architecture

Giới thiệu Khóa học Building Distributed Applications with Microservices sẽ giúp bạn tìm hiểu nhanh chóng về công nghệ Microservices để ứng dụng xây c

Jul 17, 2022

MIT 6.824: Distributed Systems

MIT 6.824 is a core 12-unit graduate subject with lectures, readings, programming labs, an optional project, a mid-term exam, and a final exam.

Jul 6, 2022

An easy-to-use Map Reduce Go parallel-computing framework inspired by 2021 6.824 lab1. It supports multiple workers on a single machine right now.

MapReduce This is an easy-to-use Map Reduce Go framework inspired by 2021 6.824 lab1. Feature Multiple workers on single machine right now. Easy to pa

Dec 5, 2022

mit 6.824 lab

mit 6.824 lab

lab-6.824 0. How to run? go版本: 1.13+ 在高于1.11的版本中报unexpected directory layout, 但是用较低版本goLand无法调试... unexpected dir layout 原因是不支持相对路径包引入, 遇到时在import 删掉.

Dec 9, 2021

learn mit 6.824 lab

MIT6.824 lab1 MapReduce timeout: command not found - brew install coreutils panic data race - 加锁 内层变量会屏蔽外部同名变量 test1 word-count 测试基本功能 test2 indexer

Jan 5, 2022

My code of the course mit6.824

MIT6.824 Lab1 Rules 最后文件需要输出nReduce个,文件名格式为mr-out-X 输出到文件的格式在mrsequential.go中 只用写worker.go/coordinator.go/rpc.go这三个文件 worker将中间文件输出到当前文件夹下,之后worker执行r

Dec 27, 2022

MIT6.824 Distributed Systems

MIT6.824-Distributed-Systems My Solutions for MIT6.824

Jan 28, 2022

6.824-Raft - There are three roles in Raft algorithm, Follower, Candidate, Leader, each node store currentTerm, votedFor and log

6.824-Raft Raft note There are three roles in Raft algorithm, Follower, Candidat

Feb 3, 2022

Implement based on course of mit_6.824 and raft paper

Mit6.824_raft implement based on course of mit_6.824 and raft paper ##Mit 6.824 ###2A leader election Implement Raft leader election and heartbeats (A

Feb 13, 2022

MIT 6.824: Distributed Systems (Spring 2020)

MIT6.824 MIT 6.824: Distributed Systems (Spring 2020) Lab 1 Lab 2 Lab 2A Lab 2B Lab 2C Lab 2D Lab 3 Lab 3A Lab 3B Lab 4 Lab 4A Lab 4B Lab 4 Challenge

Dec 26, 2022

:notes: Minimalist websocket framework for Go

:notes: Minimalist websocket framework for Go

melody 🎶 Minimalist websocket framework for Go. Melody is websocket framework based on github.com/gorilla/websocket that abstracts away the tedious p

Dec 30, 2022

textnote is a command line tool for quickly creating and managing daily plain text notes.

textnote is a command line tool for quickly creating and managing daily plain text notes. It is designed for ease of use to encourage the practice of daily, organized note taking. textnote intentionally facilitates only the management (creation, opening, organizing, and consolidated archiving) of notes, following the philosophy that notes are best written in a text editor and not via a CLI.

Jan 2, 2023

:notes: Minimalist websocket framework for Go

:notes: Minimalist websocket framework for Go

melody 🎶 Minimalist websocket framework for Go. Melody is websocket framework based on github.com/gorilla/websocket that abstracts away the tedious p

Dec 23, 2022

A simple Git Notes Key Value store

Gino Keva - Git Notes Key Values Gino Keva works as a simple Key Value store built on top of Git Notes, using an event sourcing architecture. Events a

Aug 14, 2022

Automation Tool to auto generate markdown notes from online classes/talks/presentations.

Automation Tool to auto generate markdown notes from online classes/talks/presentations.

autonotes Automation tool to autocapture screenshots and join them with a supplied .srt or .txt file and output a notes file in markdown. Problem? Wat

Aug 29, 2021

Encrypt your files or notes by your GPG key and save to MinIO or Amazon S3 easily!

Encrypt your files or notes by your GPG key and save to MinIO or Amazon S3 easily!

Super Dollop Super Dollop can encrypt your files and notes by your own GPG key and save them in S3 or minIO to keep them safe and portability, also yo

Jul 11, 2022

A music programming language for musicians. :notes:

Installation | Docs | Changelog | Contributing composers chatting Alda is a text-based programming language for music composition. It allows you to co

Dec 30, 2022
mit 6.824 lab
mit 6.824 lab

lab-6.824 0. How to run? go版本: 1.13+ 在高于1.11的版本中报unexpected directory layout, 但是用较低版本goLand无法调试... unexpected dir layout 原因是不支持相对路径包引入, 遇到时在import 删掉.

Dec 9, 2021
learn mit 6.824 lab

MIT6.824 lab1 MapReduce timeout: command not found -> brew install coreutils panic data race -> 加锁 内层变量会屏蔽外部同名变量 test1 word-count 测试基本功能 test2 indexer

Jan 5, 2022
My code of the course mit6.824

MIT6.824 Lab1 Rules 最后文件需要输出nReduce个,文件名格式为mr-out-X 输出到文件的格式在mrsequential.go中 只用写worker.go/coordinator.go/rpc.go这三个文件 worker将中间文件输出到当前文件夹下,之后worker执行r

Dec 27, 2022
Implement based on course of mit_6.824 and raft paper

Mit6.824_raft implement based on course of mit_6.824 and raft paper ##Mit 6.824 ###2A leader election Implement Raft leader election and heartbeats (A

Feb 13, 2022
MIT 6.824: Distributed Systems (Spring 2020)

MIT6.824 MIT 6.824: Distributed Systems (Spring 2020) Lab 1 Lab 2 Lab 2A Lab 2B Lab 2C Lab 2D Lab 3 Lab 3A Lab 3B Lab 4 Lab 4A Lab 4B Lab 4 Challenge

Dec 26, 2022
A tool to quickly dump notes, todos, snippets, ...

brain_dump brain_dump allows to quickly dump notes, todos, snippets, ... in text files using Golang text/template and functions from the Sprig project

Feb 21, 2022
My solutions to labs of MIT 6.824: Distributed Systems.

MIT 6.824 Distributed Systems Labs

Dec 30, 2021
Labs, solutions and related materials from the MIT 6.824 Distributed Systems course.
Labs, solutions and related materials from the MIT 6.824 Distributed Systems course.

MIT 6.824 Distributed Systems Labs, solutions and related materials from the MIT 6.824 Distributed Systems course. Overview From the official website:

Nov 5, 2022
Labs for MIT 6.824 Distributed Systems (Spring 2020)

6.824-2020-labs This repo contains labs for MIT 6.824 Distributed Systems (Spring 2020) The master branch contains the source code with no implementat

Jan 18, 2022
Training materials and labs for a "Getting Started" level course on COBOL

COBOL Programming Course This project is a set of training materials and labs for COBOL on z/OS. The following books are available within this reposit

Dec 30, 2022