Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Introduction of java concurrent package and creation and use of thread pool

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article mainly introduces "the introduction of java concurrent package and the creation and use of thread pool". In daily operation, I believe many people have doubts about the introduction of java concurrent package and the creation and use of thread pool. Xiaobian consulted all kinds of materials and sorted out simple and useful methods of operation. I hope it will be helpful to answer the doubts of "the introduction of java concurrent package and the creation and use of thread pool". Next, please follow the editor to study!

Introduction to 1.java concurrent package

Later versions of JDK5.0 (renamed JDK1.5) introduce advanced concurrency features, most of which are in the java.util.concurrent package and are dedicated to multithreaded programming, making full use of the functions of modern multiprocessors and multi-core systems to write large-scale concurrent applications. It mainly includes atomic weight, concurrent set, synchronizer, reentrant lock, and provides strong support for the construction of thread pool.

two。 Thread pool

Java.util.concurrent.Executors provides an implementation of the java.util.concurrent.Executor interface for creating thread pools

Multithreading technology mainly solves the problem of multi-thread execution in the processor unit. it can significantly reduce the idle time of the processor unit and increase the throughput capacity of the processor unit.

Suppose the time it takes for the server to complete a task is: T1 creation thread time, T2 task execution time in the thread, and T3 thread destruction time. If T1 + T3 is much larger than T2, you can use thread pools to improve server performance and reduce the time it takes to create and destroy threads.

A thread pool consists of the following four basic parts:

Thread pool manager (ThreadPool): used to create and manage thread pools, including creating thread pools, destroying thread pools, and adding new tasks

Worker thread (PoolWorker): a thread in a thread pool that waits when there is no task and can execute the task in a loop.

Task interface (Task): the interface that each task must implement for the worker thread to schedule the execution of the task. It mainly defines the entry of the task, the closing work after the task execution, the execution status of the task, etc.

Task queue (taskQueue): used to store unprocessed tasks. Provides a buffering mechanism.

Thread pool technology is concerned about how to shorten or adjust the T1Mague T3 time in order to improve the performance of server programs. It arranges T1Magine T3 in the time period when the server program starts and ends or some idle time period, so that when the server program processes client requests, it eliminates the overhead of thread creation and destruction.

The thread pool not only adjusts the time period generated by T1Magine T3, but it also significantly reduces the number of threads created. Take a look at an example:

Suppose a server processes 100000 requests a day, and each request needs to be completed by a separate thread. In a thread pool, the number of threads is generally fixed

The general thread pool size is much less than 100000. So server programs that take advantage of thread pools will not waste time processing requests in order to create 100000, thus improving efficiency.

Five ways to create a thread pool

SingleThreadExecutor: a thread pool with only one thread, so the tasks submitted are executed sequentially, Executors.newSingleThreadExecutor ()

CachedThreadPool: there are many threads in the thread pool that need to be executed at the same time, and the old available thread will be triggered by a new task to re-execute. If the thread does not execute within more than 60 seconds, it will be terminated and the Executors.newCachedThreadPool () will be removed from the pool.

FixedThreadPool: a thread pool with a fixed number of threads. If there is no task execution, the thread will wait all the time, Executors.newFixedThreadPool (10). The parameter 10 in the constructor is the size of the thread pool, which you can set at will or be consistent with the number of cpu. Get the number of cpu int cpuNums = Runtime.getRuntime (). AvailableProcessors ()

ScheduledThreadPool: thread pool Executors.newScheduledThreadPool () used to schedule tasks that are about to be executed

Sing ThreadScheduled Pool: there is only one thread to schedule a task to execute Executors.newSingleThreadScheduledExecutor () at a specified time.

3. Use of thread pool

The following uses Fixed Thread Pool as an example to provide a reference for use

LogNumVo

Package com.ithzk.threadpool;/** * used as * @ author hzk * @ date 2018-3-29 * / public class LogNumVo {private static final long serialVersionUID =-5541722936350755569L; private Integer dataNum; private Integer successNum; private Integer waitNum; public Integer getDataNum () {return dataNum;} public void setDataNum (Integer dataNum) {this.dataNum = dataNum;} public Integer getSuccessNum () {return successNum } public void setSuccessNum (Integer successNum) {this.successNum = successNum;} public Integer getWaitNum () {return waitNum;} public void setWaitNum (Integer waitNum) {this.waitNum = waitNum;}}

DealObject

Package com.ithzk.threadpool;/** * @ author hzk * @ date 2018-3-29 * / public class DealObject {private Integer identifyId; private String data; public DealObject (Integer identifyId, String data) {this.identifyId = identifyId; this.data = data;} public DealObject () {} public Integer getIdentifyId () {return identifyId;} public void setIdentifyId (Integer identifyId) {this.identifyId = identifyId;} public String getData () {return data } public void setData (String data) {this.data = data;}}

AbstractCalculateThread

Package com.ithzk.threadpool;import java.util.Collection;import java.util.concurrent.Callable;import java.util.concurrent.CountDownLatch;/** * @ author hzk * @ date 2018-3-29 * / public class AbstractCalculateThread implements Callable {protected Collection insertList; protected CountDownLatch countd; protected String threadCode; protected String batchNumber; public Collection getInsertList () {return insertList;} public void setInsertList (Collection insertList) {this.insertList = insertList;} public CountDownLatch getCountd () {return countd } public void setCountd (CountDownLatch countd) {this.countd = countd;} public String getThreadCode () {return threadCode;} public void setThreadCode (String threadCode) {this.threadCode = threadCode;} public String getBatchNumber () {return batchNumber;} public void setBatchNumber (String batchNumber) {this.batchNumber = batchNumber;} public AbstractCalculateThread () {super ();} public AbstractCalculateThread (Collection insertList, CountDownLatch countd, String threadCode,String batchNumber) {super () This.insertList = insertList; this.countd = countd; this.threadCode = threadCode; this.batchNumber = batchNumber;} public String call () throws Exception {return null;}}

CalculateDealThread

Package com.ithzk.threadpool;import java.util.Collection;import java.util.concurrent.CountDownLatch;/** * @ author hzk * @ date 2018-3-29 * / public class CalculateDealThread extends AbstractCalculateThread {private ExecutorPool executorPool = SpringContextUtil.getBean (ExecutorPool.class); @ Override public String call () throws Exception {try {System.out.println ("= start running thread [" + threadCode+ "]"); return executorPool.syncBatchDealObject (insertList,batchNumber) } catch (Exception e) {e.printStackTrace (); System.out.println ("= start running threads [" + threadCode+ "]:" + e.getMessage ();} finally {countd.countDown ();} return null;} public CalculateDealThread () {super ();} public CalculateDealThread (Collection insertList, CountDownLatch countd, String threadCode,String batchNumber) {super (insertList, countd, threadCode, batchNumber);}}

ExecutorPool

Package com.ithzk.threadpool;import java.util.*;import java.util.concurrent.*;/** * @ author hzk * @ date 2018-3-29 * / public class ExecutorPool {/ * simulate the size of data to be processed * / private static final int ARRAY_COUNT = 50000; / * conditions for enabling multithreading * / private static final int MULTI_THREAD_STARTCOUNT = 10000 / * size of batch processing * / private static final int BATCH_DEAL_SIZE = 100; / * number of threads opened each time * / public static final int THREAD_POOL_NUM=10; public static void main (String [] args) {testExecutorPool ();} public static void testExecutorPool () {ArrayList dealObjects = new ArrayList (); for (int I = 0) I MULTI_THREAD_STARTCOUNT) {try {System.out.println ("= dataNum > 1000 | Multiple Thread Run==="); / / int batchInsertSize = BATCH_DEAL_SIZE; / / define saved thread pool ExecutorService executorInsert = Executors.newFixedThreadPool (THREAD_POOL_NUM) / / define the thread execution return parameter List futureListIsert = new ArrayList () returned during saving; / / Thread modification list List listDealObjects = new ArrayList (); List listLiveSyncLogInsert = pointDateClassify (dealObjects, batchInsertSize, listDealObjects); if (null! = listLiveSyncLogInsert & &! listDealObjects.isEmpty ()) {System.out.println ("= size after cutting:" + listLiveSyncLogInsert.size () + "= =") / / use CountDownLatch to ensure that the main program CountDownLatch countd = new CountDownLatch (listLiveSyncLogInsert.size ()); for (int j = 0; j) is executed after all the subroutines have been executed

< listLiveSyncLogInsert.size(); j++) { Map insert = listLiveSyncLogInsert.get(j); Future future = executorInsert.submit(new CalculateDealThread(insert.values(), countd,"executor_pool_test_thread", null)); futureListIsert.add(future); } } // 等待线程执行完成 executorInsert.shutdown(); for (Future future : futureListIsert) { String json = future.get(); if (null != json && !"".equals(json)) { 将返回的json格式数据转换为实体类 进行业务记录 LogNumVo logNumVo = JSON.toJavaObject(JSON.parseObject(json),LogNumVo.class); successNum += logNumVo.getSuccessNum(); waitNum += logNumVo.getWaitNum(); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } /** * 拆分线程数 * 假设集合中有50000个元素 则按照100个一组切分 可切分为500组 * 即每个线程一次处理一组(100个元素) * * @author * @param lPostUploadIntegralList * @param batchInsertSize * @param listPostUploadIsert */ @SuppressWarnings("all") public static List pointDateClassify(List lPostUploadIntegralList,int batchInsertSize, List listJSONObjectUpdate) { List listLiveSyncLogInsert = new Vector(); // 新增数据list List integralListInsert = lPostUploadIntegralList; System.out.println("============integralListInsert.size()=====:" + integralListInsert.size()); // 拆分数据(拆成多个List) int inserti = 0; if (integralListInsert != null && integralListInsert.size() >

0) {ConcurrentHashMap integralListIns = null; for (int l = 0; l

< integralListInsert.size(); l++) { if (integralListIns == null) { integralListIns = new ConcurrentHashMap(); } integralListIns.put(integralListInsert.get(l).getIdentifyId(), integralListInsert.get(l)); inserti++; if ((inserti % batchInsertSize) == 0) { listLiveSyncLogInsert.add(integralListIns); integralListIns = null; } else { // 最后100条或不足100条数据 if ((l + 1) == integralListInsert.size()) { listLiveSyncLogInsert.add(integralListIns); } } } } System.out.println("=============listPostUploadInsert.size()====:" + listLiveSyncLogInsert.size()); return listLiveSyncLogInsert; } /** * 多线程保存数据至数据库 */ public String syncBatchDealObject(Collection insertList,String batchNumber) { int successNum = 0, waitNum = 0; Date currentDate = new Date(System.currentTimeMillis()); for (DealObject dealObject : insertList) { try { int icount = syncDealObject(dealObject,currentDate); if(icount >

0) {successNum + +;} else {waitNum + +;}} catch (Exception e) {e.printStackTrace (); + + waitNum;}} LogNumVo logNum = new LogNumVo (); logNum.setDataNum (0); logNum.setSuccessNum (successNum); logNum.setWaitNum (waitNum) / / feedback the record entity class to json format to thread pool return JSON.toJSONString (logNum);} / * handle data business * @ param dealObject * @ param currentDate * @ return * / private int syncDealObject (DealObject dealObject,Date currentDate) {int successNum = 0; / / Business processing logic if (null! = dealObject.getData ()) {successNum++;} return successNum;}}

4.BlockingQueue

BlockingQueue is also a tool mainly used to control thread synchronization under java.util.concurrent. The main methods are: a pair of blocking access of put and take, and a pair of non-blocking access of add and poll.

Insert:

Add (anObject)

Add anObject to BlockingQueue and return true if BlockingQueue can hold it. Otherwise, an exception is thrown.

Offer (anObject)

Add anObject to BlockingQueue, and return true if BlockingQueue can accommodate it, otherwise return false.

Put (anObject)

Add anObject to BlockingQueue, and if the BlockQueue has no space, the thread calling this method is blocked until there is space in the BlockingQueue.

Read:

Poll (time)

Take the object that ranks first in the BlockingQueue. If it cannot be removed immediately, you can wait for the time specified in the time parameter, and return null if it is not available.

Take ()

Take the first object in the BlockingQueue. If BlockingQueue is empty, block entering the waiting state until a new object is added to the Blocking.

Other:

Int remainingCapacity ()

Returns the number of additional elements that are acceptable to this queue and will not be blocked ideally (without memory and resource constraints).

This number is always equal to the initial capacity of this queue, less than the current size of the queue (returns the remaining capacity of the queue).

Note that you cannot always determine whether an attempt to insert an element is successful by checking remainingcapacity, because it is possible that another thread will insert or remove an element

Su.

Boolean remove (Object o)

Remove elements from the queue, if present, that is, remove one or more, the queue changes to return true

Public boolean contains (Object o)

Check whether this element exists in the queue. It returns true if it exists.

Int drainTo (Collection

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report