Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use threads to execute the framework

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article shares with you about how to use threading to execute the framework. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.

Scene

A thread receives a message (data) from somewhere, which can be another host or message queue, and then transfers to another thread pool to perform the logic of specific processing of the message, and the processing speed of the message is less than the speed of receiving the message. This kind of situation is very common, just imagine, how would you design and implement it?

Intuitive thinking

It is obvious that with JUC's threading framework, code can be written quickly.

Message recipient:

Public class Receiver {private static volatile boolean inited = false; private static volatile boolean shutdown = false; private static volatile int cnt = 0; private MessageHandler messageHandler; public void start () {Executors.newSingleThreadExecutor () .execute (new Runnable () {@ Override public void run () {while (! shutdown) {init (); recv () });} / * * Analog message reception * / public void recv () {Message msg = new Message ("Msg" + System.currentTimeMillis ()); System.out.println (String.format ("received message (% d):% s", + + cnt, msg)); messageHandler.handle (msg) } public void init () {if (! inited) {messageHandler = new MessageHandler (); inited = true;}} public static void main (String [] args) {new Receiver (). Start ();}}

Message processing:

Public class MessageHandler {private static final int THREAD_POOL_SIZE = 4; private ExecutorService service = Executors.newFixedThreadPool (THREAD_POOL_SIZE); public void handle (Message msg) {try {service.execute (new Runnable () {@ Override public void run () {parseMsg (msg);}})) } catch (Throwable e) {System.out.println ("message handling exception" + e);}} / * more time-consuming message processing flow * / public void parseMsg (Message message) {while (true) {try {System.out.println ("parsing message:" + message); Thread.sleep (5000); System.out.println ("=") } catch (InterruptedException e) {e.printStackTrace ();}

Effect: the phenomenon caused by this scheme is that the received messages will accumulate rapidly. We have extracted a large number of messages from the message queue (or other places), but the speed of the processing thread can not keep up, so the problem is that a large number of Task will accumulate in a blocking queue maintained at the bottom of the thread pool, which will consume a lot of storage space and affect the performance of the system.

Analysis: when execute () a task, if there are idle worker threads, then run, otherwise depending on the number of threads set, create a new thread and take on the new task if it does not reach the limit on the number of threads, otherwise the task will be buffered into a blocking queue, the problem is this queue, the default size is unlimited, so a large number of tasks will be piled up, which will inevitably consume heap space.

Public static ExecutorService newFixedThreadPool (int nThreads) {return new ThreadPoolExecutor (nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue ());} public LinkedBlockingQueue () {this (Integer.MAX_VALUE); / / capacity}

Count limit

In the face of the above problems, the thought of limiting the speed of receiving messages naturally reminds me of various thread synchronization primitives, but the simplest thing here is to use a Volatile counter.

Message recipient:

Public class Receiver {private static volatile boolean inited = false; private static volatile boolean shutdown = false; private static volatile int cnt = 0; private MessageHandler messageHandler; public void start () {Executors.newSingleThreadExecutor () .execute (new Runnable () {@ Override public void run () {while (! shutdown) {init (); recv () });} / * * Analog message reception * / public void recv () {Message msg = new Message ("Msg" + System.currentTimeMillis ()); System.out.println (String.format ("received message (% d):% s", + + cnt, msg)); messageHandler.handle (msg) } public void init () {if (! inited) {messageHandler = new MessageHandler (); inited = true;}} public static void main (String [] args) {new Receiver (). Start ();}}

Message processing:

Public class MessageHandler {private static final int THREAD_POOL_SIZE = 1; private ExecutorService service = Executors.newFixedThreadPool (THREAD_POOL_SIZE); public void handle (Message msg) {try {service.execute (new Runnable () {@ Override public void run () {parseMsg (msg);}})) } catch (Throwable e) {System.out.println ("message handling exception" + e);}} / * more time-consuming message processing flow * / public void parseMsg (Message message) {try {Thread.sleep (10000); System.out.println ("parsing message:" + message);} catch (InterruptedException e) {e.printStackTrace ();} finally {Receiver.limit-- }

Effect: blocking the receiving process of messages by controlling the number of messages will not lead to the accumulation of tasks, the memory consumption of the system will be relatively flat, and limiting the number of messages is essentially the same as limiting the size of the task queue below.

Use synchronization queue SynchronousQueue

Although SynchronousQueue is called queue, it does not buffer the object of the task, but only serves as the control point passed by the object. If there is an idle thread or does not reach the * thread limit, it will be delivered to the worker thread for execution, otherwise it will be rejected. We need to implement the corresponding rejection policy RejectedExecutionHandler by ourselves. The default is to throw an exception RejectedExecutionException.

The recipient of the message is ditto.

Message processing:

Public class MessageHandler {private static final int THREAD_POOL_SIZE = 4; ThreadPoolExecutor service = new ThreadPoolExecutor (THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue (), new RejectedExecutionHandler () {@ Override public void rejectedExecution (Runnable r, ThreadPoolExecutor executor) {System.out.println (Custom rejection Policy); try {executor.getQueue (). Put (r) System.out.println ("requeue the task");} catch (InterruptedException e) {e.printStackTrace ();}); public void handle (Message msg) {try {System.out.println (service.getTaskCount ()); System.out.println (service.getQueue (). Size ()); System.out.println (service.getCompletedTaskCount ()); service.execute (new Runnable () {@ Override public void run () {parseMsg (msg);}})) } catch (Throwable e) {System.out.println ("message handling exception" + e);}} / * more time-consuming message processing flow * / public void parseMsg (Message message) {while (true) {try {System.out.println ("Thread name:" + Thread.currentThread (). GetName ()); System.out.println ("parsing message:" + message); Thread.sleep (1000) } catch (InterruptedException e) {e.printStackTrace ();}

Effect: we can control the speed of receiving messages, but we need to implement some blocking operation in rejectedExecution, but if we choose to put the task back in the queue when rejection occurs, the problem is that hunger will occur in this Task.

Blocking queues using size limits

Using LinkedBlockingQueue as the task buffer at the bottom of the thread framework, and setting a size limit, the idea is the same as the above scheme, there is a blocking point, but through the jvm monitor of * *, we can see that there is less CPU consumption, lower memory usage, and less fluctuation (the specific reasons to be explored).

The recipient of the message is ditto.

Message processing:

Public class MessageHandler {private static final int THREAD_POOL_SIZE = 4; private static final int BLOCK_QUEUE_CAP = 500; ThreadPoolExecutor service = new ThreadPoolExecutor (THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue (BLOCK_QUEUE_CAP), new SimpleThreadFactory (), new RejectedExecutionHandler () {@ Override public void rejectedExecution (Runnable r, ThreadPoolExecutor executor) {System.out.println ("Custom rejection Policy") Try {executor.getQueue () .put (r); System.out.println ("requeue the task");} catch (InterruptedException e) {e.printStackTrace ();}); public void handle (Message msg) {try {service.execute (new Runnable () {@ Override public void run () {parseMsg (msg);}});} catch (Throwable e) {System.out.println ("message handling exception" + e) }} / * more time-consuming message processing flow * / public void parseMsg (Message message) {try {Thread.sleep (5000); System.out.println ("Thread name:" + Thread.currentThread (). GetName ()); System.out.println ("parse message:" + message);} catch (InterruptedException e) {e.printStackTrace ();}} static class SimpleThreadFactory implements ThreadFactory {@ Override public Thread newThread (Runnable r) {Thread thread = newThread (r) Thread.setName ("Thread-" + System.currentTimeMillis (); return thread;}

Thank you for reading! This is the end of this article on "how to use the threaded execution framework". I hope the above content can be of some help to you, so that you can learn more knowledge. If you think the article is good, you can share it out for more people to see!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report