2010年11月3日
这里LZ再做最后一次冲刺,如果能找到线索,就继续,找不到,就找华生。
今天问了Leader,To be Cruel
因为populateMap作用域是private,所以它只能在本地被调用。
我们看看都有谁调用了它!
1, public void setProcessMap(Map map) {
processMap.clear();
populateMap(map, processMap);
}
2, public void setDiscardMap(Map map) {
discardMap.clear();
populateMap(map, discardMap);
}
从前面大家知道这个populateMap是将它的第一个参数里面值传给它第二个参数里面的Map里面。
我们的线索就是我们的数据传到了哪里,我们就找哪里。
现在传到了Map里,那我们就找Map看谁和它有联系。
有三个方法和processMap有关系:
1,public List getProcessDestinations(IMessageProcessor processor) {
List l = (List) processMap.get(processor);
return l != null ? l : Collections.EMPTY_LIST;
}
注意它返回的是一个list,它将传进来的参数作为key,然后返回一个MessageProcessor的List。
2, protected IMessageProcessor getIfAlreadyAutoboxed(Object processor){
Object boxed = null;
Iterator ite = processMap.keySet().iterator();
while(ite.hasNext()){
Object key = ite.next();
if(! (key instanceof Node)) continue;
Node node = (Node) key;
if(node.getProcessor().equals(processor)){
boxed = node;
break;
}
}
return (IMessageProcessor) boxed;
}
作用是得到如果已经封装过了。
得到processMap的key集合,然后迭代,如果key是Node(第一次出场)的实例,就继续下一次迭代,否则
将key(强制)转化为Node。如果node.getProcessor()==processor,那就把node赋值给boxed。中断迭代
。返回的是一个IMessageProcessor。
3, public Object clone() throws CloneNotSupportedException {
RoutingMap obj = (RoutingMap) super.clone();
obj.processors = Collections.unmodifiableSet(processors);
obj.processMap = Collections.unmodifiableMap(processMap);
obj.discardMap = Collections.unmodifiableMap(discardMap);
obj.exceptionMap = Collections.unmodifiableMap(exceptionMap);
return obj;
}
这个先不详细描述了
我们可以试着把重点放在第一个方法上,选中第一个方法,CTRL+ALT+H
发现出现了一个新的view,call hierarchy.
一共有4个调用。
先看第一个方法 在新类TopologyAnalyser中
private void buildTopology(IRoutingMap map) {
topology=new HashMap();
Collection processors=map.getMessageProcessors();
Iterator it=processors.iterator();
//Get the direct connections
while (it.hasNext()) {
IMessageProcessor processor=(IMessageProcessor)it.next();
link(processor,map.getProcessDestinations(processor));
link(processor,map.getDiscardDestinations(processor));
}
//Now get the indirect ones
it=processors.iterator();
}
调用了RoutingMap里面的
public Collection getMessageProcessors() {
return Collections.unmodifiableCollection(processors);
}
这里面的
Collections.unmodifiableCollection返回指定 collection 的不可修改视图,此方法允许模块为用户提
供对内部 collection 的“只读”访问。在返回的 collection 上执行的查询操作将“读完”指定的
collection。试图修改返回的 collection(不管是直接修改还是通过其迭代器进行修改)将导致抛出
UnsupportedOperationException
processors值是xml传来的所有的IMessageProcessor。
迭代出所有的IMessageProcessor,
根据提供的IMessageProcessor key得到value List<IMessageProcessor>。
link(processor,map.getProcessDestinations(processor));将源处理器和目标处理器连接起来。
查找link()的源
private void link(IMessageProcessor src,List destinations) {
TopologyInfo srcTi=getTopologyInfo(src);
srcTi.addOutputs(destinations);
Iterator it=destinations.iterator();
while(it.hasNext()) {
IMessageProcessor destination=(IMessageProcessor)it.next();
TopologyInfo dstTi=getTopologyInfo(destination);
dstTi.addInput(src);
}
}
TopololyInfo 是类TopologyAnalyser中一个内部类,作用就是得到输入和输出,并将输入和输出
的IMessageProcessor放入list中。
class TopologyInfo {
private List inputs=new ArrayList();
private List outputs=new ArrayList();
public List getInputs() {
return inputs;
}
public List getOutputs(){return outputs;}
public void addInput(IMessageProcessor input) {
inputs.add(input);
}
public void addOutput(IMessageProcessor output) {outputs.add(output);}
//public void addInputs(Collection inputs) {this.inputs.addAll(inputs);}
public void addOutputs(Collection outputs) {this.outputs.addAll(outputs);}
}
我们再回头看link()的源,将目的处理器给了srcTi的outputs List.源处理器给了inputs List。
在这里源和目的终于连接上了。
那么这个类TopologyAnalyser,在link()中要初始化,
首先调用
private TopologyInfo getTopologyInfo(IMessageProcessor processor) {
TopologyInfo topologyInfo;
if (!topology.containsKey(processor)) {
topologyInfo=new TopologyInfo();
topology.put(processor, topologyInfo);
}
else {
topologyInfo=(TopologyInfo)topology.get(processor);
}
return topologyInfo;
}
如果在Map topology中包含源IMessageProcessor,则将topology中的目标IMessageProcessor放
入topologyInfo中,如果不包含IMessageProcessor,则创建一个新的TopologyInfo实例,然后将空
的topologyInfo和源ImessageProcessor一起放入topology Map。返回topologyInfo.
posted @
2010-11-03 13:54 Vigoss 阅读(177) |
评论 (0) |
编辑 收藏
2010年11月2日
好了,老子不废话了,真金不怕卖钱。
Router bean下有property 名叫processors,所以应该也有个set方法,不然没有办法设置值。我们找到
org.openadaptor.core.router.Router类。
不负众望
public void setProcessors(List processorList){
// create process map from list of processors
if ((processorList==null) || (processorList.isEmpty())) {
throw new RuntimeException("Null or empty processorList is not permitted");
}
Object[] processors=processorList.toArray(new Object[processorList.size()]);
Map processMap = new HashMap();
for (int i=1;i<processors.length;i++){
processMap.put(processors[i-1],processors[i]);
}
setProcessMap(processMap);
}
果然有,大家通过简单阅读代码可以看到这就是个将List设置到Map的过程,那么Map有什么用呢?大家再
仔细看看,不要着急,对了,看到了吧,那个for(;;),它是将一个处理器接着一个处理器塞
到HashMap里面,前面的是顺序用个鲜明的矩阵图来形容吧
1 2
2 3
3 4
4 5
发现什么规律?右边的减左边的等于1,- -,你真不是盖的。
大家想想HashMap的结构是什么样的?<key,value>!
对!
当它循着这个Map的key找的时候会找到value,然后又以这个value为key接着找下一个value,循环直到这
个Map的最后一个值。这样就会把每个processor都执行一遍。
那么有人会问- -!为什么说会这样执行,你怎么知道啊?
好的,来看
setProcessors()里最后一行是什么?
setProcessMap()
好,继续找它,对,还在Router里
public void setProcessMap(Map map) {
if (processMapConfigured) {
throw new RuntimeException("Only one of processMap and processors properties may be
configured");
}
processMapConfigured=true;
routingMap.setProcessMap(map);
}
对,继续
这里设置了processMapConfigured=true;这是为什么呢?LZ下次为你揭开这个神秘的擦脚布。
先把注意力放在routingMap里,它的本类是RoutingMap
好,找到
RoutingMap,找到setProcessMap()
public void setProcessMap(Map map) {
processMap.clear();
populateMap(map, processMap);
}
注意populateMap(map,processMap)在执行前,processMap是空的
在这里我们发现了processMap的定义,Sets the processMap which defines how to route output from
one adaptor component to anothers.
是一个定义怎样将一个适配器组件的输出转发到另外一个的集合
首先它把processMap清空了,然后populateMap(map,processMap)
哇擦,天长地久有时尽 程序绵绵无绝期
找到populateMap(map,processMap)
private void populateMap(Map map, Map checkedMap) {
map = autoboxer.autobox(map);
for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
verifyEntryKeyIsIMessageProcessor(entry);
IMessageProcessor fromProcessor = (IMessageProcessor) entry.getKey();
List processorList = autoboxIMessageProcessorList(entry.getValue());
processors.add(fromProcessor);
processors.addAll(processorList);
checkedMap.put(fromProcessor, processorList);
}
}
大家仔细阅读代码发现这就是个将map中的数据先verifyEntryKeyIsIMessageProcessor(entry);
确定map的key是否是MessageProcessor,不是就报错退出了,是就将他强制转换为IMessageProcessor,
然后自动封装,这里真j8操淡,TMD前面设置到Map里面的时候就可以了,现在还要设置到set中,还来个
什么自动封箱,封你妈啊,不过他们也有道理,如果list里面嵌套了另外一个list就把第二
个list的MessageProcessor给漏掉了。现在checkMap里面已经有了所有的MessageProcessor,LZ就要看看
他们要干什吗?
posted @
2010-11-02 17:39 Vigoss 阅读(199) |
评论 (0) |
编辑 收藏
2010年11月1日
OpenAdaptor是为达到快速构建数据集成功能的适配器框架,用Java实现,抽象数据迁移的过程,用3类组
件完成数据迁移,通过对组件的连接和配置达到快速构建、高度复用的目的,快速地满足企业数据集成的
要求,具有标准化、快速开发、灵活、易于定制,高度复用特点。
OpenAdaptor原理
在2个不同的业务数据库之间进行数据迁移的时候,经历三个过程:数据抽象,数据转换,数据加
载。OpenAdaptor框架将3个阶段抽象为3类不同的组件:sources,pipes,sinks,通过这3类内置的组件
分别完成各阶段工作。
在数据抽取阶段,对应sources组件,完成与源端的协议连接,读取数据并将数据转化为DataObject数组
,根据需要对数据加密/压缩;
数据转化阶段,对应pipes组件,完成数据过滤、数据转化、异常处理等作业;
数据加载对应sinks组件,完成建立与目的端的协议连接,将数据转化为目的端的数据格式,数据解压/解
密,将数据写入目的端。
source类型有3种:
callback类型是指数据到来时间触发source读取数据;
listen类型表示source监听数据,例如source从socket读数据。
poll类型表示每隔一段时间久读取数据。
在run()方法中不同类型的source分别调用runCallback()、runListen()、runPoll。
sourceProcess()方法对读取到的DataObject[]进行处理,并调用processMessage(),该方法会循环
调用管道线里下游组件的processMessage()。
AbstractWriter将DataObject[]转化为字符串的一个Writer类,如果sink制定了一个DoStringWriter,则
用改writer的实例根据DataObject[]产生字符串,否则产生默认的XML格式字符串。
OpenAdaptor采用的是Spring的技术,所以建议你在学习OpenAdaptor前,先学习Spring,至少要了
解Spring的AOP和IOC,知道Spring的工作原理,你就明白了一半OpenAdaptor的调用过程。
受限我们先看一个OpeanAdaptor的一个XML文件,为什么它要使用XML文件,而不是txt或者doc呢?这是因
为,XML文件是用来存储数据的,重在数据本身。而且她使用标签来存储数据,每个标签必须同时有结束
可开始标签。并且标签必须按照合适的顺序进行嵌套,这就使得它可以用来表示一些有层次的数据类型,
例如java中的类。还有一个重要原因是,Spring的特性(Service层,DAO层独立出现,在编码时既没有实
例化对象,也没有设置依赖关系,而把它交给Spring,由Spring在运行阶段实例化、组装对象)决定它必
须要使用到XML文件,在XML文件中配置各个组件。
首先,我们看一下OpenAdaptor的XML文件中的内容:
先看<beans>,yes,你很聪明,那是一个标准的Spring XML文件开头,好了这下你更确信了,所
谓OpenAdaptor不过是建立在Spring基础上的一个有特定功能的Java功能组件包。
好了,我们接着看,大家发现了么?LZ是个天才。
下面第一个组件式Adaptor 注意有很多Adaptor,这个是org.openadaptor.core.adaptor.Adaptor,那么我
们可以发现(当然LZ是在看了很多个XML文件后发现的),每个XML文件都有这个Adaptor,那么这
个Adaptor究竟是个神马东东?
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router" />
</bean>
1。LZ发现这个Adaptor里面有个property属性,那么我们就想吧,什么里面有属性这一说,大家不知道
学习Java有多深入,但是我可以告诉大家,属性绝对是个抽象的东西,因为具体的不叫属性,叫 值
。(⊙o⊙)哦!到这里你发现原来是这啊,那不就是个类吗?类里面有属性,当类生成具体对象 的
时候,可以给属性赋值,对头,LZ说。
那么既然是属性,肯定有设置它的方法啊?楼主决定给这么问的童鞋奖励一个臭豆腐,属性如果不
能设置,就好比你只是说我有臭豆腐,可是没法吃。LZ郁闷。
好,我们就打开这个类,看看谁掉泪。注意这个臭豆腐名叫messageProcessor。
当当当当
public void setMessageProcessor(final IMessageProcessor processor) {
if (this.processor != null) {
throw new RuntimeException("message processor has already been set");
}
this.processor = processor;
/* if the processor has metrics treat them as 'global' Adaptor ones */
if(processor instanceof IRecordableComponent){
this.metrics = ((IRecordableComponent) processor).getMetrics();
String enabled = metrics.isMetricsEnabled() ? "on" : "off";
log.info("Metrics are " + enabled);
}
}
瓦擦
这不是我编的。真的有这个嘞。好,如果你翻开看这个setMessageProcessor,行了,别吐。所
以我们不看丫的。那么我们知道就行了,它初始化的第一
个Bean是org.openadaptor.core.adaptor.Adaptor,它的一个属性是messageProcessor,它……等等
,有值,是的有值。值是多少?Router!Router?好的,Router是什么?我靠,Ruter是另外一 个Bean
,傻狗答道。
2。Router
介于傻狗同学的成就,我们今天请他作为特邀嘉宾,你好,傻狗!你好,楼猪!- -,啊,那个你向大家
介绍一下自己吧!(⊙_⊙),大家好,我是Router。大家 ⊙﹏⊙b汗
- <bean id="Router" class="org.openadaptor.core.router.Router">
- <property name="processors">
- <list>
<ref bean="Servlet" />
<ref bean="TickerFilter" />
<ref bean="XMLConverter" />
<ref bean="Validator" />
<ref bean="Writer" />
</list>
</property>
<property name="exceptionProcessor" ref="ExceptionHandler" />
</bean>
哇,大家一阵狂吐!
待续
posted @
2010-11-01 18:00 Vigoss 阅读(301) |
评论 (0) |
编辑 收藏
2010年10月13日
OpenAdaptor是一个基于Java/XML的软件平台,允许用少量的或非常规的编程进行快速地商业系统集成。它具有高扩展性并已经为JMS,LDAP,Mail,MQSeries,Oracle,Sybase,MSSQL Server和作为数据交换格式的XML构建接口组件。OpenAdaptor为开发人员提供了大量接口组件,通过Source、Pipe、Sink组件链的形式实现系统和系统间的连接,从而完成数据的交换。
OpenAdaptor是Dresdner Kleinwort Wasserstein 开发的一款open source的产品,主要面向不同系统中的数据传递。 它可以很富方便地将一个系统中的数据输出(Source),经过一定的转化(Pipe),输出到其他系统(Sink)。
Source 数据获取对象,是一个接口,可以根据数据获取来源不同,实现很多子类,比如:FileSource、JMSSource等。它本身并没有定义任何方法,但是扩展了线程,所以它的核心方法应该就是线程的run方法。
Pipe 对source获取的数据进行处理,比如过滤、重新组织等,比如:FilterPipe。最上层是一个抽象类,核心方法是processMessage()
Sink 数据发布,将整合后的数据,根据配置文件,发布出去,比如:FileSink、JMSSink等。定义了一个接口,核心方法是processMessage()
Controller 整个消息传递的控制类,是一个线程,负责对上面所说的三个对象的调用,完成一次消息的传递,每一次消息的传递都会新建一个线程实例。定义了一个接口。
DataObject 传递消息数据的对象,定义了一个接口。
Message 对DataObject对象集合的封装,用来传递消息。
LDAP是轻量目录访问协议,英文全称是Lightweight Directory Access Protocol,一般都简称为LDAP。它是基于X.500标准的,但是简单多了并且可以根据需要定制。与X.500不同,LDAP支持TCP/IP,这对访问Internet是必须的。LDAP的核心规范在RFC中都有定义,所有与LDAP相关的RFC都可以在LDAPman RFC网页中找到。
详细简单说来,LDAP是一个得到关于人或者资源的集中、静态数据的快速方式、LDAP是一个用来发布目录信息到许多不同资源的协议。通常它都作为一个集中的地址本使用,不过根据组织者的需要,它可以做得更加强大。详细信息参照http://baike.baidu.com/view/159263.htm
下面是几个例子
eg1:csv to database
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step07.xml 1285 2008-03-05 14:31:22Z higginse $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step07.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for csv to database.
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router"/>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processMap">
<map>
<entry key-ref="ReaderA" value-ref="MapConverter"/>
<entry key-ref="MapConverter" value-ref="Writer"/>
</map>
</property>
</bean>
<bean id="ReaderA" class="org.openadaptor.auxil.connector.iostream.reader.FileReadConnector">
<property name="filename" value="input/inputA.csv" />
</bean>
<bean id="MapConverter" class="org.openadaptor.auxil.convertor.delimited.DelimitedStringToOrderedMapConvertor"> <description>将读取csv文件的数据转换成map类型;以下对应的行属性值,是和数据库中表trade1中相对应的。 以下三个属性name,number,text分别和数据库中的字段一致,且和下面的outputColumns的数值一致,反之,报错! </description>
<property name="fieldNames">
<list>
<value>SIDE</value>
<value>STOCK</value>
<value>PRICE</value>
</list>
</property>
</bean>
<bean id="JdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti1_user"/>
<property name="password" value="ti1_user"/>
</bean>
<bean id="Writer" class="org.openadaptor.auxil.connector.jdbc.writer.JDBCWriteConnector">
<property name="jdbcConnection" ref="JdbcConnection" />
<property name="writer">
<bean class="org.openadaptor.auxil.connector.jdbc.writer.map.MapTableWriter">
<property name="tableName" value="TRADE" />
<property name="outputColumns">
<description>以下对应的行属性值,是和数据库中表trade1中相对应的。以下三个属性name,number,text分别和数据库中的字段一致,且和上面的map的key值一致,反之,报错!
</description>
<list>
<value>SIDE</value>
<value>STOCK</value>
<value>PRICE</value>
</list>
</property>
</bean>
</property>
</bean>
</beans>
eg2:database to database
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step09.xml 1508 2008-06-02 14:06:48Z cawthorng $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step09.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for database1 to database2
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router"/>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processors">
<list>
<ref bean="readerConnector"/>
<ref bean="writerConnector"/>
</list>
</property>
</bean>
<bean id="sourceJdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti1_user"/>
<property name="password" value="ti1_user"/>
</bean>
<bean id="targetJdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti1_user"/>
<property name="password" value="ti1_user"/>
</bean>
<bean id="readerConnector" class="org.openadaptor.auxil.connector.jdbc.reader.JDBCReadConnector">
<description>Reader which polls database using configured SQL.</description>
<property name="jdbcConnection" ref="sourceJdbcConnection"/>
<property name="sql">
<value>
SELECT side as BUYSELL,stock as TICKER,price as PRICE
FROM TRADE
</value>
</property>
</bean>
<bean id="writerConnector" class="org.openadaptor.auxil.connector.jdbc.writer.JDBCWriteConnector">
<property name="jdbcConnection" ref="targetJdbcConnection" />
<property name="writer">
<bean class="org.openadaptor.auxil.connector.jdbc.writer.map.MapTableWriter">
<property name="tableName" value="TRADE2" />
</bean>
</property>
</bean>
<bean id="writer" class="org.openadaptor.auxil.connector.jdbc.writer.map.MapTableWriter">
<property name="tableName" value="trade2"/> <!-- -->
<property name="outputColumns">
<list>
<value>side</value><!--ݿֶ -->
<value>price</value>
<value>stock</value>
</list>
</property>
</bean>
<bean id="ExceptionHandler" class="org.openadaptor.auxil.connector.iostream.writer.FileWriteConnector"> <property name="filename" value="output/print.txt"/>
</bean>
</beans>
eg3:xml to csv
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step07.xml 1285 2008-03-05 14:31:22Z higginse $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step07.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for xml to csv
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router">
</property>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processors">
<list>
<ref bean="Reader"/>
<ref bean="XmlToMap"/>
<ref bean="FormatterScript"/>
<ref bean="maptostring"/>
<ref bean="Writer"/>
</list>
</property>
</bean>
<bean id="Reader" class="org.openadaptor.auxil.connector.iostream.reader.FileReadConnector">
<property name="filename" value="input/trades.xml"/>
<property name="dataReader">
<bean class="org.openadaptor.auxil.connector.iostream.reader.string.StringReader"/>
</property>
</bean>
<bean id="XmlToMap" class="org.openadaptor.auxil.convertor.xml.XmlToOrderedMapConvertor"/>
<bean id="maptostring" class="org.openadaptor.auxil.convertor.delimited.OrderedMapToDelimitedStringConvertor"/>
<bean id="FormatterScript" class="org.openadaptor.auxil.processor.script.ScriptProcessor">
<description>Convert Dom4j Document inton org.w3c.dom.Document</description>
<property name="script">
<value>
var trade=oa_data.get('Trade'); oa_data.put('SIDE',trade.get('buySell')); oa_data.put('STOCK',trade.get('ticker'));
oa_data.put('PRICE',trade.get('price'));
oa_data.remove('Trade'); //No longer needed
</value>
</property>
</bean>
<bean id="Writer" class="org.openadaptor.auxil.connector.iostream.writer.FileWriteConnector">
<property name="filename" value="output/dengjianli.csv"/>
</bean>
</beans>
eg4:xml to database
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step07.xml 1285 2008-03-05 14:31:22Z higginse $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step07.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for xml to database
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router">
</property>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processors">
<list>
<ref bean="Reader" />
<ref bean="xmltomap" />
<ref bean="FormatterScript" />
<ref bean="writer" />
</list>
</property>
</bean>
<bean id="Reader" class="org.openadaptor.auxil.connector.iostream.reader.FileReadConnector">
<description>Read entire contents of input file</description>
<property name="filename" value="input/trades.xml" />
<property name="dataReader">
<bean class="org.openadaptor.auxil.connector.iostream.reader.string.StringReader" />
</property>
</bean>
<bean id="xmltomap" class="org.openadaptor.auxil.convertor.xml.XmlToOrderedMapConvertor" />
<bean id="maptostring" class="org.openadaptor.auxil.convertor.delimited.OrderedMapToDelimitedStringConvertor" />
<bean id="FormatterScript" class="org.openadaptor.auxil.processor.script.ScriptProcessor">
<property name="script">
<value>
var trade=oa_data.get('Trade');
oa_data.put('SIDE',trade.get('buySell'));
oa_data.put('STOCK',trade.get('ticker'));
oa_data.put('PRICE',trade.get('price'));
oa_data.remove('Trade');
</value>
</property>
</bean>
<bean id="JdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti1_user"/>
<property name="password" value="ti1_user"/>
</bean>
<bean id="writer" class="org.openadaptor.auxil.connector.jdbc.writer.JDBCWriteConnector">
<property name="jdbcConnection" ref="JdbcConnection" />
<property name="writer">
<bean class="org.openadaptor.auxil.connector.jdbc.writer.map.MapTableWriter">
<property name="tableName" value="TRADE" />
</bean>
</property>
</bean>
</beans>
eg5:txt to database
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step07.xml 1285 2008-03-05 14:31:22Z higginse $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step07.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for xml to database
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router">
</property>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processors">
<list>
<ref bean="FileReader" />
<ref bean="writerConnector" />
</list>
</property>
<property name="exceptionProcessor" ref="ExceptionHandler" />
</bean>
<bean id="FileReader" class="org.openadaptor.core.node.ReadNode">
<property name="connector">
<ref bean="readerConnector" />
</property>
<property name="processor">
<ref bean="mapconverter" />
</property>
</bean>
<bean id="readerConnector" class="org.openadaptor.auxil.connector.iostream.reader.FileReadConnector">
<property name="filename" value="input/data.txt" />
<property name="dataReader">
<bean class="org.openadaptor.auxil.connector.iostream.reader.string.LineReader" />
</property>
</bean>
<bean id="mapconverter" class="org.openadaptor.auxil.convertor.delimited.DelimitedStringToOrderedMapConvertor"> <property name="delimiter" value="," />
<property name="fieldNames">
<list>
<value>SIDE</value>
<value>STOCK</value>
<value>PRICE</value>
</list>
</property>
</bean>
<bean id="targetJdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti1_user"/>
<property name="password" value="ti1_user"/>
</bean>
<bean id="writerConnector" class="org.openadaptor.auxil.connector.jdbc.writer.JDBCWriteConnector">
<property name="jdbcConnection" ref="targetJdbcConnection" />
<property name="writer">
<ref bean="writer" />
</property>
</bean>
<bean id="writer" class="org.openadaptor.auxil.connector.jdbc.writer.map.MapTableWriter">
<property name="tableName" value="TRADE1" />
</bean>
<bean id="ExceptionHandler" class="org.openadaptor.auxil.connector.iostream.writer.FileWriteConnector">
<property name="filename" value="output/puberror.txt" />
</bean>
</beans>
以上例子 都可以选择同一个用户 在同一个用户下创建3个结构相同的表 sql如下
create table TRADE
(
SIDE VARCHAR2(50),
STOCK VARCHAR2(50),
PRICE NUMBER(20)
)
tablespace TI_DATA
pctfree 10
initrans 1
maxtrans 255
storage
(
initial 64K
minextents 1
maxextents unlimited
);
在运行以上实例的时候请参照如下目录下的 说明
D:\openadaptor-3.4.5-bin\openadaptor-3.4.5\example\tutorial\index.htm只需创建需要的xml文件并以 index.htm里面说的创建好环境后使用 java org.openadaptor.spring.SpringAdaptor -config step01.xml 在命令行模式下运行,注意以上都与队列无关,所以如果要使用队列的话,提供下面一个实例,
队列牵涉到发送端和接收端,所以需要配置两个xml文件,开两个cmd窗口运行,值得注意的是要在setclasspath.bat中添加一些需要的jar包,并且在运行前,在命令行下运行 setclasspath.bat文件,我的是这样的
@echo off
set CLASSPATH=.
set CLASSPATH=%CLASSPATH%;..\..\lib\openadaptor.jar
set CLASSPATH=%CLASSPATH%;..\..\lib\openadaptor-spring.jar
set CLASSPATH=%CLASSPATH%;..\..\lib\openadaptor-depends.jar
set CLASSPATH=%CLASSPATH%;..\..\lib\3rdparty\hsqldb.jar
set CLASSPATH=%CLASSPATH%;..\..\lib\3rdparty\jbossall-client.jar
set CLASSPATH=%CLASSPATH%;..\..\lib\3rdparty\ojdbc14.jar
set CLASSPATH=%CLASSPATH%;..\..\lib\3rdparty\sqljdbc.jar
@echo on
使用两个xml文件从 database to database
,xml文件名自己取
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step07.xml 1285 2008-03-05 14:31:22Z higginse $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step07.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for step 7 of the tutorial.
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router"/>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processors">
<list>
<ref bean="Reader"/>
<ref bean="Writer"/>
</list>
</property>
</bean>
<bean id="JndiConnection" class="org.openadaptor.auxil.connector.jndi.JNDIConnection">
<property name="initialContextFactory" value="org.jnp.interfaces.NamingContextFactory"/>
<property name="providerUrl" value="jnp://localhost:1099"/>
</bean>
<bean id="JmsConnection" class="org.openadaptor.auxil.connector.jms.JMSConnection">
<property name="jndiConnection" ref="JndiConnection"/>
<property name="connectionFactoryName" value="ConnectionFactory"/>
</bean>
<bean id="JdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti1_user"/>
<property name="password" value="ti1_user"/>
</bean>
<!--<bean id="Reader" class="org.openadaptor.auxil.connector.jms.JMSReadConnector">
<property name="destinationName" value="queue/testQueue"/>
<property name="jmsConnection" ref="JmsConnection"/>
<property name="transacted" value="true"/>
</bean>-->
<bean id="Reader" class="org.openadaptor.auxil.connector.jdbc.reader.JDBCReadConnector">
<description>Reader which polls database using configured SQL.</description>
<property name="jdbcConnection" ref="JdbcConnection"/>
<!-- batch size of 0 or less means process all rows in one message batch. -->
<!-- batch size of one means process one row per message and so on -->
<property name="batchSize" value="0"/>
<property name="resultSetConverter">
<bean class="org.openadaptor.auxil.connector.jdbc.reader.xml.ResultSetToXMLConverter"/>
</property>
<property name="sql">
<value>
SELECT side as BUYSELL,stock as TICKER,price as PRICE
FROM TRADE
</value>
</property>
</bean>
<bean id="Writer" class="org.openadaptor.auxil.connector.jms.JMSWriteConnector">
<property name="destinationName" value="queue/testQueue"/>
<property name="jmsConnection" ref="JmsConnection"/>
</bean>
<!--<bean id="Writer" class="org.openadaptor.auxil.connector.iostream.writer.FileWriteConnector"/>-->
</beans>
另外一个xml文件
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step09.xml 1508 2008-06-02 14:06:48Z cawthorng $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step09.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for step 9 of the tutorial.
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router"/>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processors">
<list>
<ref bean="Reader"/>
<ref bean="XmlToMap"/>
<ref bean="FormatterScript"/>
<ref bean="Writer"/>
</list>
</property>
</bean>
<bean id="JndiConnection" class="org.openadaptor.auxil.connector.jndi.JNDIConnection">
<property name="initialContextFactory" value="org.jnp.interfaces.NamingContextFactory"/>
<property name="providerUrl" value="jnp://localhost:1099"/>
</bean>
<bean id="JmsConnection" class="org.openadaptor.auxil.connector.jms.JMSConnection">
<property name="jndiConnection" ref="JndiConnection"/>
<property name="connectionFactoryName" value="ConnectionFactory"/>
</bean>
<bean id="Reader" class="org.openadaptor.auxil.connector.jms.JMSReadConnector">
<property name="destinationName" value="queue/testQueue"/>
<property name="jmsConnection" ref="JmsConnection"/>
<property name="transacted" value="true"/>
</bean>
<bean id="XmlToMap" class="org.openadaptor.auxil.convertor.xml.XmlToOrderedMapConvertor"/>
<bean id="FormatterScript" class="org.openadaptor.auxil.processor.script.ScriptProcessor">
<property name="script">
<value>
<![CDATA[
var trade=oa_data.get('row');
oa_data.put('SIDE',trade.get('BUYSELL'));
oa_data.put('STOCK',trade.get('TICKER'));
oa_data.put('PRICE',trade.get('PRICE'));
oa_data.remove('row'); //No longer needed
]]>
</value>
</property>
</bean>
<bean id="JdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti2_user"/>
<property name="password" value="ti2_user"/>
</bean>
<bean id="Writer" class="org.openadaptor.auxil.connector.jdbc.writer.JDBCWriteConnector">
<property name="jdbcConnection" ref="JdbcConnection" />
<property name="writer">
<bean class="org.openadaptor.auxil.connector.jdbc.writer.map.MapTableWriter">
<property name="tableName" value="TRADE" />
</bean>
</property>
</bean>
</beans>
有问题欢迎讨论
posted @
2010-10-13 10:44 Vigoss 阅读(361) |
评论 (0) |
编辑 收藏