c#

位置:IT落伍者 >> c# >> 浏览文章

基于消息与.Net Remoting的分布式处理架构


发布日期:2021年04月05日
 
基于消息与.Net Remoting的分布式处理架构

消息的定义

要实现进程间的通信则通信内容的载体——消息就必须在服务两端具有统一的消息标准定义从通信的角度来看消息可以分为两类Request Messge和Reply Message为简便起见这两类消息可以采用同样的结构

消息的主体包括IDName和Body我们可以定义如下的接口方法来获得消息主体的相关属性

public interface IMessage:ICloneable

{

IMessageItemSequence GetMessageBody();

string GetMessageID();

string GetMessageName();

void SetMessageBody(IMessageItemSequence aMessageBody);

void SetMessageID(string aID);

void SetMessageName(string aName);

}

消息主体类Message实现了IMessage接口在该类中消息体Body为IMessageItemSequence类型这个类型用于Get和Set消息的内容Value和Item

public interface IMessageItemSequence:ICloneable

{

IMessageItem GetItem(string aName);

void SetItem(string aNameIMessageItem aMessageItem);

string GetValue(string aName);

void SetValue(string aNamestring aValue);

}

Value为string类型并利用HashTable来存储Key和Value的键值对而Item则为IMessageItem类型同样的在IMessageItemSequence的实现类中利用HashTable存储了Key和Item的键值对

IMessageItem支持了消息体的嵌套它包含了两部分SubValue和SubItem实现的方式和IMessageItemSequence相似定义这样的嵌套结构使得消息的扩展成为可能一般的结构如下

IMessage——Name

——ID

——Body(IMessageItemSequence)

——Value

——Item(IMessageItem)

——SubValue

——SubItem(IMessageItem)

——……

各个消息对象之间的关系如下

在实现服务进程通信之前我们必须定义好各个服务或各个业务的消息格式通过消息体的方法在服务的一端设置消息的值然后发送并在服务的另一端获得这些值例如发送消息端定义如下的消息体

IMessageFactory factory = new MessageFactory();

IMessageItemSequence body = factoryCreateMessageItemSequence();

bodySetValue(namevalue);

bodySetValue(namevalue);

IMessageItem item = factoryCreateMessageItem();

itemSetSubValue(subnamesubvalue);

itemSetSubValue(subnamesubvalue);

IMessageItem subItem = factoryCreateMessageItem();

subItemSetSubValue(subsubnamesubsubvalue);

subItemSetSubValue(subsubnamesubsubvalue);

IMessageItem subItem = factoryCreateMessageItem();

subItemSetSubValue(subsubnamesubsubvalue);

subItemSetSubValue(subsubnamesubsubvalue);

itemSetSubItem(subitemsubItem);

itemSetSubItem(subitemsubItem);

bodySetItem(itemitem);

//Send Request Message

MyServiceClient service = new MyServiceClient(Client);

IMessageItemSequence reply = serviceSendRequest(TestServiceTestbody);

在接收消息端就可以通过获得body的消息体内容进行相关业务的处理

Net Remoting服务

Net中要实现进程间的通信主要是应用Remoting技术根据前面对消息的定义可知实际上服务的实现可以认为是对消息的处理因此我们可以对服务进行抽象定义接口IService:

public interface IService

{

IMessage Execute(IMessage aMessage);

}

Execute()方法接受一条Request Message对其进行处理后返回一条Reply Message在整个分布式处理架构中可以认为所有的服务均实现该接口但受到Remoting技术的限制如果要实现服务则该服务类必须继承自MarshalByRefObject同时必须在服务端被Marshal随着服务类的增多必然要在服务两端都要对这些服务的信息进行管理这加大了系统实现的难度与管理的开销如果我们从另外一个角度来分析服务的性质基于消息处理而言所有服务均是对Request Message的处理我们完全可以定义一个Request服务负责此消息的处理

然而Request服务处理消息的方式虽然一致但毕竟服务实现的业务即对消息处理的具体实现却是不相同的对我们要实现的服务可以分为两大类业务服务与Request服务实现的过程为首先具体的业务服务向Request服务发出Request请求Request服务侦听到该请求然后交由其侦听的服务来具体处理

业务服务均具有发出Request请求的能力且这些服务均被Request服务所侦听因此我们可以为业务服务抽象出接口IListenService

public interface IListenService

{

IMessage OnRequest(IMessage aMessage);

}

Request服务实现了IService接口并包含IListenService类型对象的委派以执行OnRequest()方法

public class RequestListener:MarshalByRefObjectIService

{

public RequestListener(IListenService listenService)

{

m_ListenService = listenService;

}

private IListenService m_ListenService;

#region IService Members

public IMessage Execute(IMessage aMessage)

{

return thism_ListenServiceOnRequest(aMessage);

}

#endregion

public override object InitializeLifetimeService()

{

return null;

}

}

在RequestListener服务中继承了MarshalByRefObject类同时实现了IService接口通过该类的构造函数接收IListService对象

由于Request消息均由Request服务即RequestListener处理因此业务服务的类均应包含一个RequestListener的委派唯一的区别是其服务名不相同业务服务类实现IListenService接口但不需要继承MarshalByRefObject因为被Marshal的是该业务服务内部的RequestListener对象而非业务服务本身

public abstract class Service:IListenService

{

public Service(string serviceName)

{

m_ServiceName = serviceName;

m_RequestListener = new RequestListener(this);

}

#region IListenService Members

public IMessage OnRequest(IMessage aMessage)

{

//……

}

#endregion

private string m_ServiceName;

private RequestListener m_RequestListener;

}

Service类是一个抽象类所有的业务服务均继承自该类最后的服务架构如下

我们还需要在Service类中定义发送Request消息的行为通过它才能使业务服务被RequestListener所侦听

public IMessageItemSequence SendRequest(string aServiceNamestring aMessageNameIMessageItemSequence aMessageBody)

{

IMessage message = m_FactoryCreateMessage();

messageSetMessageName(aMessageName);

messageSetMessageID();

messageSetMessageBody(aMessageBody);

IService service = FindService(aServiceName);

IMessageItemSequence replyBody = m_FactoryCreateMessageItemSequence();

if (service != null)

{

IMessage replyMessage = serviceExecute(message);

replyBody = replyMessageGetMessageBody();

}

else

{

replyBodySetValue(resultFailure);

}

return replyBody;

}

注意SendRequest()方法的定义其参数包括服务名消息名和被发送的消息主体而在实现中最关键的一点是FindService()方法我们要查找的服务正是与之对应的RequestListener服务不过在此之前我们还需要先将服务Marshal

public void Initialize()

{

RemotingServicesMarshal(thism_RequestListenerthism_ServiceName + RequestListener);

}

我们Marshal的对象是业务服务中的Request服务对象m_RequestListener这个对象在Service的构造函数中被实例化

m_RequestListener = new RequestListener(this);

注意在实例化的时候是将this作为IListenService对象传递给RequestListener因此此时被Marshal的服务对象保留了业务服务本身即Service的指引可以看出在Service和RequestListener之间采用了双重委派的机制

通过调用Initialize()方法初始化了一个服务对象其类型为RequestListener(或IService)其服务名为Service的服务名 + RequestListener而该服务正是我们在SendRequest()方法中要查找的Service

IService service = FindService(aServiceName);

下面我们来看看FindService()方法的实现

protected IService FindService(string aServiceName)

{

lock (thism_Services)

{

IService service = (IService)m_Services[aServiceName];

if (service != null)

{

return service;

}

else

{

IService tmpService = GetService(aServiceName);

AddService(aServiceNametmpService);

return tmpService;

}

}

}

可以看到这个服务是被添加到m_Service对象中该对象为SortedList类型服务名为KeyIService对象为Value如果没有找到则通过私有方法GetService()来获得

private IService GetService(string aServiceName)

{

IService service = (IService)ActivatorGetObject(typeof(RequestListener)

tcp://localhost:/ + aServiceName + RequestListener);

return service;

}

在这里ChannelIPPort应该从配置文件中获取为简便起见这里直接赋为常量

再分析SendRequest方法在找到对应的服务后执行了IService的Execute()方法此时的IService为RequestListener而从前面对RequestListener的定义可知Execute()方法执行的其实是其侦听的业务服务的OnRequest()方法

我们可以定义一个具体的业务服务类来分析整个消息传递的过程该类继承于Service抽象类

public class MyService:Service

{

public MyService(string aServiceName):base(aServiceName)

{}

}

假设把进程分为服务端和客户端那么对消息处理的步骤如下

在客户端调用MyService的SendRequest()方法发送Request消息

查找被Marshal的服务即RequestListener对象此时该对象应包含对应的业务服务对象MyService

在服务端调用RequestListener的Execute()方法该方法则调用业务服务MyService的OnRequest()方法

在这些步骤中除了第一步在客户端执行外其他的步骤均是在服务端进行

业务服务对于消息的处理

前面实现的服务架构已经较为完整地实现了分布式的服务处理但目前的实现并未体现对消息的处理我认为对消息的处理等价与具体的业务处理这些业务逻辑必然是在服务端完成每个服务可能会处理单个业务也可能会处理多个业务并且服务与服务之间仍然存在通信某个服务在处理业务时可能需要另一个服务的业务行为也就是说每一种类的消息处理的方式均有所不同而这些消息的唯一标识则是在SendRequest()方法已经有所体现的aMessageName

虽然处理的消息不同所需要的服务不同但是根据我们对消息的定义我们仍然可以将这些消息处理机制抽象为一个统一的格式Net中体现这种机制的莫过于委托delegate我们可以定义这样的一个委托

public delegate void RequestHandler(string aMessageNameIMessageItemSequence aMessageBodyref IMessageItemSequence aReplyMessageBody);

在RequestHandler委托中它代表了这样一族方法接收三个入参aMessageNameaMessageBodyaReplyMessageBody返回值为void其中aMessageName代表了消息名它是消息的唯一标识aMessageBody是待处理消息的主体业务所需要的所有数据都存储在aMessageBody对象中aReplyMessageBody是一个引用对象它存储了消息处理后的返回结果通常情况下我们可以用<resultSuccess>或<result Failure>来代表处理的结果是成功还是失败

这些委托均在服务初始化时被添加到服务类的SortedList对象中键值为aMessageName所以我们可以在抽象类中定义如下方法

protected abstract void AddRequestHandlers();

protected void AddRequestHandler(string aMessageNameRequestHandler handler)

{

lock (thism_EventHandlers)

{

if (!thism_EventHandlersContains(aMessageName))

{

thism_EventHandlersAdd(aMessageNamehandler);

}

}

}

protected RequestHandler FindRequestHandler(string aMessageName)

{

lock (thism_EventHandlers)

{

RequestHandler handler = (RequestHandler)m_EventHandlers[aMessageName];

return handler;

}

}

AddRequestHandler()用于添加委托对象与aMessageName的键值对而FindRequestHandler()方法用于查找该委托对象而抽象方法AddRequestHandlers()则留给Service的子类实现简单的实现如MyService的AddRequestHandlers()方法

public class MyService:Service

{

public MyService(string aServiceName):base(aServiceName)

{}

protected override void AddRequestHandlers()

{

thisAddRequestHandler(Testnew RequestHandler(Test));

thisAddRequestHandler(Testnew RequestHandler(Test));

}

private void Test(string aMessageNameIMessageItemSequence aMessageBodyref IMessageItemSequence aReplyMessageBody)

{

ConsoleWriteLine(MessageName:{}\naMessageName);

ConsoleWriteLine(MessageBody:{}\naMessageBody);

aReplyMessageBodySetValue(resultSuccess);

}

private void Test(string aMessageNameIMessageItemSequence aMessageBodyref IMessageItemSequence aReplyMessageBody)

{

ConsoleWriteLine(Test + aMessageBodyToString());

}

}

Test和Test方法均为匹配RequestHandler委托签名的方法然后在AddRequestHandlers()方法中通过调用AddRequestHandler()方法将这些方法与MessageName对应起来添加到m_EventHandlers中

需要注意的是本文为了简要的说明这种处理方式所以简化了Test和Test方法的实现而在实际开发中它们才是实现具体业务的重要方法而利用这种方式则解除了服务之间依赖的耦合度我们随时可以为服务添加新的业务逻辑也可以方便的增加服务

通过这样的设计Service的OnRequest()方法的最终实现如下所示

public IMessage OnRequest(IMessage aMessage)

{

string messageName = aMessageGetMessageName();

string messageID = aMessageGetMessageID();

IMessage message = m_FactoryCreateMessage();

IMessageItemSequence replyMessage = m_FactoryCreateMessageItemSequence();

RequestHandler handler = FindRequestHandler(messageName);

handler(messageNameaMessageGetMessageBody()ref replyMessage);

messageSetMessageName(messageName);

messageSetMessageID(messageID);

messageSetMessageBody(replyMessage);

return message;

}

利用这种方式我们可以非常方便的实现服务间通信以及客户端与服务端间的通信例如我们分别在服务端定义MyService(如前所示)和TestService

public class TestService:Service

{

public TestService(string aServiceName):base(aServiceName)

{}

protected override void AddRequestHandlers()

{

thisAddRequestHandler(Testnew RequestHandler(Test));

}

private void Test(string aMessageNameIMessageItemSequence aMessageBodyref IMessageItemSequence aReplyMessageBody)

{

aReplyMessageBody = SendRequest(MyServiceaMessageNameaMessageBody);

aReplyMessageBodySetValue(resultSuccess);

}

}

注意在TestService中的Test方法它并未直接处理消息aMessageBody而是通过调用SendRequest()方法将其传递到MyService中

对于客户端而言情况比较特殊根据前面的分析我们知道除了发送消息的操作是在客户端完成外其他的具体执行都在服务端实现所以诸如MyService和TestService等服务类只需要部署在服务端即可而客户端则只需要定义一个实现Service的空类即可

public class MyServiceClient:Service

{

public MyServiceClient(string aServiceName):base(aServiceName)

{}

protected override void AddRequestHandlers()

{}

}

MyServiceClient类即为客户端定义的服务类在AddRequestHandlers()方法中并不需要实现任何代码如果我们在Service抽象类中将AddRequestHandlers()方法定义为virtual而非abstract方法则这段代码在客户端服务中也可以省去另外客户端服务类中的aServiceName可以任意赋值它与服务端的服务名并无实际联系至于客户端具体会调用哪个服务则由SendRequest()方法中的aServiceName决定

IMessageFactory factory = new MessageFactory();

IMessageItemSequence body = factoryCreateMessageItemSequence();

//……

MyServiceClient service = new MyServiceClient(Client);

IMessageItemSequence reply = serviceSendRequest(TestServiceTestbody);

对于serviceSendRequest()的执行而言会先调用TestService的Test方法然后再通过该方法向MyService发送最终调用MyService的Test方法

我们还需要另外定义一个类负责添加服务并初始化这些服务

public class Server

{

public Server()

{

m_Services = new ArrayList();

}

private ArrayList m_Services;

public void AddService(IListenService service)

{

thism_ServicesAdd(service);

}

public void Initialize()

{

IDictionary tcpProp = new Hashtable();

tcpProp[name] = tcp;

tcpProp[port] = ;

TcpChannel channel = new TcpChannel(tcpPropnew BinaryClientFormatterSinkProvider()new BinaryServerFormatterSinkProvider());

ChannelServicesRegisterChannel(channel);

foreach (Service service in m_Services)

{

serviceInitialize();

}

}

}

同理这里的ChannelIP和Port均应通过配置文件读取最终的类图如下所示

在服务端可以调用Server类来初始化这些服务

  static void Main(string[] args)

{

MyService service = new MyService(MyService);

TestService service = new TestService(TestService);

Server server = new Server();

serverAddService(service);

serverAddService(service);

serverInitialize();

ConsoleReadLine();

}

结论

利用这个基于消息与Net Remoting技术的分布式架构可以将企业的业务逻辑转换为对消息的定义和处理要增加和修改业务就体现在对消息的修改上服务间的通信机制则完全交给整个架构来处理如果我们将每一个委托所实现的业务(或者消息)理解为Contract则该结构已经具备了SOA的雏形当然该架构仅仅处理了消息的传递而忽略了对底层事件的处理(类似于Corba的Event Service)这个功能我想留待后面实现

唯一遗憾的是我缺乏验证这个架构稳定性和效率的环境应该说这个架构是我们在企业项目解决方案中的一个实践但是解决方案则是利用了CORBA中间件在Unix环境下实现并运行本架构仅仅是借鑒了核心的实现思想和设计理念从而完成的在Net平台下的移植由于Unix与Windows Server的区别其实际的优势还有待验证

               

上一篇:C#约瑟夫环问题

下一篇:对C#泛型中的new()约束的一点思考