在应用中
我们常常需要Thread缓沖池来做一些事以提高程序的效率和并发性
本文演示了如何利用Queue这种数据结构实现一个简单的Thread缓沖池
一个Thread缓沖池可以设计成以下这样缓沖池由几个工作Thread和一个Queue组成Client负责把任务放到Queue里面(put方法)而工作Thread就依次取出这些任务并执行它们(get方法)
Queue的一个经典实现是使用一个循环数组(这个实现在很多数据结构的书上都有介绍)如一个大小为size的数组这个循环数组可以被想象成首尾相连的一个环oldest指向Queue中最老的数据所在的位置next指向下一个可以放新数据的位置
放入一个新数据到next的位置后需要更新nextnext = (next + ) % size;
从oldest位置取出一个数据后需要更新oldestoldest = (oldest + ) % size;
当oldest == next的时候Queue为空
当(next + ) % size == oldest的时候Queue为满
(注意为了区分Queue为空和为满的情况实际上Queue里面最多能放size个数据)
因为这个Queue会同时被多个线程访问需要考虑在这种情况下Queue如何工作首先Queue需要是线程安全的可以用Java里的synchronized关键字来确保同时只有一个Thread在访问Queue
我们还可以注意到当Queue为空的时候get操作是无法进行的;当Queue为满的时候put操作又是无法进行的在多线程访问遇到这种情况时一般希望执行操作的线程可以等待(block)直到该操作可以进行下去比如但一个 Thread在一个空Queue上执行get方法的时候这个 Thread应当等待(block)直到另外的Thread执行该Queue的put方法后再继续执行下去在Java里面Object对象的 wait ()notify()方法提供了这样的功能
把上面的内容结合起来就是一个SyncQueue的类
public class SyncQueue {
public SyncQueue(int size) {
_array = new Object[size];
_size = size;
_oldest = ;
_next = ;
}
public synchronized void put(Object o) {
while (full()) {
try {
wait();
} catch (InterruptedException ex) {
throw new ExceptionAdapter(ex);
}
}
_array[_next] = o;
_next = (_next + ) % _size;
notify();
}
public synchronized Object get() {
while (empty()) {
try {
wait();
} catch (InterruptedException ex) {
throw new ExceptionAdapter(ex);
}
}
Object ret = _array[_oldest];
_oldest = (_oldest + ) % _size;
notify();
return ret;
}
protected boolean empty() {
return _next == _oldest;
}
protected boolean full() {
return (_next + ) % _size == _oldest;
}
protected Object [] _array;
protected int _next;
protected int _oldest;
protected int _size;
}
可以注意一下get和put方法中while的使用如果换成if是会有问题的这是个很容易犯的错误;)
在以上代码中使用了ExceptionAdapter这个类它的作用是把一个checked Exception包装成RuntimeException详细的说明可以参考我的避免在Java中使用Checked Exception一文
接下来我们需要一个对象来表现Thread缓沖池所要执行的任务可以发现JDK中的Runnable interface非常合适这个角色
最后剩下工作线程的实现就很简单了从SyncQueue里取出一个Runnable对象并执行它
public class Worker implements Runnable {
public Worker(SyncQueue queue) {
_queue = queue;
}
public void run() {
while (true) {
Runnable task = (Runnable) _queueget();
taskrun();
}
}
protected SyncQueue _queue = null;
}
下面是一个使用这个Thread缓沖池的例子
//构造Thread缓沖池
SyncQueue queue = new SyncQueue();
for (int i = ; i < ; i ++) {
new Thread(new Worker(queue))start();
}
//使用Thread缓沖池
Runnable task = new MyTask();
queueput(task);
为了使本文中的代码尽可能简单这个Thread缓沖池的实现是一个基本的框架当使用到实际中时一些其他功能也可以在这一基础上添加比如异常处理动态调整缓沖池大小等等