Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Spark2.x from shallow to Deep to the end the use of py4j of Series 7 in spark python api

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Before learning any technology of spark, please correctly understand spark, you can refer to: correct understanding of Spark

We know that spark's RDD supports scala api, java api and python api. We have made a detailed introduction to scala api and java api respectively. In this paper, we will discuss how rdd python api uses py4j to call scala/java 's api, so as to realize the function of python api. First of all, let's introduce py4j.

1. Py4j

Py4j is a class library that allows python to call objects in jvm. Take a look at an example on py4j's website:

First write a java program

Package com.twq.javaapi;import py4j.GatewayServer;/** * Created by tangweiqun on 2017-9-22. * / public class AdditionApplication {public int addition (int first, int second) {return first + second;} public static void main (String [] args) {AdditionApplication app = new AdditionApplication (); / / app is now the gateway.entry_point / / start a py4j server, python can connect to the port that the service listens on, and then call java's object and its method GatewayServer server = new GatewayServer (app) Server.start ();}}

The above java code relies on a jar package, which we can import through maven, as follows:

Net.sf.py4j py4j 0.10.4

We can start GatewayServer by starting the above AdditionApplication-like main method in ide

Then, we open the python interpreter and execute the following code:

> from py4j.java_gateway import JavaGateway > gateway = JavaGateway () # connect to the JVM > random = gateway.jvm.java.util.Random () # create a java.util.Random instance > number1 = random.nextInt (10) # call the Random.nextInt method > number2 = random.nextInt (10) > print (number1,number2) (2,7) > > addition_app = gateway.entry_point # get the AdditionApplication instance > > addition_app.addition (number1,number2) # call the addition method9

The above python code relies on py4j, and we can install py4j according to http://www.py4j.org/install.html#install-instructions 's method. As you can see from the above, we can simply call the Random in jvm and the methods of the AdditionApplication object in python

2. Py4j implements python api calling java/scala api in spark

First of all, we write a very simple python version of the spark application, as follows:

If _ _ name__ = = "_ main__": conf= SparkConf (). SetAppName ("appName") sc = SparkContext (conf=conf) sourceDataRDD = sc.textFile ("hdfs://master:9999/users/hadoop-twq/word.txt") wordsRDD = sourceDataRDD.flatMap (lambda line: line.split () keyValueWordsRDD = wordsRDD.map (lambda s: (s, 1) wordCountRDD = keyValueWordsRDD.reduceByKey (lambda a) B: a + b) wordCountRDD.saveAsTextFile ("hdfs://master:9999" + output_path_service.get_output_path ()) print utils.get_rdd_result ("wordCountRDD", wordCountRDD)

The above is a very simple python version of spark wordcount application. We submit it to the spark cluster for execution through the following spark-submit command:

Spark-submit\-name "PythonWordCount"\-- master yarn\-- deploy-mode client\-- driver-memory 512m\-- executor-memory 512m\-- num-executors 2\-- executor-cores 1\-- py-files word_count_python.zip\ / home/hadoop-twq/spark-course/spark_word_count.py

(for the meaning of each parameter of spark-submit and the principle of spark-submit, please refer to: submit spark application correctly)

After being submitted to the cluster to run, an org.apache.spark.deploy.PythonRunner class will be launched in the driver program, in which two things are done

1. Initialize and start GatewayServer, as follows:

/ / Launch a Py4J gateway server for the process to connect to; this will let it see our// Java system properties and such// code for python to access objects in the current jvm val gatewayServer = new py4j.GatewayServer (null, 0) val thread = new Thread (new Runnable () {override def run (): Unit = Utils.logUncaughtExceptions {gatewayServer.start ()}) thread.setName ("py4j-gateway-init") thread.setDaemon (true) thread.start ()

2. Use ProcessBuilder to start the execution of the above spark_word_count.py python file, as follows:

/ / Launch Python processval builder = new ProcessBuilder ((Seq (pythonExec, formattedPythonFile) + + otherArgs) .asJava) val env = builder.environment () env.put ("PYTHONPATH", pythonPath) / / This is equivalent to setting the-u flag We use it because ipython doesn't support-u:env.put ("PYTHONUNBUFFERED", "YES") / / value is needed to be set to a non-empty stringenv.put ("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort) / / pass conf spark.pyspark.python to python process, the only way to pass info to// python process is through environment variable.sparkConf.get (PYSPARK_PYTHON) .foreach (env.put ("PYSPARK_PYTHON") _)) sys.env.get ("PYTHONHASHSEED") .foreach (env.put ("PYTHONHASHSEED", _)) builder.redirectErrorStream (true) / / Ugly but needed for stdout and stderr to synchronize

What you need to note here is the environment variable PYSPARK_GATEWAY_PORT. The value of this environment variable is the listening port of the GatewayServer started in the first step. We pass this port to the started python process as an environment variable.

Then, when the spark_word_count.py python process is started in step 2 above, the contents of spark_word_count.py are started. When the execution reaches sc=SparkContext (conf), that is, initialize SparkContext, when SparkContext is initialized, a Gateway of py4j will be launched to communicate with the GatewayServer launched above, as shown in the following code (in the context.py file):

SparkContext._ensure_initialized (self, gateway=gateway, conf=conf) def _ ensure_initialized (cls, instance=None, gateway=None, conf=None): "" Checks whether a SparkContext is initialized or not. Throws error if a SparkContext is already running. "" With SparkContext._lock: if not SparkContext._gateway: # here is to start a Gateway and assign the jvm of Gateway to the member variable _ jvm So we can use this _ jvm variable to access the java object in jvm and its method SparkContext._gateway = gateway or launch_gateway (conf) SparkContext._jvm = SparkContext._gateway.jvm if instance: if (SparkContext._active_spark_context and SparkContext._active_spark_context! = instance): currentMaster = SparkContext . _ active_spark_context.master currentAppName = SparkContext._active_spark_context.appName callsite = SparkContext._active_spark_context._callsite # Raise error if there is already a running Spark context raise ValueError ("Cannot run multiple SparkContexts at once "" existing SparkContext (app=%s, master=%s) "" created by% s at% SV% s "% (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum) else: SparkContext._active_spark_context = instance

A Gateway is initialized in the launch_gateway (conf) method (the source code is in java_gateway.py), as follows:

# get environment variable PYSPARK_GATEWAY_PORT from environment variable This is the environment variable if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int (os.environ ["PYSPARK_GATEWAY_PORT"]) # Connect to the gateway# that we set in PythonRunner. Start a JavaGateway to communicate with GatewayServer gateway = JavaGateway (GatewayClient (port=gateway_port), auto_convert=True) # bring in the java/scala classes needed in python api # Import the classes used by PySparkjava_import (gateway.jvm "org.apache.spark.SparkConf") java_import (gateway.jvm, "org.apache.spark.api.java.*") java_import (gateway.jvm, "org.apache.spark.api.python.*") java_import (gateway.jvm, "org.apache.spark.ml.python.*") java_import (gateway.jvm, "org.apache.spark.mllib.api.python.*") # TODO (davies): move into sqljava_import (gateway.jvm Org.apache.spark.sql.*) java_import (gateway.jvm, "org.apache.spark.sql.hive.*") java_import (gateway.jvm, "scala.Tuple2") return gateway

In this way, the SparkContext in python can access RDD java api, as shown below in the python file context.py to access the JavaSparkContext of java api:

Def _ initialize_context (self, jconf): "Initialize SparkContext in function to allow subclass specific initialization" return self._jvm.JavaSparkContext (jconf)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report