import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*主要过滤出双色球相邻号码
* */
public class Adjacent extends Configured implements Tool {
/**
* 计数器
* 用于计数各种异常数据
*/
enum Counter
{
LINESKIP, //出错的行
}
/**
* MAP任务
*/
public static class AdjacentMap extends Mapper<LongWritable, Text, Text, Text>
{
public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException
{
String line = value.toString(); //读取源数据: 2003001 10 11 12 13 26 28 11
line = line.replaceAll("\t", " ");//过滤掉制表符
try
{
//数据处理
String [] lineSplit = line.split(" ");
if(lineSplit.length != 8){
return ;
}
//这里不判断最后一个红球,因为最后一个怎么也不会和后面有相邻的球
String out = "";
int next =-1 ;
List<String> list = new ArrayList<String>();
for(int i=1;i<7;i++){
int a = Integer.parseInt(lineSplit[i]);
int b = 100;
if(i<6){
b = Integer.parseInt(lineSplit[i+1]);
}
if(1==b-a){
if(next == a ){
out = out + " "+ b ;
next = b ;
}else{
out = "" ;
out = out+a+" "+b+" " ;
next = b ;
}
}else{
if(out.equals("")){
}else{
list.add(out);
out = "" ;
}
}
}
if(list.size()>0){
for(String s :list){
context.write(new Text(s), new Text("1")); //输出
}
}
}
catch ( java.lang.ArrayIndexOutOfBoundsException e )
{
context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1
return;
}
}
}
public static class AdjacentReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> value,
Context context)
throws IOException, InterruptedException {
int total = 0;
for(Text text:value){
total++;
}
context.write(key,new Text("\t\t\t"+total) );
}
}
@Override
public int run(String[] args) throws Exception
{
Configuration conf = getConf();
/** 需要注意的部分 **/
Job job = new Job(conf, "adjacent"); //任务名
job.setJarByClass(Adjacent.class); //指定Class
FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径
FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径
job.setMapperClass( AdjacentMap.class ); //调用上面Map类作为Map任务代码
job.setReducerClass(AdjacentReducer.class);
job.setOutputFormatClass( TextOutputFormat.class );
job.setOutputKeyClass( Text.class ); //指定输出的KEY的格式
job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式
job.waitForCompletion(true);
//输出任务完成情况
System.out.println( "任务名称:" + job.getJobName() );
System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );
return job.isSuccessful() ? 0 : 1;
}
/**
* 设置系统说明
* 设置MapReduce任务
*/
public static void main(String[] args) throws Exception
{
//判断参数个数是否正确
//如果无参数运行则显示以作程序说明
if ( args.length != 2 )
{
System.err.println("");
System.err.println("Usage: Adjacent < input path > < output path > < name >");
System.err.println("Example: hadoop jar ~/adjacent.jar ./input/ssq03-12.txt ./output/adjacent.txt ");
System.err.println("Counter:");
System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
System.exit(-1);
}
//记录开始时间
DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
Date start = new Date();
//运行任务
int res = ToolRunner.run(new Configuration(), new Adjacent(), args);
//输出任务耗时
Date end = new Date();
float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ;
System.out.println( "任务开始:" + formatter.format(start) );
System.out.println( "任务结束:" + formatter.format(end) );
System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" );
System.exit(res);
}
}