Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

A case study on the Partition problem of Spark Core Reading ES

2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces the relevant knowledge of "Spark Core reading ES partition case analysis". In the operation of the actual case, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

1.Spark Core reads ES

The elasticsearch-hadoop plug-in is directly provided on the official website of ES. The support for ES 7.x Hadoop and Spark versions is as follows:

Hadoop2Version = 2.7.1hadoop22Version = 2.2.0spark13Version = 1.6.2spark20Version = 2.3.0

The version of ES used in Langjian is 7.1.1, and the version of Spark used in testing is 2.3.1, no problem. There are two ways to integrate es and spark and import related dependencies:

A, import the entire elasticsearch-hadoop package

Org.elasticsearch elasticsearch-hadoop 7.1.1

B, import only packages of the spark module

Org.elasticsearch elasticsearch-spark-20_2.11 7.1.1

For the convenience of testing here, Langjian only has a single-node ES instance on the machine. The simple test code is as follows:

Import org.apache.spark. {SparkConf, SparkContext} import org.elasticsearch.hadoop.cfg.ConfigurationOptions

Object es2sparkrdd {

Def main (args: Array [String]): Unit = {val conf = new SparkConf () .setMaster ("local [*]") .setAppName (this.getClass.getCanonicalName)

Conf.set (ConfigurationOptions.ES_NODES, "127.0.0.1") conf.set (ConfigurationOptions.ES_PORT, "9200") conf.set (ConfigurationOptions.ES_NODES_WAN_ONLY, "true") conf.set (ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true") conf.set (ConfigurationOptions.ES_NODES_DISCOVERY, "false") / / conf.set (ConfigurationOptions.ES_NET_HTTP_AUTH_USER EsUser) / / conf.set (ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd) conf.set ("es.write.rest.error.handlers", "ignoreConflict") conf.set ("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

Val sc = new SparkContext (conf) import org.elasticsearch.spark._

Sc.esRDD ("posts") .foreach (each= > {each._2.keys.foreach (println)}) sc.esJsonRDD ("posts") .foreach (each= > {println (each._2)})

Sc.stop ()}}

You can see that there are two main forms of API for Spark Core reading RDD:

A,esRDD . This returns a RDD of type tuple2, with the first element id and the second map containing the document element of ES.

RDD [(String, Map [String, AnyRef])]

B,esJsonRDD . This also returns a RDD of type tuple2, with the first element still id and the second json string.

RDD [(String, String)]

Although there are two types of RDD, RDD is of type ScalaEsRDD.

To analyze the parallelism of reading ES from Spark Core, you only need to analyze the getPartitions function of ScalaEsRDD.

two。 Source code analysis

First of all, import the source code https://github.com/elastic/elasticsearch-hadoop. This is a gradle project. You can import idea directly, and then switch to version 7.x.

Cut the crap and go to ScalaEsRDD directly, and find that gePartitions is implemented in its parent class. The method content is as follows:

Override def getPartitions: Array [Partition] = {esPartitions.zipWithIndex.map {case (esPartition, idx) = > new EsPartition (id, idx, esPartition)} .toArray}

EsPartitions is a Lazy variable:

@ transient private [spark] lazy val esPartitions = {RestService.findPartitions (esCfg, logger)}

What is the reason for this statement?

You can consider the reasons for lazy+transient.

The RestService.findPartitions method only creates a client to obtain information such as sharding, and then calls it, calling two methods in two cases.

Final List partitions;// 5.x and later versions are not configured with es.input.max.docs.per.partitionif (clusterInfo.getMajorVersion (). OnOrAfter (EsMajorVersion.V_5_X) & & settings.getMaxDocsPerPartition ()! = null) {partitions = findSlicePartitions (client.getRestClient (), settings, mapping, nodesMap, shards, log);} else {partitions = findShardPartitions (settings, mapping, nodesMap, shards, log);}

A). FindSlicePartitions

This method is actually in 5.x and later versions of ES, while configuring the

Es.input.max.docs.per.partition

It will only be executed later. In fact, the ES shard is split according to the specified size. It is necessary to count the shard size first, then calculate the number of partitions split, and finally generate partition information. The specific code is as follows:

Long numDocs;if (readResource.isTyped ()) {numDocs = client.count (index, readResource.type (), Integer.toString (shardId), query);} else {numDocs = client.countIndexShard (index, Integer.toString (shardId), query);} int numPartitions = (int) Math.max (1, numDocs / maxDocsPerPartition); for (int I = 0; I < numPartitions; iTunes +) {PartitionDefinition.Slice slice = new PartitionDefinition.Slice (I, numPartitions) Partitions.add (settings, resolvedMapping, index, shardId, slice, locations);}

In fact, sharding is to sort the _ doc with a cursor, and then read the data according to the partition offset calculated by the shard. The assembly process is realized by the SearchRequestBuilder.assemble method.

In fact, I think it will waste a certain amount of performance. If you really want to combine ES with Spark, it is recommended to set a reasonable number of fragments.

B) .findShardPartitions method

There is no doubt about this method, except that a RDD partition corresponds to a fragment of ES index.

PartitionDefinition partition = new PartitionDefinition (settings, resolvedMapping, index, shardId,locationList.toArray (new String [0])); partitions.add (partition); "case study of partitioning problems of Spark Core Reading ES" ends here. Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report