In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)06/01 Report--
Optimization summary of HBase
To sum up: pre-partitioning, column families, batch read and write, merge, link pool. For more information, please see:
1. Table design (the first three are the most important) 1.1 Pre-Creating Regions
By default, an region partition is automatically created when the HBase table is created, and when the data is imported, all HBase clients write to this region until the region is large enough. One way to speed up batch writes is to create some empty regions in advance, so that when the data is written to HBase, it will load balance the data in the cluster according to the region partition.
For more information on pre-partitioning, see Table Creation: Pre-Creating Regions. Here is an example:
Public static boolean createTable (HBaseAdmin admin, HTableDescriptor table, byte [] [] splits) throws IOException {try {admin.createTable (table, splits); return true;} catch (TableExistsException e) {logger.info ("table" + table.getNameAsString () + "already exists"); / / the table already exists... Return false }} / / this method is the key at the beginning and end of the incoming data, and which data is to be divided into several region, and which data is returned in which regions / / the characteristics of the data may need to be analyzed in practical applications, so as to avoid a large amount of data input by users corresponding to some key. Adjacent key sets public static byte [] [] getHexSplits (String startKey, String endKey, int numRegions) {byte [] [] splits = new byte [numRegions-1] [] BigInteger lowestKey = new BigInteger (startKey, 16); BigInteger highestKey = new BigInteger (endKey, 16); BigInteger range = highestKey.subtract (lowestKey); BigInteger regionIncrement = range.divide (BigInteger.valueOf (numRegions)); lowestKey = lowestKey.add (regionIncrement); for (int iTuno; I
< numRegions-1;i++) { BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i))); byte[] b = String.format("6x", key).getBytes(); splits[i] = b; } return splits;} 预分区是根据预估的数据量,进行预先的region分割,设计哪些rowKey的数据放在哪些region上,避免数据倾斜。 1.2 Row Key HBase中row key用来检索表中的记录,就是用来查找表中数据的,支持以下三种方式: 通过单个row key访问:即按照某个row key键值进行get操作; 通过row key的range进行scan:即通过设置startRowKey和endRowKey,在这个范围内进行扫描; 全表扫描:即直接扫描整张表中所有行记录。 在HBase中,row key可以是任意字符串,最大长度64KB,实际应用中一般为10~100bytes,存为byte[]字节数组,一般设计成定长的。 row key是按照字典序存储,因此,设计row key时,要充分利用这个排序特点,将经常一起读取的数据存储到一块,将最近可能会被访问的数据放在一块。 举个例子:如果最近写入HBase表中的数据是最可能被访问的,可以考虑将时间戳作为row key的一部分,由于是字典序排序,所以可以使用Long.MAX_VALUE - timestamp作为row key,这样能保证新写入的数据在读取时可以被快速命中。表示就是最早插入的数据row key越大,越靠后,越晚插入的数据row key越小,越靠前,因此可以使得最近插入的数据最先被访问到,因为hbase在存储表中数据时是按row key升序排列的。外界查询时,是一次查region。 1.3 Column Family 不要在一张表里定义太多的column family(列族)。目前Hbase并不能很好的处理超过2~3个column family的表。因为某个column family在flush的时候,它邻近的column family也会因关联效应被触发flush,最终导致系统产生更多的I/O。感兴趣的同学可以对自己的HBase集群进行实际测试,从得到的测试结果数据验证一下。 1.4 In Memory 创建表的时候,可以通过HColumnDescriptor.setInMemory(true)将表放到RegionServer的缓存中,保证在读取的时候被cache命中。 缓存的一个思考:最靠近用户的地方做缓存,不可以太底层。 1.5 Max Version 创建表的时候,可以通过列族HColumnDescriptor.setMaxVersions(int maxVersions)设置表中数据的最大版本,如果只需要保存最新版本的数据,那么可以设置setMaxVersions(1)。每一个列族都可以设置这个Max Version。 hbase自身在服务器基本不设置,除了设置下zookeeper所在的位置,为了hbase可以找到zookeeper,一般在程序端,可以动态的创建表,并设置表内的属性,例如该表中,某一个列族的Max Version。 1.6 Time To Live 创建表的时候,可以通过HColumnDescriptor.setTimeToLive(int timeToLive)设置表中数据的存储生命期,过期数据将自动被删除,例如如果只需要存储最近两天的数据,那么可以设置setTimeToLive(2 * 24 * 60 * 60)。 1.7 Compact & Split 在HBase中,数据在更新时首先写入WAL 日志(HLog)和内存(MemStore)中,MemStore中的数据是排序的,当MemStore累计到一定阈值时,就会创建一个新的MemStore,并且将老的MemStore添加到flush队列,由单独的线程flush到磁盘上,成为一个StoreFile。于此同时, 系统会在zookeeper中记录一个redo point,表示这个时刻之前的变更已经持久化了(minor compact)。 StoreFile是只读的,一旦创建后就不可以再修改。因此Hbase的更新其实是不断追加的操作。当一个Store中的StoreFile达到一定的阈值后,就会进行一次合并(major compact),将对同一个key的修改合并到一起,形成一个大的StoreFile,当StoreFile的大小达到一定阈值后,又会对 StoreFile进行分割(split),等分为两个StoreFile。 由于对表的更新是不断追加的,处理读请求时,需要访问Store中全部的StoreFile和MemStore,将它们按照row key进行合并,由于StoreFile和MemStore都是经过排序的,并且StoreFile带有内存中索引,通常合并过程还是比较快的。 实际应用中,可以考虑必要时手动进行major compact,将同一个row key的修改进行合并形成一个大的StoreFile。同时,可以将StoreFile设置大些,减少split的发生。 用户访问时,查询先从region开始,查询对应的row_key。因为插入时,是按row_key来插的数据,依序分在region上。 major compaction是将每个分区(region)下的所有store(列族)里的storeFile进行合并,方便查询和插入,很耗资源的一种操作,因此不要频繁进行,应使用程序手动操作合并。 总体有三种方式有major_compaction命令;api操作(常用);region server自动运行,默认是24小时一次。其中region server自动的方式需要设置hbase.hregion.majorcompaction.jetter,默认为0.2,也就是为了防止多个regionserver在同一时间合并,设定合并的时间有个±0.2的浮动。 minor compaction是较小范围的合并,因为消耗资源少,因此设置好参数后,可以交由hbase自动管理,其中几个参数: hbase.hstore.compaction.min默认为3,至少需要3个满足条件的storefile,才会启动; hbase.hstore.compaction.max默认为10,表示最多一次合并10个; hbase.hstore.compaction.min.size hbase.hstore.compaction.max.size这两个表示storefile文件大小在哪个范围内才会加入合并; hbase.hstore.compaction.ratio将storefle按年龄排序来合并,先合并老的。 2. 写表操作2.1 多HTable并发写 创建多个HTable客户端用于写操作,提高写数据的吞吐量,一个例子: htable创建时可以单独传入row-key来锁定一行查询,也可以设置scan,查询多行数据。 static final Configuration conf = HBaseConfiguration.create();static final String table_log_name = "user_log";wTableLog = new HTable[tableN];for (int i = 0; i < tableN; i++) { wTableLog[i] = new HTable(conf, table_log_name); wTableLog[i].setWriteBufferSize(5 * 1024 * 1024); //5MB wTableLog[i].setAutoFlush(false);}2.2 HTable参数设置2.2.1 Auto Flush 通过调用HTable.setAutoFlush(false)方法可以将HTable写客户端的自动flush关闭,这样可以批量写入数据到HBase,而不是有一条put就执行一次更新,只有当put填满客户端写缓存时,才实际向HBase服务端发起写请求。默认情况下auto flush是开启的。 2.2.2 Write Buffer 通过调用HTable.setWriteBufferSize(writeBufferSize)方法可以设置HTable客户端的写buffer大小,如果新设置的buffer小于当前写buffer中的数据时,buffer将会被flush到服务端。其中,writeBufferSize的单位是byte字节数,可以根据实际写入数据量的多少来设置该值。 2.2.3 WAL Flag 在HBae中,客户端向集群中的RegionServer提交数据时(Put/Delete操作),首先会先写WAL(Write Ahead Log)日志(即HLog,一个RegionServer上的所有Region共享一个HLog),只有当WAL日志写成功后,再接着写MemStore,然后客户端被通知提交数据成功;如果写WAL日志失败,客户端则被通知提交失败。这样做的好处是可以做到RegionServer宕机后的数据恢复。 因此,对于相对不太重要的数据,可以在Put/Delete操作时,通过调用Put.setWriteToWAL(false)或Delete.setWriteToWAL(false)函数,放弃写WAL日志,从而提高数据写入的性能。 值得注意的是:谨慎选择关闭WAL日志,因为这样的话,一旦RegionServer宕机,Put/Delete的数据将会无法根据WAL日志进行恢复。 2.3 批量写 通过调用HTable.put(Put)方法可以将一个指定的row key记录写入HBase,同样HBase提供了另一个方法:通过调用HTable.put(List)方法可以将指定的row key列表,批量写入多行记录,这样做的好处是批量执行,只需要一次网络I/O开销,这对于对数据实时性要求高,网络传输RTT高的情景下可能带来明显的性能提升。 2.4 多线程并发写 在客户端开启多个HTable写线程,每个写线程负责一个HTable对象的flush操作,这样结合定时flush和写buffer(writeBufferSize),可以既保证在数据量小的时候,数据可以在较短时间内被flush(如1秒内),同时又保证在数据量大的时候,写buffer一满就及时进行flush。下面给个具体的例子: for (int i = 0; i < threadN; i++) { Thread th = new Thread() { public void run() { while (true) { try { sleep(1000); //1 second } catch (InterruptedException e) { e.printStackTrace(); }synchronized (wTableLog[i]) { try { wTableLog[i].flushCommits(); } catch (IOException e) { e.printStackTrace(); } } }} }; th.setDaemon(true); th.start();} 3. 读表操作3.1 多HTable并发读 创建多个HTable客户端用于读操作,提高读数据的吞吐量,一个例子: static final Configuration conf = HBaseConfiguration.create();static final String table_log_name = "user_log";rTableLog = new HTable[tableN];for (int i = 0; i < tableN; i++) { rTableLog[i] = new HTable(conf, table_log_name); //每次scan数据时读50条数据 rTableLog[i].setScannerCaching(50);}3.2 HTable参数设置3.2.1 Scanner Caching hbase.client.scanner.caching配置项可以设置HBase scanner一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少scan过程中next()的时间开销,代价是scanner需要通过客户端的内存来维持这些被cache的行记录。 有三个地方可以进行配置:1)在HBase的conf配置文件中进行配置;2)通过调用HTable.setScannerCaching(int scannerCaching)进行配置;3)通过调用Scan.setCaching(int caching)进行配置。三者的优先级越来越高。 3.2.2 Scan Attribute Selectionscan时指定需要的Column Family,可以减少网络传输数据量,否则默认scan操作会返回整行所有Column Family的数据。3.2.3 Close ResultScanner通过scan取完数据后,记得要关闭ResultScanner,否则RegionServer可能会出现问题(对应的Server资源无法释放)。3.3 批量读 通过调用HTable.get(Get)方法可以根据一个指定的row key获取一行记录,同样HBase提供了另一个方法:通过调用HTable.get(List)方法可以根据一个指定的row key列表,批量获取多行记录,这样做的好处是批量执行,只需要一次网络I/O开销,这对于对数据实时性要求高而且网络传输RTT高的情景下可能带来明显的性能提升。 3.4 多线程并发读 在客户端开启多个HTable读线程,每个读线程负责通过HTable对象进行get操作。下面是一个多线程并发读取HBase,获取店铺一天内各分钟PV值的例子: public class DataReaderServer { //获取店铺一天内各分钟PV值的入口函数 public static ConcurrentHashMap getUnitMinutePV(long uid, long startStamp, long endStamp){ long min = startStamp; int count = (int)((endStamp - startStamp) / (60*1000)); List lst = new ArrayList(); for (int i = 0; i futures = new ArrayList>(5); ThreadFactoryBuilder builder = new ThreadFactoryBuilder (); builder.setNameFormat ("ParallelBatchQuery"); ThreadFactory factory = builder.build (); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool (lstBatchKeys.size (), factory); for (List keys: lstBatchKeys) {Callable
< ConcurrentHashMap >Callable = new BatchMinutePVCallable (keys); FutureTask
< ConcurrentHashMap >Future = (FutureTask
< ConcurrentHashMap >) executor.submit (callable); futures.add (future);} executor.shutdown (); / / Wait for all the tasks to finish try {boolean stillRunning =! executor.awaitTermination (5000000, TimeUnit.MILLISECONDS); if (stillRunning) {try {executor.shutdownNow () } catch (Exception e) {/ / TODO Auto-generated catch block e.printStackTrace ();} catch (InterruptedException e) {try {Thread.currentThread (). Interrupt ();} catch (Exception E1) {/ / TODO Auto-generated catch block e1.printStackTrace () }} / / Look for any exception for (Future f: futures) {try {if (f.get ()! = null) {hashRet.putAll ((ConcurrentHashMap) f.get ()) }} catch (InterruptedException e) {try {Thread.currentThread () .interrupt ();} catch (Exception E1) {/ / TODO Auto-generated catch block e1.printStackTrace ();}} catch (ExecutionException e) {e.printStackTrace () }} return hashRet;} / / A thread batch query to get the minute PV value protected static ConcurrentHashMap getBatchMinutePV (List lstKeys) {ConcurrentHashMap hashRet = null; List lstGet = new ArrayList (); String [] splitValue = null; for (String s: lstKeys) {splitValue = s.split ("_") Long uid = Long.parseLong (splitValue [0]); long min = Long.parseLong (splitValue [1]); byte [] key = new byte [16]; Bytes.putLong (key, 0, uid); Bytes.putLong (key, 8, min); Get g = new Get (key); g.addFamily (fp); lstGet.add (g) } Result [] res = null; try {res = tableMinutePV [Rand.nextInt (tableN)] .get (lstGet);} catch (IOException E1) {logger.error ("tableMinutePV exception, e =" + e1.getStackTrace ());} if (res! = null & & res.length > 0) {hashRet = new ConcurrentHashMap (res.length) For (Result re: res) {if (re! = null & &! re.isEmpty ()) {try {byte [] key = re.getRow (); byte [] value = re.getValue (fp, cp) If (key! = null & & value! = null) {hashRet.put (String.valueOf (Bytes.toLong (key, Bytes.SIZEOF_LONG)), String.valueOf (Bytes.toLong (value) }} catch (Exception e2) {logger.error (e2.getStackTrace ());} return hashRet;}} / / call the interface class to implement Callable interface class BatchMinutePVCallable implements Callable {private List keys Public BatchMinutePVCallable (List lstKeys) {this.keys = lstKeys;} public ConcurrentHashMap call () throws Exception {return DataReadServer.getBatchMinutePV (keys);}} 3.5 cache query results
For application scenarios where HBase is queried frequently, you can consider caching in the application. When there is a new query request, first look it up in the cache and return it directly if it exists. If you no longer query the HBase;, otherwise initiate a read request query on the HBase, and then cache the query results in the application. As for cache replacement strategies, you can consider common strategies such as LRU.
You can also use redis for caching, that is, the data queried from hbase means redis, which can be fetched from redis when the outside world accesses it.
3.6 Blockcache
The memory of Regionserver on HBase is divided into two parts, one as Memstore, which is mainly used for writing, and the other as BlockCache, which is mainly used for reading.
The write request will be written to Memstore,Regionserver first and each region will be provided with a Memstore. When the Memstore is full of 64MB, it will start flush to flush to disk. When the total size of the Memstore exceeds the limit (heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9), the flush process is forcibly started, starting from the largest Memstore until the flush is below the limit.
The read request will first check the data in Memstore, and if it cannot be found, it will be checked in BlockCache. If it cannot be found, it will be read on disk, and the read result will be put into BlockCache. Because BlockCache uses the LRU strategy, when the BlockCache reaches the upper limit (heapsize * hfile.block.cache.size * 0.85), the phase-out mechanism is activated to phase out the oldest batch of data.
There is a BlockCache and N Memstore on a Regionserver, and the sum of their sizes cannot be greater than or equal to heapsize * 0.8, otherwise the HBase cannot be started. The default BlockCache is 0.2 and Memstore is 0.4. For systems that focus on read response time, you can set the BlockCache larger, such as setting BlockCache=0.4,Memstore=0.39, to increase the cache hit ratio.
For links, you can refer to the internal links of the corresponding links.
Considerations for using HTable and HTablePool
HTable is the Java API object through which the HBase client communicates with the HBase server. The client can perform CRUD operations (additions, deletions, modifications and queries) with the server through the HTable object. It is easy to create (the creation of htable):
Configuration conf = HBaseConfiguration.create (); HTable table = new HTable (conf, "tablename"); / / TODO CRUD Operation.
Some considerations when using HTable:
1. Avoid the overhead of creating HTable objects
Because after the client creates the HTable object, it needs to do a series of operations: check. Meta. Table to confirm whether the specified name of the HBase table exists, whether the table is valid, etc., the entire time overhead is relatively heavy, it may take as long as a few seconds, so it is best to create the required HTable objects at one time when the program starts. If Java API is used, it is generally created in the constructor and reused directly after the program starts.
2. The HTable object is not thread safe
HTable objects are not thread safe for clients to read and write data, so when multithreading, it is necessary to create a separate HTable object for each thread, and do not share HTable objects between different objects, especially when the client auto flash is set to false, the data may be inconsistent due to the existence of local write buffer.
3. Share Configuration between HTable objects
Configuration do not create too many, one is enough, through zookeeper to connect hbase classes.
HTable objects share Configuration objects, which has the advantage of:
Connection to shared ZooKeeper: each client needs to establish a connection with ZooKeeper and query the user's table regions location. This information can be cached and shared after the connection is established.
Share common resources: clients need to find-ROOT- and .meta through ZooKeeper. Table, which requires network transmission overhead. After caching these common resources, the client can reduce the subsequent network transmission overhead and speed up the search process.
Therefore, compared to the following approach:
HTable table1 = new HTable ("table1"); HTable table2 = new HTable ("table2")
The following ways are more effective:
Configuration conf = HBaseConfiguration.create (); HTable table1 = new HTable (conf, "table1"); HTable table2 = new HTable (conf, "table2")
Note: even high-load multithreaded programs do not find performance problems caused by sharing Configuration; if this is not the case in your actual situation, try not sharing Configuration.
HTablePool
HTablePool can solve the thread unsafe problem of HTable, and by maintaining a fixed number of HTable objects, these HTable resource objects can be reused during program running.
Configuration conf = HBaseConfiguration.create (); / / create a pool and remove the htable object HTablePool pool = new HTablePool (conf, 10) from the pool when using it
1. HTablePool can automatically create HTable objects, and it is completely transparent to the client, which can avoid the problem of concurrent modification of data between multiple threads.
2. The HTable objects in HTablePool are connected by public Configuration, which can reduce the network overhead.
The use of HTablePool is simple: before each operation, get a HTable object through the getTable method of HTablePool, then perform operations such as put/get/scan/delete, and finally put the HTable object back into the HTablePool through the putTable method of HTablePool.
Here is a simple example of using HTablePool:
Public void createUser (String username, String firstName, String lastName, String email, String password, String roles) throws IOException {/ / htable objects HTable table = rm.getTable (UserTable.NAME) from the pool; Put put = new Put (Bytes.toBytes (username)); put.add (UserTable.DATA_FAMILY, UserTable.FIRSTNAME, Bytes.toBytes (firstName)); put.add (UserTable.DATA_FAMILY, UserTable.LASTNAME, Bytes.toBytes (lastName)) Put.add (UserTable.DATA_FAMILY, UserTable.EMAIL, Bytes.toBytes (email)); / / column family, column, data put.add (UserTable.DATA_FAMILY, UserTable.CREDENTIALS, Bytes.toBytes (password)); put.add (UserTable.DATA_FAMILY, UserTable.ROLES, Bytes.toBytes (roles)); table.put (put); table.flushCommits (); rm.putTable (table);}
As for the real performance of multithreading using HTablePool, it needs to be obtained through actual testing work.
4. Data Computing 4.1 Server side Computing
Coprocessor runs on the HBase RegionServer server, and each Regions maintains a reference to its associated coprocessor implementation class, which can be loaded through the local jar in classpath on RegionServer or the classloader of HDFS.
Currently, several coprocessor are available:
Coprocessor: provides hooks for region management, such as region's open/close/split/flush/compact, etc.
RegionObserver: provides hooks for monitoring table-related operations from the client, such as table get/put/scan/delete, etc.
Endpoint: provides command triggers that can execute arbitrary functions on the region. One use example is column aggregation on the RegionServer side, and here is a code example.
The above is just some basic introduction to coprocessor. I have no experience with its actual use, and I don't know its usability and performance data. Interested students can have a try, welcome to discuss.
4.2 write-end calculation 4.2.1 count
HBase itself can be regarded as a horizontally scalable Key-Value storage system, but its computing power is limited (Coprocessor can provide some server-side computing). Therefore, when using HBase, it is often necessary to calculate from the write side or the reader side, and then return the final calculation result to the caller. Here are two simple examples:
PV calculation: by accumulating counts in the memory of the HBase writer, maintaining the update of the PV value, and in order to achieve persistence, synchronize the PV calculation results to the HBase on a regular basis (for example, 1 second), so that the query side will have a delay of up to 1 second, and you can see the PV results with a second delay.
Minute PV calculation: combined with the PV calculation method mentioned above, the current cumulative PV value is written into HBase every minute according to rowkey + minute as the new rowkey, and then the cumulative PV value before each minute of the day is obtained through scan on the query side, and then the cumulative PV value before and after two minutes is subtracted in sequence to get the current PV value within one minute, and finally get the PV value within each minute of the day.
4.2.2 weight removal
For the calculation of UV, it is an example of de-recalculation. There are two situations:
If the memory can hold, then all existing UV identities can be maintained in the Hash table. Whenever a new identity is coming, quickly look up the Hash to determine whether it is a new UV. If so, the UV value is increased by 1, otherwise the UV value remains the same. In addition, in order to persist or provide to the query interface, the results of UV calculations can be synchronized to HBase on a regular basis (for example, 1 second).
If the memory can not hold, you can consider using Bloom Filter to achieve, so as to reduce the memory footprint as much as possible. In addition to the calculation of UV, judging the existence of URL is also a typical application scenario.
4.3 Reading end Computing
If the response time is demanding (for example, a single http request should be returned within milliseconds), I think it is not appropriate for the reader to do too much complex computing logic, and try to make the reader's function simple, that is, after reading the data from HBase RegionServer (scan or get), simply stitching according to the data format and returning it to the frontend for use. Of course, if the response time requirements are general, or business characteristics require, you can also do some computing logic on the reader side.
5. Summary
As a Key-Value storage system, HBase is not omnipotent, it has its own unique features. Therefore, when we do the application based on it, we often need to optimize and improve from many aspects (table design, read table operation, write table operation, data calculation, etc.), sometimes even need to optimize the configuration of HBase from the system level, and even optimize HBase itself. This belongs to different levels of categories.
In short, in a nutshell, when optimizing the system, first locate the bottleneck that affects the performance of your program, and then aim at the line.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.