In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
In this issue, the editor will bring you an in-depth understanding of Scala concurrency. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.
Similar to Scala's sister language F# in .NET, Scala is one of the solutions to the "concurrency problem". Here I discuss some of the properties of Scala that make it more suitable for writing thread-safe code, such as objects that are immutable by default, and discuss a design that returns copies of objects rather than modifying their contents. Scala's support for concurrency goes far beyond that; now it's necessary to take a look at the various libraries of Scala.
Concurrency basis
Before delving into Scala's concurrency support, it is important to make sure that you have a good understanding of the basic concurrency model of Java, because Scala's concurrency support is, to some extent, based on the features and capabilities provided by JVM and support libraries. To do this, the code in listing 1 contains a known Producer/Consumer concurrency problem (see the "Guarded Blocks" section of Sun Java Tutorial for details). Note that the Java Tutorial version does not use the java.util.concurrent class in its solution, but preferably uses the older wait () / notifyAll () method in java.lang.Object:
Listing 1. Producer/Consumer (before Java5)
Package com.tedneward.scalaexamples.notj5; class Producer implements Runnable {private Drop drop; private String importantInfo [] = {"Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too"}; public Producer (Drop drop) {this.drop = drop;} public void run () {for (int I = 0; I)
< importantInfo.length; i++) { drop.put(importantInfo[i]); } drop.put("DONE"); } } class Consumer implements Runnable { private Drop drop; public Consumer(Drop drop) { this.drop = drop; } public void run() { for (String message = drop.take(); !message.equals("DONE"); message = drop.take()) { System.out.format("MESSAGE RECEIVED: %s%n", message); } } } class Drop { //Message sent from producer to consumer. private String message; //True if consumer should wait for producer to send message, //false if producer should wait for consumer to retrieve message. private boolean empty = true; //Object to use to synchronize against so as to not "leak" the //"this" monitor private Object lock = new Object(); public String take() { synchronized(lock) { //Wait until message is available. while (empty) { try { lock.wait(); } catch (InterruptedException e) {} } //Toggle status. empty = true; //Notify producer that status has changed. lock.notifyAll(); return message; } } public void put(String message) { synchronized(lock) { //Wait until message has been retrieved. while (!empty) { try { lock.wait(); } catch (InterruptedException e) {} } //Toggle status. empty = false; //Store message. this.message = message; //Notify consumer that status has changed. lock.notifyAll(); } } } public class ProdConSample { public static void main(String[] args) { Drop drop = new Drop(); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); } } Java 教程 "缺陷" 好奇的读者可能会将此处的代码与 Java Tutorial 中的代码进行比较,寻找它们之间有哪些不同;他们会发现我并未 "同步" put 和 take 方法,而是使用了存储在 Drop 中的 lock 对象。其原因非常简单:对象的监测程序永远都不会封装在类的内部,因此 Java Tutorial 版本允许此代码打破此规则(显然很疯狂): public class ProdConSample { public static void main(String[] args) { Drop drop = new Drop(); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); synchronized(drop) { Thread.sleep(60 * 60 * 24 * 365 * 10); // sleep for 10 years?!? } } } 通过使用私有对象作为锁定所依托的监测程序,此代码将不会有任何效果。从本质上说,现在已经封装了线程安全的实现;然后,它才能依赖客户机的优势正常运行。 注意:我在此处展示的代码对 Sun 教程解决方案做了少许修改;它们提供的代码存在一个很小的设计缺陷(参见 Java 教程 "缺陷")。 Producer/Consumer 问题的核心非常容易理解:一个(或多个)生产者实体希望将数据提供给一个(或多个)使用者实体供它们使用和操作(在本例中,它包括将数据打印到控制台)。Producer 和 Consumer 类是相应直观的 Runnable-实现类:Producer 从数组中获取 String,并通过 put 将它们放置到 Consumer 的缓冲区中,并根据需要执行 take。 问题的难点在于,如果 Producer 运行过快,则数据在覆盖时可能会丢失;如果 Consumer 运行过快,则当 Consumer 读取相同的数据两次时,数据可能会得到重复处理。缓冲区(在 Java Tutorial 代码中称作 Drop)将确保不会出现这两种情况。数据破坏的可能性就更不用提了(在 String 引用的例子中很困难,但仍然值得注意),因为数据会由 put 放入缓冲区,并由 take 取出。 关于此主题的全面讨论请阅读 Brian Goetz 的 Java Concurrency in Practice 或 Doug Lea 的 Concurrent Programming in Java(参见 参考资料),但是,在应用 Scala 之前有必要快速了解一下此代码的运行原理。 当 Java 编译器看到 synchronized 关键字时,它会在同步块的位置生成一个 try/finally 块,其顶部包括一个 monitorenter 操作码,并且 finally 块中包括一个 monitorexit 操作码,以确保监控程序(Java 的原子性基础)已经发布,而与代码退出的方式无关。因此,Drop 中的 put 代码将被重写,如清单 2 所示: 清单 2. 编译器失效后的 Drop.put // This is pseudocode public void put(String message) { try { monitorenter(lock) //Wait until message has been retrieved. while (!empty) { try { lock.wait(); } catch (InterruptedException e) {} } //Toggle status. empty = false; //Store message. this.message = message; //Notify consumer that status has changed. lock.notifyAll(); } finally { monitorexit(lock) } } wait() 方法将通知当前线程进入非活动状态,并等待另一个线对该对象调用 notifyAll()。然后,通知的线程必须在能够继续执行的时候尝试再次获取监控程序。从本质上说,wait() 和 notify()/notifyAll() 允许一种简单的信令机制,它允许 Drop 在 Producer 和 Consumer 线程之间进行协调,每个 put 都有相应的 take。 本文的 代码下载 部分使用 Java5 并发性增强(Lock 和 Condition 接口以及 ReentrantLock 锁定实现)提供 清单 2 的基于超时的版本,但基本代码模式仍然相同。这就是问题所在:编写清单 2 这样的代码的开发人员需要过度专注于线程和锁定的细节以及低级实现代码,以便让它们能够正确运行。此外,开发人员需要对每一行代码刨根知底,以确定是否需要保护它们,因为过度同步与过少同步同样有害。 现在,我们来看到 Scala 替代方案。 良好的 Scala 并发性 (v1) 开始应用 Scala 并发性的一种方法是将 Java 代码直接转换为 Scala,以便利用 Scala 的语法优势来简化代码(至少能简化一点): 清单 3. ProdConSample (Scala) object ProdConSample { class Producer(drop : Drop) extends Runnable { val importantInfo : Array[String] = Array( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" ); override def run() : Unit = { importantInfo.foreach((msg) =>Drop.put (msg)) drop.put ("DONE")}} class Consumer (drop: Drop) extends Runnable {override def run (): Unit = {var message = drop.take () while (message! = "DONE") {System.out.format ("MESSAGE RECEIVED:% s% n") Message) message = drop.take ()}} class Drop {var message: String = "" var empty: Boolean = true var lock: AnyRef = new Object () def put (x: String): Unit = lock.synchronized {/ / Wait until message has been retrieved await (empty = = true) / / Toggle status empty = false / / Store message message = x / / Notify consumer that status has changed lock.notifyAll ()} def take (): String = lock.synchronized {/ / Wait until message is available. Await (empty= = false) / / Toggle status empty=true / / Notify producer that staus has changed lock.notifyAll () / / Return the message message} private def await (cond: = > Boolean) = while (! cond) {lock.wait ()}} def main (args: Array [String]): Unit = {/ / Create Drop val drop = new Drop () / / Spawn Producer new Thread (new Producer (drop)) .start (); / / Spawn Consumer new Thread (new Consumer (drop)) .start ();}}
The Producer and Consumer classes are almost the same as their Java counterparts, once again extending (implementing) the Runnable interface and overriding the run () method, and-in the case of Producer-using built-in iterative methods to traverse the contents of the importantInfo array. In fact, in order to make it more like Scala,importantInfo, it should probably be a List rather than an Array, but I want to make sure that they are as consistent as possible with the original Java code when I make * attempts. )
The Drop class is also similar to its Java version. However, there are some exceptions in Scala. "synchronized" is not a keyword, but a method defined for the AnyRef class, that is, Scala "the root of all reference types." This means that to synchronize a particular object, you only need to call the synchronization method on that object; in this case, the synchronization method is called on the object saved in the lock field on Drop.
Note that we also take advantage of a Scala mechanism in the Drop class defined by the await () method: the cond parameter is a block of code waiting to be evaluated, not before it is passed to the method. In Scala, this is called "call-by-name"; here, it is a practical way to capture conditional wait logic that needs to be expressed twice in the Java version (for put and take, respectively).
*, in main (), create an instance of Drop, instantiate the two threads, start them with start (), and then exit at the end of main (). It is believed that JVM will start both threads before the end of main (). (in production code, this situation may not be guaranteed, but for such a simple example, 99.99% is fine. )
However, as has been said, there is still the same basic problem: programmers still need to worry too much about communication and coordination between the two threads. Although some Scala mechanisms can simplify syntax, this is not quite attractive so far.
Scala concurrency v2
There is an interesting package in Scala Library Reference: scala.concurrency. This package contains many different concurrency structures, including the MailBox class we are about to take advantage of.
As the name implies, MailBox is essentially Drop, a single-slot buffer used to hold blocks of data before detection. However, the advantage of MailBox * * is that it completely encapsulates the details of sending and receiving data into pattern matching and case classes, which makes it more flexible than simple Drop (or Drop's multi-slot data storage class java.util.concurrent.BoundedBuffer).
Listing 4. ProdConSample, v2 (Scala)
Package com.tedneward.scalaexamples.scala.V2 {import concurrent. {MailBox, ops} object ProdConSample {class Producer (drop: Drop) extends Runnable {val importantInfo: Array [String] = Array ("Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too") Override def run (): Unit = {importantInfo.foreach ((msg) = > drop.put (msg)) drop.put ("DONE")} class Consumer (drop: Drop) extends Runnable {override def run (): Unit = {var message = drop.take () While (message! = "DONE") {System.out.format ("MESSAGE RECEIVED:% s% n") Message) message = drop.take ()} class Drop {privateval m = new MailBox () private case class Empty () private case class Full (x: String) m send Empty () / / initialization def put (msg: String): Unit = { M receive {case Empty () = > m send Full (msg)}} def take (): String = {m receive {case Full (msg) = > m send Empty () Msg} def main (args: Array [String]): Unit = {/ / Create Drop val drop = new Drop () / / Spawn Producer new Thread (new Producer (drop)) .start (); / / Spawn Consumer new Thread (new Consumer (drop)) .start ();}
Here, the only difference between v2 and v1 is the implementation of Drop, which now leverages the MailBox class to handle blocking and signal transactions for incoming and deleted messages from Drop. We can rewrite Producer and Consumer to use MailBox directly, but for simplicity, we assume that we want to keep the Drop API consistent in all examples. Using MailBox is slightly different from using a typical BoundedBuffer (Drop), so let's take a closer look at the code.
MailBox has two basic operations: send and receive. The receiveWithin method is only based on the timeout receive. MailBox receives messages of any type. The send () method places the message in the mailbox and immediately notifies any waiting recipient who cares about the type of message, and appends it to a message linked list for later retrieval. The receive () method blocks until a message is received that is appropriate for the functional block.
So, in this case, we will create two case classes, one containing nothing (Empty), which means that MailBox is empty and the other contains message data (Full).
The put method, because it places the data in the Drop and calls receive () on the MailBox to find the Empty instance, it blocks until the Empty is sent. At this point, it sends an instance of Full to the MailBox containing the new data.
The take method, because it removes data from Drop, calls receive () on MailBox to find the Full instance, extracts messages (again thanks to the ability of pattern matching to extract values from within the case class and bind them to local variables), and sends an instance of Empty to MailBox.
No explicit locking is required, and monitoring programs do not need to be considered.
Scala concurrency v3
In fact, we can significantly shorten the code, as long as Producer and Consumer do not need full-featured classes (as is the case here)-both are essentially thin wrappers for the Runnable.run () method, Scala can be implemented using the spawn method of the scala.concurrent.ops object, as shown in listing 5:
Listing 5. ProdConSample, v3 (Scala)
Package com.tedneward.scalaexamples.scala.V3 {import concurrent.MailBox import concurrent.ops._ object ProdConSample {privateval m = new MailBox () private case class Empty () private case class Full (x: String) m send Empty () / / initialization def put (msg: String): Unit = { M receive {case Empty () = > m send Full (msg)}} def take (): String = {m receive {case Full (msg) = > m send Empty () Msg} def main (args: Array [String]): Unit = {/ / Create Drop val drop = new Drop () / / Spawn Producer spawn {val importantInfo: Array [String] = Array ("Mares eat oats", "Does eat oats", "Little lambs eat ivy" "A kid will eat ivy too") ImportantInfo.foreach ((msg) = > drop.put (msg) drop.put ("DONE")} / / Spawn Consumer spawn {var message = drop.take () while (message! = "DONE") {System.out.format ("MESSAGE RECEIVED:% s% n") Message) message = drop.take ()}
The spawn method (imported through the ops object at the top of the package block) takes a code block (another by-name parameter example) and wraps it inside the run () method of the anonymously constructed thread object. In fact, it is not difficult to understand what the definition of spawn looks like inside the ops class:
Listing 6. Scala.concurrent.ops.spawn ()
Def spawn (p: = > Unit) = {val t = new Thread () {override def run () = p} t.start ()}
…… This once again emphasizes the power of the by-name parameter.
One drawback of the ops.spawn method is that it was written in 2003 before the Java 5 concurrency class was available. In particular, the role of java.util.concurrent.Executor and its peers is to make it easier for developers to generate threads without actually dealing with the details of creating thread objects directly. Fortunately, recreating the definition of spawn in your own custom library is fairly simple, which requires the use of Executor (or ExecutorService or ScheduledExecutorService) to perform the actual startup task of the thread.
In fact, Scala's concurrency support goes beyond MailBox and ops classes; Scala also supports a similar concept of "Actors", which uses a messaging approach similar to that adopted by MailBox, but is more comprehensive and flexible. However, this part will be discussed in the next issue.
Scala provides two levels of support for concurrency, which is very similar to other Java-related topics:
First, full access to the underlying library (such as java.util.concurrent) and support for "traditional" Java concurrency semantics (such as monitor and wait () / notifyAll ()).
Second, there is a layer of abstraction above these basic mechanisms, as detailed in the MailBox classes discussed in this article and the Actors library that will be discussed in the next article in this series.
The goal in both examples is the same: to make it easier for developers to focus on the essence of the problem without considering the low-level details of concurrent programming (obviously, the second approach achieves this goal better, at least for people who don't think too much about low-level details. )
However, an obvious drawback of the current Scala library is the lack of Java 5 support; the scala.concurrent.ops class should have methods like spawn that take advantage of the new Executor interface. It should also support various versions of synchronized that leverage the new Lock interface. Fortunately, these are library enhancements that can be implemented in the Scala lifecycle without breaking existing code; they can even be done by Scala developers themselves without waiting for Scala's core development team to provide them (in a small amount of time).
The above is the editor for you to share how to in-depth understanding of Scala concurrency, if you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.