the code we give you is missing two crucial pieces: the function that divides up the output of a map task, and the function that gathers all the inputs for a reduce task. These tasks are carried out by the doMap() function in common_map.go, and the doReduce() function in common_reduce.go
funcdoReduce( jobName string, // the name of the whole MapReduce job reduceTask int, // which reduce task this is outFile string, // write the output here nMap int, // the number of map tasks that were run ("M" in the paper) reduceF func(key string, values []string)string, ) { //创建输出文件 fileStream, err := os.Create(outFile) if err != nil { log.Fatal("create file fail") return } defer fileStream.Close() enc := json.NewEncoder(fileStream) // 读取中间文件数据,利用map数据结构实现key值相同的value聚合 inputData := make(map[string][]string) for m := 0; m < nMap; m++ { filename := reduceName(jobName, m, reduceTask) inputFileStream, err := os.Open(filename) if err != nil { log.Fatal("open input file fail") return } dec := json.NewDecoder(inputFileStream) for { // var kv MapOutPutType version 1,2 var kv KeyValue err = dec.Decode(&kv) if err != nil { break }
// inputData[kv.Key] = append(inputData[kv.Key], kv.Value...) version 1,2 inputData[kv.Key] = append(inputData[kv.Key], kv.Value) // version 3 } inputFileStream.Close() } // 写入目标文件 for k, v := range inputData { res := reduceF(k,v) enc.Encode(KeyValue{k,res}) }
Now you will implement word count — a simple Map/Reduce example. Look in main/wc.go; you’ll find empty mapF() and reduceF() functions. Your job is to insert code so that wc.go reports the number of occurrences of each word in its input. A word is any contiguous sequence of letters, as determined by unicode.IsLetter.
funcmapF(filename string, contents string) []mapreduce.KeyValue { // 定义分割函数 spiltFunc := func(r rune)bool { return !unicode.IsLetter(r) } words := strings.FieldsFunc(contents, spiltFunc) var res []mapreduce.KeyValue for _, word := range words { res = append(res, mapreduce.KeyValue{word,"1"}) } return res }
// rune is an alias for int32 and is equivalent to int32 in all ways. It is // used, by convention, to distinguish character values from integer values.
Your job is to implement schedule() in mapreduce/schedule.go. The master calls schedule() twice during a MapReduce job, once for the Map phase, and once for the Reduce phase. schedule()‘s job is to hand out tasks to the available workers. There will usually be more tasks than worker threads, so schedule() must give each worker a sequence of tasks, one at a time. schedule() should wait until all tasks have completed, and then return.
In this part you will make the master handle failed workers. MapReduce makes this relatively easy because workers don’t have persistent state. If a worker fails while handling an RPC from the master, the master’s call() will eventually return false due to a timeout. In that situation, the master should re-assign the task given to the failed worker to another worker.
实际上只通过一个循环判断就能实现
1 2 3 4
for res == false { workerAddress = <-registerChan res = call(workerAddress, "Worker.DoTask", arg, nil) }
part5
Inverted indices are widely used in computer science, and are particularly useful in document searching. Broadly speaking, an inverted index is a map from interesting facts about the underlying data, to the original location of that data. For example, in the context of search, it might be a map from keywords to documents that contain those words
funcmapF(document string, value string) (res []mapreduce.KeyValue) { spiltFunc := func(r rune)bool { return !unicode.IsLetter(r) } words := strings.FieldsFunc(value, spiltFunc) for _, word := range words { res = append(res, mapreduce.KeyValue{word, document}) } return res }
funcreduceF(key string, values []string)string { // 将values中重复的值去除,得到newValues tmp := make(map[string]int) for _,val := range values{ tmp[val] = 1 } var newValues [] string for v,_ := range tmp{ newValues = append(newValues, v) } length := len(newValues) res := strconv.Itoa(length) res += " " // 文件名排序 sort.Sort(StringList(newValues)) for i := 0; i < length-1; i++ { res += newValues[i] + "," } res += newValues[length-1] return res }