线程池通俗的描述就是预先创建若干空闲线程等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务这样就省去了频繁创建线程的时间因为频 繁创建线程是要耗费大量的CPU资源的如果一个应用程序需要频繁地处理大量并发事务不断的创建销毁线程往往会大大地降低系统的效率这时候线程池就派 上用场了
本文旨在使用Java语言编写一个通用的线程池当需要使用线程池处理事务时只需按照指定规范封装好事务处理对象然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可并实现线程池的动态修改(修改当前线程数最大线程数等)下面是实现代码
//ThreadTask java
package polarmanthreadpool;
/** *//**
*线程任务
* @author ryang
*
*/
public interface ThreadTask {
public void run();
}
//PooledThreadjava
package polarmanthreadpool;
import javautilCollection; import javautilVector;
/** *//**
*接受线程池管理的线程
* @author ryang
*
*/
public class PooledThread extends Thread {
protected Vector tasks = new Vector();
protected boolean running = false;
protected boolean stopped = false;
protected boolean paused = false;
protected boolean killed = false;
private ThreadPool pool;
public PooledThread(ThreadPool pool){ thispool = pool;
}
public void putTask(ThreadTask task){ tasksadd(task);
}
public void putTasks(ThreadTask[] tasks){ for(int i=; i<taskslength; i++) thistasksadd(tasks[i]);
}
public void putTasks(Collection tasks){ thistasksaddAll(tasks);
}
protected ThreadTask popTask(){ if(taskssize() > ) return (ThreadTask)tasksremove();
else
return null;
}
public boolean isRunning(){
return running;
}
public void stopTasks(){
stopped = true;
}
public void stopTasksSync(){
stopTasks();
while(isRunning()){ try {
sleep();
} catch (InterruptedException e) {
}
}
}
public void pauseTasks(){
paused = true;
}
public void pauseTasksSync(){
pauseTasks();
while(isRunning()){ try {
sleep();
} catch (InterruptedException e) {
}
}
}
public void kill(){ if(!running)
interrupt();
else
killed = true;
}
public void killSync(){
kill();
while(isAlive()){ try {
sleep();
} catch (InterruptedException e) {
}
}
}
public synchronized void startTasks(){
running = true;
thisnotify();
}
public synchronized void run(){ try{ while(true){ if(!running || taskssize() == ){ poolnotifyForIdleThread(); //Systemoutprintln(ThreadcurrentThread()getId() + : 空闲); thiswait(); }else{
ThreadTask task;
while((task = popTask()) != null){ taskrun(); if(stopped){
stopped = false;
if(taskssize() > ){ tasksclear(); Systemoutprintln(ThreadcurrentThread()getId() + : Tasks are stopped);
break;
}
}
if(paused){
paused = false;
if(taskssize() > ){ Systemoutprintln(ThreadcurrentThread()getId() + : Tasks are paused);
break;
}
}
}
running = false;
}
if(killed){
killed = false;
break;
}
}
}catch(InterruptedException e){
return;
}
//Systemoutprintln(ThreadcurrentThread()getId() + : Killed);
}
}
//ThreadPooljava
package polarmanthreadpool;
import javautilCollection; import javautilIterator; import javautilVector;
/** *//**
*线程池
* @author ryang
*
*/
public class ThreadPool {
protected int maxPoolSize;
protected int initPoolSize;
protected Vector threads = new Vector();
protected boolean initialized = false;
protected boolean hasIdleThread = false;
public ThreadPool(int maxPoolSize int initPoolSize){ thismaxPoolSize = maxPoolSize; thisinitPoolSize = initPoolSize;
}
public void init(){
initialized = true;
for(int i=; i<initPoolSize; i++){
PooledThread thread = new PooledThread(this);
threadstart(); threadsadd(thread);
}
//Systemoutprintln(线程池初始化结束线程数= + threadssize() + 最大线程数= + maxPoolSize);
}
public void setMaxPoolSize(int maxPoolSize){ //Systemoutprintln(重设最大线程数最大线程数= + maxPoolSize); thismaxPoolSize = maxPoolSize;
if(maxPoolSize < getPoolSize())
setPoolSize(maxPoolSize);
}
/** *//**
*重设当前线程数
* 若需杀掉某线程线程不会立刻杀掉而会等到线程中的事务处理完成* 但此方法会立刻从线程池中移除该线程不会等待事务处理结束
* @param size
*/
public void setPoolSize(int size){ if(!initialized){
initPoolSize = size;
return;
}else if(size > getPoolSize()){ for(int i=getPoolSize(); i<size && i<maxPoolSize; i++){
PooledThread thread = new PooledThread(this);
threadstart(); threadsadd(thread);
}
}else if(size < getPoolSize()){ while(getPoolSize() > size){ PooledThread th = (PooledThread)threadsremove(); thkill();
}
}
//Systemoutprintln(重设线程数线程数= + threadssize());
}
public int getPoolSize(){ return threadssize();
}
protected void notifyForIdleThread(){
hasIdleThread = true;
}
protected boolean waitForIdleThread(){
hasIdleThread = false;
while(!hasIdleThread && getPoolSize() >= maxPoolSize){ try { Threadsleep(); } catch (InterruptedException e) {
return false;
}
}
return true;
}
public synchronized PooledThread getIdleThread(){ while(true){ for(Iterator itr=erator(); itrhasNext();){ PooledThread th = (PooledThread)itrnext(); if(!thisRunning())
return th;
}
if(getPoolSize() < maxPoolSize){
PooledThread thread = new PooledThread(this);
threadstart(); threadsadd(thread);
return thread;
}
//Systemoutprintln(线程池已满等待);
if(waitForIdleThread() == false)
return null;
}
}
public void processTask(ThreadTask task){
PooledThread th = getIdleThread();
if(th != null){ thputTask(task); thstartTasks();
}
}
public void processTasksInSingleThread(ThreadTask[] tasks){
PooledThread th = getIdleThread();
if(th != null){ thputTasks(tasks); thstartTasks();
}
}
public void processTasksInSingleThread(Collection tasks){
PooledThread th = getIdleThread();
if(th != null){ thputTasks(tasks); thstartTasks();
}
}
}
下面是线程池的测试程序
//ThreadPoolTestjava
import javaioBufferedReader; import javaioIOException; import javaioInputStreamReader;
import polarmanthreadpoolThreadPool; import polarmanthreadpoolThreadTask;
public class ThreadPoolTest {
public static void main(String[] args) { Systemoutprintln(quit 退出); Systemoutprintln(task A 启动任务A时长为秒); Systemoutprintln(size 设置当前线程池大小为); Systemoutprintln(max 设置线程池最大线程数为); Systemoutprintln();
final ThreadPool pool = new ThreadPool( ); poolinit();
Thread cmdThread = new Thread(){ public void run(){
BufferedReader reader = new BufferedReader(new InputStreamReader(Systemin));
while(true){ try { String line = readerreadLine(); String words[] = linesplit( ); if(words[]equalsIgnoreCase(quit)){ Systemexit(); }else if(words[]equalsIgnoreCase(size) && wordslength >= ){ try{ int size = IntegerparseInt(words[]); poolsetPoolSize(size); }catch(Exception e){
}
}else if(words[]equalsIgnoreCase(max) && wordslength >= ){ try{ int max = IntegerparseInt(words[]); poolsetMaxPoolSize(max); }catch(Exception e){
}
}else if(words[]equalsIgnoreCase(task) && wordslength >= ){ try{ int timelen = IntegerparseInt(words[]); SimpleTask task = new SimpleTask(words[] timelen * ); poolprocessTask(task); }catch(Exception e){
}
}
} catch (IOException e) { eprintStackTrace();
}
}
}
};
cmdThreadstart();
/**//*
for(int i=; i<; i++){
SimpleTask task = new SimpleTask(Task + i (i+)*); poolprocessTask(task);
}*/
}
}
class SimpleTask implements ThreadTask{
private String taskName;
private int timeLen;
public SimpleTask(String taskName int timeLen){ thistaskName = taskName; thistimeLen = timeLen;
}
public void run() { Systemoutprintln(ThreadcurrentThread()getId() +
: START TASK + taskName + );
try { Threadsleep(timeLen); } catch (InterruptedException e) {
}
Systemoutprintln(ThreadcurrentThread()getId() +
: END TASK + taskName + );
}
}
使用此线程池相当简单下面两行代码初始化线程池
ThreadPool pool = new ThreadPool( ); poolinit();
要处理的任务实现ThreadTask接口即可(如测试代码里的SimpleTask)这个接口只有一个方法run()
两行代码即可调用
ThreadTask task = //实例化你的任务对象poolprocessTask(task);