手写线程池


手写线程池

线程池执行原理

线程池类—MyThreadPool1

线程类主要参数有:

  • 线程数量
  • 任务数量

其中,从任务队列ArrayBlockingQueue取任务时taskQueue.take();会一直阻塞,直到队列中有可用元素。

destroy()通过循环判断阻塞队列是否为空来决定是否关闭线程池。若线程池为空,还要 Thread.sleep(2000);以便线程执行完所有任务(事实上,关闭线程池应该由正在活动的线程数量来决定,通过任务队列来判断是不准确的,因为无法确定当任务队列为空时,正在活动的线程何时才能结束执行)。

/**
 * @author Allen
 * @date 2021/6/25 22:36
 */
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class MyThreadPool1 {

    private static final int WORKER_NUM = 5;

    private static final int TASK_NUM = 20;

    private int workerNum;

    private int taskNum;

    private final Set<WorkerThread> workerThreads;

    private final BlockingQueue<Runnable> taskQueue;

    public MyThreadPool1(){
        this(WORKER_NUM,TASK_NUM);
    }

    public MyThreadPool1(int workerNum, int taskNum) {
        if(workerNum<=0)workerNum = WORKER_NUM;
        if(taskNum<=0)taskNum = TASK_NUM;
        this.workerNum = workerNum;
        this.taskNum = taskNum;
        taskQueue = new ArrayBlockingQueue<>(taskNum);
        workerThreads = new HashSet<>();

        //启动一定数量的线程数,从队列中获取任务处理
        for(int i=0;i<workerNum;i++){
            WorkerThread workerThread = new WorkerThread("thread"+i);
            workerThread.start();
            workerThreads.add(workerThread);
        }
    }

    public void execute(Runnable task) throws InterruptedException {
        taskQueue.put(task);
    }

    public void destroy() throws InterruptedException {
        System.out.println("ready close thread pool");
        if(workerThreads==null||workerThreads.isEmpty())return;
        while(!taskQueue.isEmpty()){
            try{
                Thread.sleep(1000);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        Thread.sleep(2000);
        for(WorkerThread thread:workerThreads){
            thread.stopWork();
            thread = null;  //help GC
        }
        workerThreads.clear();
    }


    private class WorkerThread extends Thread {

        private String name;

        public WorkerThread(String name){
            super();    //invoke Thread.Thread() to initialize thread
            setName(name);
        }

        public void run(){
            while(!isInterrupted()){
                try{
                    /**
                     * Retrieves and removes the head of this queue, waiting if necessary
                     * until an element becomes available.
                     */
                    Runnable task = taskQueue.take();
                    System.out.println(getName()+" ready execute: "+task.toString());
                    task.run(); //执行任务
                    task = null;    //help gc
                }catch (Exception e){
                    interrupt();
                    e.printStackTrace();
                }
            }
        }

        public void stopWork(){
            this.interrupt();
        }
    }
}

测试类—Test

/**
 * @author Allen
 * @date 2021/6/25 22:36
 */
public class Test {

    public static void main(String[] args) throws InterruptedException {
        int taskNum = 30;
        int workerNum = 5;
        MyThreadPool1 threadPool1 = new MyThreadPool1(workerNum,taskNum);
        for(int i=0;i<taskNum;i++){
            threadPool1.execute(new MyTask("task"+i));
        }
        threadPool1.destroy();
    }

    public static class MyTask implements Runnable{

        private String name;

        public String getName(){
            return name;
        }

        public void setName(String name){
            this.name = name;
        }

        public MyTask(String name){
            setName(name);
        }

        @Override
        public void run() {
            try{
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task :"+name+" end...");
        }

        public String toString(){
            return name;
        }
    }
}

测试输出

从测试图可看出,线程在执行完所有任务之后,能正常退出 process finished with exit code 0,这是因为所有任务均设置成 Thread.sleep(1000);,因此本例中 destroy()Thread.sleep(2000);足以使最后一个线程执行完。

附:手写简单的线程池


文章作者: Hailong Gao
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Hailong Gao !
评论
  目录