The Cafe Sample(小卖部订餐例子)
小卖部有一个订饮料服务客户可以通过订单来订购所需要饮料小卖部提供两种咖啡饮料LATTE(拿铁咖啡)和MOCHA(摩卡咖啡)每种又都分冷饮和热饮整个流程如下
有一个下订单模块用户可以按要求下一个或多个订单
有一个订单处理模块处理订单中那些是关于订购饮料的
有一个饮料订购处理模块处理拆分订购的具体是那些种类的饮料把具体需要生产的饮料要求发给生产模块有一个生产模块
这个例子利用Spring Integration实现了灵活的可配置化的模式集成了上述这些服务模块
先来看一下配置文件
<beans:beansxmlns=
xmlns:xsi=instance
xmlns:beans=
xmlns:context=
xsi:schemaLocation=
beansxsd
integrationxsd
contextxsd>
<!启动Messagebus消息服务总线支持四个属性
autostartup[boolean是否自动启动default=true]如果设置false则需要手动调用applicationContextstart()方法
autocreatechannels[boolean是否自动注册MessageChanneldefault=false]如果使用的MessagChannle不存在
errorchannel设置错误时信息发送的MessageChannle如果不设置则使用DefaultErrorChannel
dispatcherpoolsize使用的启动线程数默认为>
<messagebus/>
<!启动支持元数据标记>
<annotationdriven/>
<!设置@Component标识的元数据扫描包(package)>
<context:componentscanbasepackage=orgspringframeworkintegrationsamplescafe/>
<!下面启动了四个MessageChannel服务处理接收发送端发过来的消息和把消息流转到消息的消费端>
<!属性说明capacity消息最大容量默认为publishsubscribe是否是发布订阅模式默认为否
idbean的id名称datatype?>
<channelid=orders/><!订单Channel>
<channelid=drinks/><!饮料订单Channel处理饮料的类别>
<channelid=coldDrinks/><!热饮生产Channel>
<channelid=hotDrinks/><!冷饮生产Channel>
<!消息处理终端接收channelcoldDrinks的消息后执行baristaprepareColdDrink方法生产冷饮>
<!属性说明inputchannel接收消息的Channel必须defaultoutputchannel设置默认回复消息Channel
handlerref引用bean的id名称handlermethodHandler处理方法名(参数类型必须与发送消息的payLoad使用的一致)
errorhandler设置错误时信息发送的MessageChannlereplyhandler消息回复的Channel>
<endpointinputchannel=coldDrinkshandlerref=barista
handlermethod=prepareColdDrink/>
<!消息处理终端接收channelhotDrinks的消息后执行baristaprepareHotDrink方法生产热饮>
<endpointinputchannel=hotDrinkshandlerref=barista
handlermethod=prepareHotDrink/>
<!定义一个启动下定单操作的bean它通过channelorders下定单>
<beans:beanid=cafeclass=orgspringframeworkintegrationsamplescafeCafe>
<beans:propertyname=orderChannelref=orders/>
</beans:bean>
</beans:beans>
下面我们来看一下源代码目录
我们来看一下整体服务是怎么启动的
首先我们来看一下CafeDemo这个类它触发下定单操作
publicclassCafeDemo{
publicstaticvoidmain(String[]args){
//加载Spring配置文件
AbstractApplicationContextcontext=null;
if(argslength>){
context=newFileSystemXmlApplicationContext(args);
}
else{
context=newClassPathXmlApplicationContext(cafeDemoxmlCafeDemoclass);
}
//启动Spring容器(启动所有实现orgntextLifecycle接口的实现类的start方法)
contextstart();
//从Spring容器取得cafe实例
Cafecafe=(Cafe)contextgetBean(cafe);
DrinkOrderorder=newDrinkOrder();
//一杯热饮参数说明饮料类型数量是否是冷饮(true表示冷饮)
DrinkhotDoubleLatte=newDrink(DrinkTypeLATTEfalse);
DrinkicedTripleMocha=newDrink(DrinkTypeMOCHAtrue);
orderaddDrink(hotDoubleLatte);
orderaddDrink(icedTripleMocha);
//下个订单
for(inti=;i<;i++){
//调用cafe的placeOrder下订单
cafeplaceOrder(order);
}
}
}
下面是Cafe的源代码
publicclassCafe{
privateMessageChannelorderChannel;
publicvoidsetOrderChannel(MessageChannelorderChannel){
thisorderChannel=orderChannel;
}
//其实下订单操作调用的是orderChannel(orderschannel)的send方法把消息发出去
publicvoidplaceOrder(DrinkOrderorder){
thisorderChannelsend(newGenericMessage<DrinkOrder>(order));
//GenericMessage有三个构建方法参考如下
//newGenericMessage<T>(ObjectidTpayload);
//newGenericMessage<T>(Tpayload);
//newGenericMessage<T>(TpayloadMessageHeaderheaderToCopy)
}
}
下面我们来看一下哪个类标记有@MessageEndpoint(input=orders) 表示它会消费orders Channel的消息我们发现OrderSplitter类标记这个元数据下面是源代码我们来分析
//标记MessageEndpoint元数据input表示设置后所有ordersChannel消息都会被OrderSplitter收到
@MessageEndpoint(input=orders)
publicclassOrderSplitter{
//@Splitter表示接收消息后调用这个类的该方法其的参数类型必须与message的payload属性一致
//即在newGenericMessage<T>的泛型中指定
//元数据设置的 channel属性表示方法执行完成后会把方法返回的结果保存到message的payload属性后发送到指定的channel中去
//这里指定发送到drinkschannel
@Splitter(channel=drinks)
publicList<Drink>split(DrinkOrderorder){
returnordergetDrinks();//方法中是把订单中的饮料订单取出来
}
}
接下来与找OrderSplitter方法相同我们要找哪个类标记有@MessageEndpoint(input=drinks) 表示它会消费drinks Channel的消息找到DrinkRouter这个类
@MessageEndpoint(input=drinks)
publicclassDrinkRouter{
//@Router表示接收消息后调用这个类的该方法其的参数类型必须与message的payload属性一致
//方法执行完毕后其返回值为在容器中定义的channel名称channel名称必须存在
@Router
publicStringresolveDrinkChannel(Drinkdrink){
return(drinkisIced())?coldDrinks:hotDrinks;//方法中是根据处理饮料是否是冷饮送不同的channel处理
}
}
备注
@Router可以把消息路由到多个channel
实现方式如下
@Router
publicMessageChannelroute(Messagemessage){}
@Router
publicList<MessageChannel>route(Messagemessage){}
@Router
publicStringroute(Foopayload){}
@Router
publicList<String>route(Foopayload){}
接下来我们就要找 MessageEndpoint 标记为处理 coldDrinks 和 hotDrinks 的类我们发现这个两个类并不是通过元数据@MessageEndpoint来实现的而是通过容器配置(下面会演示如何用元数据配置但元数据配置有局限性这两种配置方式看大家喜好系统中都是可以使用)
下面是容器配置信息
<!消息处理终端接收channelcoldDrinks的消息后执行baristaprepareColdDrink方法生产冷饮>
<endpointinputchannel=coldDrinkshandlerref=barista
handlermethod=prepareColdDrink/>
<!消息处理终端接收channelhotDrinks的消息后执行baristaprepareHotDrink方法生产热饮>
<endpointinputchannel=hotDrinkshandlerref=barista
handlermethod=prepareHotDrink/>
我们来看一下源代码
@Component//这个必须要有表示是一个消息处理组件
publicclassBarista{
privatelonghotDrinkDelay=;
privatelongcoldDrinkDelay=;
privateAtomicIntegerhotDrinkCounter=newAtomicInteger();
privateAtomicIntegercoldDrinkCounter=newAtomicInteger();
publicvoidsetHotDrinkDelay(longhotDrinkDelay){
thishotDrinkDelay=hotDrinkDelay;
}
publicvoidsetColdDrinkDelay(longcoldDrinkDelay){
ldDrinkDelay=coldDrinkDelay;
}
publicvoidprepareHotDrink(Drinkdrink){
try{
Threadsleep(thishotDrinkDelay);
}catch(InterruptedExceptione){
ThreadcurrentThread()interrupt();
}
Systemoutprintln(preparedhotdrink#+hotDrinkCounterincrementAndGet()+:+drink);
}
publicvoidprepareColdDrink(Drinkdrink){
try{
Threadsleep(ldDrinkDelay);
}catch(InterruptedExceptione){
ThreadcurrentThread()interrupt();
}
Systemoutprintln(preparedcolddrink#+coldDrinkCounterincrementAndGet()+:+drink);
}
}
如果要用元数据标识实现上述方法要用元数据配置它不像容器配置可以在一个类中支持多个不同的Handler方法以处理prepareColdDrink方法为例
@MessageEndpoint(input=coldDrinks)//加了该元数据它会自动扫描并作为@Componet标记处理
publicclassBarista{
privatelonghotDrinkDelay=;
privatelongcoldDrinkDelay=;
privateAtomicIntegerhotDrinkCounter=newAtomicInteger();
privateAtomicIntegercoldDrinkCounter=newAtomicInteger();
publicvoidsetHotDrinkDelay(longhotDrinkDelay){
thishotDrinkDelay=hotDrinkDelay;
}
publicvoidsetColdDrinkDelay(longcoldDrinkDelay){
ldDrinkDelay=coldDrinkDelay;
}
publicvoidprepareHotDrink(Drinkdrink){
try{
Threadsleep(thishotDrinkDelay);
}catch(InterruptedExceptione){
ThreadcurrentThread()interrupt();
}
Systemoutprintln(preparedhotdrink#+hotDrinkCounterincrementAndGet()+:+drink);
}
@Handler//回调处理的方法
publicvoidprepareColdDrink(Drinkdrink){
try{
Threadsleep(ldDrinkDelay);
}catch(InterruptedExceptione){
ThreadcurrentThread()interrupt();
}
Systemoutprintln(preparedcolddrink#+coldDrinkCounterincrementAndGet()+:+drink);
}
}
这样整个流程就执行完了最终我们的饮料产品就按照订单生产出来了累了吧喝咖啡提神着呢!!!
初充下面是针对 Spring Integration adapter扩展的学习笔记
JMS Adapters
jms adapters 目前有两种实现
JmsPollingSourceAdapter 和 JmsMessageDrivenSourceAdapter 前者是使用Srping的JmsTemplate模板类通过轮循的方式接收消息
后者是使用则通过代理Spring的DefaultMessageListenerContainer实例实现消息驱动的方式
xml配置如下
<bean class=orgspringframeworkintegrationadapterjmsJmsPollingSourceAdapter>
<constructorarg ref=jmsTemplate/>
<property name=channel ref=exampleChannel/>
<property name=period value=/> <! 轮循时间间隔 >
<property name=messageMapper ref=/> <! message转换 >
</bean>
<! 备注消息的转换方式如下
收到JMS Message消息后SourceAdapter会调用Spring的MessageConverter实现类把javaxjmsMessage对象转换成普通Java对象再调用Spring Integration的MessageMapper把该对象转成 orgspringframessageMessage对象 >
JmsMessageDrivenSourceAdapter
<bean class=orgspringframeworkintegrationadapterjmsJmsMessageDrivenSourceAdapter>
<property name=connectionFactory ref=connectionFactory/>
<property name=destinationName value=exampleQueue/>
<property name=channel ref=exampleChannel/>
<property name=messageConverter ref=/> <! jms消息对象转换 >
<property name=messageMapper ref= /> <! 普通java对象转换成 Spring Integration Message >
<property name=sessionAcknowledgeMode value= />
<! sesssion回复模式 AUTO_ACKNOWLEDGE= CLIENT_ACKNOWLEDGE= DUPS_OK_ACKNOWLEDGE= SESSION_TRASACTED=>
</bean>
另外还有一个比较有用的类JmsTargetAdapter 它实现了MessageHandler接口它提把Spring Integration Message对象转换成JMS消息并发送到指定的消息队列与JMS服务连接的实现可以通过设定 jmsTemplate属性引用或是connectionFactory和destination或destinationName属性
<bean class=orgspringframeworkintegrationadapterjmsJmsTargetAdapter>
<constructorarg ref=connectionFactory/>
<constructorarg value=examplequeue/>
<!或是以下配置
<property name=connectionFactory ref=connectionFactory/>
<property name=destinationName value=exampleQueue/>
或是
<constructorarg ref=jmsTemplate/> >
</bean>