Hadoop初探之MapReduce+HBase实例
时间:2015-02-08 00:22 来源:linux.it.net.cn 作者:IT
一、环境配置
这里选择的环境是hadoop-0.20.2和hbase-0.90.4,Hadoop环境配置参看这里,HBase环境配置请看这里。
需要注意的是,本文的需求是在Hadoop上跑MapReduce job来分析日志并将结果持久化到HBase,所以,在编译程序时,Hadoop需要用到HBase和Zookeeper包,因此,需要分别将hbase-0.90.4.jar和zookeeper-3.3.2.jar拷贝到Hadoop的lib目录下,具体操作如下:
#cp /root/hbase-0.90.4/hbase-0.90.4.jar /root/hadoop-0.20.2/lib
#cp /root/hbase-0.90.4/lib/zookeeper-3.3.2.jar /root/hadoop-0.20.2/lib
二、实例编写
日志文件xxxlog.txt的内容如下:
version-------------time-----------------id-------rt----filter--------id----rt-----filter
1.0^A2014-03-03 00:00:01^Ad2000^C4^C3040^Bd2001^C7^C0
1.0^A2014-03-03 00:00:01^Ad3000^C4^C3041^Bd2001^C7^C0
同样,需要将此文件放到hdfs目录下,比如:hadoop fs -put /tmp/input。
为持久化在HBase中创建table和family,比如:./hbase shell,create 'xxxlog', 'dsp_filter'。
为了清晰便于扩展,将Maper、Reducer、Driver分开,具体如下:
1、Maper
#vi xxxLogMaper.java
import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class xxxLogMaper
extends Mapper<Object, Text, Text, IntWritable> {
public final static String CONTROL_A = "^A";
public final static String CONTROL_B = "^B";
public final static String CONTROL_C = "^C";
public final static int PV_TIME = 1;
public final static int DSP_INFO_LIST = 5;
public final static int DSP_ID = 0;
public final static int DSP_FILTER = 2;
public void map(Object key, Text value, Context context) {
try {
System.out.println("\n------------map come on-----------");
System.out.println("\nline=-----------"+value.toString());
String[] line = value.toString().split(CONTROL_A);
String pvtime = "";
System.out.println("\npvtime=-----------"+line[PV_TIME]);
String year = line[PV_TIME].substring(0, 4);
String month = line[PV_TIME].substring(5, 7);
String day = line[PV_TIME].substring(8, 10);
String hour = line[PV_TIME].substring(11, 13);
String minute = "";
int m_tmp = Integer.parseInt(line[PV_TIME].substring(14, 16));
if (m_tmp >= 0 && m_tmp <= 30) {
minute = "00";
} else {
minute = "30";
}
pvtime = year + month + day + hour + minute;
String[] dspInfoList = line[DSP_INFO_LIST].split(CONTROL_B);
String dspid = "";
String dspfilter = "";
Text k = new Text();
IntWritable v = new IntWritable(1);
for(int i=0; i<dspInfoList.length; i++) {
System.out.println("\n------------map-----------");
System.out.println("\ndspinfo="+dspInfoList[i]);
String[] dspInfo = dspInfoList[i].split(CONTROL_C);
dspid = dspInfo[DSP_ID];
dspfilter = dspInfo[DSP_FILTER];
//key=ddspid^Afilter^Apvtime, value=1
k.set(dspid+CONTROL_A+dspfilter+CONTROL_A+pvtime);
context.write(k, v);
System.out.println("\nkey="+k.toString());
System.out.println("\nvalue="+v.toString());
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2、Reducer
import java.io.IOException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.KeyValue;
public class BidLogReducer
extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
public final static String COL_FAMILY = "dsp_filter";
public final static String COL_NAME = "sum";
private final static String ZK_HOST = "localhost";
private final static String TABLE_NAME = "xxxlog";
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
System.out.println("\n------------reduce come on-----------");
String k = key.toString();
IntWritable v = new IntWritable();
int sum = 0;
for (IntWritable val:values) {
sum += val.get();
}
//v.set(sum);
//context.write(key, v);
System.out.println("\n------------reduce-----------");
System.out.println("\ncur-key="+key.toString());
System.out.println("\ncur-value="+sum);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum.", ZK_HOST);
HTablePool pool = new HTablePool(conf, 3);
HTable table = (HTable)pool.getTable(TABLE_NAME);
Get getrow = new Get(k.getBytes());
Result r = table.get(getrow);
int m_tmp = 0;
for(KeyValue kv:r.raw()) {
System.out.println("\nraw-KeyValugge---"+kv);
System.out.println("\nraw-row=>"+Bytes.toString(kv.getRow()));
System.out.println("\nraw-family=>"+Bytes.toString(kv.getFamily()));
System.out.println("\nraw-qualifier=>"+Bytes.toString(kv.getQualifier()));
System.out.println("\nraw-value=>"+Bytes.toString(kv.getValue()));
m_tmp += Integer.parseInt(Bytes.toString(kv.getValue()));
}
sum = sum + m_tmp;
v.set(sum);
System.out.println("\nreal-key="+key.toString());
System.out.println("\nreal-value="+v.toString());
Put putrow = new Put(k.getBytes());
putrow.add(COL_FAMILY.getBytes(), COL_NAME.getBytes(), String.valueOf(v).getBytes());
try {
context.write(new ImmutableBytesWritable(key.getBytes()), putrow);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3、Driver
#vi xxxLogDriver.java
#vi xxxLogReducer.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class xxxLogDriver {
public final static String ZK_HOST = "localhost";
public final static String TABLE_NAME = "xxxlog";
public static void main(String[] args) throws Exception {
//Hbase Configuration
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum.", ZK_HOST);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: please input <in> <out> args");
System.exit(2);
}
Job job = new Job(conf,"xxxLog");
job.setJarByClass(xxxLogDriver.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.out.println("\n------------driver come on-----------");
job.setMapperClass(xxxLogMaper.class);
job.setReducerClass(xxxLogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
TableMapReduceUtil.initTableReducerJob(TABLE_NAME, xxxLogReducer.class, job);
System.exit(job.waitForCompletion(true)? 0 : 1);
}
}
三、编译运行
在当前目录下编译源码,具体如下:
#javac -classpath /root/hadoop-0.20.2/hadoop-0.20.2-core.jar:/root/hadoop-0.20.2/lib/commons-cli-1.2.jar:/root/hbase-0.90.4/hbase-0.90.4.jar -d ./ xxxLogMaper.java xxxLogReducer.java xxxLogDriver.java
需要注意的是,必须三个一起编译否则出错:
xxxLogDriver.java:22: error: cannot find symbol
job.setMapperClass(xxxLogMaper.class);
打包class文件,具体如下:
#jar cvf xxxLog.jar *class
#rm -rf *class
运行任务,具体如下:
#hadoop jar xxxLog.jar xxxLogDriver /tmp/input /tmp/output
查询结果,具体如下:
#./hbase shell
hbase(main):014:0>scan 'xxxlog'
(责任编辑:IT)
一、环境配置 这里选择的环境是hadoop-0.20.2和hbase-0.90.4,Hadoop环境配置参看这里,HBase环境配置请看这里。 需要注意的是,本文的需求是在Hadoop上跑MapReduce job来分析日志并将结果持久化到HBase,所以,在编译程序时,Hadoop需要用到HBase和Zookeeper包,因此,需要分别将hbase-0.90.4.jar和zookeeper-3.3.2.jar拷贝到Hadoop的lib目录下,具体操作如下: #cp /root/hbase-0.90.4/hbase-0.90.4.jar /root/hadoop-0.20.2/lib #cp /root/hbase-0.90.4/lib/zookeeper-3.3.2.jar /root/hadoop-0.20.2/lib 二、实例编写 日志文件xxxlog.txt的内容如下: version-------------time-----------------id-------rt----filter--------id----rt-----filter 1.0^A2014-03-03 00:00:01^Ad2000^C4^C3040^Bd2001^C7^C0 1.0^A2014-03-03 00:00:01^Ad3000^C4^C3041^Bd2001^C7^C0 同样,需要将此文件放到hdfs目录下,比如:hadoop fs -put /tmp/input。 为持久化在HBase中创建table和family,比如:./hbase shell,create 'xxxlog', 'dsp_filter'。 为了清晰便于扩展,将Maper、Reducer、Driver分开,具体如下: 1、Maper #vi xxxLogMaper.java import java.io.IOException; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; public class xxxLogMaper extends Mapper<Object, Text, Text, IntWritable> { public final static String CONTROL_A = "^A"; public final static String CONTROL_B = "^B"; public final static String CONTROL_C = "^C"; public final static int PV_TIME = 1; public final static int DSP_INFO_LIST = 5; public final static int DSP_ID = 0; public final static int DSP_FILTER = 2; public void map(Object key, Text value, Context context) { try { System.out.println("\n------------map come on-----------"); System.out.println("\nline=-----------"+value.toString()); String[] line = value.toString().split(CONTROL_A); String pvtime = ""; System.out.println("\npvtime=-----------"+line[PV_TIME]); String year = line[PV_TIME].substring(0, 4); String month = line[PV_TIME].substring(5, 7); String day = line[PV_TIME].substring(8, 10); String hour = line[PV_TIME].substring(11, 13); String minute = ""; int m_tmp = Integer.parseInt(line[PV_TIME].substring(14, 16)); if (m_tmp >= 0 && m_tmp <= 30) { minute = "00"; } else { minute = "30"; } pvtime = year + month + day + hour + minute; String[] dspInfoList = line[DSP_INFO_LIST].split(CONTROL_B); String dspid = ""; String dspfilter = ""; Text k = new Text(); IntWritable v = new IntWritable(1); for(int i=0; i<dspInfoList.length; i++) { System.out.println("\n------------map-----------"); System.out.println("\ndspinfo="+dspInfoList[i]); String[] dspInfo = dspInfoList[i].split(CONTROL_C); dspid = dspInfo[DSP_ID]; dspfilter = dspInfo[DSP_FILTER]; //key=ddspid^Afilter^Apvtime, value=1 k.set(dspid+CONTROL_A+dspfilter+CONTROL_A+pvtime); context.write(k, v); System.out.println("\nkey="+k.toString()); System.out.println("\nvalue="+v.toString()); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } 2、Reducer import java.io.IOException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.KeyValue; public class BidLogReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { public final static String COL_FAMILY = "dsp_filter"; public final static String COL_NAME = "sum"; private final static String ZK_HOST = "localhost"; private final static String TABLE_NAME = "xxxlog"; public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { System.out.println("\n------------reduce come on-----------"); String k = key.toString(); IntWritable v = new IntWritable(); int sum = 0; for (IntWritable val:values) { sum += val.get(); } //v.set(sum); //context.write(key, v); System.out.println("\n------------reduce-----------"); System.out.println("\ncur-key="+key.toString()); System.out.println("\ncur-value="+sum); Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum.", ZK_HOST); HTablePool pool = new HTablePool(conf, 3); HTable table = (HTable)pool.getTable(TABLE_NAME); Get getrow = new Get(k.getBytes()); Result r = table.get(getrow); int m_tmp = 0; for(KeyValue kv:r.raw()) { System.out.println("\nraw-KeyValugge---"+kv); System.out.println("\nraw-row=>"+Bytes.toString(kv.getRow())); System.out.println("\nraw-family=>"+Bytes.toString(kv.getFamily())); System.out.println("\nraw-qualifier=>"+Bytes.toString(kv.getQualifier())); System.out.println("\nraw-value=>"+Bytes.toString(kv.getValue())); m_tmp += Integer.parseInt(Bytes.toString(kv.getValue())); } sum = sum + m_tmp; v.set(sum); System.out.println("\nreal-key="+key.toString()); System.out.println("\nreal-value="+v.toString()); Put putrow = new Put(k.getBytes()); putrow.add(COL_FAMILY.getBytes(), COL_NAME.getBytes(), String.valueOf(v).getBytes()); try { context.write(new ImmutableBytesWritable(key.getBytes()), putrow); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } 3、Driver #vi xxxLogDriver.java #vi xxxLogReducer.java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class xxxLogDriver { public final static String ZK_HOST = "localhost"; public final static String TABLE_NAME = "xxxlog"; public static void main(String[] args) throws Exception { //Hbase Configuration Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum.", ZK_HOST); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: please input <in> <out> args"); System.exit(2); } Job job = new Job(conf,"xxxLog"); job.setJarByClass(xxxLogDriver.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.out.println("\n------------driver come on-----------"); job.setMapperClass(xxxLogMaper.class); job.setReducerClass(xxxLogReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); TableMapReduceUtil.initTableReducerJob(TABLE_NAME, xxxLogReducer.class, job); System.exit(job.waitForCompletion(true)? 0 : 1); } } 三、编译运行 在当前目录下编译源码,具体如下: #javac -classpath /root/hadoop-0.20.2/hadoop-0.20.2-core.jar:/root/hadoop-0.20.2/lib/commons-cli-1.2.jar:/root/hbase-0.90.4/hbase-0.90.4.jar -d ./ xxxLogMaper.java xxxLogReducer.java xxxLogDriver.java 需要注意的是,必须三个一起编译否则出错: xxxLogDriver.java:22: error: cannot find symbol job.setMapperClass(xxxLogMaper.class); 打包class文件,具体如下: #jar cvf xxxLog.jar *class #rm -rf *class 运行任务,具体如下: #hadoop jar xxxLog.jar xxxLogDriver /tmp/input /tmp/output 查询结果,具体如下: #./hbase shell hbase(main):014:0>scan 'xxxlog' (责任编辑:IT) |