MapReduce应用
时间:2015-10-10 11:06 来源:linux.it.net.cn 作者:IT
目录[-]
-
1、MapReduce实现矩阵相乘
-
2、MapReduce实现倒排索引
-
3、MapReduce实现复杂倒排索引
1、MapReduce实现矩阵相乘
一. 准备数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/bin/bash if [ $# -ne 3 ]
then
echo "there must be 3 arguments to generate the two matries file!"
exit 1
fi
cat /dev/null > M_$1_$2
cat /dev/null > N_$2_$3 for i in `seq 1 $1` do
for j in `seq 1 $2`
do
s=$((RANDOM%100))
echo -e "$i,$j\t$s" >>M_$1_$2
done
done
echo "we have built the matrix file M"
for i in `seq 1 $2`
do
for j in ` seq 1 $3`
do
s=$((RANDOM%100))
echo -e "$i,$j\t$s" >>N_$2_$3
done
done
echo "we have built the matrix file N"
用一下脚本语言准备数组数据
1
2
3
4
5
6
7
M_3_2: 1,1 81 1,2 13 2,1 38 2,2 46 3,1 0 3,2 2
1
2
3
4
5
6
7
8
9
N_2_4: 1,1 99 1,2 38 1,3 34 1,4 19 2,1 21 2,2 4 2,3 36 2,4 64
二. 计算
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
public class Matrix {
private static class MatrixMapper extends
Mapper<LongWritable, Text, Text, Text> {
private static int colN = 0;
private static int rowM = 0;
@Override
protected void setup(
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
colN = configuration.getInt("colN", 0);
rowM = configuration.getInt("rowM", 0);
}
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
String[] strings = value.toString().split(",");
int i = Integer.parseInt(strings[0]);
String[] ser = strings[1].split("\t");
int j = Integer.parseInt(ser[0]);
int val = Integer.parseInt(ser[1]);
if (fileName.startsWith("M")) {
for (int count = 1; count <= colN; count++) {
context.write(new Text(i + "," + count), new Text("M," + j
+ "," + val + ""));
}
} else {
for (int count = 1; count <= rowM; count++) {
context.write(new Text(count + "," + j), new Text("N," + i
+ "," + val + ""));
}
}
}
}
private static class MatrixReduce extends
Reducer<Text, Text, Text, IntWritable> {
private static int rowM = 0;
@Override
protected void setup(
Reducer<Text, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
rowM = configuration.getInt("rowM", 0);
}
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sumValue = 0;
int[] m_Arr = new int[rowM + 1];
int[] n_Arr = new int[rowM + 1];
for (Text value : values) {
String string = value.toString();
String[] strings = string.split(",");
if (strings[0].equals("M")) {
m_Arr[Integer.parseInt(strings[1])] = Integer
.parseInt(strings[2]);
} else {
n_Arr[Integer.parseInt(strings[1])] = Integer
.parseInt(strings[2]);
}
}
for (int i = 1; i < rowM + 1; i++) {
sumValue += m_Arr[i] * n_Arr[i];
}
context.write(key, new IntWritable(sumValue));
}
}
public static void main(String[] args) throws IllegalArgumentException,
IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = HadoopConfig.getConfiguration();
configuration.setInt("colN", 4);
configuration.setInt("rowN", 2);
configuration.setInt("colM", 2);
configuration.setInt("rowM", 3);
Job job = Job.getInstance(configuration, "矩阵相乘");
job.setJarByClass(Sort.class);
job.setMapperClass(MatrixMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setReducerClass(MatrixReduce.class);
FileInputFormat.addInputPath(job, new Path("/matrix"));
FileOutputFormat.setOutputPath(job, new Path("/matrixOutput"));
job.waitForCompletion(true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
三. 结果
1
2
3
4
5
6
7
8
9
10
11
12
1,1 8292 1,2 3130 1,3 3222 1,4 2371 2,1 4728 2,2 1628 2,3 2948 2,4 3666 3,1 42 3,2 8 3,3 72 3,4 128
2、MapReduce实现倒排索引
一、准备数据
1
2
3
4
5
6
7
8
file1:
one fish
two bird
two monkey
file2:
two peach
three watermelon
二、计算
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class InvertIndex {
private static class InvertIndexMapper extends
Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().toString();
String[] words = value.toString().split(" ");
for (String string : words) {
context.write(new Text(string), new Text(fileName + "#" + key.toString()));
}
}
}
private static class InvertIndexReduce extends
Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuilder stringBuilder = new StringBuilder();
for (Text text : values) {
stringBuilder.append(text.toString()).append(";");
}
context.write(key, new Text(stringBuilder.toString()));
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException{
Configuration configuration = HadoopConfig.getConfiguration();
Job job = Job.getInstance(configuration, "倒排索引");
job.setJarByClass(InvertIndex.class);
job.setMapperClass(InvertIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(InvertIndexReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/data"));
FileOutputFormat.setOutputPath(job, new Path("/ouput"));
job.waitForCompletion(true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
三、结果
1
2
3
4
5
6
7
8
bird hdfs://127.0.0.1:8020/data/file1#9; fish hdfs://127.0.0.1:8020/data/file1#0; monkey hdfs://127.0.0.1:8020/data/file1#18; one hdfs://127.0.0.1:8020/data/file1#0; peach hdfs://127.0.0.1:8020/data/file2#0; three hdfs://127.0.0.1:8020/data/file2#10; two hdfs://127.0.0.1:8020/data/file2#0;hdfs://127.0.0.1:8020/data/file1#18;hdfs://127.0.0.1:8020/data/file1#9; watermelon hdfs://127.0.0.1:8020/data/file2#10;
3、MapReduce实现复杂倒排索引
一、准备数据
1
2
3
4
5
6
7
8
file1:
one fish
two bird
two monkey
file2:
two peach
three watermelon
二、计算
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
public class ComplexInvertIndex {
private static class FileNameRecordReader extends RecordReader<Text, Text> {
LineRecordReader lineRecordReader = new LineRecordReader();
String fileName;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
lineRecordReader.initialize(split, context);
fileName = ((FileSplit) split).getPath().getName();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return lineRecordReader.nextKeyValue();
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return new Text(fileName);
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return lineRecordReader.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}
@Override
public void close() throws IOException {
lineRecordReader.close();
}
}
private static class FileNameInputFormat extends
FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
FileNameRecordReader fileNameRecordReader = new FileNameRecordReader();
fileNameRecordReader.initialize(split, context);
return fileNameRecordReader;
}
}
private static class ComplexInvertIndexMapper extends
Mapper<Text, Text, Text, IntWritable> {
@Override
protected void map(Text key, Text value,
Mapper<Text, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String[] strs = value.toString().split(" ");
for (String string : strs) {
context.write(new Text( string+"#"+key.toString() ),new IntWritable(1));
}
}
}
private static class ComplexInvertIndexCombiner extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key,new IntWritable(sum));
System.out.println(key.toString() + sum +"");
}
}
//把key的前面字段聚合,排序 private static class InvertIndexPartitioner extends
HashPartitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
String[] strs = key.toString().split("#");
return super.getPartition(new Text(strs[0]), value, numReduceTasks);
}
}
private static class ComplexInvertIndexReduce extends
Reducer<Text, IntWritable, Text, Text> {
static Map<String, String> map = new HashMap<String, String>();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] strings = key.toString().split("#");
String word = strings[0];
String doc = strings[1];
int sum = 0;
for(IntWritable value : values){
sum = sum + value.get();
}
if( map.get(word) == null ){
map.put(word," ("+doc+","+sum+") ");
}else{
map.put(word,map.get(word)+" ("+doc+","+sum+") ");
}
}
@Override
protected void cleanup(
Reducer<Text, IntWritable, Text, Text>.Context context)
throws IOException, InterruptedException {
for(String key:map.keySet()){
context.write(new Text(key), new Text(map.get(key)));
}
}
}
public static void main(String[] args)throws IOException,
ClassNotFoundException, InterruptedException{
Configuration configuration = HadoopConfig.getConfiguration();
Job job = Job.getInstance(configuration, "复杂倒排索引");
job.setJarByClass(ComplexInvertIndex.class);
job.setInputFormatClass(FileNameInputFormat.class);
job.setMapperClass(ComplexInvertIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setCombinerClass(ComplexInvertIndexCombiner.class);
job.setReducerClass(ComplexInvertIndexReduce.class);
job.setPartitionerClass(InvertIndexPartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/data"));
FileOutputFormat.setOutputPath(job, new Path("/ouputdata"));
job.waitForCompletion(true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
三、结果查看
1
2
3
4
5
6
7
8
monkey (file1,1)
bird (file1,1)
fish (file1,1)
one (file1,1)
peach (file2,1)
watermelon (file2,1)
three (file2,1)
two (file1,2) (file2,1)
(责任编辑:IT)
目录[-]
1、MapReduce实现矩阵相乘一. 准备数据
用一下脚本语言准备数组数据
2、MapReduce实现倒排索引一、准备数据
三、结果
3、MapReduce实现复杂倒排索引一、准备数据
二、计算
三、结果查看
(责任编辑:IT) |