当前位置: > Linux集群 > Hadoop >

hadoop-双色球-统计

时间:2015-10-08 12:06来源:linux.it.net.cn 作者:IT

1/使用hadoop把双色球相邻的红球进行统计:

测试数据在:http://pan.baidu.com/s/1hq82YrU

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
 
/**
 *主要过滤出双色球相邻号码
 * */
 
public class Adjacent extends Configured implements Tool { 
    
    /** 
     * 计数器
     * 用于计数各种异常数据
     */
    enum Counter
    {
        LINESKIP,   //出错的行
    }
    
    /** 
     * MAP任务
     */
    public static class AdjacentMap extends Mapper<LongWritable, Text, Text, Text>
    {
        public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException
        {
            String line = value.toString();             //读取源数据: 2003001    10  11  12  13  26  28 11
            line = line.replaceAll("\t", " ");//过滤掉制表符
            try
            {
                //数据处理
                String [] lineSplit = line.split(" ");
                if(lineSplit.length != 8){
                    return ;
                }
                
                //这里不判断最后一个红球,因为最后一个怎么也不会和后面有相邻的球
                String out = "";
                int next =-1 ;
                List<String> list = new ArrayList<String>();
                for(int i=1;i<7;i++){
                    int a = Integer.parseInt(lineSplit[i]);
                    int b = 100;
                    if(i<6){
                        b = Integer.parseInt(lineSplit[i+1]);
                    }
                    
                    if(1==b-a){
                        if(next == a ){
                            out = out + " "+ b ;
                            next = b ;
                        }else{
                            out = "" ;
                            out = out+a+" "+b+" " ;
                            next = b ;
                        }
                    }else{
                        if(out.equals("")){
                            
                        }else{
                            list.add(out);
                            out = "" ;
                        }
                    }
                    
                }
                
                if(list.size()>0){
                    for(String s :list){
                        context.write(new Text(s), new Text("1"));  //输出
                    }
                }
            }
            catch ( java.lang.ArrayIndexOutOfBoundsException e )
            {
                context.getCounter(Counter.LINESKIP).increment(1);  //出错令计数器+1
                return;
            }
        }
    }
    
    
    public static class AdjacentReducer extends Reducer<Text, Text, Text, Text>{
 
        @Override
        protected void reduce(Text key, Iterable<Text> value,
                Context context)
                throws IOException, InterruptedException {
            int total = 0;
            for(Text text:value){
                total++;
            }
            context.write(key,new Text("\t\t\t"+total) );
            
        }
    }
 
 
    @Override
    public int run(String[] args) throws Exception
    {
        Configuration conf = getConf();
        
 
        /**  需要注意的部分     **/
 
        Job job = new Job(conf, "adjacent");                            //任务名
        job.setJarByClass(Adjacent.class);                          //指定Class
        
        FileInputFormat.addInputPath( job, new Path(args[0]) );         //输入路径
        FileOutputFormat.setOutputPath( job, new Path(args[1]) );       //输出路径
        
        job.setMapperClass( AdjacentMap.class );                                //调用上面Map类作为Map任务代码
        job.setReducerClass(AdjacentReducer.class);
        job.setOutputFormatClass( TextOutputFormat.class );
        job.setOutputKeyClass( Text.class );                    //指定输出的KEY的格式
        job.setOutputValueClass( Text.class );                          //指定输出的VALUE的格式
        
        job.waitForCompletion(true);
        
        //输出任务完成情况
        System.out.println( "任务名称:" + job.getJobName() );
        System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
        System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
        System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
        System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );
 
        return job.isSuccessful() ? 0 : 1;
    }
    
    /** 
     * 设置系统说明
     * 设置MapReduce任务
     */
    public static void main(String[] args) throws Exception
    {
        
        //判断参数个数是否正确
        //如果无参数运行则显示以作程序说明
        if ( args.length != 2 )
        {
            System.err.println("");
            System.err.println("Usage: Adjacent < input path > < output path > < name >");
            System.err.println("Example: hadoop jar ~/adjacent.jar ./input/ssq03-12.txt ./output/adjacent.txt ");
            System.err.println("Counter:");
            System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
            System.exit(-1);
        }
        
        //记录开始时间
        DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
        Date start = new Date();
        
        //运行任务
        int res = ToolRunner.run(new Configuration(), new Adjacent(), args);
 
        //输出任务耗时
        Date end = new Date();
        float time =  (float) (( end.getTime() - start.getTime() ) / 60000.0) ;
        System.out.println( "任务开始:" + formatter.format(start) );
        System.out.println( "任务结束:" + formatter.format(end) );
        System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" );
 
        System.exit(res);
    }
}

 

 

2/对双色球红球出现的次数进行统计:
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
 
/**
 *对双色球红球出现的次数进行统
 * */
 
public class TotalHong extends Configured implements Tool {
    
    /** 
     * 计数器
     * 用于计数各种异常数据
     */
    enum Counter
    {
        LINESKIP,   //出错的行
    }
    
    /** 
     * MAP任务
     */
    public static class AdjacentMap extends Mapper<LongWritable, Text, Text, Text>
    {
        public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException
        {
            String line = value.toString();             //读取源数据: 2003001    10  11  12  13  26  28 11
            line = line.replaceAll("\t", " ");//过滤掉制表符
            try
            {
                //数据处理
                String [] lineSplit = line.split(" ");
                if(lineSplit.length != 8){
                    return ;
                }
                for(int i=1;i<7;i++){
                    context.write(new Text(lineSplit[i]), new Text("1"));   //输出
                }
                
            }
            catch ( java.lang.ArrayIndexOutOfBoundsException e )
            {
                context.getCounter(Counter.LINESKIP).increment(1);  //出错令计数器+1
                return;
            }
        }
    }
    
    
    public static class AdjacentReducer extends Reducer<Text, Text, Text, Text>{
 
        @Override
        protected void reduce(Text key, Iterable<Text> value,
                Context context)
                throws IOException, InterruptedException {
            int total = 0;
            for(Text text:value){
                total++;
            }
            context.write(key,new Text(total+"") );
            
        }
    }
 
 
    @Override
    public int run(String[] args) throws Exception
    {
        Configuration conf = getConf();
        
 
        /**  需要注意的部分     **/
 
        Job job = new Job(conf, "adjacent");                            //任务名
        job.setJarByClass(TotalHong.class);                         //指定Class
        
        FileInputFormat.addInputPath( job, new Path(args[0]) );         //输入路径
        FileOutputFormat.setOutputPath( job, new Path(args[1]) );       //输出路径
        
        job.setMapperClass( AdjacentMap.class );                                //调用上面Map类作为Map任务代码
        job.setReducerClass(AdjacentReducer.class);
        job.setOutputFormatClass( TextOutputFormat.class );
        job.setOutputKeyClass( Text.class );                    //指定输出的KEY的格式
        job.setOutputValueClass( Text.class );                          //指定输出的VALUE的格式
        
        job.waitForCompletion(true);
        
        //输出任务完成情况
        System.out.println( "任务名称:" + job.getJobName() );
        System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
        System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
        System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
        System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );
 
        return job.isSuccessful() ? 0 : 1;
    }
    
    /** 
     * 设置系统说明
     * 设置MapReduce任务
     */
    public static void main(String[] args) throws Exception
    {
        
        //判断参数个数是否正确
        //如果无参数运行则显示以作程序说明
        if ( args.length != 2 )
        {
            System.err.println("");
            System.err.println("Usage: Adjacent < input path > < output path > < name >");
            System.err.println("Example: hadoop jar ~/adjacent.jar ./input/ssq03-12.txt ./output/adjacent.txt ");
            System.err.println("Counter:");
            System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
            System.exit(-1);
        }
        
        //记录开始时间
        DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
        Date start = new Date();
        
        //运行任务
        int res = ToolRunner.run(new Configuration(), new TotalHong(), args);
 
        //输出任务耗时
        Date end = new Date();
        float time =  (float) (( end.getTime() - start.getTime() ) / 60000.0) ;
        System.out.println( "任务开始:" + formatter.format(start) );
        System.out.println( "任务结束:" + formatter.format(end) );
        System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" );
 
        System.exit(res);
    }
}

 

(责任编辑:IT)
------分隔线----------------------------