Mapreduce概況
MapReduce是一種編程模型,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運(yùn)算。概念“Map(映射)”和“Reduce(歸約)”,是它們的主要思想,都是從函數(shù)式編程語(yǔ)言里借來(lái)的,還有從矢量編程語(yǔ)言里借來(lái)的特性。它極大地方便了編程人員在不會(huì)分布式并行編程的情況下,將自己的程序運(yùn)行在分布式系統(tǒng)上。 當(dāng)前的軟件實(shí)現(xiàn)是指定一個(gè)Map(映射)函數(shù),用來(lái)把一組鍵值對(duì)映射成一組新的鍵值對(duì),指定并發(fā)的Reduce(歸約)函數(shù),用來(lái)保證所有映射的鍵值對(duì)中的每一個(gè)共享相同的鍵組。
1. MapReduce是一種分布式計(jì)算模型,是Google提出的,主要用于搜索領(lǐng)域,解決海量數(shù)據(jù)的計(jì)算問(wèn)題。
2. MR有兩個(gè)階段組成:Map和Reduce,用戶(hù)只需實(shí)現(xiàn)map()和reduce()兩個(gè)函數(shù),即可實(shí)現(xiàn)分布式計(jì)算。
MapReduce執(zhí)行流程
?
MapReduce原理
?
MapReduce的執(zhí)行步驟:
1、Map任務(wù)處理
1.1 讀取HDFS中的文件。每一行解析成一個(gè)《k,v》。每一個(gè)鍵值對(duì)調(diào)用一次map函數(shù)。 《0,hello you》 《10,hello me》
1.2 覆蓋map(),接收1.1產(chǎn)生的《k,v》,進(jìn)行處理,轉(zhuǎn)換為新的《k,v》輸出。 《hello,1》 《you,1》 《hello,1》 《me,1》
1.3 對(duì)1.2輸出的《k,v》進(jìn)行分區(qū)。默認(rèn)分為一個(gè)區(qū)。詳見(jiàn)《Partitioner》
1.4 對(duì)不同分區(qū)中的數(shù)據(jù)進(jìn)行排序(按照k)、分組。分組指的是相同key的value放到一個(gè)集合中?!∨判蚝螅骸秇ello,1》 《hello,1》 《me,1》 《you,1》 分組后:《hello,{1,1}》《me,{1}》《you,{1}》
1.5 (可選)對(duì)分組后的數(shù)據(jù)進(jìn)行歸約。詳見(jiàn)《Combiner》
2、Reduce任務(wù)處理
2.1 多個(gè)map任務(wù)的輸出,按照不同的分區(qū),通過(guò)網(wǎng)絡(luò)copy到不同的reduce節(jié)點(diǎn)上。(shuffle)詳見(jiàn)《shuffle過(guò)程分析》
2.2 對(duì)多個(gè)map的輸出進(jìn)行合并、排序。覆蓋reduce函數(shù),接收的是分組后的數(shù)據(jù),實(shí)現(xiàn)自己的業(yè)務(wù)邏輯, 《hello,2》 《me,1》 《you,1》
處理后,產(chǎn)生新的《k,v》輸出。
2.3 對(duì)reduce輸出的《k,v》寫(xiě)到HDFS中。
Java代碼實(shí)現(xiàn)
注:要導(dǎo)入org.apache.hadoop.fs.FileUtil.java。
1、先創(chuàng)建一個(gè)hello文件,上傳到HDFS中
Java代碼實(shí)現(xiàn)
注:要導(dǎo)入org.apache.hadoop.fs.FileUtil.java。
1、先創(chuàng)建一個(gè)hello文件,上傳到HDFS中
圖三
2、然后再編寫(xiě)代碼,實(shí)現(xiàn)文件中的單詞個(gè)數(shù)統(tǒng)計(jì)(代碼中被注釋掉的代碼,是可以省略的,不省略也行)
1package mapreduce;
? ? ? ?
? ? ? ? 2
3 import java.net.URI;
4 import org.apache.hadoop.conf.Configuration;
5 import org.apache.hadoop.fs.FileSystem;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.io.LongWritable;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
16
17 public class WordCountApp {
18 static final String INPUT_PATH = “hdfs://chaoren:9000/hello”;
19 static final String OUT_PATH = “hdfs://chaoren:9000/out”;
20
21 public static void main(String[] args) throws Exception {
22 Configuration conf = new Configuration();
23 FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
24 Path outPath = new Path(OUT_PATH);
25 if (fileSystem.exists(outPath)) {
26 fileSystem.delete(outPath, true);
27 }
28
29 Job job = new Job(conf, WordCountApp.class.getSimpleName());
30
31 // 1.1指定讀取的文件位于哪里
32 FileInputFormat.setInputPaths(job, INPUT_PATH);
33 // 指定如何對(duì)輸入的文件進(jìn)行格式化,把輸入文件每一行解析成鍵值對(duì)
34 //job.setInputFormatClass(TextInputFormat.class);
35
36 // 1.2指定自定義的map類(lèi)
37 job.setMapperClass(MyMapper.class);
38 // map輸出的《k,v》類(lèi)型。如果《k3,v3》的類(lèi)型與《k2,v2》類(lèi)型一致,則可以省略
39 //job.setOutputKeyClass(Text.class);
40 //job.setOutputValueClass(LongWritable.class);
41
42 // 1.3分區(qū)
43 //job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
44 // 有一個(gè)reduce任務(wù)運(yùn)行
45 //job.setNumReduceTasks(1);
46
47 // 1.4排序、分組
48
49 // 1.5歸約
50
51 // 2.2指定自定義reduce類(lèi)
52 job.setReducerClass(MyReducer.class);
53 // 指定reduce的輸出類(lèi)型
54 job.setOutputKeyClass(Text.class);
55 job.setOutputValueClass(LongWritable.class);
56
57 // 2.3指定寫(xiě)出到哪里
58 FileOutputFormat.setOutputPath(job, outPath);
59 // 指定輸出文件的格式化類(lèi)
60 //job.setOutputFormatClass(TextOutputFormat.class);
61
62 // 把job提交給jobtracker運(yùn)行
63 job.waitForCompletion(true);
64 }
65
66 /**
67 *
68 * KEYIN 即K1 表示行的偏移量
69 * VALUEIN 即V1 表示行文本內(nèi)容
70 * KEYOUT 即K2 表示行中出現(xiàn)的單詞
71 * VALUEOUT 即V2 表示行中出現(xiàn)的單詞的次數(shù),固定值1
72 *
73 */
74 static class MyMapper extends
75 Mapper《LongWritable, Text, Text, LongWritable》 {
76 protected void map(LongWritable k1, Text v1, Context context)
77 throws java.io.IOException, InterruptedException {
78 String[] splited = v1.toString().split(“\t”);
79 for (String word : splited) {
80 context.write(new Text(word), new LongWritable(1));
81 }
82 };
83 }
84
85 /**
86 * KEYIN 即K2 表示行中出現(xiàn)的單詞
87 * VALUEIN 即V2 表示出現(xiàn)的單詞的次數(shù)
88 * KEYOUT 即K3 表示行中出現(xiàn)的不同單詞
89 * VALUEOUT 即V3 表示行中出現(xiàn)的不同單詞的總次數(shù)
90 */
91 static class MyReducer extends
92 Reducer《Text, LongWritable, Text, LongWritable》 {
93 protected void reduce(Text k2, java.lang.Iterable《LongWritable》 v2s,
94 Context ctx) throws java.io.IOException,
95 InterruptedException {
96 long times = 0L;
97 for (LongWritable count : v2s) {
98 times += count.get();
99 }
100 ctx.write(k2, new LongWritable(times));
101 };
102 }
103 }
3、運(yùn)行成功后,可以在Linux中查看操作的結(jié)果
圖四
MapReduce主要功能
1)數(shù)據(jù)劃分和計(jì)算任務(wù)調(diào)度:
系統(tǒng)自動(dòng)將一個(gè)作業(yè)(Job)待處理的大數(shù)據(jù)劃分為很多個(gè)數(shù)據(jù)塊,每個(gè)數(shù)據(jù)塊對(duì)應(yīng)于一個(gè)計(jì)算任務(wù)(Task),并自動(dòng) 調(diào)度計(jì)算節(jié)點(diǎn)來(lái)處理相應(yīng)的數(shù)據(jù)塊。作業(yè)和任務(wù)調(diào)度功能主要負(fù)責(zé)分配和調(diào)度計(jì)算節(jié)點(diǎn)(Map節(jié)點(diǎn)或Reduce節(jié)點(diǎn)),同時(shí)負(fù)責(zé)監(jiān)控這些節(jié)點(diǎn)的執(zhí)行狀態(tài),并 負(fù)責(zé)Map節(jié)點(diǎn)執(zhí)行的同步控制。
2)數(shù)據(jù)/代碼互定位:
為了減少數(shù)據(jù)通信,一個(gè)基本原則是本地化數(shù)據(jù)處理,即一個(gè)計(jì)算節(jié)點(diǎn)盡可能處理其本地磁盤(pán)上所分布存儲(chǔ)的數(shù)據(jù),這實(shí)現(xiàn)了代碼向 數(shù)據(jù)的遷移;當(dāng)無(wú)法進(jìn)行這種本地化數(shù)據(jù)處理時(shí),再尋找其他可用節(jié)點(diǎn)并將數(shù)據(jù)從網(wǎng)絡(luò)上傳送給該節(jié)點(diǎn)(數(shù)據(jù)向代碼遷移),但將盡可能從數(shù)據(jù)所在的本地機(jī)架上尋 找可用節(jié)點(diǎn)以減少通信延遲。
3)系統(tǒng)優(yōu)化:
為了減少數(shù)據(jù)通信開(kāi)銷(xiāo),中間結(jié)果數(shù)據(jù)進(jìn)入Reduce節(jié)點(diǎn)前會(huì)進(jìn)行一定的合并處理;一個(gè)Reduce節(jié)點(diǎn)所處理的數(shù)據(jù)可能會(huì)來(lái)自多個(gè) Map節(jié)點(diǎn),為了避免Reduce計(jì)算階段發(fā)生數(shù)據(jù)相關(guān)性,Map節(jié)點(diǎn)輸出的中間結(jié)果需使用一定的策略進(jìn)行適當(dāng)?shù)膭澐痔幚?,保證相關(guān)性數(shù)據(jù)發(fā)送到同一個(gè) Reduce節(jié)點(diǎn);此外,系統(tǒng)還進(jìn)行一些計(jì)算性能優(yōu)化處理,如對(duì)最慢的計(jì)算任務(wù)采用多備份執(zhí)行、選最快完成者作為結(jié)果。
4)出錯(cuò)檢測(cè)和恢復(fù):
以低端商用服務(wù)器構(gòu)成的大規(guī)模MapReduce計(jì)算集群中,節(jié)點(diǎn)硬件(主機(jī)、磁盤(pán)、內(nèi)存等)出錯(cuò)和軟件出錯(cuò)是常態(tài),因此 MapReduce需要能檢測(cè)并隔離出錯(cuò)節(jié)點(diǎn),并調(diào)度分配新的節(jié)點(diǎn)接管出錯(cuò)節(jié)點(diǎn)的計(jì)算任務(wù)。同時(shí),系統(tǒng)還將維護(hù)數(shù)據(jù)存儲(chǔ)的可靠性,用多備份冗余存儲(chǔ)機(jī)制提 高數(shù)據(jù)存儲(chǔ)的可靠性,并能及時(shí)檢測(cè)和恢復(fù)出錯(cuò)的數(shù)據(jù)。
評(píng)論
查看更多