摘要 在企业中许多计算机由于在其上执行工作的性质而未得到充分利用或者因为过了上班时间而干脆得不到使用在许多情况下应用服务器耗光了宝贵的CPU(尤其是在执行CPU密集型的数学运算时)而网络上的其他计算机则闲置一旁本文提出了一种框架用于把Java消息服务(Java Messaging ServiceJMS)客户端放置在这些未充分利用的计算机上以分担一些通常应由服务器执行的工作该客户端可以监听某个要执行的工作单元的请求队列然后在应答队列中做出响应此外本文还给出了一个BEA WebLogic Integration 架构的例子它通过把一个工作流以及相关的Java控件用作替代框架把工作分发到远程客户端上从而把工作单元可靠地分发给JMS请求队列 简介 本文提出了一种JEE框架用于解决把工作分配给未充分利用的计算机资源这个难题具体来说可以把JMS客户端放置在这些未充分利用的计算机上从而分担一些通常应由服务器执行的工作该客户端可以监听某个要执行的工作单元的请求队列然后在应答队列上做出响应可以使用一组消息驱动bean获取应答队列上的响应消息以便进行进一步的处理此外还可以使用一种servlet实现来管理性地启动用于创建(要发送给JMS客户端的)工作单元的整个子流程并使用它来终止这个子流程 我使用常见的BEA WebLogic Server作为把离散的工作单元分配给分布式JMS客户端的例子在另一个更为复杂的例子中BEA WebLogic Integration (WLI)工作流也执行类似的分发任务但是通过对请求队列进行监控它在灵活性Java控件的可重用性和可伸缩性等方面要更好一些 使用的例子 业内有相当多的例子可以演示如何使用JMS框架来充分利用计算机进行并行处理例如 一个银行应用程序可以实现抵押贷款并以不同的比率和年份执行几类利息计算从而为信贷官员提供与每个申请者相关的可能影响借贷类型的数据所有不同种类的计算可以按照申请者分配给可用的计算机来执行然后把结果返回给应用服务器储存起来 一个记帐系统可以从数据库读取记录然后重新计算记录中的数字以求做到更加精确对于每条记录它可能需要连接到业务用户的本地系统中以获得辅助数据这可能需要几秒钟如果顺序执行当涉及到的记录上千时这种方法不仅很慢而且可能进一步延长服务器线程等待从各地返回响应的时间通过把这些工作分发给JMS客户端不仅可以并行完成处理而且还可以节省服务器线程 一个天气预报系统或线性优化系统可能需要操纵或执行矩阵乘法随着矩阵的大小和数量逐步增加服务器CPU的负担也随之加重如果这种情况经常发生那么通过把矩阵操作和乘法分发给其他计算机上的JMS客户端服务器的CPU就可以节省下来用于其他工作 使用常规的WebLogic Server来分发工作单元 借鑒最后一个例子我将构建一个简单的例子用于执行矩阵乘法同时说明如何使用JMS框架把计算工作分发给企业中的计算机资源JMS客户端将收到一个工作单元实例之后它将会调用其doWork()方法在这个简单的例子中doWork()方法将把个×的矩阵相乘然后把结果保存到一个结果矩阵中 接着JMS客户端使用工作单元实例的一个副本(工作是在这上面执行的)对应答队列做出响应一个消息驱动bean将接受已完成的工作图说明了我将要讨论的各个组件 图 该WebLogic Server实现中的各个组件 这种方法以常规方式使用了JMS系统在下面的内容中我将引入一些代码并考虑几个扩展问题 工作单元类 JMS请求队列上的每个类都将实现一个UnitOfWork接口该接口有一个特别有趣的方法叫做doWork() public interface UnitOfWork extends javaioSerializable { // This method executes itself on the client machine public void doWork(); // This method prints the current contents of performed work public void print(); // This method stores the instance into a backing store public void store(); } 在我们这个简洁而直观的例子中我使用一个SimpleMatri类实现了UnitOfWork接口 public class SimpleMatrix implements UnitOfWork { private Integer m[][]; private Integer m[][]; private Integer result[][]; private Integer rows = new Integer(); private Integer cols = new Integer(); // May initialize m and m by locating records from a database public SimpleMatrix() { } // This method actually multiplies m x m and stores in result public void doWork() { } // This method stores result into a backing store public void store() { } // This method prints the current contents of result public void print() { } } 方法的实现相当简单限于文章的篇幅这里就不再进行说明请参见所附的示例代码其中给出了完整实现这里的要点在于这个SimpleMatrix实例被传递给一个JMS客户端该客户端只要调用doWork()即可利用其CPU来执行工作对于这个例子我不会实际从数据库中检索矩阵或者把矩阵保存到数据库中但是在实际应用中这是必须完成的工作 Servlet工作创建程序可以使用一个servlet来创建这些UnitOfWork实例尽管WebLogic Server启动类可以执行同样的功能但出于管理的目的从安全的Web浏览器发送消息给servlet要更加容易(另一种可选的实现是使用Web服务)如果在servlet上安置了安全性通过身份验证的用户可以在查询字符串中传递命令以便开始交付工作单元给JMS请求队列或停止交付我将给出一个servlet的主干例子以说明其中的一些有用方法 public class WorkServlet extends HttpServlet {
private QueueSender qsender; private ObjectMessage msg; private int numMessages = ;
// This places the unit of work on the request queue public synchronized boolean sendMessages(int numberOfMessages PrintWriter o) { for(int i=; i JMS客户端类的任务仅仅是接受请求队列上的消息调用对象上的doWork()方法在这台计算机上执行工作然后把结果返回给应答队列消息驱动bean从应答队列中获取结果以便进行进一步的处理和保存可以检查它是否是文本消息然后告诉客户端停止处理从而允许发送控件消息给客户端当然在实际应用中消息可能包含客户端的名称这样就不会造成所有的客户端都停止处理 使用UnitOfWork接口的优点在于JMS客户端只要编写一次就可以用于以后实现该接口的任何类这使得JMS客户端具有很大的通用性可以不加修改地应用到许多不同的场景中只需把编译后的UnitOfWork接口以及它所有的实现类都包含在客户端的类路径中 在这个简化模型中客户端需要等待消息到达以启动处理在实际情况中客户端的方法等待的条件可以是一天中的某个时刻比如下午点或者是计算机的CPU负载低于某个阈值需要把此类逻辑添加到客户端使之与计算机的使用安排更加一致 消息驱动bean接收程序 消息驱动bean实例将监听应答队列看看有没有已完成的工作对象单元下面给出一个例子的主干部分public class MessageWorkBean implements MessageDrivenBean MessageListener { // This method will receive a unit of work object to store public void onMessage(Message msg) { ObjectMessage om = (ObjectMessage) msg; try { UnitOfWork unit = (UnitOfWork)omgetObject(); unitprint(); unitstore(); } catch(JMSException ex) { log(Message Driven Bean: Could not retrieve Unit of Work); exprintStackTrace(); } } } 这里有一个有趣的方法叫做onMessage()这个方法的用途仅仅是从应答队列接收已完成的对象接着它将调用其print()和store()方法我的目标是让服务器把它对这个工作单元的处理工作分发给其他计算机我已经通过JMS客户端实现了这一点并使用消息驱动bean把结果返回给服务器 可扩展性方面的考虑 在这个框架的实际实现中我们应该要解决几个问题从而让例子变得可以扩展 考虑使用一个大小可以调整的消息驱动bean池来处理响应 如果请求队列没有外部使用者应该创建一些消息驱动bean来使用服务器上的请求队列这与本文的主旨不相符但是可以防止队列溢出或者在没有使用者的情况下请求队列利用不充分 如果存在多种类型的工作单元那么每种类型都应该有自己的请求和响应队列 对于WebLogic Server考虑使用JMS页面调度技术以便防止当队列中存在过多没有及时使用的消息时出现内存不足问题 对于WebLogic Server如果生产者(servlet)生产出过多没有使用的工作考虑使用WebLogic JMS的调节功能 对于WebLogic Server考虑对队列使用分布式目的地因为这可以把队列分布到多台服务器上在这种情况下应该集群化servlet本身并对其进行协调以避免创建重复的工作请求单元 还应该考虑本文结尾处的参考资料此外对其他服务器也适用的考虑事项是把客户端部分交付给各台计算机的方式一种方式是自愿即每台计算机的所有者都下载一个可以在客户端计算机上配置和运行的安装程序另一种方式是使用商业软件分发包它可以自动下载客户端的最新版本并把它安装在客户端计算机上 使用WebLogic Integration工作流来分发工作 前面给出了一种把工作单元分发给客户端的直观方法即使用servlet和消息驱动bean尽管该方法实现起来相当容易但是它不能解决的问题还很多比如如何以自支持的方式启动过程定时把请求交付给请求队列当然我们不希望让管理员编写一个shell脚本来不停地调用该servlet此外还应该以一种应用程序可以预先控制的方式限制所使用的请求数量考虑到这一点下面给出一个更加复杂的例子用于把工作单元分发给远程JMS客户端并对其做出响应从而利用未充分使用的计算机 该方法使用了两个在BEA WebLogic Workshop中开发的WebLogic Integration (WLI)工作流即Java流程定义(Java Process DefinitionJPD)文件它是BPEL/J (Business Process Engineering Language for Java)的前身BPEL/J是在JSR 中定义的第一个工作流响应某些Web服务请求而启动并执行初始化以通过一个JMS控件订阅JMS请求队列该工作流使用一个Timer控件不停地进行循环并定时唤醒一个while循环从而在请求队列上放置更多的工作单元该工作流还将使用一个定制Java控件(在本文相关代码中给出)来浏览请求队列以便决定是否需要在队列上放置更多请求来防止队列出现过载最后工作流还将等待来自Web服务的停止消息然后停止处理第二个工作流执行的任务与前面例子中的消息驱动bean相同因为它将对响应队列中的消息做出响应以便从出队的响应队列调用print()和store()方法这是一个生存期很短的工作流而WebLogic Integration将按照要求产生足够的实例 浏览JMS队列 WebLogic Integration被用作一种为远程流程构造和汇编服务的机制有现成的组件程序集即Java控件它使得开发人员可以轻松地构建复合应用程序而不需要进行大量的开发尽管WebLogic Integration提供了开箱即用的JMS控件用于在使用JMS时抽象化内部细节在某些情况下由于要细粒度地访问底层方法最好还是创建一个可重用的定制控件在这个示例框架中我需要浏览工作请求队列以统计在队列中等待的工作项的数量然后决定能否在队列中放入更多工作项而不会引起队列过载为此我们编写了一个定制Java控件JMSBrowse它有一个这样的方法public interface JMSBrowse extends Control { int numberOfElementsInQueue(String qFactory String qName); } 这个控件的实现使用了JMS QueueBrowser类来查看一个带有给定的JMS连接工厂的给定JMS队列它返回队列中等待处理的实例个数本文所附的代码中提供了完整实现 启动和停止工作流的Web服务 为了启动和停止负责把工作单元分发给请求队列的WebLogic Integration流程我们创建了一个Java Web Service (JWS)它服从JSR 带有两个方法public classControlWebService implements combeajwsWebService { /** * @common:control */ private ControlsJMSStopControlMessage JMSStopControl; /** * @common:control */ private ControlsJMSControlMessage JMSControl; static final long serialVersionUID = L; /** * @common:operation */ public void startFlow() { JMSControlsubscribe(); JMSControlsendTextMessage(start); JMSControlunsubscribe(); } /** * @common:operation */ public void stopFlow() { JMSStopControlsubscribe(); JMSStopControlsendTextMessage(stop); JMSStopControlunsubscribe(); } } 该Web服务不是直接调用工作流而是把一条消息放在JMS队列中然后调用WorkerMessage把消息发送给分发JPD这解除了Web服务实现与工作流之间的耦合以保持其模块性在WebLogic Integration中有一个概念叫做事件生成器可以使用WebLogic Integration Administration Console对它进行配置您可以把事件生成器配置为从JMS WorkerMessage中取出消息然后将其交付给一个Message Broker通道(逻辑概念)分发工作流监听/UnitOfWork/StartWorkflow通道该通道被绑定在与JMS WorkerMessage队列相关联的JMS事件生成器上只要有一个String start消息交付到此通道上工作流就会开始工作类似地开始之后分发工作流就会在它的一个Event Choice节点中监听Message Broker通道(/UnitOfWork/StopWorkflow)以便从WorkerStopMessage JMS队列接收stop消息然后事件生成器再次把WorkerStopMessage队列上的JMS消息关联到/UnitOfWork/StopWorkflow通道以便交付消息 这实际上创建了一种与启动和停止分发工作流的实现解耦合的面向服务方法通过Web服务客户端或者使用所提供的WebLogic Integration Workshop Test Browser可以轻松对Web服务进行测试 分发工作流 图说明了负责分发工作单元的DistributeFlowjpd我们的简单矩阵对象以及请求队列的相关部分 图 用于分发工作单元的工作流 while循环不断地循环直到一条stop消息改变布尔变量的值才跳出循环并结束工作流Event Choice等待两个Control Receive回调的其中一个第一个回调是通过刚刚描述的Web服务从一个Message Broker通道接收一条Stop消息第二个回调对一个Timer控件做出响应我们已经通过该控件的属性面板对它进行了设置每秒钟发生一次这将使处理继续而下一个行为将调用定制Java控件来浏览WorkerRequest队列以获得等待处理的请求的个数接下来决策节点检查请求的个数是否已经超出请求的最大个数(此处的最大个数被设置为其值保存在一个变量中)如果尚未超出就会调用一个执行节点然后使用JMS控件在请求队列中放入个矩阵对象如下所示 public void perform() throws Exception { for(int i = ; i < maxInQueue; i++) { matrix = new SimpleMatrix(); jmsControlsendObjectMessage(matrix); } } 响应工作流的JMS客户端 响应工作流的JMS客户端与前面在WebLogic Server部分中描述的JMS客户端几乎一模一样惟一的区别在于现在客户端使用一条字节消息(而不是对象消息)对响应队列做出响应客户端把SimpleMatrix对象转换为一个字节数组并将其传递给响应队列这样做的理由是与绑定到响应队列的事件生成器相关联的Message Broker通道只能够监听数据流即StringXML Bean或字节数组相关代码被设计用于对WebLogic Integration请求消息和普通的WebLogic Server请求消息做出响应 接收一个已完成的工作单元的工作流如图所示 图接收程序的工作流 这里的重要行为是perform节点它用于把字节数组转换为一个对象并调用print()和store()方法 public void perform() throws Exception { ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(rawDatabyteValue()); ObjectInputStream objectInputStream = new ObjectInputStream(arrayInputStream); UnitOfWork unit = (UnitOfWork) objectInputStreamreadObject(); unitprint(); unitstore(); objectInputStreamclose(); } 使用WebLogic Integration工作流 您已经了解到使用工作流Java控件和Message Broker通道可以提供一种更加完善的方式把工作分发给未完全利用的计算机只要在流程流中添加更多的行为节点就可以让处理过程变得像您所期望的那样面面俱到例如该工作流可以有一个审计控件用于在把所有发送到内部日志文件的请求放到队列中之前对其进行审计该工作流可以把请求重定向到其他JMS队列只要修改JMS控件的属性值即可为了实现可扩展性甚至可以让远程Web服务启动多个工作流实例最后基于业务安排Timer控件的时间间隔可以更小 使用Message Broker通道和事件生成器的另一个好处在于WebLogic Integration Administration Console可以监控事件生成器的响应消息的数量以便实现进一步的控制通过控制台您可以挂起和恢复事件生成器及通道从而对生产事件做出响应 这种灵活性使得WebLogic Integration工作流成为一种极具吸引力的方法 结束语 使用远程JMS客户端来分发工作的优点在于它有效地利用了网络计算机来进行某种类型的批处理工作同时减轻了原来服务器的负担这种方法的一个着名例子是Search for Extraterrestrial Intelligence (SETI@home)系统它利用全世界的PC来执行工作单元本文使用了一种JMS客户端的框架还讨论了如何部署这类解决方案以实现可扩展性力求让这种方法通用化本文还讨论了用于把工作分发给远程客户端的多种方法并提供了一种面向服务的方法作为首选 |