In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "how to realize user task queue and estimate queue length by JAVA multithread". In daily operation, it is believed that many people have doubts about how JAVA multithread implements user task queue and estimate queue length. The editor consulted all kinds of materials and sorted out simple and useful operation methods. I hope it will be helpful to answer the question of "how to achieve user task queuing and estimate the queuing length of user tasks by JAVA multithreading"! Next, please follow the editor to study!
Realization process
Initialize a certain number of task processing threads and cache thread pool, and the user calls the interface each time to start one thread processing.
Suppose that five processors are initialized and the code executes BlockingQueue.take, each time the take will reduce the processor queue by one. When the processor queue is empty, the take is the blocking thread. When the user completes the so-and-so task, call the resource release API, put a processor object in the processor queue, the original blocked take, and continue to execute.
A brief introduction to queuing Theory
Queuing theory is a mathematical theory and method to study the phenomenon of random aggregation and dispersion of systems and the working engineering of stochastic systems, also known as stochastic service system theory. it is a branch of operational research. Let's simplify the queuing theory and take a look at the following figure:
Specific implementation of the code
Task queue initialization TaskQueue
Import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;import org.springframework.stereotype.Component; import javax.annotation.PostConstruct;import java.util.Optional;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.atomic.AtomicInteger; / * initialize queue and thread pool * @ author tarzan * * / @ Componentpublic class TaskQueue {/ / processor queue public static BlockingQueue taskProcessors / / waiting for task queue public static BlockingQueue waitTasks; / / processing task queue public static BlockingQueue executeTasks; / / thread pool public static ExecutorService exec; / / initial processors (number of threads available for computer cpu) public static Integer processorNum=Runtime.getRuntime () .availableProcessors () / * initialize processors, wait tasks, process task queues and thread pools * / @ PostConstruct public static void initEquipmentAndUsersQueue () {exec = Executors.newCachedThreadPool (); taskProcessors = new LinkedBlockingQueue (processorNum); / / put idle devices into device queues setFreeDevices (processorNum); waitTasks = new LinkedBlockingQueue (); executeTasks=new LinkedBlockingQueue (processorNum) } / * put idle processors into the processor queue * / private static void setFreeDevices (int num) {/ / get the available device for (int I = 0; I
< num; i++) { TaskProcessor dc=new TaskProcessor(); try { taskProcessors.put(dc); } catch (InterruptedException e) { e.printStackTrace(); } } } public static CompileTask getWaitTask(Long clazzId) { return get(TaskQueue.waitTasks,clazzId); } public static CompileTask getExecuteTask(Long clazzId) { return get(TaskQueue.executeTasks,clazzId); } private static CompileTask get(BlockingQueue users, Long clazzId) { CompileTask compileTask =null; if (CollectionUtils.isNotEmpty(users)){ Optional optional=users.stream().filter(e->E.getClazzId (). LongValue () = = clazzId.longValue (). FindFirst (); if (optional.isPresent ()) {compileTask = optional.get ();}} return compileTask;} public static Integer getSort (Long clazzId) {AtomicInteger index = new AtomicInteger (- 1); BlockingQueue compileTasks = TaskQueue.waitTasks If (CollectionUtils.isNotEmpty (compileTasks)) {compileTasks.stream () .filter (e-> {index.getAndIncrement (); return e.getClazzId (). LongValue () = = clazzId.longValue ();}) .findFirst ();} return index.get () } / / Unit second public static int estimatedTime (Long clazzId) {return estimatedTime (60 clazzId getSort (clazzId) + 1);} / / Unit second public static int estimatedTime (int cellMs,int num) {int a = (num-1) / processorNum; int b = cellMs* (axi1); return b;}
Compile task class CompileTask
Import lombok.Data;import org.springblade.core.tool.utils.SpringUtil;import org.springblade.gis.common.enums.DataScheduleEnum;import org.springblade.gis.dynamicds.service.DynamicDataSourceService;import org.springblade.gis.modules.feature.schedule.service.DataScheduleService; import java.util.Date; @ Datapublic class CompileTask implements Runnable {/ / currently requested thread object private Long clazzId; / / user id private Long userId; / / currently requested thread object private Thread thread / / bind processor private TaskProcessor taskProcessor; / / Task status private Integer status; / / start time private Date startTime; / / end time private Date endTime; private DataScheduleService dataScheduleService= SpringUtil.getBean (DataScheduleService.class); private DynamicDataSourceService dataSourceService= SpringUtil.getBean (DynamicDataSourceService.class); @ Override public void run () {compile () } / * compile * / public void compile () {try {/ / take out a device TaskProcessor taskProcessor = TaskQueue.taskProcessors.take (); / / take out a task CompileTask compileTask = TaskQueue.waitTasks.take (); / / bind task and device compileTask.setTaskProcessor (taskProcessor) / / put in TaskQueue.executeTasks.put (compileTask); System.out.println (DataScheduleEnum.DEAL_WITH.getName () + "+ userId); / / switch user data source dataSourceService.switchDataSource (userId); / / add progress dataScheduleService.addSchedule (clazzId, DataScheduleEnum.DEAL_WITH.getState ()) } catch (InterruptedException e) {System.err.println (e.getMessage ());}
Task processor TaskProcessor
Import lombok.Data; import java.util.Date; @ Datapublic class TaskProcessor {/ * release * / public static Boolean release (CompileTask task) {Boolean flag=false; Thread thread=task.getThread (); synchronized (thread) {try {TaskQueue.taskProcessors.put (task.getTaskProcessor ()) TaskQueue.executeTasks.remove (task); task.setEndTime (new Date ()); long intervalMilli = task.getEndTime (). GetTime ()-task.getStartTime (). GetTime (); flag=true; System.out.println ("user" + task.getClazzId () + "time" + intervalMilli+ "ms") } catch (InterruptedException e) {e.printStackTrace ();} return flag;}
Interface implementation of Controller Controller
Import io.swagger.annotations.Api;import io.swagger.annotations.ApiOperation;import org.springblade.core.tool.api.R;import org.springblade.gis.multithread.TaskProcessor;import org.springblade.gis.multithread.TaskQueue;import org.springblade.gis.multithread.CompileTask;import org.springframework.web.bind.annotation.*; import java.util.Date @ RestController@RequestMapping ("task") @ Api (value = "data compilation task", tags = "data compilation task") public class CompileTaskController {@ ApiOperation (value = "add wait request @ author Tarzan Liu") @ PostMapping ("compile/ {clazzId}") public R compile (@ PathVariable ("clazzId") Long clazzId) {CompileTask checkUser=TaskQueue.getWaitTask (clazzId) If (checkUserration invalid null) {return R.fail ("already queuing!") ;} checkUser=TaskQueue.getExecuteTask (clazzId); if (checkUserpercent null) {return R.fail ("compiling!") ;} / / get the current thread Thread thread=Thread.currentThread (); / / create the current user request object CompileTask compileTask = new CompileTask (); compileTask.setThread (thread); compileTask.setClazzId (clazzId); compileTask.setStartTime (new Date ()) / / put the current user request object into the queue try {TaskQueue.waitTasks.put (compileTask);} catch (InterruptedException e) {e.printStackTrace ();} TaskQueue.exec.execute (compileTask); return R.data (TaskQueue.waitTasks.size ()-1) } @ ApiOperation (value = "query how many tasks are waiting for @ author Tarzan Liu") @ PostMapping ("sort/ {clazzId}") public R sort (@ PathVariable ("clazzId") Long clazzId) {return R.data (TaskQueue.getSort (clazzId)) } @ ApiOperation (value = "query the estimated duration of the current task @ author Tarzan Liu") @ PostMapping ("estimate/time/ {clazzId}") public R estimatedTime (@ PathVariable ("clazzId") Long clazzId) {return R.data (TaskQueue.estimatedTime (clazzId)) } @ ApiOperation (value = "Task release @ author Tarzan Liu") @ PostMapping ("release/ {clazzId}") public R release (@ PathVariable ("clazzId") Long clazzId) {CompileTask task=TaskQueue.getExecuteTask (clazzId); if (task==null) {return R.fail ("Resource release exception");} return R.status (TaskProcessor.release (task)) } @ ApiOperation (value = "execute @ author Tarzan Liu") @ PostMapping ("exec") public R exec () {Long start=System.currentTimeMillis (); for (Long I = 1L; I
< 100; i++) { compile(i); } System.out.println("消耗时间:"+(System.currentTimeMillis()-start)+"ms"); return R.status(true); }}接口测试 根据任务id查询该任务前还有多少个任务待执行 根据任务id查询该任务预估执行完成的剩余时间,单位秒 补充知识BlockingQueue BlockingQueue即阻塞队列,它是基于ReentrantLock,依据它的基本原理,我们可以实现Web中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示: 在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。 阻塞与非阻塞 入队 offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->Non-blocking
Put (E e): if the queue is full, block until the queue is dissatisfied or the thread is interrupted-> blocked
Offer (E, long timeout, TimeUnit unit): insert an element at the end of the queue and, if the queue is full, wait until the following three situations occur:-- > blocking
Be awakened
Waiting time timed out
The current thread is interrupted
Get out of the team
Poll (): if there is no element, return null; directly. If there is an element, get out of the queue.
Take (): if the queue is empty, block until the queue is not empty or the thread is interrupted-- > blocked
Poll (long timeout, TimeUnit unit): if the queue is not empty, dequeue; if the queue is empty and timed out, return null;; if the queue is empty and the time has not timed out, enter waiting until the following three situations occur:
Be awakened
Waiting time timed out
The current thread is interrupted
At this point, the study on "JAVA multithreading how to achieve user task queuing and estimate the queue length" is over. I hope to be able to solve everyone's doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.