当前位置: > Linux集群 > Hadoop >

MapReduce应用

时间:2015-10-10 11:06来源:linux.it.net.cn 作者:IT

目录[-]

  • 1、MapReduce实现矩阵相乘
  • 2、MapReduce实现倒排索引
  • 3、MapReduce实现复杂倒排索引

1、MapReduce实现矩阵相乘

一. 准备数据

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/bin/bash if [ $# -ne 3 ]
then
  echo "there must be 3 arguments to generate the two matries file!"
  exit 1
fi
cat /dev/null > M_$1_$2
cat /dev/null > N_$2_$3 for i in `seq 1 $1` do
    for j in `seq 1 $2`
    do
        s=$((RANDOM%100))
        echo -e "$i,$j\t$s" >>M_$1_$2
    done
done
echo "we have built the matrix file M"
 for i in `seq 1 $2`
    do
    for j in ` seq 1 $3`
    do
        s=$((RANDOM%100))
        echo -e "$i,$j\t$s" >>N_$2_$3 
    done
done
echo "we have built the matrix file N"

用一下脚本语言准备数组数据

 
1
2
3
4
5
6
7
M_3_2: 1,1 81 1,2 13 2,1 38 2,2 46 3,1 0 3,2 2


 
1
2
3
4
5
6
7
8
9
N_2_4: 1,1 99 1,2 38 1,3 34 1,4 19 2,1 21 2,2 4 2,3 36 2,4 64



二. 计算

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
public class Matrix {
  
    private static class MatrixMapper extends
            Mapper<LongWritable, Text, Text, Text> {
  
        private static int colN = 0;
        private static int rowM = 0;
  
        @Override
        protected void setup(
                Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
  
            Configuration configuration = context.getConfiguration();
            colN = configuration.getInt("colN", 0);
            rowM = configuration.getInt("rowM", 0);
  
        }
  
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
  
            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String fileName = fileSplit.getPath().getName();
            String[] strings = value.toString().split(",");
            int i = Integer.parseInt(strings[0]);
            String[] ser = strings[1].split("\t");
            int j = Integer.parseInt(ser[0]);
            int val = Integer.parseInt(ser[1]);
  
            if (fileName.startsWith("M")) {
  
                for (int count = 1; count <= colN; count++) {
                    context.write(new Text(i + "," + count), new Text("M," + j
                            + "," + val + ""));
                }
  
            } else {
  
                for (int count = 1; count <= rowM; count++) {
                    context.write(new Text(count + "," + j), new Text("N," + i
                            + "," + val + ""));
                }
  
            }
        }
    }
  
    private static class MatrixReduce extends
            Reducer<Text, Text, Text, IntWritable> {
  
        private static int rowM = 0;
  
        @Override
        protected void setup(
                Reducer<Text, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
  
            Configuration configuration = context.getConfiguration();
            rowM = configuration.getInt("rowM", 0);
  
        }
  
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Reducer<Text, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
  
            int sumValue = 0;
            int[] m_Arr = new int[rowM + 1];
            int[] n_Arr = new int[rowM + 1];
  
            for (Text value : values) {
  
                String string = value.toString();
                String[] strings = string.split(",");
  
                if (strings[0].equals("M")) {
                    m_Arr[Integer.parseInt(strings[1])] = Integer
                            .parseInt(strings[2]);
                } else {
                    n_Arr[Integer.parseInt(strings[1])] = Integer
                            .parseInt(strings[2]);
                }
            }
  
            for (int i = 1; i < rowM + 1; i++) {
                sumValue += m_Arr[i] * n_Arr[i];
            }
  
            context.write(key, new IntWritable(sumValue));
        }
  
    }
  
    public static void main(String[] args) throws IllegalArgumentException,
            IOException, ClassNotFoundException, InterruptedException {
  
        Configuration configuration = HadoopConfig.getConfiguration();
        configuration.setInt("colN", 4);
        configuration.setInt("rowN", 2);
        configuration.setInt("colM", 2);
        configuration.setInt("rowM", 3);
  
        Job job = Job.getInstance(configuration, "矩阵相乘");
  
        job.setJarByClass(Sort.class);
        job.setMapperClass(MatrixMapper.class);
  
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
  
        job.setReducerClass(MatrixReduce.class);
        FileInputFormat.addInputPath(job, new Path("/matrix"));
        FileOutputFormat.setOutputPath(job, new Path("/matrixOutput"));
        job.waitForCompletion(true);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
  
    }
  
}





三. 结果

 

1
2
3
4
5
6
7
8
9
10
11
12
1,1 8292 1,2 3130 1,3 3222 1,4 2371 2,1 4728 2,2 1628 2,3 2948 2,4 3666 3,1 42 3,2 8 3,3 72 3,4 128

 

2、MapReduce实现倒排索引

一、准备数据

 
1
2
3
4
5
6
7
8
file1:
one fish
two bird
two monkey
  
file2:
two peach
three watermelon



二、计算

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class InvertIndex {
  
    private static class InvertIndexMapper extends
            Mapper<LongWritable, Text, Text, Text> {
  
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
  
            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String fileName = fileSplit.getPath().toString();
            String[] words = value.toString().split(" ");
            for (String string : words) {
                context.write(new Text(string), new Text(fileName + "#" + key.toString()));
            }
              
        }
  
    }
  
    private static class InvertIndexReduce extends
            Reducer<Text, Text, Text, Text> {
  
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
              
            StringBuilder stringBuilder = new StringBuilder();
              
            for (Text text : values) {
                    stringBuilder.append(text.toString()).append(";");
            }
              
            context.write(key, new Text(stringBuilder.toString()));
        }
    }
  
    public static void main(String[] args) throws IOException,
    ClassNotFoundException, InterruptedException{
  
        Configuration configuration = HadoopConfig.getConfiguration();
        Job job = Job.getInstance(configuration, "倒排索引");
        job.setJarByClass(InvertIndex.class);
        job.setMapperClass(InvertIndexMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setReducerClass(InvertIndexReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
  
        FileInputFormat.addInputPath(job, new Path("/data"));
        FileOutputFormat.setOutputPath(job, new Path("/ouput"));
        job.waitForCompletion(true);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
          
    }

三、结果

 
1
2
3
4
5
6
7
8
bird    hdfs://127.0.0.1:8020/data/file1#9; fish    hdfs://127.0.0.1:8020/data/file1#0; monkey  hdfs://127.0.0.1:8020/data/file1#18; one hdfs://127.0.0.1:8020/data/file1#0; peach   hdfs://127.0.0.1:8020/data/file2#0; three   hdfs://127.0.0.1:8020/data/file2#10; two hdfs://127.0.0.1:8020/data/file2#0;hdfs://127.0.0.1:8020/data/file1#18;hdfs://127.0.0.1:8020/data/file1#9; watermelon  hdfs://127.0.0.1:8020/data/file2#10;

3、MapReduce实现复杂倒排索引

一、准备数据

 
1
2
3
4
5
6
7
8
file1:
one fish
two bird
two monkey
  
file2:
two peach
three watermelon

二、计算

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
public class ComplexInvertIndex {
  
    private static class FileNameRecordReader extends RecordReader<Text, Text> {
  
        LineRecordReader lineRecordReader = new LineRecordReader();
        String fileName;
  
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            lineRecordReader.initialize(split, context);
            fileName = ((FileSplit) split).getPath().getName();
        }
  
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            return lineRecordReader.nextKeyValue();
        }
  
        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            return new Text(fileName);
        }
  
        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return lineRecordReader.getCurrentValue();
        }
  
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return lineRecordReader.getProgress();
        }
  
        @Override
        public void close() throws IOException {
            lineRecordReader.close();
        }
  
    }
  
    private static class FileNameInputFormat extends
            FileInputFormat<Text, Text> {
  
        @Override
        public RecordReader<Text, Text> createRecordReader(InputSplit split,
                TaskAttemptContext context) throws IOException,
                InterruptedException {
            FileNameRecordReader fileNameRecordReader = new FileNameRecordReader();
            fileNameRecordReader.initialize(split, context);
            return fileNameRecordReader;
        }
  
    }
  
    private static class ComplexInvertIndexMapper extends
            Mapper<Text, Text, Text, IntWritable> {
  
        @Override
        protected void map(Text key, Text value,
                Mapper<Text, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
  
            String[] strs = value.toString().split(" ");
            for (String string : strs) {
                context.write(new Text( string+"#"+key.toString() ),new IntWritable(1));
            }
  
        }
  
    }
  
    private static class ComplexInvertIndexCombiner extends
            Reducer<Text, IntWritable, Text, IntWritable> {
  
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
  
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key,new IntWritable(sum));
            System.out.println(key.toString() + sum +"");
        }
  
    }
  
    //把key的前面字段聚合,排序     private static class InvertIndexPartitioner extends
            HashPartitioner<Text, IntWritable> {
  
        @Override
        public int getPartition(Text key, IntWritable value, int numReduceTasks) {
            String[] strs = key.toString().split("#");
            return super.getPartition(new Text(strs[0]), value, numReduceTasks);
        }
  
    }                
  
    private static class ComplexInvertIndexReduce extends
            Reducer<Text, IntWritable, Text, Text> {
  
        static Map<String, String> map = new HashMap<String, String>();
          
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, Text>.Context context)
                throws IOException, InterruptedException {
  
            String[] strings = key.toString().split("#");
            String word = strings[0];
            String doc = strings[1];
            int sum = 0;
            for(IntWritable value : values){
                sum = sum + value.get();
            }
            if( map.get(word) == null ){
                map.put(word," ("+doc+","+sum+") ");
            }else{
                map.put(word,map.get(word)+" ("+doc+","+sum+") ");
            }
             
        }
          
        @Override
        protected void cleanup(
                Reducer<Text, IntWritable, Text, Text>.Context context)
                throws IOException, InterruptedException {
            for(String key:map.keySet()){
                context.write(new Text(key), new Text(map.get(key)));
            }
        }
  
    }
  
    public static void main(String[] args)throws IOException,
    ClassNotFoundException, InterruptedException{
  
        Configuration configuration = HadoopConfig.getConfiguration();
        Job job = Job.getInstance(configuration, "复杂倒排索引");
        job.setJarByClass(ComplexInvertIndex.class);
        job.setInputFormatClass(FileNameInputFormat.class);
        job.setMapperClass(ComplexInvertIndexMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setCombinerClass(ComplexInvertIndexCombiner.class);
        job.setReducerClass(ComplexInvertIndexReduce.class);
        job.setPartitionerClass(InvertIndexPartitioner.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
          
        FileInputFormat.addInputPath(job, new Path("/data"));
        FileOutputFormat.setOutputPath(job, new Path("/ouputdata"));
        job.waitForCompletion(true);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
          
    }

 

三、结果查看

 
1
2
3
4
5
6
7
8
monkey   (file1,1) 
bird     (file1,1) 
fish     (file1,1) 
one  (file1,1) 
peach    (file2,1) 
watermelon   (file2,1) 
three    (file2,1) 
two  (file1,2)  (file2,1)



(责任编辑:IT)
------分隔线----------------------------