顾名思义微软消息队列(MSMQ)是一种给队列发送消息以便稍后进行处理的方法消息由一个Producer(生产者)应用程序发送出去再由一个Consumer(消费者)应用程序返回
这两个应用程序可以在同一台机器上在整个网络中或甚至是位于并不总是连接在一起的不同机器上MSMQ具有故障保险特性因为如果第一次传送失败它会重新发送消息这样可保证你的应用程序消息到达它们的目的地
我将应用一个叫做TechRepublic的队列当你运行本文下载版本中的样本实例时如果这个队列不存在它会自动建立
在前面的一篇文章中Zach Smith说明了如何使用IPC通道在同一台机器上的两个进程间通信他将在本文中说明如何在同一台机器或网络上的应用程序间实现进程间通信
访问MSMQ
通过NET访问队列由SystemMessagingMessageQueue对象完成列表A说明了如何在一台名为SRVMESSAGING的计算机上访问TechRepublic队列
列表A
MessageQueue queue = new MessageQueue(SRVMESSAGINGTechRepublic);
注要应用这个对象你必须在你的项目中添加一个参考
现在我们有了一个MessageQueue对象这个对象为你提供与队列交互需要的所有功能
如果队列不存在你可以调用MessageQueue对象的静态Create方法编程建立队列列表B中的代码说明如何检查队列是否存在建立队列或给队列添加一个参考
列表B
MessageQueue queue = null;
string queueName = SRVMESSAGINGTechRepublic;
if (MessageQueueExists(queueName))
queue = newMessageQueue(queueName);
else
queue = MessageQueueCreate(queueName false);
改写队列
改写队列时用到MessageQueueSend方法列表C举例说明如何向TechRepublic队列发送一条消息
列表C
queueSend(My message body Message Label);
在这个例子中我们给TechRepublic队列发送一条正文为My message body的消息并对这个消息应用了一个Message Label标签消息标签允许你不需阅读消息正文就可以分割消息如果从计算机管理控制台中查看队列还可在队列消息部分看到这些标签
读取队列
可以使用几种方法从队列中读取消息最常见的情况是从队列中取出所有消息然后一次性处理它们这时要调用MessageQueueGetAllMessages方法列表D举例说明如何应用这个方法
列表D SystemMessagingMessage[] messages = queueGetAllMessages();
foreach (SystemMessagingMessage message in messages)
{
//Do something with the message
}
你也可以用GetMessageEnumerator方法代替上面的MessageQueueGetAllMessages方法虽然这两个方法的用法类似但GetMessageEnumerator只能向前(forwardonly)对于非常庞大的队列则应用使用这个方法而不是MessageQueueGetAllMessages方法
这是因为GetAllMessages方法领取所有消息把它们保存在当地内存中而GetMessageEnumerator方法只领取当前消息在本地保存在调用MoveNext时才领取下一条消息列表E举例说明了GetMessageEnumerator方法的用法这段代码检查队列中的每一条消息再删除它
列表E
MessageEnumerator enumerator = queueGetMessageEnumerator();
while (enumeratorMoveNext())
enumeratorRemoveCurrent();
在使用GetMessageEnumerator方法时还要考虑另外一个问题即你要访问队列中增加的任何新消息即使它们是在你调用GetMessageEnumerator后再增加的这假定新消息被添加到队列末尾
如果你只希望返回队列中的第一条消息你应该使用MessageQueueReceive方法这个方法会领取队列中的第一条消息在这个过程中将它从队列中删除由于消息在读取的时候被删除你可以确保你的进程是唯一收到消息的进程Receive方法的应用实例如列表F所示
列表F
SystemMessagingMessage message = queueReceive();
可以用Peek方法代替Receive方法Peek方法像Receive方法一样领取队列中的第一条消息但是它在队列中保留消息备份这允许你从队列中删除消息之前检查消息内容Peek的语法与Receive类似
列表G
SystemMessagingMessage message = queuePeek();
发送/接收序列化对象
虽然给队列发送文本的功能非常有用但队列还允许你发送可序列化对象这意味着你可以建立一个自定义的NET类实例化它的一个实例将其发送给队列以便其它应用程序使用要完成这个过程首先得使用XML Serializer序列化被发送的对象然后对序列化对象放到消息的正文中
例如假设我们希望给TechRepublic消息队列发送以下对象(列表H)
列表H
[Serializable()]
publicclassMessageContent
{
privateDateTime _creationDate = DateTimeNow;
privatestring _messageText;
public MessageContent()
{
}
public MessageContent(string messageText)
{
_messageText = messageText;
}
publicstring MessageText
{
get { return _messageText; }
set { _messageText = value; }
}
publicDateTime CreationDate
{
get { return _creationDate; }
set { _creationDate = value; }
}
}
给队列发送这个对象的一个实例只需简单调用MessageQueueSend方法并把一个对象实例作为参数提交给这个方法列表I说明了这种情况
列表I
MessageContent message = newMessageContent(Hello world!);
queueSend(message Sample Message);
如你所见上面的代码类似于我们前面发送正文为一个字符串的消息时使用的代码接收一个包含序列化对象的消息更加困难一些我们需要告诉消息它包含哪种对象
为向消息指出它包含哪种对象我们必须建立消息的格式化器(formatter)给消息的Formatter属性指定一个SystemMessagingXmlMessageFormatter对象即可建立格式化器由于我们的消息包含一个MessageContent对象我们希望为它配置XmlMessageFormatter
列表J
messageFormatter = new SystemMessagingXmlMessageFormatter(
newType[] { typeof(MessageContent) }
);
既然我们已经给消息指定了一个格式化器我们可以从消息中提取MessageContent对象但在这之前我们需要把messageBody属性的返回值分配给一个MessageContent对象
列表K
MessageContent content = (MessageContent)messageBody;
在这个例子中content变量是我们向队列发送的原始MessageContent对象的序列化版本我们可以访问原始对象的所有属性和值
设定消息优先级别
在正常情况下队列中的消息以先进先出的形式被访问这表示如何你先发送消息A再发送消息B那么队列将首先返回消息A然后才是消息B在多数情况下这样处理没有问题但是有时由于一条消息比其它消息更加重要你希望将它提到队列前面要实现这种功能你就需要设定消息优先级别
一条消息的优先级别由它的MessagePriority属性值决定下面是这个属性的所有有效值(全部来自MessagePriority的列举类型)
·最高(Highest)
·非常高(VeryHigh)
·高(High)
·高于正常级别(AboveNormal)
·正常(Normal)
·低(Low)
·非常低(VeryLow)
·最低(Lowest)
消息在队列中的位置由它的优先级别决定——例如假如队列中有四条消息两条消息的优先级别为正常(Normal)另两条为高(High)则队列中消息排列如下
·High Priority A——这是发送给队列的第一条高优先级消息
·High Priority B——这是发送给队列的第二条高优先级消息
·Normal Priority A——这是发送队列的第一条正常优先级消息
·Normal Priority B——这是发送队列的第二条正常优先级消息
根据这个顺序如果我们给队列发送另一条最高优先级的消息它将位于队列的顶部
如果需要使用消息优先级功能你必须修改发送消息的代码因为Message对象的构造器没有指定消息优先级别的功能你必须实例化一个Message对象并在将它发送给队列之前给它设定相应的属性列表L中的代码说明如何设定优先级别并给队列发送一条最高优先级别的消息
列表L
//Instantiate the queue
MessageQueue queue = newMessageQueue(queueName);
//Create a XmlSerializer for the object type were sending
XmlSerializer serializer = new
XmlSerializer(typeof(MessageContent));
//Instantiate a new message
SystemMessagingMessage queueMessage =
new SystemMessagingMessage();
//Set the priority to Highest
queueMessagePriority = MessagePriorityHighest;
//Create our MessageContent object
MessageContent messageContent =
newMessageContent(Hello world IMPORTANT!);
//Serialize the MessageContent object into the queueMessage
serializerSerialize(queueMessageBodyStream messageContent);
//Send the message
queueSend(queueMessage HIGH PRIORITY);
这段代码和上面代码的最明显区别在于它使用了XmlFormatter它实际是可选的列表L中的代码也可用列表M中的代码代替
列表M
//Instantiate a new message
SystemMessagingMessage queueMessage =
new SystemMessagingMessage();
//Set the priority to Highest
queueMessagePriority = MessagePriorityHighest;
//Create our MessageContent object
MessageContent messageContent =
newMessageContent(Hello world IMPORTANT!);
//Set the body as the messageContent object
queueMessageBody = messageContent;
//Send the message
queueSend(queueMessage HIGH PRIORITY);
这段代码执行和列表L中的代码相同的任务但代码更少
应用
输入消费者请求是MSMQ功能的一个简单实例消费者提出一个请求由一个面向消费者的应用程序将它送交给消息队列向队列发送请求后它会向消费者送出一个确认(acknowledgement)
然后一个独立的进程从队列中提取消息并运行任何所需的业务逻辑(business logic)完成业务逻辑后处理系统将向另一个队列提交一个响应接下来面向消费者的应用程序从队列中提取这个响应并给消费者返回一个响应
这种类型的配置能够加快面向消费者的应用程序的速度使其迅速做出反应同时在一个内部系统中完成大量处理工作这样还可以将请求处理分散到多台内部机器上提供可扩展性