Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the evolution of producer and consumer problems in Java

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article will explain in detail what is the evolution of producer and consumer issues in Java. The content of the article is of high quality, so the editor will share it with you for reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.

Want to know more about the evolution of Java producer-consumer issues? Let's take a look at this article. Let's deal with this problem in the old way and in the new way.

Producer-consumer problem is a typical multi-process synchronization problem.

For most of us, this problem may be the first synchronization problem we encounter when we implement the first parallel algorithm in school.

Although it is simple, it has always been the biggest challenge in parallel computing-multiple processes share a resource.

Problem statement

Two programs, producer and consumer, share a public buffer of limited size.

Suppose a producer "produces" a piece of data and stores it in a buffer, while a consumer "consumes" the data and removes it from the buffer.

Assuming that the two programs are running concurrently, we need to make sure that when the buffer is full, the producer does not put new data in, and that when the buffer is empty, consumers do not try to delete the data in the data buffer.

Solution

In order to solve the above concurrency problem, producers and consumers will have to communicate with each other.

If the buffer is full, the producer will sleep until a notification message wakes up.

After the consumer removes some data from the buffer, the consumer will notify the producer, and the producer will then start filling the buffer again.

If the buffer content is empty, the situation is the same, except that the consumer waits for notification from the producer first.

But if this communication is not done properly, where processes are waiting for each other, it can lead to program deadlock.

The classic method

First, let's look at a typical Java solution to solve this problem.

Package ProducerConsumer;import java.util.LinkedList;import java.util.Queue;public class ClassicProducerConsumerExample {public static void main (String [] args) throws InterruptedException {Buffer buffer = new Buffer (2); Thread producerThread = new Thread (new Runnable () {@ Override public void run () {try {buffer.produce ();} catch (InterruptedException e) {e.printStackTrace ();}); Thread consumerThread = new Thread (new Runnable () {@ Override public void run () {try {buffer.consume () } catch (InterruptedException e) {e.printStackTrace ();}); producerThread.start (); consumerThread.start (); producerThread.join (); consumerThread.join ();} static class Buffer {private Queue list; private int size; public Buffer (int size) {this.list = new LinkedList (); this.size = size;} public void produce () throws InterruptedException {int value = 0 While (true) {synchronized (this) {while (list.size () > = size) {/ / wait for the consumer wait ();} list.add (value); System.out.println ("Produced" + value); value++; / / notify the consumer notify (); Thread.sleep (1000);} public void consume () throws InterruptedException {while (true) {synchronized (this) {while (list.size () = = 0) {/ / wait for the producer wait () } int value = list.poll (); System.out.println ("Consume" + value); / / notify the producer notify (); Thread.sleep (1000);}}

Here we have two threads, producer and consumer, which share a common buffer. The producer thread starts to generate new elements and store them in the buffer. If the buffer is full, the producer thread goes to sleep until a notification wakes up. Otherwise, the producer thread will create a new element in the buffer and notify the consumer. As I said before, this process also applies to consumers. If the buffer is empty, the consumer will wait for notification from the producer. Otherwise, the consumer removes an element from the buffer and notifies the producer.

As you can see, in the previous example, the job of both producers and consumers is to manage buffer objects. These threads simply call the buffer.produce () and buffer.consume () methods to get everything done.

Whether buffers should be responsible for creating or deleting elements has always been a controversial topic, but in my opinion, buffers should not do such a thing. Of course, it depends on what you want to achieve, but in this case, the buffer should only be responsible for storing merged elements in a thread-safe form, not producing new elements.

So let's decouple the logic of production and consumption from the buffer object.

Package ProducerConsumer;import java.util.LinkedList;import java.util.Queue;public class ProducerConsumerExample2 {public static void main (String [] args) throws InterruptedException {Buffer buffer = new Buffer (2); Thread producerThread = new Thread (()-> {try {int value = 0; while (true) {buffer.add (value); System.out.println ("Produced" + value); value + +; Thread.sleep (1000);}} catch (InterruptedException e) {e.printStackTrace ();}}) Thread consumerThread = new Thread (()-> {try {while (true) {int value = buffer.poll (); System.out.println ("Consume" + value); Thread.sleep (1000);}} catch (InterruptedException e) {e.printStackTrace ();}}); producerThread.start (); consumerThread.start (); producerThread.join (); consumerThread.join ();} static class Buffer {private Queue list; private int size; public Buffer (int size) {this.list = new LinkedList () This.size = size;} public void add (int value) throws InterruptedException {synchronized (this) {while (list.size () > = size) {wait ();} list.add (value); notify ();}} public int poll () throws InterruptedException {synchronized (this) {while (list.size () = = 0) {wait ();} int value = list.poll (); notify (); return value;}}

This is much better, at least for now the buffer is only responsible for storing and deleting elements in a thread-safe form.

Queue blocking (BlockingQueue)

However, we can further improve.

In the previous example, we have created a buffer that waits for a slot to be available before storing an element in case there is not enough storage space, and before merging, the buffer also waits for a new element to appear to ensure that the storage and deletion operations are thread-safe.

However, Java's own libraries have integrated these operations. It is called BlockingQueue, and you can see its detailed documentation here.

BlockingQueue is a queue that stores and fetches instances in a thread-safe manner. And that's what we need.

So, if we use BlockingQueue in the example, we no longer need to implement the mechanism of waiting and notification.

Next, let's look at the specific code.

Package ProducerConsumer;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingDeque;public class ProducerConsumerWithBlockingQueue {public static void main (String [] args) throws InterruptedException {BlockingQueue blockingQueue = new LinkedBlockingDeque (2); Thread producerThread = new Thread (()-> {try {int value = 0; while (true) {blockingQueue.put (value); System.out.println ("Produced" + value); value++; Thread.sleep (1000);} catch (InterruptedException e) {e.printStackTrace ();}}) Thread consumerThread = new Thread (()-> {try {while (true) {int value = blockingQueue.take (); System.out.println ("Consume" + value); Thread.sleep (1000);}} catch (InterruptedException e) {e.printStackTrace ();}}); producerThread.start (); consumerThread.start (); producerThread.join (); consumerThread.join ();}}

Although runnables looks the same as before, they produce and consume elements the way they did before.

The only difference is that here we use blockingQueue instead of the buffer object.

More details about Blocking Queue

There are many types of BlockingQueue:

Unbounded queue

Bounded queue

An unbounded queue can add elements almost indefinitely, and any addition will not be blocked.

You can create an unbounded queue in this way:

BlockingQueue blockingQueue = new LinkedBlockingDeque ()

In this case, because the add operation is not blocked, the producer does not have to wait to add a new element. Every time a producer wants to add a new element, a queue stores it first. However, there is also an exception to catch. If the consumer deletes the element more slowly than the producer adds a new one, the memory will fill up and we will probably get an OutOfMemory exception.

In contrast, there is a bounded queue, which has a fixed size. You can create it like this:

BlockingQueue blockingQueue = new LinkedBlockingDeque (10)

The main difference between the two is that in the case of a bounded queue, if the queue memory is full and the producer is still trying to stuff elements into it, then the queue will be blocked (depending on how the elements are added) until there is enough space available.

There are four ways to add elements to blocking queue:

Add ()-returns true if the insert is successful, otherwise throws IllegalStateException

Put ()-insert elements into the queue and wait for an available slot (slot) if necessary

Offer ()-returns true if the insert element is successful, false otherwise

Offer (E, long timeout, TimeUnit unit)-insert an element into the queue if the queue is not full, or after waiting for a specified time for an available slot.

So, if you use the put () method to insert an element and the queue memory is full, our producer will have to wait until a slot is available.

That's all of our last case, which is the same as how ProducerConsumerExample2 works.

Use thread pool

Is there anything else we can optimize? First of all, let's analyze what we did. We instantiated two threads, one called the producer, which specifically stuffed elements into the queue, and the other called the consumer, which was responsible for deleting elements from the queue.

However, good software technology shows that it is not good to create and destroy threads manually. Creating a thread first is an expensive task, and each time a thread is created, it means going through the following steps:

The first step is to allocate memory to a thread stack

The operating system creates a native thread corresponding to the Java thread

Descriptors associated with this thread are added to the data structures within the JVM

First of all, don't get me wrong, there's no problem with using a few threads in our case, and that's one of the ways to work concurrently. The problem here is that we create threads manually, which is a bad practice. If we create threads manually, in addition to the consumption during the creation process, another problem is that we cannot control how many threads are running at the same time. For example, if there are a million requests for online services at the same time, a thread will be created for each request, and a million threads will run in the background at the same time, which will lead to thread starvation

So, we need a way to manage threads globally, which uses thread pools.

The thread pool will handle the life cycle of the thread based on the strategy we choose. It has a limited number of idle threads and enables them when tasks need to be resolved. In this way, we do not need to create a new thread for each new request, so we can avoid the problem of thread hunger.

The implementation of the Java thread pool includes:

A task queue

A collection of worker threads

A thread factory

Metadata for managing thread pool state

In order to run some tasks at the same time, you must first put them in the task queue. Then, when a thread is available, it will receive a task and run it. The more threads are available, the more tasks will be executed in parallel.

In addition to managing the thread life cycle, another advantage of using thread pools is that you can think of more ways when you plan how to split tasks so that they can be executed simultaneously. The unit of parallelism is no longer a thread, but a task. Instead of having threads run concurrently by sharing a common block of memory, you design tasks to execute concurrently. Thinking in terms of functional requirements can help us avoid some common multithreading problems, such as deadlocks or data competition. Nothing can stop us from digging into these problems again, but because of the use of functional paradigms, we cannot imperative to synchronize parallel computing (locks). This is much less likely than directly using threads and shared memory. In our example, sharing a blocking queue is not the desired situation, but I just want to emphasize this advantage.

You can find more about thread pooling here and here.

Having said that, let's take a look at how thread pools are used in our case.

Package ProducerConsumer;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingDeque;public class ProducerConsumerExecutorService {public static void main (String [] args) {BlockingQueue blockingQueue = new LinkedBlockingDeque (2); ExecutorService executor = Executors.newFixedThreadPool (2); Runnable producerTask = ()-> {try {int value = 0; while (true) {blockingQueue.put (value); System.out.println ("Produced" + value); value++; Thread.sleep (1000) } catch (InterruptedException e) {e.printStackTrace ();}}; Runnable consumerTask = ()-> {try {while (true) {int value = blockingQueue.take (); System.out.println ("Consume" + value); Thread.sleep (1000);}} catch (InterruptedException e) {e.printStackTrace ();}}; executor.execute (producerTask); executor.execute (consumerTask); executor.shutdown ();}

The difference here is that we are no longer manually creating or running consumer and producer threads. We set up a thread pool that will receive two tasks, producer and consumer tasks. The tasks of producers and consumers are actually the same as the runnable used in the previous example. Now, the executor (thread pool implementation) will receive the task and schedule its worker thread to execute them.

In our simple case, everything works as before. As in the previous example, we still have two threads that still produce and consume elements in the same way. Although we haven't improved performance, the code looks much cleaner. Instead of manually creating threads, we just specify what we want: we want to perform certain tasks concurrently.

So, when you use a thread pool. You don't need to consider that threads are concurrent execution units, on the contrary, you just think of some tasks as concurrent execution. That's all you need to know, and the rest is handled by the executive. The executor receives some tasks, and then it assigns worker threads to process them.

First, we see a "traditional" solution to the consumer-producer problem. Instead of writing a notification waiting system, we tried to use the blocking queue already provided by Java, because Java provides us with a very efficient thread pool to manage thread lifecycles, allowing us to get rid of manual thread creation. With these improvements, solutions to consumer-producer problems seem more reliable and understandable.

On the evolution of producers and consumers in Java what is shared here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report