前段时间,有同事跟我说客户那边有很多状态为receive的message,这些message只有在JMS Server或weblogic Server充启之后才能被消费。经过调查后,这个问题可能是weblogic的一个bug,当然也不排除跟具体环境有关的可能。下面我们来看看问题的根本原因是什么,这种分析有助我们更进一步理解weblogic JMS的实现。
首先我们看一下什么是receive,receive表示一个message已经被consumer消费,但服务端还没有关于这个message的ack,所以消息不能从queue中删除, 由于queue中的消息是point-2-point的,所以某个消息被标为receive后,这个消息自然不能被其他consumer消费。那么这个ack由谁负责发送给Server呢,什么时候发送呢?这些都由我们创建JMS Session时使用的Ack_mode决定,典型的ack-mode有如下两种:
auto-ack: 自动响应模式,consumer.receive()调用后,如果服务器端发现有可用的message,消息返回到客户端JMS实现层,在消息返回给客户前,由weblogic client(JMSSession.getAsyncMessageForConsumer(),异步接受,比如MessageListener,或JMSSession.receiveMessage(),同步接受)层实现直接调用acknowledge()通知服务器端,服务器端收到ack后,它会负责负责将处于receive的message从物理queue中删除。
client-ack: 客户响应模式,consumer.receive()调用后,客户端收到消息后,客户端程序决定什么时候发送ack,可以在消息后立即发送,也可以在消息处理成功后发送,ack的发送通过message.acknowledge()实现。后面的过程和auto-ack相同。
初看这个问题,感觉是ack没有收到,那么什么情况下会出现ack丢失呢?网络问题? 那么客户端或服务器端的server log应该能够看到异常,客户坚持说没有任何异常。有点不可思议,要了客户的代码,他们没有代码,实际上他们的应用是基于Spring Framework的,通过简单的配置来实现他们的业务需要,看了下Spring的相关代码,客户之所以说没有异常,因为Spring catch了服务器端返回的JMSException,并吃掉了这个异常(即异常没有打印出来),这个异常输出是可以通过Spring的配置来实现。客户配置后,给了我具体的异常,如下:
java.lang.IllegalArgumentException: Delay is negative.
at weblogic.timers.internal.TimerManagerImpl.schedule(TimerManagerImpl.java:388)
at weblogic.timers.internal.TimerManagerImpl.schedule(TimerManagerImpl.java:340)
at weblogic.messaging.kernel.internal.ReceiveRequestImpl.<init>(ReceiveRequestImp l.java:98)
at weblogic.messaging.kernel.internal.QueueImpl.receive(QueueImpl.java:820)
at weblogic.jms.backend.BEConsumerImpl.blockingReceiveStart(BEConsumerImpl.java:1 172)
at weblogic.jms.backend.BEConsumerImpl.receive(BEConsumerImpl.java:1383)
at weblogic.jms.backend.BEConsumerImpl.invoke(BEConsumerImpl.java:1088)
at weblogic.messaging.dispatcher.Request.wrappedFiniteStateMachine(Request.java:7 59)
at weblogic.messaging.dispatcher.DispatcherImpl.dispatchAsyncInternal(DispatcherI mpl.java:129)
at weblogic.messaging.dispatcher.DispatcherImpl.dispatchAsync(DispatcherImpl.java :112)
at weblogic.messaging.dispatcher.Request.dispatchAsync(Request.java:1046)
at weblogic.jms.dispatcher.Request.dispatchAsync(Request.java:72)
at weblogic.jms.frontend.FEConsumer.receive(FEConsumer.java:557)
at weblogic.jms.frontend.FEConsumer.invoke(FEConsumer.java:806)
at weblogic.messaging.dispatcher.Request.wrappedFiniteStateMachine(Request.java:7 59)
at weblogic.messaging.dispatcher.DispatcherServerRef.invoke(DispatcherServerRef.j ava:276)
at weblogic.messaging.dispatcher.DispatcherServerRef.handleRequest(DispatcherServ erRef.java:141)
at weblogic.messaging.dispatcher.DispatcherServerRef.access$000(DispatcherServerR ef.java:36)
at weblogic.messaging.dispatcher.DispatcherServerRef$2.run(DispatcherServerRef.ja va:112)
at weblogic.work.ExecuteThread.execute(ExecuteThread.java:209)
at weblogic.work.ExecuteThread.run(ExecuteThread.java:181)
现在我们看一下Weblogic JMS的receive的基本流程,看看这个exception为什么会被抛出来。
JMSConsumer.receive(long timewait),客户端发起receive请求,其中timewait可有可无,不做指定的话,说明没有可用消息到达的话,我们会一直等下去。如要不作等待的话,可以使用receiveNoWait()。receive()中会检查timeout值,如果没有指定timeout,那么Long.maxValue会被设定成这个timeout,如果timeout小于0,客户端将会收到Invalid Timeout异常,接下来请求会被delegate到JMSSession。
|
JMSSession.receiveMessage(consumer,timeout),这里timeout会被重新计算,然后我们会创建一个FEConsumerReceiveRequest对象。这个对象中包含计算后的timeout,计算后的timeout应该是个非负值(上面的异常就是这里的计算导致的,至于为什么客户指定的timeout为1,计算后的timeout变成了负数,从而导致上面的异常,从代码层面,看不出有什么问题)。FEConsumerReceiveRequest对象创建后,由JMS FrontEnd Dispatcher负责把请求交给后端的JMS Server,Dispatcher是Weblogic JMS中用于负责请求传输的,它依赖于RJVM layer,这里不做赘述。
|
RJVM layer, 负责RMI socket层的数据发送
|
FEConsumer.receive(invocableRequest),RJVM层处理完socket数据后,请求会被转给JMSConsumer,JMSConsumer通过状态机(state machine)来控制请求处理,没有过多的逻辑,它会基于收到的receive request创建一个BEConsumerReceiveRequest对象,然后把这个请求通过JMS BackEnd Dispatcher转发给BEConsumerImpl。之所以存在FrontEnd /BackEnd Dispatcher,主要考虑到处理请求的server和queue所在的不是同一server。
|
BEConsumerImpl.receive(request),request进入BEConsumerImpl后,它也通过state machine来控制请求处理,下面两个方法在调用过程中被顺序调用,
BEConsumerImpl.blockingReceiveStart(request),这里首先检查timeout值,然后调用QueueImpl.receive(...)从queue中获取message,receive()的具体参数如下,包括timeout, expression(即检查条件,我们定义的message selector就在其中)。
BEConsumerImpl.blockingReceiveProcessMessage(request)
BEConsumerImpl.blockingReceiveComplete(request)
|
QueueImpl.receive(expression,count,acknowledge,owner,timeout,started,userBlob),这里除了状态检查,没有其他逻辑,它会根据传进来的参数,初始化一个ReceiveRequestImpl对象。
|
ReceiveRequestImpl.new(),这个new代表ReceiveRequestImpl的构造函数。
|
QueueImpl.get(...),如果timeout = 0,即如果客户调用的是receiveNoWait的话,我们直接去通过QueueImpl.get(...),如果没有match的message,那么直接将新建request的result设定为no result,否则将match的message设定为result。
QueueImpl.addReader(receiveRequestImpl),如果timeout != 0,我们会在ReceiveRequestImpl.start()中调用QueueImpl.addReader(),addReader()中同样会通过QueueImpl.get()检查是否有match的message,如果找到相应的message,我们会把message reference状态改为receive。
TimerManagerImpl.schedule(timeout),如果QueueImpl.addReader()中的QueueImpl.get()没有找到相应的message,我们需要等待(依据客户指定的timeout),这个等待通过timer去实现,如下:
timer = timerManager.schedule(this, timeout);
指定的timeout到达后,如果和没有可用的message,no result将被返回。从上面的异常堆栈来看,问题就出在这里,如果timeout为负数,timerMangerImpl在启动trigger的时候,会抛出如下的runtimeException,
java.lang.IllegalArgumentException: Delay is negative.
也许你会疑问,这没什么问题吧,timerTrigger只有在没有message的时候才会被schedule,既然没有message,那有谈何状态receive message?没错,起timerTrigger之前我们的确没有修改message状态,但你注意到没有,我们在起timerTrigger前,把receiveRequestImpl加入到QueueImpl去了,但我们在碰到IllegalArgumentException时并没有把这个receiveRequestImpl从QueueImpl中删除,问题就在这里。
1 synchronized void addReader(Reader reader) throws KernelException {
2
3 List list = get(..);
4 int newCount;
5 if (list != null) {
6
7 } else {
8 reader.incrementReserveCount(-reservedCount);
9 newCount = reader.getCount();
10 }
11 if (newCount > 0) {
12 logger.debug("Adding consumer to reader list");
13 readerList.add(reader);
14 }
15 }
如果我们不把receiveRequestImpl从QueueImpl的readerList中删除,那么如果过一会有message sender发送一条和我们上述请求match的message到这个queue。weblogic收到这个message后,它会检查readerList,如果这个message match某个reader,我们会把message状态改成receive,当由于IllegalArgumentException,客户端收到它的时候,客户端会close JMSSession,也就是说这个消息虽然有reader,但无法deliver到客户端。
我们再来看看Weblogic JMS sender的相关流程,
QueueImpl.messageSendComplete(),消息发送过程结束后(比如涉及store的话,消息此时已经被存储),到这一步的话,我们会调整系统接受的消息数,然后通过makeMessageAvailable()把消息标成visiable或deliver给正在等待的reader。
QueueImpl.makeMessageAvailable(),它会直接调用match()去检查readerList中是否存在正在等待它的reader。
QueueImpl.match(),它通过finderReader()从readerList中检查reader,如果有符合条件的reader,它会把这个message标志为receive,同时把这个message挪到pending list中去。
前面我们说了,虽然reader还在,但与之对应的JMSConsumer已经被close,所以这个消息根本就无法deliver出去,自然就不会有ack从客户端返回了,这个消息也只能一直pending了。
这个问题可能是个bug,目前还在确认之中,但我同时也在和客户沟通,可能跟他的环境有一定关系(比如NTP时间同步问题)。
虽然这个问题能引发message pending,但并不是所有的message pending问题都是由它应起的。网络问题也能引发类似问题,具体问题具体分析,主要的参考客户端的JMSException。位于receive后的状态是transation,也就是如果发现状态为transaction的message的话,一般而言,是这个消息要么就是发送还没结束,要么就是消息正处于一个delete的事务单元中,这里就不再一一罗列了。
posted on 2009-06-17 09:07
走走停停又三年 阅读(3864)
评论(9) 编辑 收藏 所属分类:
Weblogic