Jms集群的意义在于提升系统在处理消息时的并发能力建立这样的集群有三个步骤
配置jms消息持久化所使用的数据库
配置分布式的jndi环境
配置分布式jms
在jboss 中系统采用hibernate的方式来保存消息所以能够兼容hibernate支持的所有数据库Jboss默认采用hsql在我们的例子中将使用oracle 首先需要配置连接到数据库的jndi数据源方法是把doc\examples\jca下的oracledsxml文件拷贝到server\all\farm下并且修改其中的参数保证数据库能够正确连接Cluster启动后该文件能够通过jboss的farm服务自动拷贝到其他集群节点并且自动部署假设jndi数据源的名称为GlobalDS
将doc\examples\jms下的oraclejdbcservicexml文件拷贝到server\all\deployhasingleton\jms目录下并且删除该目录下的hsqldbjdbcservicexml修改oraclejdbcservicexml在行左右指定name的值为数据源的名字GlobalDS这样系统会使用该数据源来保存jms消息使用如下命令启动boss:run ?c all
启动完成后正常情况下会发现oracle数据库中多出了三张表
Jms_message_log该表用于保存所有未处理的点对点消息表结构是
Messageid 消息id
Destination目的地
Txid事务id
Txop消息操作类型(a为新增d为删除)
Messageblob消息内容
JMS_REFERENCE_LOG用于保存所有未处理的topic消息表结构是
Messageid
Destination
Txid
Txop
Messageblob
Redelivered消息是否被重发
JMS_TRANSACTION_LOG用于保存处理消息过程中的一些重要的事务
需要注意的是jboss 之后就不在支持以文件形式保存消息虽然这样最会比数据库操作快一倍以上Jboss官方的解释是使用文件会让系统不可靠
客户端在发送jms消息的时候首先需要向app server查询jndi在jboss cluster中jndi是作为一个分布式的singleton出现的每个节点除了有自己的jndi环境以外整个cluster还具有一些全局的jndi客户端在进行jndi查询的时候只需要向这个全局的jndi进行查询cluster如果在全局jndi中找不到对应的jndi对象就会按次序向每个节点询问看他们的本地jndi中是否有匹配的对象如果有则返回给客户如果所有的节点都没有则抛出异常所有以all方式启动的jboss都会打开端口这个端口是全局jndi的入口所有节点都是如此
分布式的jndi有的节点有主次的区别第一个启动的jboss是主服务器它会保存所有的全局jndi其他的节点如果收到客户查询jndi的请求后都会向主服务器请求数据如果主服务器不幸down掉那么次节点会发现这个变化然后启动自己的jndi环境取代主服务器提供服务
下面是配置jms的jndi打开server\all\deployhasingleton\jms下的jbossmqdestinationsservicexml文件增加一个名为test的destination如下
<mbean code=orgjbossmqserverjmxQueue
name=jbossmqdestination:service=Queuename=test>
<depends optionalattributename=DestinationManager>jbossmq:service=DestinationManager</depends>
</mbean>
为了预防主服务器down了之后丢失该jndi所以最好在每个节点都进行这个配置
在jboss 的默认配置下是不支持消息bean的集群的要达到这个目的必须下载一个jar包才能实现可以从这里获得:
得到这个jar文件后将它命名为cdotjbossxjar
文件放到server\all\deploy\jms下下面编写消息bean它的功能很简单接收到来自test队列的消息后打印消息id
public class TestJmsBean
implements MessageDrivenBean MessageListener {
MessageDrivenContext messageDrivenContext;
public void ejbCreate() {
Systemoutprintln(消息bean创建);
}
public void ejbRemove() {
}
public void onMessage(Message msg) {
try
{
Systemoutprintln(msggetJMSMessageID());
}catch(Exception e)
{
eprintStackTrace();
}
}
public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) {
ssageDrivenContext = messageDrivenContext;
}
}
把这个消息bean部署到server\all\farm目录下它会被自动拷贝到cluster的其它节点并且被自动部署你会看到如下部署信息
educitycn/img_///jpg >上面显示通过farm的方式部署了一个名为GlobalDS的连接池以及一个名为TestJms的消息bean
下面写个客户端来测试一下
SimpleDateFormat sdf = new SimpleDateFormat(yyyyMMdd HH:mm:ss);
Properties p = new Properties();
pput(ContextINITIAL_CONTEXT_FACTORY
orgjnpinterfacesNamingContextFactory);
pput(ContextURL_PKG_PREFIXES jbossnaming:orgjnpinterfaces);
pput(ContextPROVIDER_URL :); // 全局jndi入口
InitialContext ctx = new InitialContext(p);
QueueConnectionFactory qcf = (QueueConnectionFactory) ctxlookup(
ConnectionFactory);
QueueConnection conn = qcfcreateQueueConnection();
Queue q = (Queue) ctxlookup(queue/test);//查询名为test的destination
QueueSession session = conncreateQueueSession(false
QueueSessionAUTO_ACKNOWLEDGE);
connstart();
QueueSender sender = sessioncreateSender(q);
for (int i = ; i < ; i++) {
TextMessage tm = sessioncreateTextMessage(sdfformat(new Date()));
sendersend(tm DeliveryModePERSISTENT );//发送持久化消息
Systemoutprint(第 + i);
}
connstop();
sessionclose();
connclose();
执行一下可以看到每个节点都创建了若干个消息bean同时在处理消息任意关闭一个次服务器系统会自动fail over查看Jms_message_log数据表里面没有任何数据表示所有的消息都已经被处理
Jboss的jms cluster功能与websphere mq比较起来是非常简陋的可以配置的地方也很少毕竟是免费的东西Jboss的论坛上透露在jboss 中将会有全新的jboss messaging服务不知要等到何年何月针对这个cluster我做过简单的测试万左右的消息数量无一丢失应该说还算比较可靠响应时间也还过的去在简单的网络环境下能够应付比较高的并发