手写线程池
线程池执行原理
线程池类—MyThreadPool1
线程类主要参数有:
- 线程数量
- 任务数量
其中,从任务队列ArrayBlockingQueue
取任务时taskQueue.take();
会一直阻塞,直到队列中有可用元素。
destroy()
通过循环判断阻塞队列是否为空来决定是否关闭线程池。若线程池为空,还要 Thread.sleep(2000);
以便线程执行完所有任务(事实上,关闭线程池应该由正在活动的线程数量来决定,通过任务队列来判断是不准确的,因为无法确定当任务队列为空时,正在活动的线程何时才能结束执行)。
/**
* @author Allen
* @date 2021/6/25 22:36
*/
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class MyThreadPool1 {
private static final int WORKER_NUM = 5;
private static final int TASK_NUM = 20;
private int workerNum;
private int taskNum;
private final Set<WorkerThread> workerThreads;
private final BlockingQueue<Runnable> taskQueue;
public MyThreadPool1(){
this(WORKER_NUM,TASK_NUM);
}
public MyThreadPool1(int workerNum, int taskNum) {
if(workerNum<=0)workerNum = WORKER_NUM;
if(taskNum<=0)taskNum = TASK_NUM;
this.workerNum = workerNum;
this.taskNum = taskNum;
taskQueue = new ArrayBlockingQueue<>(taskNum);
workerThreads = new HashSet<>();
//启动一定数量的线程数,从队列中获取任务处理
for(int i=0;i<workerNum;i++){
WorkerThread workerThread = new WorkerThread("thread"+i);
workerThread.start();
workerThreads.add(workerThread);
}
}
public void execute(Runnable task) throws InterruptedException {
taskQueue.put(task);
}
public void destroy() throws InterruptedException {
System.out.println("ready close thread pool");
if(workerThreads==null||workerThreads.isEmpty())return;
while(!taskQueue.isEmpty()){
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
}
Thread.sleep(2000);
for(WorkerThread thread:workerThreads){
thread.stopWork();
thread = null; //help GC
}
workerThreads.clear();
}
private class WorkerThread extends Thread {
private String name;
public WorkerThread(String name){
super(); //invoke Thread.Thread() to initialize thread
setName(name);
}
public void run(){
while(!isInterrupted()){
try{
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*/
Runnable task = taskQueue.take();
System.out.println(getName()+" ready execute: "+task.toString());
task.run(); //执行任务
task = null; //help gc
}catch (Exception e){
interrupt();
e.printStackTrace();
}
}
}
public void stopWork(){
this.interrupt();
}
}
}
测试类—Test
/**
* @author Allen
* @date 2021/6/25 22:36
*/
public class Test {
public static void main(String[] args) throws InterruptedException {
int taskNum = 30;
int workerNum = 5;
MyThreadPool1 threadPool1 = new MyThreadPool1(workerNum,taskNum);
for(int i=0;i<taskNum;i++){
threadPool1.execute(new MyTask("task"+i));
}
threadPool1.destroy();
}
public static class MyTask implements Runnable{
private String name;
public String getName(){
return name;
}
public void setName(String name){
this.name = name;
}
public MyTask(String name){
setName(name);
}
@Override
public void run() {
try{
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task :"+name+" end...");
}
public String toString(){
return name;
}
}
}
测试输出
从测试图可看出,线程在执行完所有任务之后,能正常退出 process finished with exit code 0
,这是因为所有任务均设置成 Thread.sleep(1000);
,因此本例中 destroy()
的Thread.sleep(2000);
足以使最后一个线程执行完。