Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Sample analysis of Spark enhancement by adding new code

2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

By adding new code to enhance the Spark example analysis, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain in detail for you, people with this need can come to learn, I hope you can get something.

Preface

When doing streamingpro in the past two years, it is inevitable to do a lot of enhancements to Spark. As I complained before, Spark makes heavy use of new for object creation, so there is almost no way to replace the implementation.

For example, there is an attribute in SparkEnv called closureSerializer, which specializes in serialization and deserialization of tasks, as well as serialization and deserialization of function closures. Let's take a look at how it works internally:

Val serializer = instantiateClassFromConf [Serializer] ("spark.serializer", "org.apache.spark.serializer.JavaSerializer") logDebug (s "Using serializer: ${serializer.getClass}") val serializerManager = new SerializerManager (serializer, conf, ioEncryptionKey) val closureSerializer = new JavaSerializer (conf) val envInstance = new SparkEnv (. ClosureSerializer,....

Here directly new a JavaSerializer, and can not be configured. If you don't change the source code, there's nothing you can do to replace this implementation. By the same token, it would be almost impossible for me to replace the implementation of Executor.

This year, there are two major areas related to the Spark [magic], that is, not by changing the source code, using the original hairstyle package, by adding new code to enhance Spark.

Support for layer 2 RPC

We know that in Spark, we can only touch to Executor through Task. With the existing API, you cannot directly operate all or specified parts of the Executor. For example, I want all Executor to load a resource file, which I can't do right now. In order to be able to operate directly on Executor, a new communication layer needs to be established. So how exactly do you do it?

First, set up a Backend on the driver side, which is relatively simple

Class PSDriverBackend (sc: SparkContext) extends Logging {val conf = sc.conf var psDriverRpcEndpointRef: RpcEndpointRef = null def createRpcEnv = {val isDriver = sc.env.executorId = = SparkContext.DRIVER_IDENTIFIER val bindAddress = sc.conf.get (DRIVER_BIND_ADDRESS) val advertiseAddress = sc.conf.get (DRIVER_HOST_ADDRESS) var port = sc.conf.getOption ("spark.ps.driver.port"). GetOrElse ("7777") .toInt val ioEncryptionKey = if (sc.conf .get (IO_ENCRYPTION_ENABLED) {Some (CryptoStreamUtils.createKey (sc.conf))} else {None} logInfo (s "setup ps driver rpc env: ${bindAddress}: ${port} clientMode=$ {! isDriver}") var createSucess = false var count = 0 val env = new AtomicReference [RpcEnv] () while (! createSucess & & count

< 10) { try { env.set(RpcEnv.create("PSDriverEndpoint", bindAddress, port, sc.conf, sc.env.securityManager, clientMode = !isDriver)) createSucess = true } catch { case e: Exception =>

LogInfo ("fail to create rpcenv", e) count + = 1 port + = 1}} if (env.get () = = null) {logError (s "fail to create rpcenv finally with attemp ${count}")} env.get ()} def start () = {val env = createRpcEnv val pSDriverBackend = new PSDriverEndpoint (sc, env) psDriverRpcEndpointRef = env.setupEndpoint ("ps-driver-endpoint") PSDriverBackend)}}

In this way, you can understand that a PRC Server is started on the driver side. It is also very easy to run this code, just run it directly in the main program:

/ / parameter server should be enabled by default if (! params.containsKey ("streaming.ps.enable") | | params.get ("streaming.ps.enable") .toString.toBoolean) {logger.info ("ps enabled...") If (ss.sparkContext.isLocal) {localSchedulerBackend = new LocalPSSchedulerBackend (ss.sparkContext) localSchedulerBackend.start ()} else {logger.info ("start PSDriverBackend") psDriverBackend = new PSDriverBackend (ss.sparkContext) psDriverBackend.start ()}}

Here we need to implement both local mode and cluster mode.

Driver starts a PRC Server, so how does the executor start? there seems to be no place for me to start a PRC Server on the Executor side. In fact, there is, only very trick, we know that Spark allows custom Metrics and will call user-implemented metric-specific methods, we just need to develop a metric Sink, start RPC Server in it, and fool Spark. The details are as follows:

Class PSServiceSink (val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager) extends Sink with Logging {def env = SparkEnv.get var psDriverUrl: String = null var psExecutorId: String = null var hostname: String = null var cores: Int = 0 var appId: String = null val psDriverPort = 7777 var psDriverHost: String = null var workerUrl: Option [String] = Noneval userClassPath = new mutable.ListBuffer [URL] () def parseArgs = {/ / val runtimeMxBean = ManagementFactory.getRuntimeMXBean () / / var argv = runtimeMxBean.getInputArguments.toList var argv = System.getProperty ("sun.java.command") .split ("\\ s +") .toList. PsDriverHost = host psDriverUrl = "spark://ps-driver-endpoint@" + psDriverHost + ":" + psDriverPort} parseArgs def createRpcEnv = {val isDriver = env.executorId = = SparkContext.DRIVER_IDENTIFIER val bindAddress = hostnameval advertiseAddress = "" val port = env.conf.getOption ("spark.ps.executor.port"). GetOrElse ("0"). ToInt val ioEncryptionKey = if (env.conf.get (IO_ENCRYPTION_ENABLED)) {Some (CryptoStreamUtils. CreateKey (env.conf)} else {None} / / logInfo (s "setup ps driver rpc env: ${bindAddress}: ${port} clientMode=$ {! isDriver}") RpcEnv.create ("PSExecutorBackend" BindAddress, port, env.conf, env.securityManager, clientMode =! isDriver)} override def start (): Unit = {new Thread (new Runnable {override def run (): Unit = {logInfo (s "delay PSExecutorBackend 3s") Thread.sleep (3000) logInfo (s "start PSExecutor Env:$ {env} ") if (env.executorId! = SparkContext.DRIVER_IDENTIFIER) {val rpcEnv = createRpcEnv val pSExecutorBackend = new PSExecutorBackend (env, rpcEnv, psDriverUrl, psExecutorId, hostname, cores) PSExecutorBackend.executorBackend = Some (pSExecutorBackend) rpcEnv.setupEndpoint (" ps-executor-endpoint ", pSExecutorBackend)}). Start ()}.

At this point, we can successfully start RPC Server and connect to the PRC Server in Driver. Now, you can write communication-related code without modifying the Spark source code, so that you can have better control of Executor.

For example, the following code is implemented in PSExecutorBackend:

Override def receiveAndReply (context: RpcCallContext): PartialFunction [Any, Unit] = {case Message.TensorFlowModelClean (modelPath) = > {logInfo ("clean tensorflow model") TFModelLoader.close (modelPath) context.reply (true)} case Message.CopyModelToLocal (modelPath, destPath) = > {logInfo (s "copying model: ${modelPath}-> ${destPath}") HDFSOperator.copyToLocalFile (destPath, modelPath, true) context.reply (true)}}

Then you can write the following code in Spark to call:

Val psDriverBackend = runtime.asInstanceOf [SparkRuntime] .psDriverBackend psDriverBackend.psDriverRpcEndpointRef.send (Message.TensorFlowModelClean ("/ tmp/ok"))

Isn't that cool.

Modify the serialization of closures

The task scheduling cost of Spark is very high. For a complex task, the execution time of the business logic code is about 3-7ms, but the running cost of the entire spark is about 1.3 seconds.

After detailed dig discovery, when RDD is converted in sparkContext, clean operation is performed on the function. During the clean operation, the default is to check whether it can be serialized (that is, serialization is repeated, even if an exception is not thrown). The serialization cost is quite high (JavaSerializer is used by default and is immutable for function and task serialization). It takes about 200ms for a single serialization. Optimizing it in local mode can reduce the request time around 600ms.

Of course, it is important to be clear that this is modified for the local schema. So how exactly did you do it?

Let's first look at how Spark calls the serialization function. First of all, in SparkContext, the clean function looks like this:

Private [spark] def clean [F throw new SparkException ("Task not serializable", ex)}

SparkEnv is created when SparkContext is initialized, and this object contains closureSerializer, which is created through new JavaSerializer. Since serialization is too slow, and because we're actually in Local mode, we don't need serialization, so we're trying to replace the implementation of closureSerializer here. As we complained earlier, because it was written dead in the Spark code, it didn't expose any possibility of customization, so we had to change it again.

First, let's create a new subclass of SparkEnv:

Class WowSparkEnv (....) Extends SparkEnv (

Then implement a custom Serializer:

Class LocalNonOpSerializerInstance (javaD: SerializerInstance) extends SerializerInstance {private def isClosure (cls: Class [_]): Boolean = {cls.getName.contains ("$anonfun$")} override def serialize [T: ClassTag] (t: t): ByteBuffer = {if (isClosure (t.getClass)) {val uuid = UUID.randomUUID (). ToString LocalNonOpSerializerInstance.maps.put (uuid) T.asInstanceOf [AnyRef]) ByteBuffer.wrap (uuid.getBytes ())} else {javaD.serialize (t)} override def deserialize [T: ClassTag] (bytes: ByteBuffer): t = {val s = StandardCharsets.UTF_8.decode (bytes). ToString () if (LocalNonOpSerializerInstance.maps.containsKey (s)) {LocalNonOpSerializerInstance.maps.remove (s) .asInstanceof [T]} else { Bytes.flip () javaD.deserialize (bytes)}} override def deserialize [T: ClassTag] (bytes: ByteBuffer Loader: ClassLoader): t = {val s = StandardCharsets.UTF_8.decode (bytes). ToString () if (LocalNonOpSerializerInstance.maps.containsKey (s)) {LocalNonOpSerializerInstance.maps.remove (s) .asInstanceOf [T]} else {bytes.flip () javaD.deserialize (bytes) Loader)} override def serializeStream (s: OutputStream): SerializationStream = {javaD.serializeStream (s)} override def deserializeStream (s: InputStream): DeserializationStream = {javaD.deserializeStream (s)}

Then we need to encapsulate another LocalNonOpSerializer.

Class LocalNonOpSerializer (conf: SparkConf) extends Serializer with Externalizable {val javaS = new JavaSerializer (conf) override def newInstance (): SerializerInstance = {new LocalNonOpSerializerInstance (javaS.newInstance ())} override def writeExternal (out: ObjectOutput): Unit = Utils.tryOrIOException {javaS.writeExternal (out)} override def readExternal (in: ObjectInput): Unit = Utils.tryOrIOException {javaS.readExternal (in)}}

Now that everything is ready, all we need is Dongfeng, how can we get this code up and running for Spark? The specific approach is magical, implementing an enhance class:

Def enhanceSparkEnvForAPIService (session: SparkSession) = {val env = SparkEnv.get / / create a new WowSparkEnv object and replace the Serializer in it with our own LocalNonOpSerializer val wowEnv = new WowSparkEnv (. New LocalNonOpSerializer (env.conf): Serializer,....) / / replace the instance in SparkEnv object with our / / WowSparkEnv SparkEnv.set (wowEnv) / / but many places have already used the generated SparkEnv after SparkContext startup. We need to make some adjustments / / Let's first stop the scheduer in the previously started LocalSchedulerBackend val localScheduler = session.sparkContext.schedulerBackend.asInstanceOf [LocalSchedulerBackend] val scheduler = ReflectHelper.field (localScheduler, "scheduler") val totalCores = localScheduler.totalCores localScheduler.stop () / / create a new LocalSchedulerBackend val wowLocalSchedulerBackend = new WowLocalSchedulerBackend (session.sparkContext.getConf, scheduler.asInstanceOf [TaskSchedulerImpl] TotalCores) wowLocalSchedulerBackend.start () / / replace the _ schedulerBackend in SparkContext with our implementation ReflectHelper.field (session.sparkContext, "_ schedulerBackend", wowLocalSchedulerBackend)} will it help you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report