In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
[TOC]
Overview
The Actor of Scala is somewhat similar to multithreaded programming in Java. But the difference is that the model provided by Scala's Actor is different from that of multithreading. Scala's Actor avoids locks and shared states as much as possible, so as to avoid resource contention when multithreaded concurrency occurs, thus improving the performance of multithreaded programming.
The distributed multithreading framework used in Spark is Akka, which is a multithreaded class library of Scala. Akka also implements a model similar to Scala Actor, and its core concept is also Actor. The Scala Actor model was still in use in 2.1.0, but it was abandoned in 2.1.1. Spark began to switch to AKKA instead of Scala Actor, but the concept and principle of Scala Actor are the same. So learning Scala Actor is helpful for us to learn AKKA,Spark.
The reason for learning Scala Actor,AKKA is that when we learn the source code of Spark, we can understand the source code of Spark, because of the heavy use of AKKA's delivery mechanism in the underlying messaging mechanism.
Scala actor
Before using it, you need to introduce maven dependencies:
Org.scala-lang scala-actors 2.10.5actor unidirectional communication
The test code is as follows:
Package cn.xpleaf.bigdata.p5.myactorimport scala.actors.Actor/** * learn the basic operation of scala actor * is almost the same as Runnable Thread in java * * step 1: write a class and extend the attribute trait Actor (actor of scala) * step 2: copy the act method in it * step 3: create the object of the actor and call the start () method of the object Start the thread * step 4: send the message through the operator "!" of scala * step 5: call close to * * simulate an one-way greeting * / object ActorOps {def main (args: Array [String]): Unit = {val mFActor = new MyFirstActor () mFActor.start () / / send the message mFActor! "Xiao Mei, are you asleep?" MFActor! "I'm going to take a shower." mFActor! Class MyFirstActor extends Actor {override def act (): Unit = {while (true) {receive {case str: String = > println (str)}
The output is as follows:
Xiao Mei, are you asleep? I went to take a shower ~ hehe uses the sample class (case class) for actor message delivery
The test code is as follows:
Package cn.xpleaf.bigdata.p5.myactorimport scala.actors.Actor/** / object GreetingActor {def main (args: Array [String]): Unit = {val ga = new GreetingActor ga.start () ga! Greeting ("Xiao Mei") ga! WorkContent ("install system")} case class Greeting (name:String) case class WorkContent (content:String) class GreetingActor extends Actor {override def act (): Unit = {while (true) {receive {case Greeting (name) = > println (s "Hello, $name") case WorkContent (content) = > println (s "Let's talk about sth. With $content")}
The output is as follows:
Hello, Xiaomei Let's talk about sth. With installs actor to communicate with each other
The test code is as follows:
Package cn.xpleaf.bigdata.p5.myactorimport scala.actors.Actor/** * actor threads communicate with each other * * studentActor * asks the teacher a question * * teacherActor * responds to students * * Communication protocol: * request, use Request (content) to express * response Use Response to express * / object _ 03CommunicationActorOps {def main (args: Array [String]): Unit = {val teacherActor = new TeacherActor () teacherActor.start () val studentActor = new StudentActor (teacherActor) studentActor.start () studentActor! Request ("Lao Li" Why is scala learning so difficult ")} case class Request (req:String) case class Response (resp:String) class StudentActor (teacherActor: TeacherActor) extends Actor {override def act (): Unit = {while (true) {receive {case Request (req) = > {/ / ask the teacher for related questions println (the student said to the teacher: "+ req) teacherActor! Request (req)} case Response (resp) = > {println (resp) println ("high!") } class TeacherActor () extends Actor {override def act (): Unit = {while (true) {receive {case Request (req) = > {/ / received the student's request sender! Response ("this problem needs to be solved ~")}
The output is as follows:
The student said to the teacher: Lao Li, why is it so difficult to learn scala? this problem needs to be solved so high! Synchronization and Future of messages
1. Scala by default, messages are sent asynchronously, but if the message sent is synchronous, that is, after the other party accepts it, you must return the result to yourself, then you can use!? To send a message. That is:
Val response= activeActor!? ActiveMessage
2. If you want to send a message asynchronously, but you want to get the return value of the message later, you can use Future. That is! Syntax, as follows:
Val futureResponse = activeActor! ActiveMessageval activeReply = future () AKKA actor
First, you need to add the maven dependency of akka:
Com.typesafe.akka akka-actor_2.10 2.3.16 com.typesafe.akka akka-remote_2.10 2.3.16 com.typesafe.akka akka-slf4j_2.10 2.3.16AKKA messaging-Local
The principle is as follows:
_ 01StudentActorOpspackage cn.xpleaf.bigdata.p5.myakka.p1import akka.actor. {Actor, ActorSystem, Props} import cn.xpleaf.bigdata.p5.myakka.MessageProtocol. {QuoteRequest QuoteResponse} import scala.util.Random/** * one-way communication case based on AKKA Actor * students send requests to teachers * / object _ 01StudentActorOps {def main (args: Array [String]): Unit = {/ / first step: build Actor operating system val actorSystem = ActorSystem ("StudentActorSystem") / / second step: actorSystem creates TeacherActor proxy object ActorRef val teacherActorRef = actorSystem. ActorOf (Props [TeacherActor]) / / step 3: send the message teacherActorRef! QuoteRequest () Thread.sleep (2000) / / step 4: close actorSystem.shutdown ()}} class TeacherActor extends Actor {val quotes = List ("Moderation is for cowards", "Anything worth doing is worth overdoing", "The trouble is you think you have time") "You never gonna know if you never even try") override def receive = {case QuoteRequest () = > {val random = new Random () val randomIndex = random.nextInt (quotes.size) val randomQuote = quotes (randomIndex) val response = QuoteResponse (randomQuote) println (response)}} MessageProtocol
Later, several test programs for akka communication will use this object, which is only given here and will not be given later.
Package cn.xpleaf.bigdata.p5.myakka/** * akka actor Communication Protocol * / object MessageProtocol {case class QuoteRequest () case class QuoteResponse (resp: String) case class InitSign ()} object Start extends Serializableobject Stop extends Serializabletrait Message {val id: String} case class Shutdown (waitSecs: Int) extends Serializablecase class Heartbeat (id: String, magic: Int) extends Message with Serializablecase class Header (id: String, len: Int, encrypted: Boolean) extends Message with Serializablecase class Packet (id: String, seq: Long Content: String) extends Message with Serializable test
The output is as follows:
QuoteResponse (Anything worth doing is worth overdoing) AKKA request and response-Local
The principle is as follows:
TeacherActorpackage cn.xpleaf.bigdata.p5.myakka.p2import akka.actor.Actorimport cn.xpleaf.bigdata.p5.myakka.MessageProtocol. {QuoteRequest, QuoteResponse} import scala.util.Random/** * TeacherActor * / class TeacherActor extends Actor {val quotes = List ("Moderation is for cowards", "Anything worth doing is worth overdoing", "The trouble is you think you have time" "You never gonna know if you never even try") override def receive = {case QuoteRequest () = > {val random = new Random () val randomIndex = random.nextInt (quotes.size) val randomQuote = quotes (randomIndex) val response = QuoteResponse (randomQuote) / / println (response) sender! Response}} StudentActorpackage cn.xpleaf.bigdata.p5.myakka.p2import akka.actor. {Actor, ActorLogging, ActorRef} import cn.xpleaf.bigdata.p5.myakka.MessageProtocol. {InitSign, QuoteRequest, QuoteResponse} / * StudentActor * when the student receives the InitSign signal, they send a Request request message to the teacher * / class StudentActor (teacherActorRef:ActorRef) extends Actor with ActorLogging {override def receive = {case InitSign = > {teacherActorRef! QuoteRequest () / / println ("student send request")} case QuoteResponse (resp) = > {log.info (s "$resp")} DriverApppackage cn.xpleaf.bigdata.p5.myakka.p2import akka.actor. {ActorSystem Props} import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.InitSignobject DriverApp {def main (args: Array [String]): Unit = {val actorSystem = ActorSystem ("teacherStudentSystem") / / teacher's surrogate val teacherActorRef = actorSystem.actorOf (Props [TeacherActor], "teacherActor") / / student's surrogate val studentActorRef = actorSystem.actorOf (props [study Actor] (new StudentActor (teacherActorRef)), "studentActor") studentActorRef! InitSign Thread.sleep (2000) actorSystem.shutdown ()}} Test
The output is as follows:
[INFO] [04provider] [04Accord] [teacherStudentSystem-akka.actor.default-dispatcher-2] [akka://teacherStudentSystem/user/studentActor] Anything worth doing is worth overdoingAKKA request and response-remote application.confMyRemoteServerSideActor {akka {provider = "akka.remote.RemoteActorRefProvider"} remote {enabled-transports = ["akka.remote.netty.tcp"] netty.tcp {hostname = "127.0.0.1" port = 2552} MyRemoteClientSideActor {akka {actor {provider = "akka.remote.RemoteActorRefProvider"} RemoteActorpackage cn.xpleaf.bigdata.p5.myakka.p3import akka.actor. {Actor ActorLogging} import cn.xpleaf.bigdata.p5.myakka. {Header, Shutdown, Start Stop} class RemoteActor extends Actor with ActorLogging {def receive = {case Start = > {/ / processing Start messages log.info ("Remote Server Start = = > RECV Start event:" + Start)} case Stop = > {/ / processing Stop messages log.info ("Remote Server Stop = = > RECV Stop event:" + Stop)} case Shutdown (waitSecs) = > {/ / Processing Shutdown messages log.info ("Remote Server Shutdown = = > Wait to shutdown: waitSecs=" + waitSecs) Thread.sleep (waitSecs) log.info ("Remote Server Shutdown = = > Shutdown this system.") Context.system.shutdown / / stop the current ActorSystem system} case Header (id, len, encrypted) = > log.info ("Remote Server = > RECV header:" + (id, len, encrypted)) / / process Header messages case _ = >}} AkkaServerApplicationpackage cn.xpleaf.bigdata.p5.myakka.p3import akka.actor. {ActorSystem Props} import com.typesafe.config.ConfigFactoryobject AkkaServerApplication extends App {/ / create an ActorSystem named remote-system: get the configuration content of the Actor from the configuration file application.conf val system = ActorSystem ("remote-system", ConfigFactory.load (). GetConfig ("MyRemoteServerSideActor")) val log = system.log log.info ("= = > Remote server actor started:" + system) / / create an Actor named remoteActor Returns an ActorRef We do not need to use this return value system.actorOf (Props [RemoteActor], "remoteActor")} ClientActorpackage cn.xpleaf.bigdata.p5.myakka.p3import akka.actor.SupervisorStrategy.Stopimport akka.actor. {Actor, ActorLogging} import cn.xpleaf.bigdata.p5.myakka. {Header Start} path to class ClientActor extends Actor with ActorLogging {/ / akka.://@:/ val path = "akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor" / / remote Actor Through this path, you can get a reference to the remote Actor val remoteServerRef = context.actorSelection (path) / / get a reference to the remote Actor, through which you can send the message @ volatile var connected = false @ volatile var stop = false def receive = {case Start = > {/ / send the Start message indicating the communication to be processed with the remote Actor for subsequent business logic processing. You can instruct the remote Actor to initialize some operations or data send (Start) if (! connected) that satisfy business processing. {connected = true log.info ("ClientActor== > Actor connected:" + this)}} case Stop = > {send (Stop) stop = true Connected = false log.info ("ClientActor= > Stopped")} case header: Header = > {log.info ("ClientActor= > Header") send (header)} case (seq) Result) = > log.info ("RESULT: seq=" + seq + ", result=" + result) / / used to receive the result of remote Actor processing a Packet message case m = > log.info ("Unknown message:" + m)} private def send (cmd: Serializable): Unit = {log.info ("Send command to server:" + cmd) try {remoteServerRef! Cmd / / sends a message to the remote Actor, which must be serializable because the message object is transmitted over the network} catch {case e: Exception = > {connected = false log.info ("Try to connect by sending Start command...") Send (Start)} AkkaClientApplicationpackage cn.xpleaf.bigdata.p5.myakka.p3import akka.actor. {ActorSystem, Props} import cn.xpleaf.bigdata.p5.myakka. {Header, Start} import com.typesafe.config.ConfigFactoryobject AkkaClientApplication extends App {/ / create ActorSystem system val system = ActorSystem ("client-system") through configuration file ActorSystem configuration ConfigFactory.load () .getConfig ("MyRemoteClientSideActor") val log = system.log val clientActor = system.actorOf (Props [ClientActor], "clientActor") / / get a reference to ClientActor clientActor! Start / / sends a Start message, shaking hands with the remote Actor for the first time (forwarded via the local ClientActor) Thread.sleep (2000) clientActor! Header ("What's your name: Can you tell me", 20, encrypted = false) / / send a Header message to the remote Actor (forwarded via local ClientActor) Thread.sleep (2000)} test
The output from the server is as follows:
[INFO] [04/24/2018 09:39:49.271] [main] [Remoting] Starting remoting [INFO] [04/24/2018 09:39:49.508] [main] [Remoting] Remoting started Listening on addresses: [akka.tcp://remote-system@127.0.0.1:2552] [INFO] [04 remote-system] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://remote-system@127.0.0.1:2552] [INFO] [04 INFO] [main] [ActorSystem (remote-system)] = > Remote server actor started: akka://remote -system [INFO] [04akka] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor] Remote Server Start = = > RECV Start event: cn.xpleaf.bigdata.p5.myakka.Start$@325737b3 [INFO] [04ramp] [remote-system-akka.actor.default-dispatcher-3] [akka]. Tcp://remote-system@127.0.0.1:2552/user/remoteActor] Remote Server = > RECV header: (What's your name: Can you tell me 20pr false)
The output from the client is as follows:
[INFO] [04/24/2018 09:46:01.274] [main] [Remoting] Starting remoting [INFO] [04/24/2018 09:46:01.479] [main] [Remoting] Remoting started Listening on addresses: [akka.tcp://client-system@192.168.43.132:2552] [INFO] [04/24/2018 09:46:01.480] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://client-system@192.168.43.132:2552] [INFO] [04/24/2018 09:46:01.493] [client-system-akka.actor.default-dispatcher-4] [akka.tcp:/ / client-system@192.168.43.132:2552/user/clientActor] Send command to server: cn.xpleaf.bigdata.p5.myakka.Start$@4f00805d [INFO] [04gamma 24Unix] 2018 09Groupe 46 INFO 01.496] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] ClientActor== > Actor connected: cn.xpleaf.bigdata.p5.myakka.p3.ClientActor@5a85b576 [INFO] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] ClientActor= > Header [INFO] [04 Send command] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] Send command To server: Header (What's your name: Can you tell me 20pr false)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.