电脑故障

位置:IT落伍者 >> 电脑故障 >> 浏览文章

基于WCF和MSMQ构建发布/订阅消息总线


发布日期:2024/7/25
 

本文是翻译Tom Hollander先生的blog文章《Building a Pub/Sub Message Bus with WCF and MSMQ》对英文blog文章感兴趣的朋友可以直接访问文章下面的链接译者开源论坛小组

这几年经常谈到采用事件驱动架构(eventdriven architecture)技术来构建可扩展的可维护的系统我发现在一些方案中这是非常有趣的模型但是在Microsoft平台上这种架构一直没有很好的支持因此许多人发现比较难以实现几年以前在一个系统中我采用Net Remoting / MSMQ / HTTP 构建了发布/订阅消息总线(Pub/Sub Message Bus)但是觉得不是很完美很多地方实现比较困难并且需要定制的代码如承载队列监听(host the queue listeners)编码/解码消息处理可靠性和管理订阅者等等

在我目前的项目中我再一次尝试了这一模型然而这几年技术发展有很大的变化我高兴的说我的体验比以前要好多了在说明我的方案之前我想提醒的一点是我仅仅是描述实现这一模型的一种方法在不同的应用需求情况下应该有其他更合适的方法因为我工作的项目是一个大的Net应用系统因此跨平台的互操作不需要考虑(真幸运!)

我们完成的项目构建在NET Framework 平台并使用了WCF / MSMQ / IIS 部署在Windows server 平台上

定义Service Contract

第一步定义Contact发布者用来通知订阅者发生事件在我们的项目中有许多不同的事件类型但是为了尽可能重用代码我们使用了泛型Service Contract

[ServiceContract]

public interface IEventNotification<TLog>

{

[OperationContract(IsOneWay = true)]

void OnEventOccurred(TLog value);

}

对于任一事件类型我们可以简单定义一个Data Contract 来负载数据(carry the payload)并提供一个继承的Service Contract 类型如下所示

[ServiceContract]

public interface IAccountEventNotification : IEventNotification<AccountEventLog>

{

}实现发布者(Publisher)

发布/订阅模型中最重要的一点是发布和订阅方应该是低耦合的尤其是发布者不应该知道订阅者的任何事情包括多少个订阅者和订阅者在哪里起初我们试图采用MSMQ的PGM多播特性来实现 – 实质上让你定义单个队列地址悄悄地路由相同的消息到多个目标队列虽然这可以满足要求但是这一方案也存在一些缺点第一在WCF中使用多播队列地址的唯一方式是采用MsmqIntegrationBindingMsmqIntegrationBinding没有NetMsmqBinding灵活其次多播地址仅仅适用非事务队列(nontransactional queues)这样对我们的系统产生不能接受的可靠性影响

因此我们放弃了这一方案决定在发布者实现我们自己的轻量级多播技术上而言这样打破了发布者完全不了解订阅者这一黄金规则所有订阅者的信息完全存放在配置文件中这意味着我们增加修改或删除订阅者完全不影响代码

我们开发一个组件ServiceFactory(与p&p Web Service Software Factory 没有关系)ServiceFactory只是一个简单的抽象通过读取配置文件来创建本地或WCF实例ServiceFactory组件没有对外公布但是你可以简单替换为你喜欢的依赖注入框架(Dependency Injection Framework)达到相同的效果在我们的系统中Web Services的配置文件nfig 可能有如下定义的依赖服务

<serviceFactory> <services> <addname=EmailUtility contract=MyProjectIEmailUtility MyProject type=MyProjectEmailUtility MyProject mode=SameAppDomain instanceMode=Singleton enablePolicyInjection=false /> <addname=SubsctiberXAccountEventNotificationcontract=MyProjectContractsIAccountEventNotification MyProjectContractsmode=Wcfendpoint=SubsctiberXAccountEventNotification /> <addname=SubsctiberYAccountEventNotification contract=MyProjectContractsIAccountEventNotification MyProjectContracts mode=Wcf endpoint=SubsctiberYAccountEventNotification /> </services> </serviceFactory>

我们使用ServiceFactory创建单个实例代码如下

IEmailUtility email = ServiceFactoryGetService<IEmailUtility>();

根据上面的配置文件上述代码将获得一个本地EmailUtility单件实例但是不同的配置可以返回一个WCF proxy 代理类实例可以非常方便重用ServiceFactory组件返回所有配置的匹配特定Contract的服务我们将根据这些构建NotificationPublisher类代码如下

public class NotificationPublisher<TInterface TLog>

where TInterface : class IEventNotification<TLog>

{

public static void OnEventOccurred(TLog value)

{

List<TInterface> subscribers = ServiceFactoryGetAllServices<TInterface>();

foreach (TInterface subscriber in subscribers)

{

subscriberOnEventOccurred(value);

}

}

}

根据上述代码发布者发布事件所需要做的是传入合适的泛型参数实例化NotificationPublisher对象并调用OnEventOccured 方法假定我们使用IAccountEventNotification 接口和上述配置这样事件将通过WCF到达SubscriberXAccountEventNotification 和 SubscriberYAccountNotification 端点并触发相关事件

配置发布者

发布端最后一部分是WCF配置如上述所提及的我们选择使用MSMQ提供可靠的异步的消息传递过去编写MSMQ代码比较困难但是对WCF编程模型而言MSMQ与其他传输协议没什么区别在我们的案例中我们选择了NetMsmqBindingNetMsmqBinding 为核心MSMQ特性提供了全面访问WCF功能(与MsmqIntegrationBinding不同MsmqIntegrationBinding提供了更丰富的MSMQ支持但是限制了WCF功能)

如下是客户端的WCF的配置示例

<systemserviceModel> <bindings> <netMsmqBinding> <bindingname=TransactionalMsmqBindingexactlyOnce=truedeadLetterQueue=system /> </netMsmqBinding> </bindings> <client> <endpointname=SubscriberXAccountEventNotification address=netmsmq://localhost/private/SubscriberX/accounteventnotificationsvc binding=netMsmqBindingbindingConfiguration=TransactionalMsmqBinding contract=MyProjectContractsIAccountEventNotification /> <endpointname=SubscriberYAccountEventNotification address=netmsmq://localhost/private/SubscriberY/accounteventnotificationsvc binding=netMsmqBindingbindingConfiguration=TransactionalMsmqBinding contract=MyProjectContractsIAccountEventNotification /> </client> </systemserviceModel>

上述配置没什么特别的地方 – 需要关注的是 exactlyOnce=true 设置这是事务队列必须的设置另外就是 netmsmq:// 地址语法这是NetMsmqBinding 协议所需要的私有队列分别为 SubscriberX/accounteventnotificationsvc 和 SubscriberY/accountnotificationsvc为什么我给队列这样愚蠢的命名呢?继续读下面内容

承载和配置订阅者

在过去如果说创建MSMQ客户端是烦人的那么创建MSMQ服务更是噩梦你不得不创建你自己的host(一般而言为Windows Service)或者使用一些灵活的MSMQ触发器功能然后你需要做很多工作确保你的服务没有丢失消息或者没有被poison messages所阻塞因为poison message错误的消息体(malformed payload)会不断导致你的服务失败

就像在客户端一样WCF需要很多工作在服务端 – 但是这些不是直接帮助承载服务和监听队列幸运的是这一问题由Windows Vista和Windows server 中提供的IIS 和Windows Activation Services (WAS) 轻松解决IIS 负责监听MSMQ / tcp / Named Pipes并且激活WCF 服务就像 IIS 监听HTTP一样听起来不错但是需要提醒的是 – 这些需要灵巧的配置

首先你需要在IIS中设置application 指向service包括svc文件和nfig配置文件这与在IIS 通过HTTP部署service一样

接着你需要创建消息队列你可以通过Vista的Computer Management console 或Windows server 的Server Manager配置队列的名称必须匹配application name 加上svc 文件名例如 SubscriberX/accounteventnotificationsvc在创建队列时确保标记队列支持事务因为随后不能改变你也需要设置队列的权限这样运行NetMsmq Listener 服务的帐号(缺省为NETWORK SERVICE)能够接收消息任何运行Client/Publisher 都能够发送消息(缺省为NETWORK SERVICE)

最后你需要配置IIS和WAS 的站点和特定application支持NetMsmq 监听器(在开始操作之前确保你已经安装了WAS和nonHTTP激活windows组件)最简单的办法是使用appcmdexe 命令行(\system\inetsrv目录下)

appcmd set site Default Web Site +bindings[protocol=netmsmqbindingInformation=localhost]

appcmd set app Default Web Site/SubscriberX /enabledProtocols:netmsmq

配置好IIS后接下来是确保service的WCF配置是正确的如同你期望的那样这将与客户端的配置非常相似

<systemserviceModel> <bindings> <netMsmqBinding> <bindingname=TransactionalMsmqBindingexactlyOnce=truedeadLetterQueue=systemreceiveErrorHandling=Move/> </netMsmqBinding> </bindings> <services> <servicename=SubscriberXNotificationService> <endpointcontract=MyProjectContractsIAccountEventNotification bindingConfiguration=TransactionalMsmqBinding binding=netMsmqBinding address=netmsmq://localhost/private/SubscriberX/accounteventnotificationsvc/> </service> </services> </systemserviceModel>

值得说明的一点是receiveErrorHandling=Move这一属性可以帮助节省我们一个月的工作这一属性让WCF转移多次重复处理失败的消息到MSMQ的子队列poison然后继续处理下一个消息而不是阻塞服务这一子队列和远程队列事务性读取等待功能是Vista 和 windows server 中MSMQ 的一些新特性

实现订阅者(Subscribers)

最后一件事是实现订阅者当然最多的代码是特定业务逻辑的实现因为我仅仅描述service interface的实现在我们的系统中确保没有消息丢失是非常重要的既然MSMQ能够确保消息的到达因此消息不会无缘故的消失事实上大多数消息的丢失是在MSMQ成功传递消息到达service 之后有可能service 接收到消息之后在service 成功处理消息之前发生异常导致失败(可能由于bug或配置问题)避免这一问题最好的办法是使用事务跨越从队列接收消息和业务逻辑的处理如果发生失败将回滚事包括从队列中接收的消息如果只是一个临时的小故障消息将再次被成功处理如果问题持续或是错误的消息在经过多次尝试后该消息将被认为是poison 消息如前面提及的该消息将被转移到poison 子队列由管理员手动处理

上述所有的工作非常简单因为MSMQ和WCF支持所有这些特性(假定你使用事务性队列)你需要做的工作是由一些attributes标记你的服务实现声明当消息从队列取出后业务逻辑应该登记事务

public class NotificationService : IAccountEventNotification

{

[OperationBehavior(TransactionScopeRequired = true TransactionAutoComplete = true)]

public void OnEventOccurred(AccountEventLog value)

{

// Businessspecific logic

}

}

如对上述solution 的实现有疑问或改进建议欢迎到我们的论坛( )进行交流

结论

这是我最近最长的blog文章之一这一解决方案非常强大且特别简单去实现这是由于WCF / MSMQ / IIS 技术的先进性在过去许多人(包括我)花了几个月的时间尽力去实现pub/sub模式往往不能达到预期的效果现在使用这些的技术消除了大量的定制代码事实上这篇文章中少量的代码和配置脚本实现了pub/sub模式

译者注

我们是 开源论坛小组负责 / YAF 开源ASPNET/C# 论坛的开发工作免费提供项目源代码下载欢迎您访问下载交流和学习包括Enterprise Message Bus / WCF / MSMQ / BizTalk / SSB / sql server / Net Framework 等等

下载 / YAF 开源论坛 v (ASPNET/C#)

x?g=posts&t=

英文原文

Building a Pub/Sub Message Bus with WCF and MSMQ by Tom Hollander

apubsubmessagebuswithwcfandmsmqaspx

上一篇:避免创建重复对象

下一篇:JBuilder X 初体验