简介
使用Scan的时候,可以配合各种Filter进行数据的筛选以减少返回的数据量,同样也可以通过选择特定的列族和列来减少返回的数据量。若是能将该特性进一步的优化则HBase会更强大,例如将客户端的代码分发到各个HRegionServer上执行,好比MapReduce一样的执行任务,执行完后返回的结果相信数据量会更小。HBase在0.92版本后引入了协处理器来实现该功能。
使用协处理器的好处是显而易见的,可以将运算放至Server端,减少通信开销的同时还能有效的提升性能。但同样的也会存在一些风险:由于协处理器目前还没有与主要的HBase进程隔离开来,若协处理器崩溃可能导致进程崩溃。
协处理器分类两类:Observer和Endpoint。可以分别对应成关系型数据库中的触发器和存储过程。
Observer
Observer类的协处理器在特定的事件发生时执行回调函数(也被称为钩子函数,hook),这些事件可以是用户产生的事件,也可以是服务器端产生的事件。
Observer分类以下三类:
- RegionObserver:提供客户端的数据操纵事件钩子,与表的region紧密关联
- MasterObserver:提供DDL类型的操作钩子,为集群级事件
- WALObserver:提供WAL相关操作钩子
RegionObserver详解
RegionObserver当一个特定的region级别的操作发生时,相应的钩子函数会被触发。这些操作分为两类:
- region生命周期变化。主要接口:
- void preOpen(..)/void postOpen(..):在region打开前/后调用
- void preWALRestore(..)/void postWALRestore(..):在WAL日志restore前/后调用
- void preFlush(..)/void postFlush(..):flush前/后调用
- void preCompact(..)/void postCompact(..):compact前/后调用
- void preSplit(..)/void postSplit(..):split前/后调用
- void preClose(..)/void postClose(..):cloase前/后调用
- 客户端API调用。主要接口:
- void preGet(..)/void postGet(..)
- void prePut(..)/void postPut()
- void preDelete()/void postDelete()
- void preCheckAndPut(..)/void postCheckAndPut(..)
- etc.
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public class Obs extends BaseRegionObserver { private static final byte[] CURRENT_TIME = Bytes.toBytes("#time#"); @Override public void preGet(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<KeyValue> results) throws IOException { if(Bytes.equals(get.getRow(), CURRENT_TIME)) { KeyValue kv = new KeyValue(get.getRow(), CURRENT_TIME, CURRENT_TIME, Bytes.toBytes(System.currentTimeMillis())); results.add(kv); } } @Override public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException { e.bypass(); //停止split } } |
MasterObserver
MasterObserver为处理master服务器的所有回调函数,而HMaster负责的内容为创建表、删除表之类的类似关系型数据库中的DDL操作。
MasterObserver提供的接口:
- void preCreateTable(..)/void postCreateTable(..): 创建表前/后调用
- void preDeleteTable(..)/void postDeleteTable(..):删除表前/后调用
- void preDisableTable(..)/void postDisableTable(..):disable表前/后调用
- etc.
WALObserver
WALObserver提供WAL相关的钩子函数,提供的接口:
- void preWALWrite(..)/void postWALWrite(..):WAL写前/后调用
Endpoint
Endpoint类似于关系型数据库中的存储过程,只不过这个存储过程会运行在多个region上。
终端是动态RPC插件的接口,它的实现代码被安装在服务器端,从而能够通过HBase RPC唤醒。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个终端,它们的实现代码会被目标region远程执行,结果会返回到终端。用户可以结合使用这些强大的插件接口,为HBase添加全新的特性。
Endpoint的使用需要经过如下步骤(示例为计算某列的sum值):
- 定义一个新的protocol接口,并继承CoprocessorProtocol
123public interface ColumnAggregationProtocol extends CoprocessorProtocol {public long sum(byte[] family, byte[] qualifier) throws IOException;} - 继承抽象类BaseEndpointCoprocessor,并实现已定义的接口
123456789101112131415161718192021222324public class ColumnAggregationEndpoint extends BaseEndpointCoprocessorimplements ColumnAggregationProtocol {@Overridepublic long sum(byte[] family, byte[] qualifier) throws IOException {Scan scan = new Scan();scan.addColumn(family, qualifier);long sumResult = 0;InternalScanner scanner = ((RegionCoprocessorEnvironment)getEnvironment()).getRegion().getScanner(scan);try {List<KeyValue> curVals = new ArrayList<KeyValue>();boolean hasMore = false;do {curVals.clear();hasMore = scanner.next(curVals);KeyValue kv = curVals.get(0);sumResult += Bytes.toLong(kv.getValue());} while (hasMore);} finally {scanner.close();}return sumResult;}}
- 客户端调用定义好的方法
12345678910111213141516171819202122232425262728public class EndpointTest {private static final byte[] TABLE_NAME = Bytes.toBytes("testtable");private static final byte[] CF = Bytes.toBytes("cf");private static final byte[] QUALIFIER = Bytes.toBytes("counts");private static final byte[] START_KEY = Bytes.toBytes("000");private static final byte[] END_KEY = Bytes.toBytes("999");public static void main(String[] args) throws Throwable {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", "172.16.0.126");conf.set("hbase.zookeeper.property.clientPort", "2181");conf.set("mapred.task.timeout", "0");HTableInterface table = new HTable(conf, TABLE_NAME);Map<byte[], Long> results;results = table.coprocessorExec(ColumnAggregationProtocol.class,START_KEY, END_KEY, new Batch.Call<ColumnAggregationProtocol, Long>() {@Overridepublic Long call(ColumnAggregationProtocol instance) throws IOException {return instance.sum(CF, QUALIFIER);}});long sumResult = 0;for (Map.Entry<byte[], Long> e : results.entrySet()) {sumResult += e.getValue();}System.out.println(sumResult);}}
客户点调用Endpoint的方法有三种方式:
1 2 3 4 5 6 7 8 9 10 |
//适用于单个region public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, Row row); //适用于多个region public <T extends CoprocessorProtocol, R> void coprocessorExec( Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> call, Batch.Callback<R> callback); //适用于多个region public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec( Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> call); |
协处理器加载
协处理器的加载分两种形式,配置文件加载和Shell命令加载。
配置文件加载
编辑hbase-site.xml文件,添加:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
<property> <!-- 配置RegionObserver和Endpoint --> <name>hbase.coprocessor.region.classes</name> <value>coprocessor.RegionObserverExample01,EndpointExample01</value> </property> <property> <!-- 配置RegionObserver和Endpoint --> <name>hbase.coprocessor.master.classes</name> <value>coprocessor.MasterObserverExample01</value> </property> <property> <!-- 配置WALObserver --> <name>hbase.coprocessor.wal.classes</name> <value>coprocessor.WALObserverExample01</value> </property> |
配置文件中的配置项决定了该协处理器的执行顺序。修改完配置文件后,需将相应的jar包放入hbase-env.sh中的HBASE——CLASSPATH中,然后重启HBase来使配置生效。
Shell命令加载
Shell命令加载也就是从表描述符中加载,其中键必须以COPROCESSOR开头,值必须符合以下格式:
1 |
'COPROCESSOR'=>'hdfs://localhost:9000/user/hadoop/test.jar|coprocessor.Test|SYSTEM' |
其中priority只能为SYSTEM或者USER。