HBase-1.0.1学习笔记(四)MapReduce操作HBase
鲁春利的工作笔记,谁说程序员不能有文艺范?
环境:
hadoop-2.6.0
hbase-1.0.1
zookeeper-3.4.6
1、Hadoop集群配置过程略;
2、Zookeeper集群配置过程略;
3、HBase集群配置过程略;
4、HBase作为输入源示例
查看当前hbase表m_domain中的数据
[hadoop@dnode1conf]$hbaseshellHBaseShell;enter'help<RETURN>'forlistofsupportedcommands.Type"exit<RETURN>"toleavetheHBaseShellVersion1.0.1,r66a93c09df3b12ff7b86c39bc8475c60e15af82d,FriApr1722:14:06PDT2015hbase(main):001:0>listTABLEm_domaint_domain2row(s)in0.9270seconds=>["m_domain","t_domain"]hbase(main):002:0>scan'm_domain'ROWCOLUMN+CELLalibaba.com_19990415_20220523column=cf:access_server,timestamp=1440947490018,value=\xE6\x9D\xAD\xE5\xB7\x9Ealibaba.com_19990415_20220523column=cf:exp_date,timestamp=1440947490018,value=2022\xE5\xB9\xB405\xE6\x9C\x8823\xE6\x97\xA5alibaba.com_19990415_20220523column=cf:ipstr,timestamp=1440947490018,value=205.204.101.42alibaba.com_19990415_20220523column=cf:owner,timestamp=1440947490018,value=HangzhouAlibabaAdvertisingCo.alibaba.com_19990415_20220523column=cf:reg_date,timestamp=1440947490018,value=1999\xE5\xB9\xB404\xE6\x9C\x8815\xE6\x97\xA5baidu.com_19991011_20151011column=cf:access_server,timestamp=1440947489956,value=\xE5\x8C\x97\xE4\xBA\xACbaidu.com_19991011_20151011column=cf:exp_date,timestamp=1440947489956,value=2015\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA5baidu.com_19991011_20151011column=cf:ipstr,timestamp=1440947489956,value=220.181.57.217baidu.com_19991011_20151011column=cf:reg_date,timestamp=1440947489956,value=1999\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA52row(s)in1.4560secondshbase(main):003:0>quit
实现Mapper端
packagecom.invic.mapreduce.hbase.source;importjava.io.IOException;importjava.util.Map;importjava.util.Map.Entry;importjava.util.NavigableMap;importjava.util.Set;importorg.apache.hadoop.hbase.Cell;importorg.apache.hadoop.hbase.CellUtil;importorg.apache.hadoop.hbase.client.Result;importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;importorg.apache.hadoop.hbase.mapreduce.TableMapper;importorg.apache.hadoop.hbase.util.Bytes;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.io.Writable;/****@authorlucl*TableMapper扩展自Mapper类,所有以HBase作为输入源的Mapper类都需要继承该类*/publicclassHBaseReaderMapperextendsTableMapper<Writable,Writable>{privateTextkey=newText();privateTextvalue=newText();@Overrideprotectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{super.setup(context);}@Overrideprotectedvoidmap(ImmutableBytesWritablerow,Resultresult,Contextcontext)throwsIOException,InterruptedException{//可以明确给定family{NavigableMap<byte[],byte[]>map=result.getFamilyMap("cf".getBytes());Set<Entry<byte[],byte[]>>values=map.entrySet();for(Entry<byte[],byte[]>entry:values){StringcolumnQualifier=newString(entry.getKey());StringcellValue=newString(entry.getValue());System.out.println(columnQualifier+"\t"+cellValue);//}}//存在多个列族或者不确定列族名字{StringrowKey=newString(row.get());byte[]columnFamily=null;byte[]columnQualifier=null;byte[]cellValue=null;StringBuffersbf=newStringBuffer(1024);for(Cellcell:result.listCells()){columnFamily=CellUtil.cloneFamily(cell);columnQualifier=CellUtil.cloneQualifier(cell);cellValue=CellUtil.cloneValue(cell);sbf.append(Bytes.toString(columnFamily));sbf.append(".");sbf.append(Bytes.toString(columnQualifier));sbf.append(":");sbf.append(newString(cellValue,"UTF-8"));}key.set(rowKey);value.set(sbf.toString());context.write(key,value);}}@Overrideprotectedvoidcleanup(Contextcontext)throwsIOException,InterruptedException{super.cleanup(context);}}
实现MapReduce的Driver类
packagecom.invic.mapreduce.hbase.source;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.conf.Configured;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.hbase.HBaseConfiguration;importorg.apache.hadoop.hbase.client.Scan;importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;importorg.apache.hadoop.hbase.util.Bytes;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;importorg.apache.hadoop.util.Tool;importorg.apache.hadoop.util.ToolRunner;/****@authorlucl*HBase作为输入源示例**/publicclassHBaseASDataSourceDriverextendsConfiguredimplementsTool{/****@paramargs*@throwsException*/publicstaticvoidmain(String[]args)throwsException{//System.setProperty("hadoop.home.dir","E:\\hadoop-2.6.0\\hadoop-2.6.0\\");intexit=ToolRunner.run(newHBaseASDataSourceDriver(),args);System.out.println("receiveexit:"+exit);}@Overridepublicintrun(String[]args)throwsException{Configurationconf=HBaseConfiguration.create();//hadoop的参数配置/*conf.set("fs.defaultFS","hdfs://cluster");conf.set("dfs.nameservices","cluster");conf.set("dfs.ha.namenodes.cluster","nn1,nn2");conf.set("dfs.namenode.rpc-address.cluster.nn1","nnode:8020");conf.set("dfs.namenode.rpc-address.cluster.nn2","dnode1:8020");conf.set("dfs.client.failover.proxy.provider.cluster","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");*///hbasemaster//property"hbase.master"hasbeendeprecatedsince0.90//JustpassingtheZKconfigurationmakesyourclientauto-discoverthemaster//conf.set("hbase.master","nnode:60000");//zookeeperquorumgetConf().set("hbase.zookeeper.property.clientport","2181");getConf().set("hbase.zookeeper.quorum","nnode,dnode1,dnode2");//是否对MapTask启用推测执行机制getConf().setBoolean("mapreduce.map.speculative",false);//是否对ReduceTask启用推测执行机制getConf().setBoolean("mapreduce.reduce.speculative",false);Jobjob=Job.getInstance(conf);job.setJobName("MyBaseReaderFromHBase");job.setJarByClass(HBaseASDataSourceDriver.class);job.setOutputFormatClass(TextOutputFormat.class);/***从HBase读取数据时数据会传给下面定义的Mapper来,在Mapper类中进行了数据的处理*由于在job中未指定Reducer类,会调用默认的Reducer类来将Mapper的输出原封不动的写入;*如果需要在Reducer中再做些其他的单独的处理,则可以自定义Reducer类再做些处理。*/Scanscan=newScan();//scan.addFamily(family);//scan.addColumn(family,qualifier);byte[]tableName=Bytes.toBytes("m_domain");TableMapReduceUtil.initTableMapperJob(tableName,scan,HBaseReaderMapper.class,Text.class,Text.class,job);Pathpath=newPath("/"+System.currentTimeMillis());FileOutputFormat.setOutputPath(job,path);returnjob.waitForCompletion(true)?0:1;}}
查看结果:
问题记录:
a. 通过Eclipse执行时报错,但未分析出原因
b. 放到集群环境中运行时Mapper类如果定义在Driver类中,则报错
ClassNotFoundforHBaseASDataSourceDriver$HBaseReaderMapperinit()
c. zookeeper连接符总是显示连接的为127.0.0.1而非配置的zookeeper.quorum
如果zookeeper集群环境与hbase环境在不同的机器不知道是否会出现问题。
5、Hbase作为输出源示例
文本文件内容如下:
2013-09-1316:04:08www.subnetc1.com192.168.1.780192.168.1.13918863HTTPwww.subnetc1.com/index.html2013-09-1316:04:08www.subnetc2.com192.168.1.780192.168.1.15914100HTTPwww.subnetc2.com/index.html2013-09-1316:04:08www.subnetc3.com192.168.1.780192.168.1.1304927HTTPwww.subnetc3.com/index.html2013-09-1316:04:08www.subnetc4.com192.168.1.780192.168.1.15439044HTTPwww.subnetc4.com/index.html
Map端代码:
packagecom.invic.mapreduce.hbase.target;importjava.io.IOException;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;publicclassMyMapperextendsMapper<Object,Text,Text,Text>{@Overridepublicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{//用来实现wordcount功能,示例程序,Mapper<Object,Text,Text,IntWritable>/*{IntWritableone=newIntWritable(1);Textword=newText();StringTokenizertoken=newStringTokenizer(value.toString());while(token.hasMoreTokens()){word.set(token.nextToken());context.write(word,one);}}*///将多列数据写入hbase,Mapper<Object,Text,Text,Text>{String[]temps=value.toString().split("\t");if(null!=temps&&temps.length==8){Textword=newText();word.set(temps[1]);context.write(word,value);}}}}
Reducer端代码:
packagecom.invic.mapreduce.hbase.target;importjava.io.IOException;importjava.util.Iterator;importorg.apache.hadoop.hbase.client.Put;importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;importorg.apache.hadoop.hbase.mapreduce.TableReducer;importorg.apache.hadoop.hbase.util.Bytes;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;/****@authorlucl**/publicclassMyReducerextendsTableReducer<Text,Text,ImmutableBytesWritable>{@Overridepublicvoidreduce(Textkey,Iterable<Text>value,Contextcontext)throwsIOException,InterruptedException{//forwordcount//TableReducer<Text,IntWritable,ImmutableBytesWritable>//Iterable<IntWritable>/*{intsum=0;for(Iterator<IntWritable>it=value.iterator();it.hasNext();){IntWritableval=it.next();sum+=val.get();}Putput=newPut(key.getBytes());//sum为Integer类型,需要先转为S他ring,然后再取byte值,否则查看数据时无法显示sum的值byte[]datas=Bytes.toBytes(String.valueOf(sum));put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("count"),datas);context.write(newImmutableBytesWritable(key.getBytes()),put);}*///需要将多列写入HBase//TableReducer<Text,Text,ImmutableBytesWritable>//Iterable<Text>value{byte[]family="cf".getBytes();Putput=newPut(key.getBytes());StringBuffersbf=newStringBuffer();for(Texttext:value){sbf.append(text.toString());}put.addColumn(family,Bytes.toBytes("detail"),Bytes.toBytes(sbf.toString()));context.write(newImmutableBytesWritable(key.getBytes()),put);}}}
Driver驱动类:
packagecom.invic.mapreduce.hbase.target;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.conf.Configured;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.hbase.HBaseConfiguration;importorg.apache.hadoop.hbase.HColumnDescriptor;importorg.apache.hadoop.hbase.HTableDescriptor;importorg.apache.hadoop.hbase.TableName;importorg.apache.hadoop.hbase.client.Admin;importorg.apache.hadoop.hbase.client.Connection;importorg.apache.hadoop.hbase.client.ConnectionFactory;importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.util.Tool;importorg.apache.hadoop.util.ToolRunner;/****@authorlucl*HBase作为输出源示例**/publicclassHBaseASDataTargetDriverextendsConfiguredimplementsTool{privatestaticfinalStringTABLE_NAME="t_inter_log";privatestaticfinalStringCOLUMN_FAMILY_NAME="cf";/****@paramargs*@throwsException*/publicstaticvoidmain(String[]args)throwsException{//foreclipse//System.setProperty("hadoop.home.dir","E:\\hadoop-2.6.0\\hadoop-2.6.0\\");intexit=ToolRunner.run(newHBaseASDataTargetDriver(),args);System.out.println("receiveexit:"+exit);}@Overridepublicintrun(String[]args)throwsException{Configurationconf=HBaseConfiguration.create(getConf());//hadoop的参数配置conf.set("fs.defaultFS","hdfs://cluster");conf.set("dfs.nameservices","cluster");conf.set("dfs.ha.namenodes.cluster","nn1,nn2");conf.set("dfs.namenode.rpc-address.cluster.nn1","nnode:8020");conf.set("dfs.namenode.rpc-address.cluster.nn2","dnode1:8020");conf.set("dfs.client.failover.proxy.provider.cluster","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");//hbasemaster//property"hbase.master"hasbeendeprecatedsince0.90//JustpassingtheZKconfigurationmakesyourclientauto-discoverthemaster//conf.set("hbase.master","nnode:60000");//zookeeperquorumconf.set("hbase.zookeeper.property.clientport","2181");conf.set("hbase.zookeeper.quorum","nnode,dnode1,dnode2");//是否对MapTask启用推测执行机制conf.setBoolean("mapreduce.map.speculative",false);//是否对ReduceTask启用推测执行机制conf.setBoolean("mapreduce.reduce.speculative",false);/***HBase创建表*/Connectionconnection=ConnectionFactory.createConnection(conf);Adminadmin=connection.getAdmin();TableNametableName=TableName.valueOf(TABLE_NAME);booleanexists=admin.tableExists(tableName);if(exists){admin.disableTable(tableName);admin.deleteTable(tableName);}HTableDescriptortableDesc=newHTableDescriptor(tableName);HColumnDescriptorcolumnDesc=newHColumnDescriptor(COLUMN_FAMILY_NAME);tableDesc.addFamily(columnDesc);admin.createTable(tableDesc);/***读取文件内容*/StringfileName="http_interceptor_20130913.txt";Jobjob=Job.getInstance(conf);job.setJobName("MyBaseWriterToHBase");job.setJarByClass(HBaseASDataTargetDriver.class);job.setMapperClass(MyMapper.class);/***MapReduce读取文本文件时默认的的四个参数(KeyIn,ValueIn,KeyOut,ValueOut)*说明:*默认情况下KeyIn为IntWrite类型,为在文本文件中的偏移量,ValueIn为一行数据*第一次测试时未设置的设置map端输出的key-value类型,程序执行正常*第二次增加map端输出的key-value类型设置*job.setMapOutputKeyClass*job.setMapOutputValueClass*Hadoop应用开发技术详解2015年1月第1版P191页写的:*map端输出的key-value默认类型分别为LongWritable和Text*根据示例程序MyMapper中实现的map端输出的key-value实际为Text和IntWritable*//job.setMapOutputKeyClass(LongWritable.class);//job.setMapOutputValueClass(Text.class);//设置后页面调用时报错如下:15/09/0422:19:06INFOmapreduce.Job:TaskId:attempt_1441346242717_0014_m_000000_0,Status:FAILEDError:java.io.IOException:Typemismatchinkeyfrommap:expectedorg.apache.hadoop.io.LongWritable,receivedorg.apache.hadoop.io.Textatorg.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1069)atorg.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)atorg.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)atorg.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)atcom.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:21)atcom.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:1)atorg.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)atorg.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)atorg.apache.hadoop.mapred.MapTask.run(MapTask.java:341)atorg.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)atjava.security.AccessController.doPrivileged(NativeMethod)atjavax.security.auth.Subject.doAs(Subject.java:415)atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)atorg.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)第三次设置为与Mapper类中一致的,程序执行正确。*/job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//下面这句话不能加,在测试中发现加了这句话竟然报错找不到MyReducer类了。//job.setReducerClass(MyReducer.class);Pathpath=newPath(fileName);FileInputFormat.addInputPath(job,path);TableMapReduceUtil.initTableReducerJob(TABLE_NAME,MyReducer.class,job);//forwordcount//job.setOutputKeyClass(Text.class);//job.setOutputValueClass(IntWritable.class);//formulticolumnsjob.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);returnjob.waitForCompletion(true)?0:1;}}
未设置Map输出的key-value的类型时报错如下(wordcount的示例未报错,在Hadoop应用开发技术详解中说map端输出的key-value默认类型为:LongWritable.class和Text.class,但是wordcount示例中map端输出的key-value类型却为Text.class和IntWritable):
15/09/0421:15:54INFOmapreduce.Job:map0%reduce0%15/09/0421:16:27INFOmapreduce.Job:TaskId:attempt_1441346242717_0011_m_000000_0,Status:FAILEDError:java.io.IOException:Typemismatchinvaluefrommap:expectedorg.apache.hadoop.io.IntWritable,receivedorg.apache.hadoop.io.Textatorg.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1074)atorg.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)atorg.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)atorg.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)atcom.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:29)atcom.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:1)atorg.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)atorg.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)atorg.apache.hadoop.mapred.MapTask.run(MapTask.java:341)atorg.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)atjava.security.AccessController.doPrivileged(NativeMethod)atjavax.security.auth.Subject.doAs(Subject.java:415)atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)#由于出现错误时Map端为0%,所以分析问题出现在map端,且根据提示信息说明默认value应该是IntWritable,我第二次的示例与wordcount的差别主要在map端输出的value由IntWritabe->Text,设置了如下参数后问题解决。#job.setMapOutputKeyClass(Text.class);#job.setMapOutputValueClass(Text.class);
wordcount及数据入库示例程序执行结果验证:
hbase(main):005:0>scan't_inter_log'ROWCOLUMN+CELL14100column=cf:count,timestamp=1441370812728,value=116:04:08column=cf:count,timestamp=1441370812728,value=418863:08column=cf:count,timestamp=1441370812728,value=1192.168.1.130column=cf:count,timestamp=1441370812728,value=1192.168.1.139column=cf:count,timestamp=1441370812728,value=1192.168.1.154column=cf:count,timestamp=1441370812728,value=1192.168.1.159column=cf:count,timestamp=1441370812728,value=1192.168.1.759column=cf:count,timestamp=1441370812728,value=42013-09-13759column=cf:count,timestamp=1441370812728,value=43904409-13759column=cf:count,timestamp=1441370812728,value=14927409-13759column=cf:count,timestamp=1441370812728,value=18027409-13759column=cf:count,timestamp=1441370812728,value=4HTTP409-13759column=cf:count,timestamp=1441370812728,value=4www.subnetc1.comcolumn=cf:count,timestamp=1441370812728,value=1www.subnetc1.com/index.htmlcolumn=cf:count,timestamp=1441370812728,value=1www.subnetc2.com/index.htmlcolumn=cf:count,timestamp=1441370812728,value=1www.subnetc3.com/index.htmlcolumn=cf:count,timestamp=1441370812728,value=1www.subnetc4.com/index.htmlcolumn=cf:count,timestamp=1441370812728,value=118row(s)in1.2290seconds#每次执行时都会先删除t_inter_log表hbase(main):007:0>scan't_inter_log'ROWCOLUMN+CELLwww.subnetc1.comcolumn=cf:detail,timestamp=1441373481468,value=2013-09-1316:04:08\x09www.subnetc1.com\x09192.168.1.7\x0980\x09192.168.1.139\x0918863\x09HTTP\x09www.subnetc1.com/index.htmlwww.subnetc2.comcolumn=cf:detail,timestamp=1441373481468,value=2013-09-1316:04:08\x09www.subnetc2.com\x09192.168.1.7\x0980\x09192.168.1.159\x0914100\x09HTTP\x09www.subnetc2.com/index.htmlwww.subnetc3.comcolumn=cf:detail,timestamp=1441373481468,value=2013-09-1316:04:08\x09www.subnetc3.com\x09192.168.1.7\x0980\x09192.168.1.130\x094927\x09HTTP\x09www.subnetc3.com/index.htmlwww.subnetc4.comcolumn=cf:detail,timestamp=1441373481468,value=2013-09-1316:04:08\x09www.subnetc4.com\x09192.168.1.7\x0980\x09192.168.1.154\x0939044\x09HTTP\x09www.subnetc4.com/index.html4row(s)in3.3280seconds
6、HBase作为共享源示例
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。