电脑故障

位置:IT落伍者 >> 电脑故障 >> 浏览文章

使用Jetty和DWR创建伸缩性Comet程序


发布日期:2020/1/6
 

Philip McCarthy(ph)

异步服务器端事件驱动的Ajax程序很难实现也很难获得伸缩性在Java+developers: target=blank>作者的系列文章里Plilip McCarthy展示了一个有效的方式:

Comet模式允许您push数据到客户端而且Jetty 的Continuations API让您的Comet程序对大量客户端获得高可伸缩性您可以方便的同DWR 使用Comet和Continuations

随着Ajax在Web程序开发技术里建立了牢固的位置出现了几种常见的Ajax使用模式例如Ajax通常用于响应用户输入来使用新数据修改局部页面但有时候Web程序的用户界面需要根据偶尔的异步服务器端事件来更新而不需要用户动作 例如在Ajax聊天程序里显示其他用户输入的一条新消息由于Web浏览器和服务器间的HTTP连接只能由浏览器建立服务器不能更改数据到浏览器

Ajax程序有两个解决该问题的基本方式:浏览器每隔几秒请求服务器来获得更改或者服务器维持与浏览器的连接并且传递数据长连接技术称为Comet本文展示了怎样使用Jetty服务器引擎和DWR来实现简单而高效的Comet Web程序

为什么要Comet?

轮询方式的主要缺点是在大量客户端时产生了大量的传输浪费每个客户端都必须有规律的请求服务器来获得更改这是服务器资源的一个重担最坏的情况是程序很少更新例如Ajax邮件收件箱在这种情况下大量的客户端轮询是多余的服务器仅仅简单的响应没有数据 可以通过增加轮询间隔时间来减轻服务器负荷但是这引入了服务器事件和客户端知晓之间的延迟当然一个合理的折衷方案可以多数程序适用并且轮询的工作方式也可以接受

然而对Comet策略的呼唤来自它可感知的高效客户端不会产生轮询方式特有的传输浪费一旦事件发生就会被发布到客户端但是维持长连接也消耗了服务器资源当servlet位置持久的请求在等候状态时servlet独占一个线程这样传统的servlet引擎就限制了Comet的伸缩性因为客户端的数量会迅速超过服务器栈可以有效处理的线程的数量

Jetty 有什么不同

Jetty 设计来处理大量并发连接它使用Java语言的不堵塞I/O(javanio)库并且使用优化的输出缓沖架构Jetty也有一个处理长连接的杀手锏:一个称为Continuations的特性我将用一个接收请求然后等待两秒发送响应的简单servlet来示范Continuations然后我将展示当服务器拥有更多的客户端时将发生什么最后我将使用Continuations重新实现servlet并且您将看到它们的不同

为了让它更简单我将限制Jetty servlet引擎为一个单一的请求处理线程列表显示了相关的jettyxml配置事实上我需要允许在ThreadPool里总共有个线程:Jetty服务器本身使用一个HTTP连接器使用一个来监听进来的请求最后剩一个线程来执行servlet代码

列表 单一servlet线程的Jetty配置

cellPadding= width= align=center bgColor=#fff border= heihgt=>

代码

<?xml version=?>

<!DOCTYPE Configure PUBLIC //Mort Bay Consulting//DTD Configure//EN

>

<Configure id=Server class=orgmortbayjettyserver>

<Set name=ThreadPool>

<New class=orgmortbaythreadBoundedThreadPool>

<Set name=minThreads></Set>

<Set name=lowThreads></Set>

<Set name=maxThreads></Set>

</New>

</Set>

</Configure>

下一步为了模仿异步事件列表显示了BlockingServlet的service()方法它简单的使用Threadsleep()调用来在完成前暂停毫秒同时它也在执行开始和结束时输出系统时间为了帮助区分不同请求的输出它也把一个请求参数作为标识符记录到日志

列表 BlockingServlet

cellPadding= width= align=center bgColor=#fff border= heihgt=>

代码

public class BlockingServlet extends HttpServlet {

public void service(HttpServletRequest req HttpServletResponse res) throws javaioIOException {

String reqId = reqgetParameter(id);

ressetContentType(text/plain);

resgetWriter()println(Request: + reqId + \tstart:\t + new Date());

resgetWriter()flush();

try {

Threadsleep();

} catch (Exception e) {}

resgetWriter()println(Request: + reqId + \tend:\t + new Date());

}

}

现在您可以观察几个同步请求下servlet的行为列表显示了使用lynx的个并行请求时控制台的输出命令行简单的启动个lynx进程加上一个标识符序数到请求的URL

列表 到BlockingServlet的几个并发请求的输出


cellPadding= width= align=center bgColor=#fff border= heihgt=>

代码

$ for i in seq ; do lynx dump localhost:/blocking?id=$i & done

Request: start: Sun Jul :: BST

Request: end: Sun Jul :: BST

Request: start: Sun Jul :: BST

Request: end: Sun Jul :: BST

Request: start: Sun Jul :: BST

Request: end: Sun Jul :: BST

Request: start: Sun Jul :: BST

Request: end: Sun Jul :: BST

Request: start: Sun Jul :: BST

Request: end: Sun Jul :: BST

列表的输出并不惊奇由于Jetty只有一个线程来执行servlet的service()方法Jetty将每个请求列队并按顺序服务时间戳显示了在一个应答分派给一个请求(以及end消息)后servlet开始处理下一个请求(下一个start消息)所以即使所有的个请求是同时发出的最后的那个请求必须等待秒才能得到处理

现在看看Jetty 的Continuations特性在这种情形下是多么的有用列表显示了列表的BlockingServlet使用Continuations API重写后的样子我将在后面解释代码

列表 ContinuationServlet

cellPadding= width= align=center bgColor=#fff border= heihgt=>

代码

public class ContinuationServlet extends HttpServlet {

public void service(HttpServletRequest req HttpServletResponse res) throws javaioIOException {

String reqId = reqgetParameter(id);

Continuation cc = ContinuationSupportgetContinuation(req null);

ressetContentType(text/plain);

resgetWriter()println(Request: + reqId + \tstart:\t + new Date());

resgetWriter()flush();

ccsuspend();

resgetWriter()println(Request: + reqId + \tend:\t + new Date());

}

}

列表显示了对ContinuationServlet作个并发请求时的输出可以和列表比较一下

列表 到ContinuationServlet的几个并发请求的输出

cellPadding= width= align=center bgColor=#fff border= heihgt=>

代码

$ for i in seq ; do lynx dump localhost:/continuation?id=$i & done

Request: start: Sun Jul :: BST

Request: start: Sun Jul :: BST

Request: end: Sun Jul :: BST

Request: start: Sun Jul :: BST

Request: start: Sun Jul :: BST

Request: end: Sun Jul :: BST

Request: start: Sun Jul :: BST

Request: start: Sun Jul :: BST

Request: end: Sun Jul :: BST

Request: start: Sun Jul :: BST

Request: start: Sun Jul :: BST

Request: end: Sun Jul :: BST

Request: start: Sun Jul :: BST

Request: start: Sun Jul :: BST

Request: end: Sun Jul :: BST

在列表里有两件重要的事情值得注意首先每个start消息出现了两次暂时不要担心这点其次更重要的是现在请求是并发处理的没有排队注意所有的start和end消息时间戳是一样的因此没有哪个请求耗时超多两秒即使只有单一的servlet线程在运行

深入Jetty的Continuations机制

理解Jetty的Continuations机制的将解释您在列表里看到的东西为了使用ContinuatinsJetty必须配置为使用它的SelectChannelConnector处理请求这个connector构建在javanio API之上允许它维持每个连接开放而不用消耗一个线程当使用SelectChannelConnector时ContinuationSupportgetContinuation()提供一个SelectChannelConnectorRetryContinuation实例(但是您必须针对Continuation接口编程)当在RetryContinuation上调用suspend()时它抛出一个特殊的运行时异常 RetryRequest该异常传播到servlet外并且回溯到filter链最后被SelectChannelConnector捕获但是不会发送一个异常响应给客户端而是将请求维持在未决 Continuations队列里则HTTP连接保持开放这样用来服务请求的线程返回给ThreadPool然后又可以用来服务其他请求

暂停的请求停留在未决 Continuations队列里直到指定的过期时间或者在它的Continuation上调用resume()方法当任何一个条件触发时请求会重新提交给servlet(通过filter链)这样整个请求被重播直到RetryRequest异常不再抛出然后继续按正常情况执行

列表里的输出现在应该能理解了对每个请求按顺序进入到servlet的service()方法start消息发送给应答然后Continuation的suspend()方法保留servlet然后释放线程来开始下一请求所有的个请求迅速运行service()方法的第一部分并马上进入暂停状态所有的start消息在几毫秒内输出两秒后suspend()过期第一个请求从未决队列里重新得到并重新提交给ContinuationServletstart消息第二次输出对suspend()方法的第二次调用立即返回然后end消息被发送给应答然后servlet代码执行下一个队列请求以此类推

所以在BlockingServlet和ContinuationServlet两种情况下请求被排入队列来访问单一的servlet线程尽管如此在BlockingServlet里的两秒钟暂停在servlet线程里执行时ContinuationServlet的暂停发生于servlet外面的SelectChannelConnector里ContinuationServlet全部的吞吐量会更高因为servlet线程不会在sleep()调用时阻碍大多数时间

让Continuations变得有用

现在您已经看到Continuations运行servlet请求暂停而不消耗线程我需要多解释一下Continuations API来展示怎样使用Continuations达到特殊的目的

一个resume()方法和一个suspend()方法配对您可以认为它们是标准的Object wait()/notify()机制的Continuations等价物suspend()维持一个Continuation直到过期或者另一个线程调用resume()suspend()/resume()方法是使用Continuations实现真实的Comet风格服务的关键所在基本的模式是从当前请求维持Continuation调用suspend()然后等待直到您的异步时间到达然后调用resume()并生成应答

但是不像编程语言里真实的语言级continuations如Scheme或Java语言里的wait()/notify()在Jetty Continuation上调用resume()并不意味着代码执行于它停止的确切位置您已经看到真正发生的是与Continuation相关的请求被重播这导致两个问题:列表的ContinuationServlet里代码不合需要的重新执行以及丢失状态 暂停时作用域里的任何东西都丢失了

第一个问题的解决方案是isPending()方法如果isPending()方法的返回值为true这意味着suspend()在前面已经被调用过了并且二次请求的执行不会再次接触suspend()方法换句话说给您的suspend()调用前的代码加上isPending()条件可以确保它只被执行一次Continuation也提供了一个简单的机制来保持状态:putObject(Object)和getObject()方法使用它们来维持一个context对象这样当Continuation暂停时任何您需要维持的状态都可以得到保护您也可以使用该机制作为一种在线程之间传递事件数据的方法后面您将看到

写一个基于Continuations的程序

作为一个真实世界里的例子我将开发一个基本的GPS坐标跟蹤Web程序它将在无规律间隔内生成随机的纬度经度对假设生成的坐标可以为附近的公众移动位置如拿着GPS设备马拉松运动员成队的汽车或者运输中的包裹位置有意思的部分在于我怎样告诉浏览器坐标信息显示了这个简单的GPS跟蹤程序的类图:

显示GPS跟蹤程序主要组件的类图

http://imgeducitycn/img_///gif border=>

首先该程序需要生成坐标的一些东西这是RandomWalkGenerator的工作从一个初始坐标开始每次对它的私有方法generateNextCoord()的调用都从该位置随机走一步并返回一个GpsCoord对象当初始化时RandomWalkGenerator创建一个线程该线程在随机间隔内调用generateNextCoorld()方法并发送生成的坐标给任何使用addListener()注册自己的CoordListener实例

列表显示了RandomWalkGenerator的循环逻辑:

列表 RandomWalkGenerator的run()方法

cellPadding= width= align=center bgColor=#fff border= heihgt=>

代码

public void run() {

try {

while (true) {

int sleepMillis = + (int)(Mathrandom()*d);

Threadsleep(sleepMillis);

dispatchUpdate(generateNextCoord());

}

} catch (Exception e) {

throw new RuntimeException(e);

}

}

CoordListener是一个定义了onCoord(GpsCoord coord)方法的回调接口在例子中ContinuationBasedTracker类实现了CoordListenerContinuationBasedTracker的另外一个方法为getNextPosition(Continuation int)列表显示了这些方法的具体实现:

列表 ContinuationBasedTracker的内髒

cellPadding= width= align=center bgColor=#fff border= heihgt=>

代码

public GpsCoord getNextPosition(Continuation continuation int timeoutSecs) {

synchronized(this) {

if (!continuationisPending()) {

pendingContinuationsadd(continuation);

}

// wait for next update

continuationsuspend(timeoutSecs*);

}

return (GpsCoord)continuationgetObject();

}

public void onCoord(GpsCoord gpsCoord) {

synchronized(this) {

for (Continuation continuation : pendingContinuations) {

continuationsetObject(pgsCoord);

continuationresume();

}

pendingContinuationsclear();

}

}

当客户端在Continuation里调用getNextPosition()时isPending()方法检查这次请求不是重试然后添加它到一个等待坐标的Continuations集合里然后Continuation被暂停同时onCoord 当生成新坐标时调用 简单的循环每个未决Continuations为它们设置GPS坐标然后恢复它们然后每个重试的请求完成getNextPosition()的执行从Continuation得到GpsCoord并返回它给调用者注意这里需要同步不仅预防pendingContinuations集合里出现不一致的状态也确保了新添加的Continuation在它被暂停之前不会被恢复

谜题最后一部分是servlet代码本身显示于列表:

列表 GPSTrackerServlet实现


cellPadding= width= align=center bgColor=#fff border= heihgt=>

代码

public class GpsTrackerServlet extends HttpServlet {

private static final int TIMEOUT_SECS = ;

private ContinuationBasedTracker tracker = new ContinuationBasedTracker();

public void service(HttpServletRequest req HttpServletResponse res) throws javaioIOException {

Continuation c ContinuationSupportgetContinuation(req null);

GpsCoord position = trackergetNextPosition(c TIMEOUT_SECS);

String json = new Jsonifier()toJson(position);

resgetWriter()print(json);

}

}

您可以看到servlet所做很少它简单的维持请求的Continuation调用getNextPosition()转换GPSCoord为JavaScript Object Notation(JSON)并输出这里不需要防止任何代码重执行所以我不需要检查isPending()列表显示了对GpsTrackerServlet的调用的输出使用服务器可得到的单一线程上的个并发请求

列表 GPSTrackerServlet输出

cellPadding= width= align=center bgColor=#fff border= heihgt=>

代码

$ for i in seq ; do lynx dump localhost:/tracker & done

{ coord : { lat : lng : } }

{ coord : { lat : lng : } }

{ coord : { lat : lng : } }

{ coord : { lat : lng : } }

{ coord : { lat : lng : } }

这个例子不是很引人注目但却是概念的证明在请求分派后它们被维持开发几秒钟直到生成坐标这时迅速产生应答这是Comet模式的基本原理使用Jetty单一线程处理个并发请求感谢Continuations

创建一个Comet客户端

现在您已经看到Continuations怎样用于创建非阻塞Web服务您可能想知道怎样创建客户端代码来使用它一个Comet客户端需要:

维持一个XMLHttpRequest连接直到接收应答

分派应答给合适的JavaScript处理者

立即建立一个新连接

更高级的Comet可以在客户端和服务器使用合适的路由机制来使用一个连接来从多个不同的服务推数据到浏览器一个可能性为使用JavaScript库如Dojo等写客户端代码来提供基于Comet的请求机制形如et

尽管如此如果您正在使用Java作为服务器语言在客户端和服务器端得到高级Comet支持的更好的方式是使用DWR 如果您不熟悉DWR您可以该系列的第部分Ajax with Direct Web RemotingDWR透明的提供一个HTTPRPC传输层暴露您的Java对象来使用JavaScript代码调用DWR生成客户端代理自动marshall和unmarshall数据处理安全问题提供一个便利的客户端辅助库并且对所有主要的浏览器工作

DWR :反转Ajax

DWR 新引入的概念为反转Ajax该机制将服务端事件给客户端客户端DWR代码透明的处理连接建立和应答解析所以从开发人员的角度来看事件可以从服务端Java代码简单的发布到客户端

DWR可以配置使用个不同的机制来反转Ajax一种是我们熟悉的轮询方式第二种方式称为piggyback它不创建任何到服务器的连接而是等待直到另一个DWR服务调用发生并piggyback未决事件到该请求应答这可以获得高效率但是意味着客户端事件通知被延迟直到客户端作出一个不相干的调用最后一种机制使用Comet风格的长连接最好的是当DWR运行在Jetty下并且使用Continuations来获得非阻塞Comet时可以自动检测事件

我将修改我的GPS例子来使用DWR 反转Ajax同时您将看到反转Ajax怎样工作的更多细节

我不再需要我的servletDWR提供了一个controller servlet它协调客户端请求直接访问Java对象我也不再需要显示处理Continuations因为DWR在幕后处理了这些所以我只需要一个新的CoordListener实现来发布坐标更新到任何客户端浏览器

一个称为ServerContext的接口提供DWR的反转Ajax魔法ServerContext知道当前查看一个给定页面的所有Web客户端并且可以提供一个ScriptSession来与每个客户端交流ScriptSession用来从Java代码推JavaScript片段到客户端列表显示了ReverseAjaxTracker怎样响应坐标通知以及使用它们来生成客户端updateCoordinate()方法调用注意如果一个合适的转换器是可用的则DWR的ScriptBuffer对象的appendData()调用会自动marshall一个Java对象到JSON

列表 ReverseAjaxTracker里的通知回调方法

cellPadding= width= align=center bgColor=#fff border= heihgt=>

代码

public void onCoord(GpsCoord gpsCoord) {

// Generate JavaScriptcode to call clientside

// function with coord data

ScriptBuffer script = new ScriptBuffer();

scriptappendScript(updateCoordinate()appendData(gpsCoord)appendScript(););

// Push script out to clients viewing the page

Collection<ScriptSession> sessions = sctxgetScriptSessionsByPage(pageUrl);

for (ScriptSession session : sessions) {

sessionaddScript(script);

}

}

下一步DWR必须配置来知道ReverseAjaxTracker在更大的程序里DWR的Spring集成可以使用Spring创建的beans来提供DWR但是这里我将仅仅让DWR创建一个新的ReverseAjaxTracker实例并把它放在application作用域里所有后续的DWR请求将访问这个单一的实例

我也需要告诉DWR怎样从GpsCoord beans来marshall数据到JSON由于GpsCoord是一个简单对象DWR基于反射的BeanConverter足够

列表显示了ReverseAjaxTracker配置

列表 ReverseAjaxTracker的DWR配置

cellPadding= width= align=center bgColor=#fff border= heihgt=>

代码

<dwr>

<allow>

<create creator=new javascrit=Tracker scope=application>

<param name=class value=developerworksjettygpstrackerReverseAjaxTracker/>

</create>

<convert converter=bean match=developerworksjettygpstrackerGpsCoord/>

</allow>

</dwr>

create元素的javascript元素指定了DWR用来暴露tracker作为一个JavaScript对象的名字但是在这里我的客户端代码不会使用它而是从tracker推数据给它同时也需要在webxml里做一些额外的配置来让DWR使用反转Ajax见列表

列表 DwrServlet的webxml配置


cellPadding= width= align=center bgColor=#fff border= heihgt=>

代码

<servlet>

<servletname>dwrinvoker</servletname>

<servletclass>

orgdirectwebremoteingservletDwrServlet

</servletclass>

<initparam>

<paramname>activeReverseAjaxEnabled</paramname>

<paramvalue>true</paramvalue>

</initparam>

<initparam>

<paramname>initApplicationScopeCreatorsAtStartup</paramname>

<paramvalue>true</paramvalue>

</initparam>

</servlet>

第一个servlet initparamactiveReverseAjaxEnabled激活轮询和Comet功能第二个initApplicationScopeCreatorsAtStartup告诉DWR当程序开始时初始化ReverseAjaxTracker这会覆盖通常在bean上作第一次请求时的延迟初始化行为 在这里这是很有必要的因为客户端从不在ReverseAjaxTracker上调用方法

最后我需要实现从DWR调用的客户端JavaScript方法回调方法updateCoordinate()被传递一个JSON形式的GpsCoord对象它由DWR的BeanConverter自动序列化这个方法仅仅从坐标提取longitude和latitude域并通过DOM调用添加它们到一个列表里这在列表里显示了同我的页面的onload方法一起onload包含对dwrenginesetActiveReverseAjax(true)这告诉DWR打开一个到服务器的持久的连接来等待回调

列表 反转Ajax GPS跟蹤的客户端实现

cellPadding= width= align=center bgColor=#fff border= heihgt=>

代码

windowonload = function() {

dwrenginesetActiveReverseAjax(true);

}

function updateCoordinate(coord) {

if (coord) {

var li = documentcreateElement(li);

liappendChild(documentcreateTextNode(coordlongitude + + coordlatitude));

documentgetElementById(coords)appendChild(li);

}

}

现在我可以让我的浏览器访问跟蹤程序页面当坐标数据开始生成时DWR将开始推数据到客户端这个实现将简单的输出一个生成的坐标列表见图:

ReverseAjaxTracker输出

http://imgeducitycn/img_///jpg border=>

使用反转Ajax创建一个事件驱动的Ajax程序是如此简单记住感谢DWR对Jetty Continuations的使用当等待新事件到达时线程不会阻塞在服务器

据此很容易从Yahoo!或者Google集成一个地图窗口部件通过改变客户端回调方法坐标可以简单的传递到地图API而不是直接添加到页面显示了在这样的一个地图组件上DWR反转Ajax GPS跟蹤程序描绘的随机路线:

使用地图UI的ReverseAjaxTracker

http://imgeducitycn/img_///jpg border=>

结论

现在您看到了Jetty Continuations联合Comet可以提供一个高效的可伸缩的事件驱动Ajax程序的解决方案我没有给出Continuations的伸缩性的图因为性能在真是世界里取决于许多变数服务器硬件操作系统的选择JVM实现Jetty配置您的Web程序的设计和传输效率在负荷下都会影响Jetty Continuations的性能尽管如此Webtide的Greg Wilkins(首要的Jetty开发者) 发布了一个比较Jetty 集成Continuations与不集成Continuations的Comet程序处理并发请求时的性能的白皮书在Greg的测试里使用Continuations并去掉了线程消费和栈内存消费使用大于的因数

您也看到了使用DWR的反转Ajax技术实现事件驱动的Ajax程序是多么容易DWR不仅节省您的客户端和服务端代码反转Ajax也将整个服务器推机制从您的代码中抽象出来您可以随意转换您的Comet方式:轮询或者piggyback方式只需简单的更改DWR配置您可以随意试验并找到适合您的程序的最佳策略而不会影响您的代码

关于作者

Philip McCarthy是伦敦的一位软件开发顾问专于Java和Web技术

上一篇:禁止用右键查看源代码

下一篇:Vector还是ArrayList这是个问题