java

位置:IT落伍者 >> java >> 浏览文章

java阻塞队列 线程同步合作


发布日期:2018年07月31日
 
java阻塞队列 线程同步合作

Queue接口与ListSet同一级别都是继承了Collection接口LinkedList实现了Queue接口Queue接口窄化了对LinkedList的方法的访问权限(即在方法中的参数类型如果是Queue时就完全只能访问Queue接口所定义的方法了而不能直接访问 LinkedList的非Queue的方法)以使得只有恰当的方法才可以使用BlockingQueue 继承了Queue接口

队列是一种数据结构.它有两个基本操作在队列尾部加人一个元素和从队列头部移除一个元素就是说队列以一种先进先出的方式管理数据如果你试图向一个已经满了的阻塞队列中添加一个元素或者是从一个空的阻塞队列中移除一个元索将导致线程阻塞.在多线程进行合作时阻塞队列是很有用的工具工作者线程可以定期地把中间结果存到阻塞队列中而其他工作者线线程把中间结果取出并在将来修改它们队列会自动平衡负载如果第一个线程集运行得比第二个慢则第二个线程集在等待结果时就会阻塞如果第一个线程集运行得快那么它将等待第二个线程集赶上来下表显示了jdk中的阻塞队列的操作

add 增加一个元索 如果队列已满则抛出一个IIIegaISlabEepeplian异常

remove 移除并返回队列头部的元素 如果队列为空则抛出一个NoSuchElementException异常

element 返回队列头部的元素 如果队列为空则抛出一个NoSuchElementException异常

offer 添加一个元素并返回true 如果队列已满则返回false

poll 移除并返问队列头部的元素 如果队列为空则返回null

peek 返回队列头部的元素 如果队列为空则返回null

put 添加一个元素 如果队列满则阻塞

take 移除并返回队列头部的元素 如果队列为空则阻塞

removeelementoffer pollpeek 其实是属于Queue接口

阻塞队列的操作可以根据它们的响应方式分为以下三类aadremovee和element操作在你试图为一个已满的队列增加元素或从空队列取得元素时抛出异常当然在多线程程序中队列在任何时间都可能变成满的或空的所以你可能想使用offerpollpeek方法这些方法在无法完成任务时只是给出一个出错示而不会抛出异常

注意poll和peek方法出错进返回null因此向队列中插入null值是不合法的

还有带超时的offer和poll方法变种例如下面的调用

boolean success = qoffer(xTimeUnitMILLISECONDS);

尝试在毫秒内向队列尾部插入一个元素如果成功立即返回true否则当到达超时进返回false同样地调用

Object head = qpoll( TimeUnitMILLISECONDS);

如果在毫秒内成功地移除了队列头元素则立即返回头元素否则在到达超时时返回null

最后我们有阻塞操作put和takeput方法在队列满时阻塞take方法在队列空时阻塞

ncurrent包提供了阻塞队列的个变种默认情况下LinkedBlockingQueue的容量是没有上限的(说的不准确在不指定时容量为IntegerMAX_VALUE不要然的话在put时怎么会受阻呢)但是也可以选择指定其最大容量它是基于链表的队列此队列按 FIFO(先进先出)排序元素

ArrayBlockingQueue在构造时需要指定容量并可以选择是否需要公平性如果公平参数被设置true等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来达到这种公平性的即等待时间最长的线程会先操作)通常公平性会使你在性能上付出代价只有在的确非常需要的时候再使用它它是基于数组的阻塞循环队列此队列按 FIFO(先进先出)原则对元素进行排序

PriorityBlockingQueue是一个带优先级的队列而不是先进先出队列元素按优先级顺序被移除该队列也没有上限(看了一下源码PriorityBlockingQueue是对PriorityQueue的再次包装是基于堆数据结构的而PriorityQueue是没有容量限制的与ArrayList一样所以在优先阻塞队列上put时是不会受阻的虽然此队列逻辑上是无界的但是由于资源被耗尽所以试图执行添加操作可能会导致 OutOfMemoryError)但是如果队列为空那么取元素的操作take就会阻塞所以它的检索操作take是受阻的另外往入该队列中的元素要具有比较能力

最后DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列只有在延迟期满时才能从中提取元素该队列的头部是延迟期满后保存时间最长的 Delayed 元素如果延迟都还没有期满则队列没有头部并且poll将返回null当一个元素的 getDelay(TimeUnitNANOSECONDS) 方法返回一个小于或等于零的值时则出现期满poll就以移除这个元素了此队列不允许使用 null 元素 下面是延迟接口

Java代码

public interface Delayed extends Comparable<Delayed> {

long getDelay(TimeUnit unit);

}

public interface Delayed extends Comparable<Delayed> {

long getDelay(TimeUnit unit);

}

放入DelayQueue的元素还将要实现compareTo方法DelayQueue使用这个来为元素排序

下面的实例展示了如何使用阻塞队列来控制线程集程序在一个目录及它的所有子目录下搜索所有文件打印出包含指定关键字的文件列表从下面实例可以看出使用阻塞队列两个显着的好处就是多线程操作共同的队列时不需要额外的同步另外就是队列会自动平衡负载即那边(生产与消费两边)处理快了就会被阻塞掉从而减少两边的处理速度差距下面是具体实现

Java代码

public class BlockingQueueTest {

public static void main(String[] args) {

Scanner in = new Scanner(Systemin);

Systemoutprint(Enter base directory (eg /usr/local/jdk/src): );

String directory = innextLine();

Systemoutprint(Enter keyword (eg volatile): );

String keyword = innextLine();

final int FILE_QUEUE_SIZE = ;// 阻塞队列大小

final int SEARCH_THREADS = ;// 关键字搜索线程个数

// 基于ArrayBlockingQueue的阻塞队列

BlockingQueue<File> queue = new ArrayBlockingQueue<File>(

FILE_QUEUE_SIZE);

//只启动一个线程来搜索目录

FileEnumerationTask enumerator = new FileEnumerationTask(queue

new File(directory));

new Thread(enumerator)start();

//启动个线程用来在文件中搜索指定的关键字

for (int i = ; i <= SEARCH_THREADS; i++)

new Thread(new SearchTask(queue keyword))start();

}

}

class FileEnumerationTask implements Runnable {

//哑元文件对象放在阻塞队列最后用来标示文件已被遍历完

public static File DUMMY = new File();

private BlockingQueue<File> queue;

private File startingDirectory;

public FileEnumerationTask(BlockingQueue<File> queue File startingDirectory) {

thisqueue = queue;

thisstartingDirectory = startingDirectory;

}

public void run() {

try {

enumerate(startingDirectory);

queueput(DUMMY);//执行到这里说明指定的目录下文件已被遍历完

} catch (InterruptedException e) {

}

}

// 将指定目录下的所有文件以File对象的形式放入阻塞队列中

public void enumerate(File directory) throws InterruptedException {

File[] files = directorylistFiles();

for (File file : files) {

if (fileisDirectory())

enumerate(file);

else

//将元素放入队尾如果队列满则阻塞

queueput(file);

}

}

}

class SearchTask implements Runnable {

private BlockingQueue<File> queue;

private String keyword;

public SearchTask(BlockingQueue<File> queue String keyword) {

thisqueue = queue;

thiskeyword = keyword;

}

public void run() {

try {

boolean done = false;

while (!done) {

//取出队首元素如果队列为空则阻塞

File file = queuetake();

if (file == FileEnumerationTaskDUMMY) {

//取出来后重新放入好让其他线程读到它时也很快的结束

queueput(file);

done = true;

} else

search(file);

}

} catch (IOException e) {

eprintStackTrace();

} catch (InterruptedException e) {

}

}

public void search(File file) throws IOException {

Scanner in = new Scanner(new FileInputStream(file));

int lineNumber = ;

while (inhasNextLine()) {

lineNumber++;

String line = innextLine();

if (ntains(keyword))

Systemoutprintf(%s:%d:%s%n filegetPath() lineNumber

line);

}

inclose();

}

}

               

上一篇:64位环境中的Java

下一篇:Java远程方法调用 (4)