posts - 42,comments - 83,trackbacks - 0
        前段时间,有同事跟我说客户那边有很多状态为receive的message,这些message只有在JMS Server或weblogic Server充启之后才能被消费。经过调查后,这个问题可能是weblogic的一个bug,当然也不排除跟具体环境有关的可能。下面我们来看看问题的根本原因是什么,这种分析有助我们更进一步理解weblogic JMS的实现。

        首先我们看一下什么是receive,receive表示一个message已经被consumer消费,但服务端还没有关于这个message的ack,所以消息不能从queue中删除, 由于queue中的消息是point-2-point的,所以某个消息被标为receive后,这个消息自然不能被其他consumer消费。那么这个ack由谁负责发送给Server呢,什么时候发送呢?这些都由我们创建JMS Session时使用的Ack_mode决定,典型的ack-mode有如下两种:
  auto-ack: 自动响应模式,consumer.receive()调用后,如果服务器端发现有可用的message,消息返回到客户端JMS实现层,在消息返回给客户前,由weblogic client(JMSSession.getAsyncMessageForConsumer(),异步接受,比如MessageListener,或JMSSession.receiveMessage(),同步接受)层实现直接调用acknowledge()通知服务器端,服务器端收到ack后,它会负责负责将处于receive的message从物理queue中删除。
        client-ack: 客户响应模式,consumer.receive()调用后,客户端收到消息后,客户端程序决定什么时候发送ack,可以在消息后立即发送,也可以在消息处理成功后发送,ack的发送通过message.acknowledge()实现。后面的过程和auto-ack相同。

  初看这个问题,感觉是ack没有收到,那么什么情况下会出现ack丢失呢?网络问题? 那么客户端或服务器端的server log应该能够看到异常,客户坚持说没有任何异常。有点不可思议,要了客户的代码,他们没有代码,实际上他们的应用是基于Spring Framework的,通过简单的配置来实现他们的业务需要,看了下Spring的相关代码,客户之所以说没有异常,因为Spring catch了服务器端返回的JMSException,并吃掉了这个异常(即异常没有打印出来),这个异常输出是可以通过Spring的配置来实现。客户配置后,给了我具体的异常,如下:
java.lang.IllegalArgumentException: Delay is negative.
        at weblogic.timers.internal.TimerManagerImpl.schedule(TimerManagerImpl.java:388)
        at weblogic.timers.internal.TimerManagerImpl.schedule(TimerManagerImpl.java:340)
        at weblogic.messaging.kernel.internal.ReceiveRequestImpl.<init>(ReceiveRequestImp l.java:98)
        at weblogic.messaging.kernel.internal.QueueImpl.receive(QueueImpl.java:820)
        at weblogic.jms.backend.BEConsumerImpl.blockingReceiveStart(BEConsumerImpl.java:1 172)
        at weblogic.jms.backend.BEConsumerImpl.receive(BEConsumerImpl.java:1383)
        at weblogic.jms.backend.BEConsumerImpl.invoke(BEConsumerImpl.java:1088)
        at weblogic.messaging.dispatcher.Request.wrappedFiniteStateMachine(Request.java:7 59)
        at weblogic.messaging.dispatcher.DispatcherImpl.dispatchAsyncInternal(DispatcherI mpl.java:129)
        at weblogic.messaging.dispatcher.DispatcherImpl.dispatchAsync(DispatcherImpl.java :112)
        at weblogic.messaging.dispatcher.Request.dispatchAsync(Request.java:1046)
        at weblogic.jms.dispatcher.Request.dispatchAsync(Request.java:72)
        at weblogic.jms.frontend.FEConsumer.receive(FEConsumer.java:557)
        at weblogic.jms.frontend.FEConsumer.invoke(FEConsumer.java:806)
        at weblogic.messaging.dispatcher.Request.wrappedFiniteStateMachine(Request.java:7 59)
        at weblogic.messaging.dispatcher.DispatcherServerRef.invoke(DispatcherServerRef.j ava:276)
        at weblogic.messaging.dispatcher.DispatcherServerRef.handleRequest(DispatcherServ erRef.java:141)
        at weblogic.messaging.dispatcher.DispatcherServerRef.access$000(DispatcherServerR ef.java:36)
        at weblogic.messaging.dispatcher.DispatcherServerRef$2.run(DispatcherServerRef.ja va:112)
        at weblogic.work.ExecuteThread.execute(ExecuteThread.java:209)
        at weblogic.work.ExecuteThread.run(ExecuteThread.java:181)

  现在我们看一下Weblogic JMS的receive的基本流程,看看这个exception为什么会被抛出来。
  JMSConsumer.receive(long timewait),客户端发起receive请求,其中timewait可有可无,不做指定的话,说明没有可用消息到达的话,我们会一直等下去。如要不作等待的话,可以使用receiveNoWait()。receive()中会检查timeout值,如果没有指定timeout,那么Long.maxValue会被设定成这个timeout,如果timeout小于0,客户端将会收到Invalid Timeout异常,接下来请求会被delegate到JMSSession。
        |
        JMSSession.receiveMessage(consumer,timeout),这里timeout会被重新计算,然后我们会创建一个FEConsumerReceiveRequest对象。这个对象中包含计算后的timeout,计算后的timeout应该是个非负值(上面的异常就是这里的计算导致的,至于为什么客户指定的timeout为1,计算后的timeout变成了负数,从而导致上面的异常,从代码层面,看不出有什么问题)。FEConsumerReceiveRequest对象创建后,由JMS FrontEnd Dispatcher负责把请求交给后端的JMS Server,Dispatcher是Weblogic JMS中用于负责请求传输的,它依赖于RJVM layer,这里不做赘述。
        |     
        RJVM layer, 负责RMI socket层的数据发送   
        |        
        FEConsumer.receive(invocableRequest),RJVM层处理完socket数据后,请求会被转给JMSConsumer,JMSConsumer通过状态机(state machine)来控制请求处理,没有过多的逻辑,它会基于收到的receive request创建一个BEConsumerReceiveRequest对象,然后把这个请求通过JMS BackEnd Dispatcher转发给BEConsumerImpl。之所以存在FrontEnd /BackEnd Dispatcher,主要考虑到处理请求的server和queue所在的不是同一server。
        |
        BEConsumerImpl.receive(request),request进入BEConsumerImpl后,它也通过state machine来控制请求处理,下面两个方法在调用过程中被顺序调用,
        BEConsumerImpl.blockingReceiveStart(request),这里首先检查timeout值,然后调用QueueImpl.receive(...)从queue中获取message,receive()的具体参数如下,包括timeout, expression(即检查条件,我们定义的message selector就在其中)。
        BEConsumerImpl.blockingReceiveProcessMessage(request)
        BEConsumerImpl.blockingReceiveComplete(request)
        |
        QueueImpl.receive(expression,count,acknowledge,owner,timeout,started,userBlob),这里除了状态检查,没有其他逻辑,它会根据传进来的参数,初始化一个ReceiveRequestImpl对象。
        |
        ReceiveRequestImpl.new(),这个new代表ReceiveRequestImpl的构造函数。
        |
        QueueImpl.get(...),如果timeout = 0,即如果客户调用的是receiveNoWait的话,我们直接去通过QueueImpl.get(...),如果没有match的message,那么直接将新建request的result设定为no result,否则将match的message设定为result。
        QueueImpl.addReader(receiveRequestImpl),如果timeout != 0,我们会在ReceiveRequestImpl.start()中调用QueueImpl.addReader(),addReader()中同样会通过QueueImpl.get()检查是否有match的message,如果找到相应的message,我们会把message reference状态改为receive。
        TimerManagerImpl.schedule(timeout),如果QueueImpl.addReader()中的QueueImpl.get()没有找到相应的message,我们需要等待(依据客户指定的timeout),这个等待通过timer去实现,如下:
                timer = timerManager.schedule(this, timeout);
  指定的timeout到达后,如果和没有可用的message,no result将被返回。从上面的异常堆栈来看,问题就出在这里,如果timeout为负数,timerMangerImpl在启动trigger的时候,会抛出如下的runtimeException,
    java.lang.IllegalArgumentException: Delay is negative.
        
  也许你会疑问,这没什么问题吧,timerTrigger只有在没有message的时候才会被schedule,既然没有message,那有谈何状态receive message?没错,起timerTrigger之前我们的确没有修改message状态,但你注意到没有,我们在起timerTrigger前,把receiveRequestImpl加入到QueueImpl去了,但我们在碰到IllegalArgumentException时并没有把这个receiveRequestImpl从QueueImpl中删除,问题就在这里。

 1   synchronized void addReader(Reader reader) throws KernelException {
 2             
 3     List list = get(..);
 4     int newCount;
 5     if (list != null) {
 6             
 7     } else {
 8       reader.incrementReserveCount(-reservedCount);
 9       newCount = reader.getCount();
10     }
11     if (newCount > 0) {
12       logger.debug("Adding consumer to reader list");
13       readerList.add(reader);
14     }
15   }

  如果我们不把receiveRequestImpl从QueueImpl的readerList中删除,那么如果过一会有message sender发送一条和我们上述请求match的message到这个queue。weblogic收到这个message后,它会检查readerList,如果这个message match某个reader,我们会把message状态改成receive,当由于IllegalArgumentException,客户端收到它的时候,客户端会close JMSSession,也就是说这个消息虽然有reader,但无法deliver到客户端。

  我们再来看看Weblogic JMS sender的相关流程,
  QueueImpl.messageSendComplete(),消息发送过程结束后(比如涉及store的话,消息此时已经被存储),到这一步的话,我们会调整系统接受的消息数,然后通过makeMessageAvailable()把消息标成visiable或deliver给正在等待的reader。
        QueueImpl.makeMessageAvailable(),它会直接调用match()去检查readerList中是否存在正在等待它的reader。
        QueueImpl.match(),它通过finderReader()从readerList中检查reader,如果有符合条件的reader,它会把这个message标志为receive,同时把这个message挪到pending list中去。

  前面我们说了,虽然reader还在,但与之对应的JMSConsumer已经被close,所以这个消息根本就无法deliver出去,自然就不会有ack从客户端返回了,这个消息也只能一直pending了。

  这个问题可能是个bug,目前还在确认之中,但我同时也在和客户沟通,可能跟他的环境有一定关系(比如NTP时间同步问题)。

  虽然这个问题能引发message pending,但并不是所有的message pending问题都是由它应起的。网络问题也能引发类似问题,具体问题具体分析,主要的参考客户端的JMSException。位于receive后的状态是transation,也就是如果发现状态为transaction的message的话,一般而言,是这个消息要么就是发送还没结束,要么就是消息正处于一个delete的事务单元中,这里就不再一一罗列了。

posted on 2009-06-17 09:07 走走停停又三年 阅读(3864) 评论(9)  编辑  收藏 所属分类: Weblogic

FeedBack:
# re: 关于JMS Message Pending的问题
2009-06-16 20:46 | sunnycare
最近项目中正好用到了JMS。
看了你的blog,有两个问题问下:
1.如何重现这个问题?
2.既然没有patch出来,那么有什么别的方式避免这个问题?  回复  更多评论
  
# re: 关于JMS Message Pending的问题
2009-06-16 21:16 | 走走停停又三年
这个问题基本很难重现,原因很可能跟系统环境有关系。weblogic在JMSSession中计算timeout的时候,参考了System.currentTimeMills(),如果系统起了NTP client定期做时间同步的话,可能会在计算的时候引起负值。如果真跟系统时间有关,那么最好的做法就是保证客户端运行期间,不要做系统时间同步。

另外一个客户端回避的方法就是客户端使用receiveNoWait()来代替receive()或receive(long timeout)。  回复  更多评论
  
# re: 关于JMS Message Pending的问题
2009-06-17 18:26 | sunnycare
3x....  回复  更多评论
  
# re: 关于JMS Message Pending的问题
2009-06-19 12:33 | 找个美女做老婆
JBOSS 和 SPRING 的JMS 用过,WEBLOGIC的还没有用过

Java乐园 技术交流社区:http://www.javaly.cn
Java乐园 群号:15651281
验证消息 : Java乐园  回复  更多评论
  
# re: 关于JMS Message Pending的问题
2009-08-03 11:22 | ybb
我这里碰到一个奇诡的问题:

环境 jdk1.6, weblogic 10.3

配置了一个 TestQueue。

当我们使用 MDB方式接收Queue消息时,如果抛出运行时异常,这个消息会自动重发。

如果客户端是采用consumer messageListener 的形式接收,当 listener.onMessage() 抛出运行时异常时,这个消息的status String 一直是 receive 状态,只有重启客户端才会重新收到一次。
代码如下:
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer receiver = session.createConsumer(sq);
receiver.setMessageListener(new Receiver());

难道messageListener形式不能像MDB一样的自动重发?

找了很多资料,一点思路都没了。


  回复  更多评论
  
# re: 关于JMS Message Pending的问题
2009-08-03 11:55 | 走走停停又三年
什么样的runtime exception呢? 有可能是你的acknowledege可能由于网络问题没有发过去,导致message处于receive,所以不会重新发送。建议你用client_acknowledge,等你消息成功处理后直接调用message.acknowledge()。  回复  更多评论
  
# re: 关于JMS Message Pending的问题
2009-08-03 13:26 | ybb
运行时异常,比如抛出空指针异常。
我的网络环境是局域网,acknowledege 肯定能收到的。
所有在客户端onMessage中抛出运行时异常的message 都是 receive状态。

但在ejb MDB中,如果出现运行时异常,这消息会等待10、20秒后重发。

如果我自行处理 acknowledege,如何让message自动重发呢?

  回复  更多评论
  
# re: 关于JMS Message Pending的问题
2009-08-03 13:30 | ybb
能否 交流一下?

我的msn: ybbkd2@hotmail.com , qq:11898620

期待  回复  更多评论
  
# re: 关于JMS Message Pending的问题
2009-09-02 10:00 | 走走停停又三年
对于系统时间问题,我们可以用下面的小程序测试一下,

public class JVMTimeTest {

public static void main(String args[]){
JVMTimeTest test = new JVMTimeTest();
test.retriveTime();
}

private void retriveTime(){
long previousTime = -1;
while(true){
long currentTime = System.currentTimeMillis();
System.out.println("currentTime is: " + currentTime);
if(previousTime > currentTime)
System.out.println("OS time change is detected! and:" +
"previousTime: " + previousTime + " " +
"currentTime: " + currentTime);
previousTime = currentTime;
try{
Thread.currentThread().sleep(100);
}catch(Exception e){}
}
}
}
  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航: