一、环境配置 这里选择的环境是hadoop-0.20.2和hbase-0.90.4,Hadoop环境配置参看这里,HBase环境配置请看这里。 需要注意的是,本文的需求是在Hadoop上跑MapReduce job来分析日志并将结果持久化到HBase,所以,在编译程序时,Hadoop需要用到HBase和Zookeeper包,因此,需要分别将hbase-0.90.4.jar和zookeeper-3.3.2.jar拷贝到Hadoop的lib目录下,具体操作如下: #cp /root/hbase-0.90.4/hbase-0.90.4.jar /root/hadoop-0.20.2/lib #cp /root/hbase-0.90.4/lib/zookeeper-3.3.2.jar /root/hadoop-0.20.2/lib 二、实例编写 日志文件xxxlog.txt的内容如下: version-------------time-----------------id-------rt----filter--------id----rt-----filter 1.0^A2014-03-03 00:00:01^Ad2000^C4^C3040^Bd2001^C7^C0 1.0^A2014-03-03 00:00:01^Ad3000^C4^C3041^Bd2001^C7^C0 同样,需要将此文件放到hdfs目录下,比如:hadoop fs -put /tmp/input。 为持久化在HBase中创建table和family,比如:./hbase shell,create 'xxxlog', 'dsp_filter'。 为了清晰便于扩展,将Maper、Reducer、Driver分开,具体如下: 1、Maper #vi xxxLogMaper.java import java.io.IOException; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; public class xxxLogMaper extends Mapper<Object, Text, Text, IntWritable> { public final static String CONTROL_A = "^A"; public final static String CONTROL_B = "^B"; public final static String CONTROL_C = "^C"; public final static int PV_TIME = 1; public final static int DSP_INFO_LIST = 5; public final static int DSP_ID = 0; public final static int DSP_FILTER = 2; public void map(Object key, Text value, Context context) { try { System.out.println("\n------------map come on-----------"); System.out.println("\nline=-----------"+value.toString()); String[] line = value.toString().split(CONTROL_A); String pvtime = ""; System.out.println("\npvtime=-----------"+line[PV_TIME]); String year = line[PV_TIME].substring(0, 4); String month = line[PV_TIME].substring(5, 7); String day = line[PV_TIME].substring(8, 10); String hour = line[PV_TIME].substring(11, 13); String minute = ""; int m_tmp = Integer.parseInt(line[PV_TIME].substring(14, 16)); if (m_tmp >= 0 && m_tmp <= 30) { minute = "00"; } else { minute = "30"; } pvtime = year + month + day + hour + minute; String[] dspInfoList = line[DSP_INFO_LIST].split(CONTROL_B); String dspid = ""; String dspfilter = ""; Text k = new Text(); IntWritable v = new IntWritable(1); for(int i=0; i<dspInfoList.length; i++) { System.out.println("\n------------map-----------"); System.out.println("\ndspinfo="+dspInfoList[i]); String[] dspInfo = dspInfoList[i].split(CONTROL_C); dspid = dspInfo[DSP_ID]; dspfilter = dspInfo[DSP_FILTER]; //key=ddspid^Afilter^Apvtime, value=1 k.set(dspid+CONTROL_A+dspfilter+CONTROL_A+pvtime); context.write(k, v); System.out.println("\nkey="+k.toString()); System.out.println("\nvalue="+v.toString()); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } 2、Reducer import java.io.IOException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.KeyValue; public class BidLogReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { public final static String COL_FAMILY = "dsp_filter"; public final static String COL_NAME = "sum"; private final static String ZK_HOST = "localhost"; private final static String TABLE_NAME = "xxxlog"; public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { System.out.println("\n------------reduce come on-----------"); String k = key.toString(); IntWritable v = new IntWritable(); int sum = 0; for (IntWritable val:values) { sum += val.get(); } //v.set(sum); //context.write(key, v); System.out.println("\n------------reduce-----------"); System.out.println("\ncur-key="+key.toString()); System.out.println("\ncur-value="+sum); Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum.", ZK_HOST); HTablePool pool = new HTablePool(conf, 3); HTable table = (HTable)pool.getTable(TABLE_NAME); Get getrow = new Get(k.getBytes()); Result r = table.get(getrow); int m_tmp = 0; for(KeyValue kv:r.raw()) { System.out.println("\nraw-KeyValugge---"+kv); System.out.println("\nraw-row=>"+Bytes.toString(kv.getRow())); System.out.println("\nraw-family=>"+Bytes.toString(kv.getFamily())); System.out.println("\nraw-qualifier=>"+Bytes.toString(kv.getQualifier())); System.out.println("\nraw-value=>"+Bytes.toString(kv.getValue())); m_tmp += Integer.parseInt(Bytes.toString(kv.getValue())); } sum = sum + m_tmp; v.set(sum); System.out.println("\nreal-key="+key.toString()); System.out.println("\nreal-value="+v.toString()); Put putrow = new Put(k.getBytes()); putrow.add(COL_FAMILY.getBytes(), COL_NAME.getBytes(), String.valueOf(v).getBytes()); try { context.write(new ImmutableBytesWritable(key.getBytes()), putrow); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } 3、Driver #vi xxxLogDriver.java #vi xxxLogReducer.java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class xxxLogDriver { public final static String ZK_HOST = "localhost"; public final static String TABLE_NAME = "xxxlog"; public static void main(String[] args) throws Exception { //Hbase Configuration Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum.", ZK_HOST); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: please input <in> <out> args"); System.exit(2); } Job job = new Job(conf,"xxxLog"); job.setJarByClass(xxxLogDriver.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.out.println("\n------------driver come on-----------"); job.setMapperClass(xxxLogMaper.class); job.setReducerClass(xxxLogReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); TableMapReduceUtil.initTableReducerJob(TABLE_NAME, xxxLogReducer.class, job); System.exit(job.waitForCompletion(true)? 0 : 1); } } 三、编译运行 在当前目录下编译源码,具体如下: #javac -classpath /root/hadoop-0.20.2/hadoop-0.20.2-core.jar:/root/hadoop-0.20.2/lib/commons-cli-1.2.jar:/root/hbase-0.90.4/hbase-0.90.4.jar -d ./ xxxLogMaper.java xxxLogReducer.java xxxLogDriver.java 需要注意的是,必须三个一起编译否则出错: xxxLogDriver.java:22: error: cannot find symbol job.setMapperClass(xxxLogMaper.class); 打包class文件,具体如下: #jar cvf xxxLog.jar *class #rm -rf *class 运行任务,具体如下: #hadoop jar xxxLog.jar xxxLogDriver /tmp/input /tmp/output 查询结果,具体如下: #./hbase shell hbase(main):014:0>scan 'xxxlog' (责任编辑:IT) |