In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains the "Elasticsearch reindex and Java use sliceScorll query optimization method", the article explains the content is simple and clear, easy to learn and understand, the following please follow the editor's ideas slowly in depth, together to study and learn "Elasticsearch reindex and Java use sliceScorll query optimization method"!
Reindex copies the data from one index to another existing index, but does not copy the configuration information of the original index, such as mapping (mapping), shard (shard), replicas (replica), and so on.
A simple example is as follows
POST _ reindex {"source": {"remote": {"host": "http://otherhost:9200", / / remote es ip and port list" socket_timeout ":" 1m "," connect_timeout ":" 10s "/ / timeout setting}," index ":" my_index_name " / / Source index name "query": {/ / data that meets the criteria "match": {"test": "data"}}, "dest": {"index": "dest_index_name" / / destination index name}}
Specific and detailed reference for use
Reindex API of ElasticSearch version 6.3 Document APIs
Elasticsearch Foundation-- ReIndex
The reindexapi was not found in java, so the author migrates the index by alias conversion and full Index query plus bulk insertion.
But it is too slow to transfer data, so slice is used to optimize scorll query.
Java slice scorll reindex is based on version 5.6
Multithreaded reindex
The specific number of open threads is adjusted according to the number of Index shards, preferably the same as the number of main shards. In this example, there are five shards. At the same time, alias conversion is used to seamlessly connect the index to avoid normal data insertion and reading.
/ / create a new index createUserRecordIndex (newIndexName, typeName); / / filter time BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery (); RangeQueryBuilder rangeQueryBuilder RangeQueryBuilder = QueryBuilders.rangeQuery ("createTime") .gte (DateUtil.format (DateUtil.parse (createBeginDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT) .lte (DateUtil.format (DateUtil.parse (createEndDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT)); boolQueryBuilder.must (rangeQueryBuilder) Try {/ / Multithreading processing query requests List list = new ArrayList (); for (int I = 0; I
< 5; i++) { SliceBuilder sliceBuilder = new SliceBuilder(i, 5); SearchResponse response = EsBuildersServiceUtil.getESClient() .prepareSearch(userRecordAlias) .setTypes(userRecordType) .setQuery(boolQueryBuilder) .setSize(1000).setScroll(new TimeValue(10000)) .slice(sliceBuilder) .execute() .actionGet(); SliceQuery sliceQuery = new SliceQuery(newIndexName, typeName, response); Future submit = threadPoolTaskExecutor.submit(sliceQuery); list.add(submit); } for (Future future : list) { future.get(); } } catch (Exception e) { log.error("reindex error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_INDEX_CONVERT_ERROR); } try { //别名转换 EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().removeAlias(oldIndexName, userRecordAlias).execute().actionGet(); EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().addAlias(newIndexName, userRecordAlias).execute().actionGet(); } catch (Exception e) { log.error(" convertAlias error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_ALIASES_CONVERT_ERROR); } slice线程 class SliceQuery implements Callable { private String newIndexName; private String typeName; private SearchResponse response; private SliceQuery(String newIndexName, String typeName, SearchResponse response) { this.newIndexName = newIndexName; this.typeName = typeName; this.response = response; } @Override public Void call() { //获取总数量 long totalCount = response.getHits().getTotalHits(); //计算总次数,每次搜索数量为分片数*设置的size大小 int page = (int) totalCount / 1000; operateRecordList(response, newIndexName, typeName); for (int i = 0; i < page; i++) { //再次发送请求,并使用上次搜索结果的ScrollId response = EsBuildersServiceUtil.getESClient().prepareSearchScroll(response.getScrollId()) .setScroll(new TimeValue(10000)).execute() .actionGet(); operateRecordList(response, newIndexName, typeName); } return null; } } 批量插入 /** * 从查询数据中获取并批量插入Index * * @param response * @param indexName * @param typeName */ private void operateRecordList(SearchResponse response, String indexName, String typeName) { try { SearchHits hits = response.getHits(); List list = new ArrayList(); for (SearchHit hit : hits) { String sourceAsString = hit.getSourceAsString(); list.add(JSON.parseObject(sourceAsString, AddUserRecordRequest.class)); } //批量插入 saveBulkRecord(list, indexName, typeName); } catch (Exception e) { log.error("operateRecordList error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR); } } /** * 批量插入 * * @param list * @param indexName * @param typeName */ private void saveBulkRecord(List list, String indexName, String typeName) { try { BulkRequestBuilder bulkRequest = EsBuildersServiceUtil.getESClient().prepareBulk(); for (AddUserRecordRequest recordRequest : list) { JSONObject json = JSONObject.fromObject(recordRequest); bulkRequest.add(EsBuildersServiceUtil.getESClient() .prepareIndex(indexName, typeName) .setSource(json)); } if (list.size() >0) {bulkRequest.execute () .actionGet ();}} catch (Exception e) {log.error ("saveBulkRecord error =", e); throw new MembershipDataException (MembershipDataErrorCode.ES_DATA_ADD_ERROR) }} Thank you for your reading, the above is the content of "Elasticsearch reindex and Java use sliceScorll to optimize query". After the study of this article, I believe you have a deeper understanding of the problem of Elasticsearch reindex and Java using sliceScorll to optimize query, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.