java

位置:IT落伍者 >> java >> 浏览文章

Java 5.0多线程编程


发布日期:2021年11月15日
 
Java 5.0多线程编程

Java自年面世以来得到了广泛得一个运用但是对多线程编程的支持Java很长时间一直停留在初级阶段在Java 之前Java里的多线程编程主要是通过Thread类Runnable接口Object对象中的wait() notify() notifyAll()等方法和synchronized关键词来实现的这些工具虽然能在大多数情况下解决对共享资源的管理和线程间的调度但存在以下几个问题

过于原始拿来就能用的功能有限即使是要实现简单的多线程功能也需要编写大量的代码这些工具就像汇编语言一样难以学习和使用比这更糟糕的是稍有不慎它们还可能被错误地使用而且这样的错误很难被发现

如果使用不当会使程序的运行效率大大降低

为了提高开发效率简化编程开发人员在做项目的时候往往需要写一些共享的工具来实现一些普遍适用的功能但因为没有规范相同的工具会被重复地开发造成资源浪费

因为锁定的功能是通过Synchronized来实现的这是一种块结构只能对代码中的一段代码进行锁定而且锁定是单一的如以下代码所示

synchronized(lock){

//执行对共享资源的操作

……

}

一些复杂的功能就很难被实现比如说如果程序需要取得lock A和lock B来进行操作然后需要取得lock C并且释放lock A来进行操作Java 之前的多线程框架就显得无能为力了

因为这些问题程序员对旧的框架一直颇有微词这种情况一直到Java 才有较大的改观一系列的多线程工具包被纳入了标准库文件这些工具包括了一个新的多线程程序的执行框架使编程人员可方便地协调和调度线程的运行并且新加入了一些高性能的常用的工具使程序更容易编写运行效率更高本文将分类并结合例子来介绍这些新加的多线程工具

在我们开始介绍Java 里的新Concurrent工具前让我们先来看一下一个用旧的多线程工具编写的程序这个程序里有一个Server线程它需要启动两个ComponentServer线程需等到Component线程完毕后再继续相同的功能在Synchronizer一章里用新加的工具CountDownLatch有相同的实现两个程序孰优孰劣哪个程序更容易编写哪个程序更容易理解相信大家看过之后不难得出结论

public class ServerThread {

Object concLock = new Object();

int count = ;

public void runTwoThreads() {

//启动两个线程去初始化组件

new Thread(new ComponentThread(this))start();

new Thread(new ComponentThread(this))start();

// Wait for other thread

while(count != ) {

synchronized(concLock) {

try {

concLockwait();

Systemoutprintln(Wake up);

} catch (InterruptedException ie) { //处理异常}

}

}

Systemoutprintln(Server is up);

}

public void callBack() {

synchronized(concLock) {

count;

concLocknotifyAll();

}

}

public static void main(String[] args){

ServerThread server = new ServerThread();

serverrunTwoThreads();

}

}

public class ComponentThread implements Runnable {

private ServerThread server;

public ComponentThread(ServerThread server) {

thisserver = server;

}

public void run() {

//做组件初始化的工作

Systemoutprintln(Do component initialization);

servercallBack();

}

}

三个新加的多线程包

Java 里新加入了三个多线程包ncurrent ncurrentatomic ncurrentlocks

ncurrent包含了常用的多线程工具是新的多线程工具的主体

ncurrentatomic包含了不用加锁情况下就能改变值的原子变量比如说AtomicInteger提供了addAndGet()方法Add和Get是两个不同的操作为了保证别的线程不干扰以往的做法是先锁定共享的变量然后在锁定的范围内进行两步操作但用AtomicIntegeraddAndGet()就不用担心锁定的事了其内部实现保证了这两步操作是在原子量级发生的不会被别的线程干扰

ncurrentlocks包包含锁定的工具

Callable 和 Future接口

Callable是类似于Runnable的接口实现Callable接口的类和实现Runnable的类都是可被其它线程执行的任务Callable和Runnable有几点不同

Callable规定的方法是call()而Runnable规定的方法是run()

Callable的任务执行后可返回值而Runnable的任务是不能返回值的

call()方法可抛出异常而run()方法是不能抛出异常的

运行Callable任务可拿到一个Future对象通过Future对象可了解任务执行情况可取消任务的执行还可获取任务执行的结果

以下是Callable的一个例子

public class DoCallStuff implements Callable{ // *

private int aInt;

public DoCallStuff(int aInt) {

thisaInt = aInt;

}

public String call() throws Exception { //*

boolean resultOk = false;

if(aInt == ){

resultOk = true;

} else if(aInt == ){

while(true){ //infinite loop

Systemoutprintln(looping);

Threadsleep();

}

} else {

throw new Exception(Callable terminated with Exception!); //*

}

if(resultOk){

return Task done;

} else {

return Task failed;

}

}

}

*: 名为DoCallStuff类实现了CallableString将是call方法的返回值类型例子中用了String但可以是任何Java类

*: call方法的返回值类型为String这是和类的定义相对应的并且可以抛出异常

*: call方法可以抛出异常如加重的斜体字所示

以下是调用DoCallStuff的主程序

import ncurrentExecutionException;

import ncurrentExecutorService;

import ncurrentExecutors;

import ncurrentFuture;

public class Executor {

public static void main(String[] args){

//*

DoCallStuff call = new DoCallStuff();

DoCallStuff call = new DoCallStuff();

DoCallStuff call = new DoCallStuff();

//*

ExecutorService es = ExecutorsnewFixedThreadPool();

//*

Future future = essubmit(call);

Future future = essubmit(call);

Future future = essubmit(call);

try {

//*

Systemoutprintln(futureget());

//*

Threadsleep();

Systemoutprintln(Thread terminated? : + futurecancel(true));

//*

Systemoutprintln(futureget());

} catch (ExecutionException ex) {

exprintStackTrace();

} catch (InterruptedException ex) {

exprintStackTrace();

}

}

}

*: 定义了几个任务

*: 初始了任务执行工具任务的执行框架将会在后面解释

*: 执行任务任务启动时返回了一个Future对象如果想得到任务执行的结果或者是异常可对这个Future对象进行操作Future所含的值必须跟Callable所含的值对映比如说例子中Future对印Callable

*: 任务正常执行完毕futureget()会返回线程的值

*: 任务在进行一个死循环调用futurecancel(true)来中止此线程传入的参数标明是否可打断线程true表明可以打断

*: 任务抛出异常调用futureget()时会引起异常的抛出

运行Executor会有以下运行结果

looping

Task done //*

looping

looping//*

looping

looping

looping

looping

Thread terminated? :true //*

//*

ncurrentExecutionException: javalangException: Callable terminated with Exception!

at ncurrentFutureTask$SyncinnerGet(FutureTaskjava:)

at ncurrentFutureTaskget(FutureTaskjava:)

at concurrentExecutormain(Executorjava:)

……

*: 任务正常结束

*: 任务是个死循环这是它的打印结果

*: 指示任务被取消

*: 在执行futureget()时得到任务抛出的异常

新的任务执行架构

在Java 之前启动一个任务是通过调用Thread类的start()方法来实现的任务的提于交和执行是同时进行的如果你想对任务的执行进行调度或是控制同时执行的线程数量就需要额外编写代码来完成里提供了一个新的任务执行架构使你可以轻松地调度和控制任务的执行并且可以建立一个类似数据库连接池的线程池来执行任务这个架构主要有三个接口和其相应的具体类组成这三个接口是Executor ExecutorService和ScheduledExecutorService让我们先用一个图来显示它们的关系

图的左侧是接口图的右侧是这些接口的具体类注意Executor是没有直接具体实现的

Executor接口

是用来执行Runnable任务的它只定义一个方法

execute(Runnable command)执行Ruannable类型的任务

ExecutorService接口

ExecutorService继承了Executor的方法并提供了执行Callable任务和中止任务执行的服务其定义的方法主要有

submit(task)可用来提交Callable或Runnable任务并返回代表此任务的Future对象

invokeAll(collection of tasks)批处理任务集合并返回一个代表这些任务的Future对象集合

shutdown()在完成已提交的任务后关闭服务不再接受新任务

shutdownNow()停止所有正在执行的任务并关闭服务

isTerminated()测试是否所有任务都执行完毕了

isShutdown()测试是否该ExecutorService已被关闭

ScheduledExecutorService接口

在ExecutorService的基础上ScheduledExecutorService提供了按时间安排执行任务的功能它提供的方法主要有

schedule(task initDelay): 安排所提交的Callable或Runnable任务在initDelay指定的时间后执行

scheduleAtFixedRate()安排所提交的Runnable任务按指定的间隔重复执行

scheduleWithFixedDelay()安排所提交的Runnable任务在每次执行完后等待delay所指定的时间后重复执行

代码ScheduleExecutorService的例子

public class ScheduledExecutorServiceTest {

public static void main(String[] args)

throws InterruptedException ExecutionException{

//*

ScheduledExecutorService service = ExecutorsnewScheduledThreadPool();

//*

Runnable task = new Runnable() {

public void run() {

Systemoutprintln(Task repeating);

}

};

//*

final ScheduledFuture future =

servicescheduleAtFixedRate(task TimeUnitSECONDS);

//*

ScheduledFuture future = serviceschedule(new Callable(){

public String call(){

futurecancel(true);

return task cancelled!;

}

} TimeUnitSECONDS);

Systemoutprintln(futureget());

//*

serviceshutdown();

}

}

这个例子有两个任务第一个任务每隔一秒打印一句Task repeating第二个任务在秒钟后取消第一个任务

*: 初始化一个ScheduledExecutorService对象这个对象的线程池大小为

*: 用内函数的方式定义了一个Runnable任务

*: 调用所定义的ScheduledExecutorService对象来执行任务任务每秒执行一次能重复执行的任务一定是Runnable类型注意我们可以用TimeUnit来制定时间单位这也是Java 里新的特征以前的记时单位是微秒现在可精确到奈秒

*: 调用ScheduledExecutorService对象来执行第二个任务第二个任务所作的就是在秒钟后取消第一个任务

*: 关闭服务

Executors类

虽然以上提到的接口有其实现的具体类但为了方便Java 建议使用Executors的工具类来得到Executor接口的具体对象需要注意的是Executors是一个类不是Executor的复数形式Executors提供了以下一些static的方法

callable(Runnable task): 将Runnable的任务转化成Callable的任务

newSingleThreadExecutor: 产生一个ExecutorService对象这个对象只有一个线程可用来执行任务若任务多于一个任务将按先后顺序执行

newCachedThreadPool(): 产生一个ExecutorService对象这个对象带有一个线程池线程池的大小会根据需要调整线程执行完任务后返回线程池供执行下一次任务使用

newFixedThreadPool(int poolSize)产生一个ExecutorService对象这个对象带有一个大小为poolSize的线程池若任务数量大于poolSize任务会被放在一个queue里顺序执行

newSingleThreadScheduledExecutor产生一个ScheduledExecutorService对象这个对象的线程池大小为若任务多于一个任务将按先后顺序执行

newScheduledThreadPool(int poolSize): 产生一个ScheduledExecutorService对象这个对象的线程池大小为poolSize若任务数量大于poolSize任务会在一个queue里等待执行

以下是得到和使用ExecutorService的例子

代码如何调用Executors来获得各种服务对象

//Single Threaded ExecutorService

ExecutorService singleThreadeService = ExecutorsnewSingleThreadExecutor();

//Cached ExecutorService

ExecutorService cachedService = ExecutorsnewCachedThreadPool();

//Fixed number of ExecutorService

ExecutorService fixedService = ExecutorsnewFixedThreadPool();

//Single ScheduledExecutorService

ScheduledExecutorService singleScheduledService =

ExecutorsnewSingleThreadScheduledExecutor();

//Fixed number of ScheduledExecutorService

ScheduledExecutorService fixedScheduledService =

ExecutorsnewScheduledThreadPool();

Lockers和Condition接口

在多线程编程里面一个重要的概念是锁定如果一个资源是多个线程共享的为了保证数据的完整性在进行事务性操作时需要将共享资源锁定这样可以保证在做事务性操作时只有一个线程能对资源进行操作从而保证数据的完整性以前锁定的功能是由Synchronized关键字来实现的这样做存在几个问题

每次只能对一个对象进行锁定若需要锁定多个对象编程就比较麻烦一不小心就会出现死锁现象

如果线程因拿不到锁定而进入等待状况是没有办法将其打断的

在Java 里出现两种锁的工具可供使用下图是这两个工具的接口及其实现

Lock接口

ReentrantLock是Lock的具体类Lock提供了以下一些方法

lock(): 请求锁定如果锁已被别的线程锁定调用此方法的线程被阻断进入等待状态

tryLock()如果锁没被别的线程锁定进入锁定状态并返回true若锁已被锁定返回false不进入等待状态此方法还可带时间参数如果锁在方法执行时已被锁定线程将继续等待规定的时间若还不行才返回false

unlock()取消锁定需要注意的是Lock不会自动取消编程时必须手动解锁

代码

//生成一个锁

Lock lock = new ReentrantLock();

public void accessProtectedResource() {

locklock(); //取得锁定

try {

//对共享资源进行操作

} finally {

//一定记着把锁取消掉锁本身是不会自动解锁的

lockunlock();

}

}

ReadWriteLock接口

为了提高效率有些共享资源允许同时进行多个读的操作但只允许一个写的操作比如一个文件只要其内容不变可以让多个线程同时读不必做排他的锁定排他的锁定只有在写的时候需要以保证别的线程不会看到数据不完整的文件ReadWriteLock可满足这种需要ReadWriteLock内置两个Lock一个是读的Lock一个是写的Lock多个线程可同时得到读的Lock但只有一个线程能得到写的Lock而且写的Lock被锁定后任何线程都不能得到LockReadWriteLock提供的方法有

readLock(): 返回一个读的lock

writeLock(): 返回一个写的lock 此lock是排他的

ReadWriteLock的例子

public class FileOperator{

//初始化一个ReadWriteLock

ReadWriteLock lock = new ReentrantReadWriteLock();

public String read() {

//得到readLock并锁定

Lock readLock = lockreadLock();

readLocklock();

try {

//做读的工作

return Read something;

} finally {

readLockunlock();

}

}

public void write(String content) {

//得到writeLock并锁定

Lock writeLock = lockwriteLock();

writeLocklock();

try {

//做读的工作

} finally {

writeLockunlock();

}

}

}

需要注意的是ReadWriteLock提供了一个高效的锁定机理但最终程序的运行效率是和程序的设计息息相关的比如说如果读的线程和写的线程同时在等待要考虑是先发放读的lock还是先发放写的lock如果写发生的频率不高而且快可以考虑先给写的lock还要考虑的问题是如果一个写正在等待读完成此时一个新的读进来是否要给这个新的读发锁如果发了可能导致写的线程等很久等等此类问题在编程时都要给予充分的考虑

Condition接口

有时候线程取得lock后需要在一定条件下才能做某些工作比如说经典的Producer和Consumer问题Consumer必须在篮子里有苹果的时候才能吃苹果否则它必须暂时放弃对篮子的锁定等到Producer往篮子里放了苹果后再去拿来吃而Producer必须等到篮子空了才能往里放苹果否则它也需要暂时解锁等Consumer把苹果吃了才能往篮子里放苹果在Java 以前这种功能是由Object类的wait() notify()和notifyAll()等方法实现的里面这些功能集中到了Condition这个接口来实现Condition提供以下方法

await()使调用此方法的线程放弃锁定进入睡眠直到被打断或被唤醒

signal(): 唤醒一个等待的线程

signalAll()唤醒所有等待的线程

Condition的例子

public class Basket {

Lock lock = new ReentrantLock();

//产生Condition对象

Condition produced = locknewCondition();

Condition consumed = locknewCondition();

boolean available = false;

public void produce() throws InterruptedException {

locklock();

try {

if(available){

consumedawait(); //放弃lock进入睡眠

}

/*生产苹果*/

Systemoutprintln(Apple produced);

available = true;

producedsignal(); //发信号唤醒等待这个Condition的线程

} finally {

lockunlock();

}

}

public void consume() throws InterruptedException {

locklock();

try {

if(!available){

producedawait();//放弃lock进入睡眠

}

/*吃苹果*/

Systemoutprintln(Apple consumed);

available = false;

consumedsignal();//发信号唤醒等待这个Condition的线程

} finally {

lockunlock();

}

}

}

ConditionTester:

public class ConditionTester {

public static void main(String[] args) throws InterruptedException{

final Basket basket = new Basket();

//定义一个producer

Runnable producer = new Runnable() {

public void run() {

try {

basketproduce();

} catch (InterruptedException ex) {

exprintStackTrace();

}

}

};

//定义一个consumer

Runnable consumer = new Runnable() {

public void run() {

try {

nsume();

} catch (InterruptedException ex) {

exprintStackTrace();

}

}

};

//各产生个consumer和producer

ExecutorService service = ExecutorsnewCachedThreadPool();

for(int i=; i < ; i++)

servicesubmit(consumer);

Threadsleep();

for(int i=; i<; i++)

servicesubmit(producer);

serviceshutdown();

}

}

: Synchronizer同步装置

Java 里新加了个协调线程间进程的同步装置它们分别是Semaphore CountDownLatch CyclicBarrier和Exchanger

Semaphore:

用来管理一个资源池的工具Semaphore可以看成是个通行证线程要想从资源池拿到资源必须先拿到通行证Semaphore提供的通行证数量和资源池的大小一致如果线程暂时拿不到通行证线程就会被阻断进入等待状态以下是一个例子

public class Pool {

ArrayList pool = null;

Semaphore pass = null;

public Pool(int size){

//初始化资源池

pool = new ArrayList();

for(int i=; i

pool.add("Resource "+i);

}

//Semaphore的大小和资源池的大小一致

pass = new Semaphore(size);

}

public String get() throws InterruptedException{

//获取通行证,只有得到通行证后才能得到资源

pass.acquire();

return getResource();

}

public void put(String resource){

//归还通行证,并归还资源

pass.release();

releaseResource(resource);

}

private synchronized String getResource() {

String result = pool.get(0);

pool.remove(0);

System.out.println("Give out "+result);

return result;

}

private synchronized void releaseResource(String resource) {

System.out.println("return "+resource);

pool.add(resource);

}

}

SemaphoreTest:

public class SemaphoreTest {

public static void main(String[] args){

final Pool aPool = new Pool(2);

Runnable worker = new Runnable() {

public void run() {

String resource = null;

try {

//取得resource

resource = aPool.get();

} catch (InterruptedException ex) {

ex.printStackTrace();

}

//用resource做工作

System.out.println("I worked on "+resource);

//归还resource

aPool.put(resource);

}

};

ExecutorService service = Executors.newCachedThreadPool();

for(int i=0; i<20; i++){

service.submit(worker);

}

service.shutdown();

}

}

CountDownLatch:

CountDownLatch是个计数器,它有一个初始数,等待这个计数器的线程必须等到计数器倒数到零时才可继续。TW.WInGWIT.cOm比如说一个Server启动时需要初始化4个部件,Server可以同时启动4个线程去初始化这4个部件,然后调用CountDownLatch(4).await()阻断进入等待,每个线程完成任务后会调用一次untDown()来倒计数, 当4个线程都结束时CountDownLatch的计数就会降低为0,此时Server就会被唤醒继续下一步操作。CountDownLatch的方法主要有:

await():使调用此方法的线程阻断进入等待

countDown(): 倒计数,将计数值减1

getCount(): 得到当前的计数值

CountDownLatch的例子:一个server调了三个ComponentThread分别去启动三个组件,然后server等到组件都启动了再继续。

public class Server {

public static void main(String[] args) throws InterruptedException{

System.out.println("Server is starting.");

//初始化一个初始值为3的CountDownLatch

CountDownLatch latch = new CountDownLatch(3);

//起3个线程分别去启动3个组件

ExecutorService service = Executors.newCachedThreadPool();

service.submit(new ComponentThread(latch, 1));

service.submit(new ComponentThread(latch, 2));

service.submit(new ComponentThread(latch, 3));

service.shutdown();

//进入等待状态

latch.await();

//当所需的三个组件都完成时,Server就可继续了

System.out.println("Server is up!");

}

}

public class ComponentThread implements Runnable{

CountDownLatch latch;

int ID;

/** Creates a new instance of ComponentThread */

public ComponentThread(CountDownLatch latch, int ID) {

this.latch = latch;

this.ID = ID;

}

public void run() {

System.out.println("Component "+ID + " initialized!");

//将计数减一

untDown();

}

}

运行结果:

Server is starting.

Component 1 initialized!

Component 3 initialized!

Component 2 initialized!

Server is up!

CyclicBarrier:

CyclicBarrier类似于CountDownLatch也是个计数器,不同的是CyclicBarrier数的是调用了CyclicBarrier.await()进入等待的线程数,当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。CyclicBarrier就象它名字的意思一样,可看成是个障碍,所有的线程必须到齐后才能一起通过这个障碍。CyclicBarrier初始时还可带一个Runnable的参数,此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。

CyclicBarrier提供以下几个方法:

await():进入等待

getParties():返回此barrier需要的线程数

reset():将此barrier重置

以下是使用CyclicBarrier的一个例子:两个线程分别在一个数组里放一个数,当这两个线程都结束后,主线程算出数组里的数的和(这个例子比较无聊,我没有想到更合适的例子)

public class MainThread {

public static void main(String[] args)

throws InterruptedException, BrokenBarrierException, TimeoutException{

final int[] array = new int[2];

CyclicBarrier barrier = new CyclicBarrier(2,

new Runnable() {//在所有线程都到达Barrier时执行

public void run() {

System.out.println("Total is:"+(array[0]+array[1]));

}

});

//启动线程

new Thread(new ComponentThread(barrier, array, 0)).start();

new Thread(new ComponentThread(barrier, array, 1)).start();

}

}

public class ComponentThread implements Runnable{

CyclicBarrier barrier;

int ID;

int[] array;

public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {

this.barrier = barrier;

this.ID = ID;

this.array = array;

}

public void run() {

try {

array[ID] = new Random().nextInt();

System.out.println(ID+ " generates:"+array[ID]);

//该线程完成了任务等在Barrier处

barrier.await();

} catch (BrokenBarrierException ex) {

ex.printStackTrace();

} catch (InterruptedException ex) {

ex.printStackTrace();

}

}

}

Exchanger:

顾名思义Exchanger让两个线程可以互换信息。用一个例子来解释比较容易。例子中服务生线程往空的杯子里倒水,顾客线程从装满水的杯子里喝水,然后通过Exchanger双方互换杯子,服务生接着往空杯子里倒水,顾客接着喝水,然后交换,如此周而复始。

class FillAndEmpty {

//初始化一个Exchanger,并规定可交换的信息类型是DataCup

Exchanger exchanger = new Exchanger();

Cup initialEmptyCup = ...; //初始化一个空的杯子

Cup initialFullCup = ...; //初始化一个装满水的杯子

//服务生线程

class Waiter implements Runnable {

public void run() {

Cup currentCup = initialEmptyCup;

try {

//往空的杯子里加水

currentCup.addWater();

//杯子满后和顾客的空杯子交换

currentCup = exchanger.exchange(currentCup);

} catch (InterruptedException ex) { ... handle ... }

}

}

//顾客线程

class Customer implements Runnable {

public void run() {

DataCup currentCup = initialFullCup;

try {

//把杯子里的水喝掉

currentCup.drinkFromCup();

//将空杯子和服务生的满杯子交换

currentCup = exchanger.exchange(currentCup);

} catch (InterruptedException ex) { ... handle ...}

}

}

void start() {

new Thread(new Waiter()).start();

new Thread(new Customer()).start();

}

}

6: BlockingQueue接口

BlockingQueue是一种特殊的Queue,若BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态直到BlocingkQueue进了新货才会被唤醒。同样,如果BlockingQueue是满的任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有新的空间才会被唤醒继续操作。BlockingQueue提供的方法主要有:

add(anObject): 把anObject加到BlockingQueue里,如果BlockingQueue可以容纳返回true,否则抛出IllegalStateException异常。

offer(anObject):把anObject加到BlockingQueue里,如果BlockingQueue可以容纳返回true,否则返回false。

put(anObject):把anObject加到BlockingQueue里,如果BlockingQueue没有空间,调用此方法的线程被阻断直到BlockingQueue里有新的空间再继续。

poll(time):取出BlockingQueue里排在首位的对象,若不能立即取出可等time参数规定的时间。取不到时返回null。

take():取出BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。

根据不同的需要BlockingQueue有4种具体实现:

ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。

LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO(先入先出)顺序排序的。LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue。

PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。

SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。

下面是用BlockingQueue来实现Producer和Consumer的例子:

public class BlockingQueueTest {

static BlockingQueue basket;

public BlockingQueueTest() {

//定义了一个大小为2的BlockingQueue,也可根据需要用其他的具体类

basket = new ArrayBlockingQueue(2);

}

class Producor implements Runnable {

public void run() {

while(true){

try {

//放入一个对象,若basket满了,等到basket有位置

basket.put("An apple");

} catch (InterruptedException ex) {

ex.printStackTrace();

}

}

}

}

class Consumer implements Runnable {

public void run() {

while(true){

try {

//取出一个对象,若basket为空,等到basket有东西为止

String result = basket.take();

} catch (InterruptedException ex) {

ex.printStackTrace();

}

}

}

}

public void execute(){

for(int i=0; i<10; i++){

new Thread(new Producor()).start();

new Thread(new Consumer()).start();

}

}

public static void main(String[] args){

BlockingQueueTest test = new BlockingQueueTest();

test.execute();

}

}

7:Atomics 原子级变量

原子量级的变量,主要的类有AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference ……。这些原子量级的变量主要提供两个方法:

compareAndSet(expectedValue, newValue): 比较当前的值是否等于expectedValue,若等于把当前值改成newValue,并返回true。若不等,返回false。

getAndSet(newValue): 把当前值改为newValue,并返回改变前的值。

这些原子级变量利用了现代处理器(CPU)的硬件支持可把两步操作合为一步的功能,避免了不必要的锁定,提高了程序的运行效率。

8:Concurrent Collections 共点聚集

在Java的聚集框架里可以调用Collections.synchronizeCollection(aCollection)将普通聚集改变成同步聚集,使之可用于多线程的环境下。 但同步聚集在一个时刻只允许一个线程访问它,其它想同时访问它的线程会被阻断,导致程序运行效率不高。Java 5.0里提供了几个共点聚集类,它们把以前需要几步才能完成的操作合成一个原子量级的操作,这样就可让多个线程同时对聚集进行操作,避免了锁定,从而提高了程序的运行效率。Java 5.0目前提供的共点聚集类有:ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayList和CopyOnWriteArraySet.

               

上一篇:java设计模式之 Adapter(纠合不兼容的类)

下一篇:java设计模式之Template(算法的骨架)