> Linux集群 > Hadoop >

Hadoop初探之MapReduce+HBase实例

一、环境配置
        这里选择的环境是hadoop-0.20.2和hbase-0.90.4,Hadoop环境配置参看这里,HBase环境配置请看这里。
        需要注意的是,本文的需求是在Hadoop上跑MapReduce job来分析日志并将结果持久化到HBase,所以,在编译程序时,Hadoop需要用到HBase和Zookeeper包,因此,需要分别将hbase-0.90.4.jar和zookeeper-3.3.2.jar拷贝到Hadoop的lib目录下,具体操作如下:
        #cp /root/hbase-0.90.4/hbase-0.90.4.jar /root/hadoop-0.20.2/lib
        #cp /root/hbase-0.90.4/lib/zookeeper-3.3.2.jar /root/hadoop-0.20.2/lib

二、实例编写
        日志文件xxxlog.txt的内容如下:
        version-------------time-----------------id-------rt----filter--------id----rt-----filter
        1.0^A2014-03-03 00:00:01^Ad2000^C4^C3040^Bd2001^C7^C0
        1.0^A2014-03-03 00:00:01^Ad3000^C4^C3041^Bd2001^C7^C0
        同样,需要将此文件放到hdfs目录下,比如:hadoop fs -put /tmp/input。
        为持久化在HBase中创建table和family,比如:./hbase shell,create 'xxxlog', 'dsp_filter'。
        为了清晰便于扩展,将Maper、Reducer、Driver分开,具体如下:
1、Maper
        #vi xxxLogMaper.java
        import java.io.IOException;
        import org.apache.hadoop.mapreduce.Mapper;
        import org.apache.hadoop.io.IntWritable;
        import org.apache.hadoop.io.Text;
        public class xxxLogMaper
            extends Mapper<Object, Text, Text, IntWritable> {
            public final static String CONTROL_A        = "^A";
            public final static String CONTROL_B        = "^B";
            public final static String CONTROL_C        = "^C";
            public final static int PV_TIME             = 1;
            public final static int DSP_INFO_LIST       = 5;
            public final static int DSP_ID              = 0;
            public final static int DSP_FILTER          = 2;
            public void map(Object key, Text value, Context context) {
                try {
                    System.out.println("\n------------map come on-----------");
                    System.out.println("\nline=-----------"+value.toString());
                    String[] line = value.toString().split(CONTROL_A);
                    String pvtime = "";
                    System.out.println("\npvtime=-----------"+line[PV_TIME]);
                    String year = line[PV_TIME].substring(0, 4);
                    String month = line[PV_TIME].substring(5, 7);
                    String day = line[PV_TIME].substring(8, 10);
                    String hour = line[PV_TIME].substring(11, 13);
                    String minute = "";  
                    int m_tmp = Integer.parseInt(line[PV_TIME].substring(14, 16));
                    if (m_tmp >= 0 && m_tmp <= 30) {
                        minute = "00";
                    } else {
                        minute = "30"; 
                    }
                    pvtime = year + month + day + hour + minute;
                    String[] dspInfoList = line[DSP_INFO_LIST].split(CONTROL_B);
                    String dspid = ""; 
                    String dspfilter = "";
                    Text k = new Text();
                    IntWritable v = new IntWritable(1);
                    for(int i=0; i<dspInfoList.length; i++) {
                        System.out.println("\n------------map-----------");
                        System.out.println("\ndspinfo="+dspInfoList[i]);
                        String[] dspInfo = dspInfoList[i].split(CONTROL_C);
                        dspid = dspInfo[DSP_ID];
                        dspfilter = dspInfo[DSP_FILTER];
                        //key=ddspid^Afilter^Apvtime, value=1
                        k.set(dspid+CONTROL_A+dspfilter+CONTROL_A+pvtime);
                        context.write(k, v);
                        System.out.println("\nkey="+k.toString());
                        System.out.println("\nvalue="+v.toString());
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
2、Reducer
        import java.io.IOException;
        import org.apache.hadoop.hbase.client.Get;
        import org.apache.hadoop.hbase.client.Put;
        import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
        import org.apache.hadoop.hbase.mapreduce.TableReducer;
        import org.apache.hadoop.mapreduce.Reducer;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.io.IntWritable;
        import org.apache.hadoop.hbase.HBaseConfiguration;
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.hbase.client.HTable;
        import org.apache.hadoop.hbase.client.HTablePool;
        import org.apache.hadoop.hbase.client.Result;
        import org.apache.hadoop.hbase.util.Bytes;
        import org.apache.hadoop.hbase.KeyValue;
        public class BidLogReducer
            extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
            public final static String COL_FAMILY       = "dsp_filter";
            public final static String COL_NAME         = "sum";
            private final static String ZK_HOST         = "localhost";
            private final static String TABLE_NAME      = "xxxlog";
            public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
                System.out.println("\n------------reduce come on-----------");
                String k = key.toString();
                IntWritable v = new IntWritable();
                int sum = 0;
                for (IntWritable val:values) {
                    sum += val.get();
                }
                //v.set(sum);
                //context.write(key, v);
                System.out.println("\n------------reduce-----------");
                System.out.println("\ncur-key="+key.toString());
                System.out.println("\ncur-value="+sum);
                Configuration conf = HBaseConfiguration.create();
                conf.set("hbase.zookeeper.quorum.", ZK_HOST);
                HTablePool pool = new HTablePool(conf, 3);
                HTable table = (HTable)pool.getTable(TABLE_NAME);
                Get getrow = new Get(k.getBytes());
                Result r = table.get(getrow);
                int m_tmp = 0;
                for(KeyValue kv:r.raw()) {
                    System.out.println("\nraw-KeyValugge---"+kv);
                    System.out.println("\nraw-row=>"+Bytes.toString(kv.getRow()));
                    System.out.println("\nraw-family=>"+Bytes.toString(kv.getFamily()));
                    System.out.println("\nraw-qualifier=>"+Bytes.toString(kv.getQualifier()));
                    System.out.println("\nraw-value=>"+Bytes.toString(kv.getValue()));
                    m_tmp += Integer.parseInt(Bytes.toString(kv.getValue()));
                }
                sum = sum + m_tmp;
                v.set(sum);
                System.out.println("\nreal-key="+key.toString());
                System.out.println("\nreal-value="+v.toString());
                Put putrow = new Put(k.getBytes());
                putrow.add(COL_FAMILY.getBytes(), COL_NAME.getBytes(), String.valueOf(v).getBytes());
                try {
                    context.write(new ImmutableBytesWritable(key.getBytes()), putrow);
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
3、Driver 
        #vi xxxLogDriver.java
        #vi xxxLogReducer.java
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.hbase.HBaseConfiguration;
        import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.io.IntWritable;
        import org.apache.hadoop.mapreduce.Job;
        import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
        import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
        import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
        import org.apache.hadoop.util.GenericOptionsParser;
        public class xxxLogDriver {
            public final static String ZK_HOST          = "localhost";
            public final static String TABLE_NAME       = "xxxlog";
            public static void main(String[] args) throws Exception {
                //Hbase Configuration
                Configuration conf = HBaseConfiguration.create();
                conf.set("hbase.zookeeper.quorum.", ZK_HOST);
                String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
                if (otherArgs.length != 2) {
                    System.err.println("Usage: please input <in> <out> args");
                    System.exit(2);
                }
                Job job = new Job(conf,"xxxLog");
                job.setJarByClass(xxxLogDriver.class);
                FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
                FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
                System.out.println("\n------------driver come on-----------");
                job.setMapperClass(xxxLogMaper.class);
                job.setReducerClass(xxxLogReducer.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(IntWritable.class);
                TableMapReduceUtil.initTableReducerJob(TABLE_NAME, xxxLogReducer.class, job);
                System.exit(job.waitForCompletion(true)? 0 : 1);
            }
        }

三、编译运行

        在当前目录下编译源码,具体如下:
        #javac -classpath /root/hadoop-0.20.2/hadoop-0.20.2-core.jar:/root/hadoop-0.20.2/lib/commons-cli-1.2.jar:/root/hbase-0.90.4/hbase-0.90.4.jar -d ./ xxxLogMaper.java xxxLogReducer.java xxxLogDriver.java
        需要注意的是,必须三个一起编译否则出错:
        xxxLogDriver.java:22: error: cannot find symbol
        job.setMapperClass(xxxLogMaper.class);
        打包class文件,具体如下:
        #jar cvf xxxLog.jar *class
        #rm -rf *class
        运行任务,具体如下:
        #hadoop jar xxxLog.jar  xxxLogDriver /tmp/input /tmp/output
        查询结果,具体如下:
        #./hbase shell
        hbase(main):014:0>scan 'xxxlog'

        



(责任编辑:IT)