In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article will explain in detail how to get the executor return value on the Spark driver side. The content of the article is of high quality, so the editor will share it with you for reference. I hope you will have some understanding of the relevant knowledge after reading this article.
Some people say that spark's code is not elegant, and this wave can't stand it. In fact, the main reason for saying that spark code is not elegant is that you are not familiar with scala. I think spark code is still very good, and it is one of the most readable big data frameworks.
The purpose of today's article is not to debate whether the Spark code is elegant or not, but to talk about some tips we can use after understanding the spark source code.
When using spark, there are always some requirements that are quite different. For example, some ball players have asked such a requirement:
Wave tip, I want to get the results returned by executor executing task on the driver side. For example, task is a rule engine. I want to know how many pieces of data are hit by each rule. How can I do this?
This is not very slutty, but also very common, in theory, after you output, run a sql in mysql on it, but this is often more troublesome. And sometimes, you may have to use this data in driver? What exactly should I do?
Most of the ideas are estimated to be the collect method, so how do you implement it with collect? You can think about it yourself. I can only tell you that it is not simple. It is better to output it to the database, and then write sql analysis on the driver side.
Another consideration is to use a custom accumulator. This allows you to accumulate the results on the executor side and then use them on the driver side, but the implementation is also troublesome. You can also think about it for yourself.
So, Langjian will introduce you to a more commonly used and coquettish operation.
In fact, the first thing we think of in this operation is the count function, because it returns the return value of task to the driver side, and then aggregates. We can click in from the idea count function and see
Def count (): Long = sc.runJob (this, Utils.getIteratorSize _). Sum
This is the runJob method of sparkcontext.
Utils.getIteratorSize _ this method mainly calculates the number of elements in each iterator, that is, the number of elements in each partition, and the returned value is the number of elements:
/ * * Counts the number of elements of an iterator using a while loop rather than calling * [scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower * in the current version of Scala. * / def getIteratorSize [T] (iterator: Iterator [T]): Long = {var count = 0L while (iterator.hasNext) {count + = 1L iterator.next ()} count}
Then runJob returns an array, and the element of each array is the return value of our task execution function, and then call sum to get our statistical value.
Then we can use this idea to achieve our initial goal. The wave tip goes straight to the case here:
Import org.apache.spark. {SparkConf, SparkContext, TaskContext} import org.elasticsearch.hadoop.cfg.ConfigurationOptions
Object es2sparkRunJob {
Def main (args: Array [String]): Unit = {val conf = new SparkConf () .setMaster ("local [*]") .setAppName (this.getClass.getCanonicalName)
Conf.set (ConfigurationOptions.ES_NODES, "127.0.0.1") conf.set (ConfigurationOptions.ES_PORT, "9200") conf.set (ConfigurationOptions.ES_NODES_WAN_ONLY, "true") conf.set (ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true") conf.set (ConfigurationOptions.ES_NODES_DISCOVERY, "false") conf.set ("es.write.rest.error.handlers") "ignoreConflict") conf.set ("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")
Val sc = new SparkContext (conf) import org.elasticsearch.spark._
Val rdd = sc.esJsonRDD ("posts"). Repartition (10)
Rdd.count () val func = (itr: Iterator [(String,String)]) = > {var count = 0 itr.foreach (each= > {count + = 1}) (TaskContext.getPartitionId (), count)}
Val res = sc.runJob (rdd,func)
Res.foreach (println)
Sc.stop ()}}
In the example, the driver side gets the amount of data processed by each task.
This is the end of the method of getting the return value of executor on the Spark driver side. I hope the above content can be helpful to you and learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.