标题党,哈哈,OK,言归正传。
最近两天被一个故障搞死了,因为原来一个报警脚本是shell写的,从一个java程序的jmx接口抓取信息并汇总发到监控系统,个人shell脚本能力比较稀松,在python牛人建议下,使用python写了一个脚本,发觉非常爽,我这个菜鸟(学习经历1天)简单就能编写出来一个复杂的脚本,比shell的简化将近5倍,晚上就得意洋洋的向一个同学吹嘘ing,结果被BS:语法甜点而已,java也可以轻松做出来,而且也能做的那么精干。。
回来看了一下java如果要实现这些功能,代码可能比shell还要多,看来这个领域实在不是java的专长。
我以前也认为语法甜点确认只是锦上添花,但是使用了python之后,发现自己以前还是偏见啊,在特定环境下,如脚本、页面等情况下,语法甜点可以大大减少输入量和代码出错可能性,动态语言还是我们工具箱不可缺少的工具。
posted @
2010-07-14 19:18 井底青蛙,常望天空 阅读(171) |
评论 (0) |
编辑 收藏
最近在做一个项目,综合使用了jboss的microcontainer,jetty和自己定义的war包,war包中还会用到spring,因为牵涉到多个容器,同时又有自己的自定义类,所以classloader环境异常复杂,ClassNotFound问题搞得头都大了,最后综合各种因素,设计了如下的一个classloader层次:
其中,红色部分是系统(也就是启动java程序加载Main函数的classloader),主要的设计考量有以下几点:
1、使用自定义的ExtClassLoader(加载java的ext目录下的jar包)把程序加载的class完全和系统加载的class隔离开,这样即使在eclipse容器中启动都不会有类冲突。
为什么不从系统的ExtClassLoader作为自定义classloader数的根有两个考虑,第一个是系统ExtClassLoader有可能不存在,第二个就是如果使用同一个ExtClassLoader中,在处理JNDI、XML和URL解析等java扩展功能时会遇到后加载的handler部分导致不同classloader树加载的同一个类的ClassCastException,具体参见这些模块的源代码。
2、WarClassLoader除了系统类和Common类(目前只有log相关类)以外的类都从war包的WEB-INFO和classes下加载。
3、所有执行War包中代码的线程ThreadContextClassLoader都设置为WarClassLoader,以供Spring和Webx中的相关工具类使用这个classloader结构的后门来加载war包中的类,典型例子是Webx中ResourceLoaderService就是使用ContextClassLoader来加载类的。
4、RialtoClassLoader也就是这个项目的容器加载器和WarClassLoader不在同一个树路径上,可以避免程序使用类和war使用类的class冲突,典型的是Spring容器相关代码。
5、CommonClassLoader加载的类需要严格控制,否则可能会导致运行期类冲突,例如Spring的相关jar包绝对不可以出现在这个classloader作用范围内。
总之,ClassLoader采用父分派机制,后来增加的Thread ContextClassLoader在这个体系上增加了一个后门,带来了灵活性,也带来了很多令人困扰的问题,在做容器类的项目时难免会遇到class loader层次设计的问题,这里抛砖引玉,欢迎达人拍砖。
posted @
2010-07-14 19:18 井底青蛙,常望天空 阅读(1061) |
评论 (0) |
编辑 收藏
ActiveMQ是一个流行的开源MQ,我们也大规模应用在网站的方方面面,每天处理上亿消息,取得了较好效果。ActiveMQ有一个很好很强大的插件体系,提供了很强的扩展能力,ActiveMQ本身就是使用这一套插件体系实现了很多扩展功能,包括他的权限管理,日志管理,事务等模块都是作为一个插件集成的,我们自己也在消息路由、补偿式事务方面使用了它的插件功能,确实非常方便。
在ActiveMQ中,Broker代表一个运行的MQ节点,ActiveMQ的插件实际上是基于Broker的一个Filter链,整个设计类似于servlet的Filter结构,所有的Plugin构成一个链式结构,每个插件实际上都是一个"Interceptor",类结构图如下:
其中Broker接口封装了一个AMQ节点的方方面面的方法,包括连接管理、session管理、消息的发送和接收以及其它的一些功能,BrokerFilter实现这个接口,并提供了链式结构支持,可以拦截所有Broker方法的实现并传递结果给链式结构的下一个,形成了一个完整的"职责链"模式,具体层次关系如下,其中,"System Plugin"是指AMQ内部使用Plugin机制实现的一些系统功能,用户不能定制,"AMQ Plugin"指的是ActiveMQ已经实现好了,可以在配置文件中自由选择的一些插件,例如简单的安全插件,JAAS安全插件和DLQ插件等等,用户插件就是指用户自己实现的amq插件,需要用户把相关jar包放入到amq的启动classpath中,并在配置文件中进行配置才能正确加载的插件。
在上面这个层次结构中,最下面的RegionBroker是核心组件,在其之上的都是Broker的插件,继承之于BrokerFilter,和Broker保持接口兼容但是扩展Broker的功能。
下面举一个简单的例子,具体说明一下AMQ的插件是如何工作的。
我们在使用AMQ的过程中发现,在测试环境维护方面有很大的麻烦,具体表现在很多同学在测试项目的时候往往只关注自己项目牵涉的队列,不会去消费其他"不相关"的队列,这样导致的一个问题就是ActiveMQ经常发生大量数据阻塞,导致测试环境不可用,影响相关项目的测试工作。为了避免这个问题,我们假定在测试环境可以定义以下一些限制条件:
1、 所有队列堆积消息不超过1000条,超过之后立即清除。
2、 消息超过1个小时没有消费,就直接过期。
我们可以编写一个简单的amq插件来完成这两个限制条件:
首先,编写一个插件安装类:
package com.alibaba.napoli.plugins;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageControlBrokerPlugin implements BrokerPlugin {
private static Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);
public Broker installPlugin(Broker broker) throws Exception {
log.info("install MessageControlBrokerPlugin");
return new MessageControlBroker(broker);
}
}
其次,编写真正的插件实现:
package com.alibaba.napoli.plugins;
import java.io.IOException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* 开发环境管理插件,符合两个条件进行消息清理:<br>
* 1 消息累积超过1000条
* 2 消息超过1个小时无人消费
* @author guolin.zhuanggl
*
*/
public class MessageControlBroker extends BrokerFilter {
public static Log log = LogFactory.getLog(DiscardingDLQBroker.class);
private static final long DEFAULT_EXPIRATION = 3600*1000;
private static final long DEFAULT_PURGE_COUNT = 1000;
public MessageControlBroker(Broker next) {
super(next);
}
@Override
public void messageExpired(ConnectionContext context,
MessageReference message) {
Message msg = null;
try {
msg = message.getMessage();
} catch (IOException e) {
log.error("failed to fetch content: ",e);
}
purgeMessage(msg);
// TODO Auto-generated method stub
super.messageExpired(context, message);
}
/**
* 清除队列中的所有消息
*/
private void purgeMessage(Message message){
Destination r = message.getRegionDestination();
if(r instanceof Queue){
try {
//如果累积消息超过1000个,清除队列消息
if(((Queue) r).getMessages().size() > DEFAULT_PURGE_COUNT){
((Queue) r).purge();
}
} catch (Exception e) {
// TODO Auto-generated catch block
log.error("failed to purge queue "+r.getName(),e);
}
}
}
/**
* 当消息发送时,全部设置过期时间1个小时,测试环境专用!!!
*/
@Override
public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception {
long oldExp = messageSend.getExpiration();
messageSend.setExpiration(oldExp < DEFAULT_EXPIRATION && oldExp > 0 ? oldExp : DEFAULT_EXPIRATION );
purgeMessage(messageSend);
super.send(producerExchange, messageSend);
}
}
然后,将这两个类打包为myplugin.jar,并放在activemq启动目录下的lib目录下
最后,在activemq.xml文件中增加一个简单的spring配置项:
<bean xmlns="http://www.springframework.org/schema/beans"
id="purgePlugin"
class="com.alibaba.napoli.plugins.MessageControlBrokerPlugin">
</bean>
然后,重启activemq,就会发现这个插件已经被加载。
posted @
2010-07-14 19:18 井底青蛙,常望天空 阅读(1971) |
评论 (3) |
编辑 收藏
【
BTrace
说明】
http://kenai.com/projects/btrace
是一个实时监控工具,使用了
java agent
和
jvm attach
技术,可以在不停机的情况下实时监控线上程序的运行情况,另外,对
btrace
脚本(实际上就是
java
程序)做了非常严格的安全限制,安全性很高,对应用程序基本没有影响。在性能方面,
cobar
进行过测试,对方法进行调用耗时统计的时候,基本消费在微秒级别,可以说微不足道。
【背景】
在中文站
napoli
上线过程后,发现了一个奇怪的现象,尽管"已知"的
offer
发送端都已经迁移到
napoli
系统中,但是老的
mq
系统仍然有新的
offer
消息进来,因为连接
mq
的服务器非常多,定位消息来源成了一个非常大的问题。这种情况,想到了使用
BTrace
在某一台服务器进行线上监控进而期望发现这个幽灵。
【过程】
首先,我们需要知道两个基本信息:消息类型和来源
ip
,这样才可以定位
offer
消息的来源。
要知道来源
ip
,需要找到服务器端
管理的类,只有在建立
socket
的地方,才可以抓到具体
ip
,经过分析
amq
代码,发现
tcp
连接基本是由下面这个类来服务所有消息的接收的:
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
private static final Log LOG = LogFactory.getLog(TcpTransport.class);
private static final ThreadPoolExecutor SOCKET_CLOSE;
protected final URI remoteLocation;
protected final URI localLocation;
protected final WireFormat wireFormat;
protected int connectionTimeout = 30000;
protected int soTimeout;
protected int socketBufferSize = 64 * 1024;
protected int ioBufferSize = 8 * 1024;
protected boolean closeAsync=true;
protected Socket socket;
|
这个类中包含一个
socket
对象的成员变量,所有我们只要监控
readCommand
方法,这个方法的返回值实际上就是一个
ActivemqObjectMessage
对象,这样就可以在一个方法上加拦截器就可以同时捕获到
ip
和消息对象,两全其美!!!
protected Object readCommand() throws IOException {
return wireFormat.unmarshal(dataIn);
}
|
因为原有
ESB
消息通道都是一个队列
ESBQueue
,所以无法通过队列名称来确定消息类型,必须通过
ESBTransferObject
对象来取得消息类型:
destType
,
offer
的区间是
1000-1008
public class ESBTransferObject implements Serializable {
private static final long serialVersionUID = -5975115234845303878L;
/**
* 消息体,原则上对象序列化后的XML数据(String) 注意使用XML1.1规范。
*/
private Object content;
/**
* 用户自定义数据
*/
private Object userDefineData;
/**
* 目的消息类型
*/
private int destType = -1;
|
但是,在服务器端并没有
ESBTransferObject
对象,无法反序列化(
BTrace
也不支持反序列化操作),所以没有方法简单取得消息类型信息!!!
OK
,我不反序列化,直接拿二进制
byte[]
,类型信息应该是在固定位置的吧?但是发现这个对象
content
变长字符串定义在类型之前,类型位置不确定了,晕倒啊
不死心,输出二进制数据,柳暗花明啊,原来对象序列化的时候,
primitive
的
field
都是紧接着类型信息写入的,所以,类型信息是在固定位置的
,类型信息始终是
255
,
256
两个字节(实际上是
4
个字节,但是目前我们只占有
2
个)
Ok
,编写代码,测试环境运行一下,晕倒,竟然有数组溢出!
使用
BTrace
,把这个数组打印下来(这个需要点技巧,
btrace
连
for
都不允许),竟然发现
位置偏移到
205
,
206
位置
,这个真的不知道什么原因,估计是客户端发送的时候压缩了,简单修改偏移量,测试运行,
ok
,所有的消息类型和
ip
的对照表打印出来了。
package com.alibaba.btrace.script;
import static com.sun.btrace.BTraceUtils.*;
import com.sun.btrace.annotations.*;
@BTrace
public class AMQQueue2IP {
@OnMethod(clazz = "org.apache.activemq.transport.tcp.TcpTransport", //需要拦截的类名
method = "readCommand", //需要拦截的方法名
location = @Location(Kind.RETURN)) //拦截位置,方法返回时
public static void onTransportCommandExit(@Self Object transport, @Return Object command) { //捕获调用对象和返回值
String commandName = str(command);
boolean isObjectMessage = (indexOf(commandName, "org.apache.activemq.command.ActiveMQObjectMessage") >= 0);
if (isObjectMessage) {
Object msg = command;
Object content = get(field(getSuperclass(getSuperclass(classOf(msg))), "content", false), msg);//捕获消息内容byte[]
byte[] bs = (byte[]) get(field(classOf(content), "data", false), content);
if (bs.length >= 206) {
int off = getInt(field(classOf(content), "offset", false), content);
int code = (0xff00&bs[205]<<8)+(0xff&bs[206]); //转换205,206字节为消息类型
//println(str(code));
Object socket = get(field(classOf(transport), "socket"), transport);
String address = str(socket); //截取ip地址
int s = indexOf(address, "/");
int e = indexOf(address, ",");
int len = e - s;
String ip = substr(address, s + 1, e);
print(strcat(timestamp(),"---"));
println(strcat(strcat("ip: ", ip), strcat(" queueName: ", str(code))));
}
}
}
}
打印结果:
2/3/10 12:38 PM---ip: 172.22.2.34 queueName: 2001
2/3/10 12:38 PM---ip: 172.22.2.41 queueName: 5001
2/3/10 12:38 PM---ip: 172.22.2.22 queueName: 5001
2/3/10 12:38 PM---ip: 172.22.2.47 queueName: 2001
2/3/10 12:38 PM---ip: 172.22.2.31 queueName: 2001
2/3/10 12:38 PM---ip: 172.22.2.13 queueName: 5001
2/3/10 12:38 PM---ip: 172.22.2.6 queueName: 5001
2/3/10 12:38 PM---ip: 172.22.2.48 queueName: 2001
2/3/10 12:38 PM---ip: 172.22.2.39 queueName: 2001
【补充】
BTrace
是一个强大的工具,但是,在线上检测的时候考虑时效性和安全性,必须有一个经过检验的脚本库才可以安全及时的定位系统问题.
posted @
2010-07-14 19:18 井底青蛙,常望天空 阅读(365) |
评论 (0) |
编辑 收藏
今天公司DNS切换,结果napoli这边收到大量报警,这就奇怪了,数据都是正确的,报警的结果确都是错误的,调试了一些脚本,发现有这个奇怪的文本"
Binary file (standard input) matches
"原来grep把输入数据临时文件当成是二进制文件了,这里加 -a 就可以解决这个问题。
但是,为什么文本文件会被当成是二进制文件?和今天的DNS切换有什么关系?分析后发现,因为dns的问题,抓数据的脚本执行时间明显变长,这样,在文件还在写入的时候,监控脚本就开始读取数据文件,在这样的并发访问下,grep会认为自己正在访问一个binary文件,导致监控误报警。
posted @
2010-07-14 19:17 井底青蛙,常望天空 阅读(163) |
评论 (0) |
编辑 收藏
最近在线上部署的ActiveMQ发生一次故障,因为一台ActiveMQ故障将前台的关键应用全部连接挂住,根本原因有两条:session的timeout设置不合理以及session池没有限制大小。这里说的不是这个问题,而是在后续设置client的timeout过程中,有同学发现AMQ有一个严重的bug,timeout根本不起作用!!!
调试代码发现:
在Activemq的send response处理中,使用了一个BlockingQueue,在有timeout的方法里,使用了poll方法,这个方法的api说明中指出,当timeout发生时,这个方法返回null!!!
我们在看AMQ经过层层调用后,在ActiveMQConnection方法中如何处理这个返回值:
对返回值为空的情况没有做任何处理,即使消息发送超时,amq也认为这个消息发送成功!估计这哥们理解poll在timeout的时候会抛出异常吧。
解决办法很简单,在response为空的时候,抛出JMSException,告知发生Timeout错误。
posted @
2010-07-14 19:17 井底青蛙,常望天空 阅读(175) |
评论 (0) |
编辑 收藏