2010年11月1日
这里LZ再做最后一次冲刺,如果能找到线索,就继续,找不到,就找华生。
今天问了Leader,To be Cruel
因为populateMap作用域是private,所以它只能在本地被调用。
我们看看都有谁调用了它!
1, public void setProcessMap(Map map) {
processMap.clear();
populateMap(map, processMap);
}
2, public void setDiscardMap(Map map) {
discardMap.clear();
populateMap(map, discardMap);
}
从前面大家知道这个populateMap是将它的第一个参数里面值传给它第二个参数里面的Map里面。
我们的线索就是我们的数据传到了哪里,我们就找哪里。
现在传到了Map里,那我们就找Map看谁和它有联系。
有三个方法和processMap有关系:
1,public List getProcessDestinations(IMessageProcessor processor) {
List l = (List) processMap.get(processor);
return l != null ? l : Collections.EMPTY_LIST;
}
注意它返回的是一个list,它将传进来的参数作为key,然后返回一个MessageProcessor的List。
2, protected IMessageProcessor getIfAlreadyAutoboxed(Object processor){
Object boxed = null;
Iterator ite = processMap.keySet().iterator();
while(ite.hasNext()){
Object key = ite.next();
if(! (key instanceof Node)) continue;
Node node = (Node) key;
if(node.getProcessor().equals(processor)){
boxed = node;
break;
}
}
return (IMessageProcessor) boxed;
}
作用是得到如果已经封装过了。
得到processMap的key集合,然后迭代,如果key是Node(第一次出场)的实例,就继续下一次迭代,否则
将key(强制)转化为Node。如果node.getProcessor()==processor,那就把node赋值给boxed。中断迭代
。返回的是一个IMessageProcessor。
3, public Object clone() throws CloneNotSupportedException {
RoutingMap obj = (RoutingMap) super.clone();
obj.processors = Collections.unmodifiableSet(processors);
obj.processMap = Collections.unmodifiableMap(processMap);
obj.discardMap = Collections.unmodifiableMap(discardMap);
obj.exceptionMap = Collections.unmodifiableMap(exceptionMap);
return obj;
}
这个先不详细描述了
我们可以试着把重点放在第一个方法上,选中第一个方法,CTRL+ALT+H
发现出现了一个新的view,call hierarchy.
一共有4个调用。
先看第一个方法 在新类TopologyAnalyser中
private void buildTopology(IRoutingMap map) {
topology=new HashMap();
Collection processors=map.getMessageProcessors();
Iterator it=processors.iterator();
//Get the direct connections
while (it.hasNext()) {
IMessageProcessor processor=(IMessageProcessor)it.next();
link(processor,map.getProcessDestinations(processor));
link(processor,map.getDiscardDestinations(processor));
}
//Now get the indirect ones
it=processors.iterator();
}
调用了RoutingMap里面的
public Collection getMessageProcessors() {
return Collections.unmodifiableCollection(processors);
}
这里面的
Collections.unmodifiableCollection返回指定 collection 的不可修改视图,此方法允许模块为用户提
供对内部 collection 的“只读”访问。在返回的 collection 上执行的查询操作将“读完”指定的
collection。试图修改返回的 collection(不管是直接修改还是通过其迭代器进行修改)将导致抛出
UnsupportedOperationException
processors值是xml传来的所有的IMessageProcessor。
迭代出所有的IMessageProcessor,
根据提供的IMessageProcessor key得到value List<IMessageProcessor>。
link(processor,map.getProcessDestinations(processor));将源处理器和目标处理器连接起来。
查找link()的源
private void link(IMessageProcessor src,List destinations) {
TopologyInfo srcTi=getTopologyInfo(src);
srcTi.addOutputs(destinations);
Iterator it=destinations.iterator();
while(it.hasNext()) {
IMessageProcessor destination=(IMessageProcessor)it.next();
TopologyInfo dstTi=getTopologyInfo(destination);
dstTi.addInput(src);
}
}
TopololyInfo 是类TopologyAnalyser中一个内部类,作用就是得到输入和输出,并将输入和输出
的IMessageProcessor放入list中。
class TopologyInfo {
private List inputs=new ArrayList();
private List outputs=new ArrayList();
public List getInputs() {
return inputs;
}
public List getOutputs(){return outputs;}
public void addInput(IMessageProcessor input) {
inputs.add(input);
}
public void addOutput(IMessageProcessor output) {outputs.add(output);}
//public void addInputs(Collection inputs) {this.inputs.addAll(inputs);}
public void addOutputs(Collection outputs) {this.outputs.addAll(outputs);}
}
我们再回头看link()的源,将目的处理器给了srcTi的outputs List.源处理器给了inputs List。
在这里源和目的终于连接上了。
那么这个类TopologyAnalyser,在link()中要初始化,
首先调用
private TopologyInfo getTopologyInfo(IMessageProcessor processor) {
TopologyInfo topologyInfo;
if (!topology.containsKey(processor)) {
topologyInfo=new TopologyInfo();
topology.put(processor, topologyInfo);
}
else {
topologyInfo=(TopologyInfo)topology.get(processor);
}
return topologyInfo;
}
如果在Map topology中包含源IMessageProcessor,则将topology中的目标IMessageProcessor放
入topologyInfo中,如果不包含IMessageProcessor,则创建一个新的TopologyInfo实例,然后将空
的topologyInfo和源ImessageProcessor一起放入topology Map。返回topologyInfo.
posted @
2010-11-03 13:54 Vigoss 阅读(179) |
评论 (0) |
编辑 收藏
好了,老子不废话了,真金不怕卖钱。
Router bean下有property 名叫processors,所以应该也有个set方法,不然没有办法设置值。我们找到
org.openadaptor.core.router.Router类。
不负众望
public void setProcessors(List processorList){
// create process map from list of processors
if ((processorList==null) || (processorList.isEmpty())) {
throw new RuntimeException("Null or empty processorList is not permitted");
}
Object[] processors=processorList.toArray(new Object[processorList.size()]);
Map processMap = new HashMap();
for (int i=1;i<processors.length;i++){
processMap.put(processors[i-1],processors[i]);
}
setProcessMap(processMap);
}
果然有,大家通过简单阅读代码可以看到这就是个将List设置到Map的过程,那么Map有什么用呢?大家再
仔细看看,不要着急,对了,看到了吧,那个for(;;),它是将一个处理器接着一个处理器塞
到HashMap里面,前面的是顺序用个鲜明的矩阵图来形容吧
1 2
2 3
3 4
4 5
发现什么规律?右边的减左边的等于1,- -,你真不是盖的。
大家想想HashMap的结构是什么样的?<key,value>!
对!
当它循着这个Map的key找的时候会找到value,然后又以这个value为key接着找下一个value,循环直到这
个Map的最后一个值。这样就会把每个processor都执行一遍。
那么有人会问- -!为什么说会这样执行,你怎么知道啊?
好的,来看
setProcessors()里最后一行是什么?
setProcessMap()
好,继续找它,对,还在Router里
public void setProcessMap(Map map) {
if (processMapConfigured) {
throw new RuntimeException("Only one of processMap and processors properties may be
configured");
}
processMapConfigured=true;
routingMap.setProcessMap(map);
}
对,继续
这里设置了processMapConfigured=true;这是为什么呢?LZ下次为你揭开这个神秘的擦脚布。
先把注意力放在routingMap里,它的本类是RoutingMap
好,找到
RoutingMap,找到setProcessMap()
public void setProcessMap(Map map) {
processMap.clear();
populateMap(map, processMap);
}
注意populateMap(map,processMap)在执行前,processMap是空的
在这里我们发现了processMap的定义,Sets the processMap which defines how to route output from
one adaptor component to anothers.
是一个定义怎样将一个适配器组件的输出转发到另外一个的集合
首先它把processMap清空了,然后populateMap(map,processMap)
哇擦,天长地久有时尽 程序绵绵无绝期
找到populateMap(map,processMap)
private void populateMap(Map map, Map checkedMap) {
map = autoboxer.autobox(map);
for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
verifyEntryKeyIsIMessageProcessor(entry);
IMessageProcessor fromProcessor = (IMessageProcessor) entry.getKey();
List processorList = autoboxIMessageProcessorList(entry.getValue());
processors.add(fromProcessor);
processors.addAll(processorList);
checkedMap.put(fromProcessor, processorList);
}
}
大家仔细阅读代码发现这就是个将map中的数据先verifyEntryKeyIsIMessageProcessor(entry);
确定map的key是否是MessageProcessor,不是就报错退出了,是就将他强制转换为IMessageProcessor,
然后自动封装,这里真j8操淡,TMD前面设置到Map里面的时候就可以了,现在还要设置到set中,还来个
什么自动封箱,封你妈啊,不过他们也有道理,如果list里面嵌套了另外一个list就把第二
个list的MessageProcessor给漏掉了。现在checkMap里面已经有了所有的MessageProcessor,LZ就要看看
他们要干什吗?
posted @
2010-11-02 17:39 Vigoss 阅读(201) |
评论 (0) |
编辑 收藏
OpenAdaptor是为达到快速构建数据集成功能的适配器框架,用Java实现,抽象数据迁移的过程,用3类组
件完成数据迁移,通过对组件的连接和配置达到快速构建、高度复用的目的,快速地满足企业数据集成的
要求,具有标准化、快速开发、灵活、易于定制,高度复用特点。
OpenAdaptor原理
在2个不同的业务数据库之间进行数据迁移的时候,经历三个过程:数据抽象,数据转换,数据加
载。OpenAdaptor框架将3个阶段抽象为3类不同的组件:sources,pipes,sinks,通过这3类内置的组件
分别完成各阶段工作。
在数据抽取阶段,对应sources组件,完成与源端的协议连接,读取数据并将数据转化为DataObject数组
,根据需要对数据加密/压缩;
数据转化阶段,对应pipes组件,完成数据过滤、数据转化、异常处理等作业;
数据加载对应sinks组件,完成建立与目的端的协议连接,将数据转化为目的端的数据格式,数据解压/解
密,将数据写入目的端。
source类型有3种:
callback类型是指数据到来时间触发source读取数据;
listen类型表示source监听数据,例如source从socket读数据。
poll类型表示每隔一段时间久读取数据。
在run()方法中不同类型的source分别调用runCallback()、runListen()、runPoll。
sourceProcess()方法对读取到的DataObject[]进行处理,并调用processMessage(),该方法会循环
调用管道线里下游组件的processMessage()。
AbstractWriter将DataObject[]转化为字符串的一个Writer类,如果sink制定了一个DoStringWriter,则
用改writer的实例根据DataObject[]产生字符串,否则产生默认的XML格式字符串。
OpenAdaptor采用的是Spring的技术,所以建议你在学习OpenAdaptor前,先学习Spring,至少要了
解Spring的AOP和IOC,知道Spring的工作原理,你就明白了一半OpenAdaptor的调用过程。
受限我们先看一个OpeanAdaptor的一个XML文件,为什么它要使用XML文件,而不是txt或者doc呢?这是因
为,XML文件是用来存储数据的,重在数据本身。而且她使用标签来存储数据,每个标签必须同时有结束
可开始标签。并且标签必须按照合适的顺序进行嵌套,这就使得它可以用来表示一些有层次的数据类型,
例如java中的类。还有一个重要原因是,Spring的特性(Service层,DAO层独立出现,在编码时既没有实
例化对象,也没有设置依赖关系,而把它交给Spring,由Spring在运行阶段实例化、组装对象)决定它必
须要使用到XML文件,在XML文件中配置各个组件。
首先,我们看一下OpenAdaptor的XML文件中的内容:
先看<beans>,yes,你很聪明,那是一个标准的Spring XML文件开头,好了这下你更确信了,所
谓OpenAdaptor不过是建立在Spring基础上的一个有特定功能的Java功能组件包。
好了,我们接着看,大家发现了么?LZ是个天才。
下面第一个组件式Adaptor 注意有很多Adaptor,这个是org.openadaptor.core.adaptor.Adaptor,那么我
们可以发现(当然LZ是在看了很多个XML文件后发现的),每个XML文件都有这个Adaptor,那么这
个Adaptor究竟是个神马东东?
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router" />
</bean>
1。LZ发现这个Adaptor里面有个property属性,那么我们就想吧,什么里面有属性这一说,大家不知道
学习Java有多深入,但是我可以告诉大家,属性绝对是个抽象的东西,因为具体的不叫属性,叫 值
。(⊙o⊙)哦!到这里你发现原来是这啊,那不就是个类吗?类里面有属性,当类生成具体对象 的
时候,可以给属性赋值,对头,LZ说。
那么既然是属性,肯定有设置它的方法啊?楼主决定给这么问的童鞋奖励一个臭豆腐,属性如果不
能设置,就好比你只是说我有臭豆腐,可是没法吃。LZ郁闷。
好,我们就打开这个类,看看谁掉泪。注意这个臭豆腐名叫messageProcessor。
当当当当
public void setMessageProcessor(final IMessageProcessor processor) {
if (this.processor != null) {
throw new RuntimeException("message processor has already been set");
}
this.processor = processor;
/* if the processor has metrics treat them as 'global' Adaptor ones */
if(processor instanceof IRecordableComponent){
this.metrics = ((IRecordableComponent) processor).getMetrics();
String enabled = metrics.isMetricsEnabled() ? "on" : "off";
log.info("Metrics are " + enabled);
}
}
瓦擦
这不是我编的。真的有这个嘞。好,如果你翻开看这个setMessageProcessor,行了,别吐。所
以我们不看丫的。那么我们知道就行了,它初始化的第一
个Bean是org.openadaptor.core.adaptor.Adaptor,它的一个属性是messageProcessor,它……等等
,有值,是的有值。值是多少?Router!Router?好的,Router是什么?我靠,Ruter是另外一 个Bean
,傻狗答道。
2。Router
介于傻狗同学的成就,我们今天请他作为特邀嘉宾,你好,傻狗!你好,楼猪!- -,啊,那个你向大家
介绍一下自己吧!(⊙_⊙),大家好,我是Router。大家 ⊙﹏⊙b汗
- <bean id="Router" class="org.openadaptor.core.router.Router">
- <property name="processors">
- <list>
<ref bean="Servlet" />
<ref bean="TickerFilter" />
<ref bean="XMLConverter" />
<ref bean="Validator" />
<ref bean="Writer" />
</list>
</property>
<property name="exceptionProcessor" ref="ExceptionHandler" />
</bean>
哇,大家一阵狂吐!
待续
posted @
2010-11-01 18:00 Vigoss 阅读(302) |
评论 (0) |
编辑 收藏