paulwong

spring integration同步数据库数据

需求为:当客户已有系统的数据被同步到我方数据库后,若再有新数据,只同步新数据到我方数据库。解决:因为客户的业务表是不能变动的,我方在客户数据库中新建一状态表,记录哪些数据被更新过。
当客户业务表有新数据插入时,用触发器将新数据id插入到状态表。

为方便实例:业务表pp,状态表status
结构为:
pp:
CREATE TABLE `pp` (
  `name` 
varchar(255default NULL,
  `address` 
varchar(255default NULL,
  `id` 
int(11NOT NULL auto_increment,
  
PRIMARY KEY  (`id`)
) ENGINE
=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

status:

CREATE TABLE `status` (
  `id` 
int(11NOT NULL auto_increment,
  `status` 
varchar(255default 'new',
  `ppid` 
int(11NOT NULL,
  
PRIMARY KEY  (`id`)
) ENGINE
=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8;


触发器:
DROP TRIGGER if EXISTS mytrigger
CREATE TRIGGER mytrigger after INSERT on pp
for EACH ROW
BEGIN
 
INSERT into `status`(ppid) values(new.id);
END;


核心配置:jdbc-inbound-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
       xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance" 
       xmlns:context
="http://www.springframework.org/schema/context" 
       xmlns:int
="http://www.springframework.org/schema/integration" 
       xmlns:int-jdbc
="http://www.springframework.org/schema/integration/jdbc"    
       xmlns:int-jms
="http://www.springframework.org/schema/integration/jms" 
       xmlns:jdbc
="http://www.springframework.org/schema/jdbc" 
       xsi:schemaLocation
="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context-3.0.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd 
    http://www.springframework.org/schema/integration/jdbc 
    http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd 
    http://www.springframework.org/schema/jdbc 
    http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd
     http://www.springframework.org/schema/integration/jms 
    http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd"
>
    
<context:component-scan base-package="com.wisely.inbound"/>
     
    
<int:channel id="target"/>
    
    
<int-jdbc:inbound-channel-adapter channel="target" 
                    data-source
="dataSource"
                    query
="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"
                    update
="update status as st set st.status='old' where ppid in (:ppid)"
                                       
>
        
<!-- 每隔多少毫秒去抓取 -->
        
<int:poller fixed-rate="5000" >
            
<int:transactional/>
        
</int:poller>
        
<!--  指定时刻抓取
        <int:poller max-messages-per-poll="1">
            <int:transactional/>
            <int:cron-trigger expression="0 0 3 * * ?"/>
        </int:poller>
        
-->
    
</int-jdbc:inbound-channel-adapter>
    
<int:service-activator input-channel="target" ref="jdbcMessageHandler"/>     
     
<context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>
     
<bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">
        
<property name="driverClassName" value="${database.driverClassName}"/>
        
<property name="url" value="${database.url}"/>
        
<property name="username" value="${database.username}"/>
        
<property name="password" value="${database.password}"/>
    
</bean>   
    
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        
<property name="dataSource" ref="dataSource"/>
    
</bean>    
   
</beans>


JdbcMessageHandler:
package com.wisely.inbound.jdbc;

import java.util.List;
import java.util.Map;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Component
public class JdbcMessageHandler {
    @ServiceActivator
    
public void handleJdbcMessage(List<Map<String ,Object>> message){
        
for(Map<String,Object> resultMap:message){
            System.out.println(
"组:");
            
for(String column:resultMap.keySet()){
                System.out.println(
"字段:"+column+" 值:"+resultMap.get(column));
            }

        }

    }

}


测试类:
package com.wisely.inbound.jdbc;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class JdbcInbound {

    
/**
     * 
@param args
     
*/

    
public static void main(String[] args) {
          ApplicationContext context 
= 
                    
new ClassPathXmlApplicationContext("/META-INF/spring/jdbc-inbound-context.xml");
    }


}


若将channel改为jms的通道。配置文件做以下修改:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
       xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance" 
       xmlns:context
="http://www.springframework.org/schema/context" 
       xmlns:int
="http://www.springframework.org/schema/integration" 
       xmlns:int-jdbc
="http://www.springframework.org/schema/integration/jdbc"    
       xmlns:int-jms
="http://www.springframework.org/schema/integration/jms" 
       xmlns:jdbc
="http://www.springframework.org/schema/jdbc" 
       xsi:schemaLocation
="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context-3.0.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd 
    http://www.springframework.org/schema/integration/jdbc 
    http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd 
    http://www.springframework.org/schema/jdbc 
    http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd
     http://www.springframework.org/schema/integration/jms 
    http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd"
>
    
<context:component-scan base-package="com.wisely.inbound"/>
     
    
<int-jms:channel id="target"  queue-name="jdbc.queue" connection-factory="connectionFactory"/>
    
    
<int-jdbc:inbound-channel-adapter channel="target" 
                                      data-source
="dataSource"
                                      query
="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"
                                      update
="update status as st set st.status='old' where ppid in (:ppid)"
                                       
>
        
<!-- 每隔多少毫秒去抓取 -->
        
<int:poller fixed-rate="5000" >
            
<int:transactional/>
        
</int:poller>
        
<!--  指定时刻抓取
        <int:poller max-messages-per-poll="1">
            <int:transactional/>
            <int:cron-trigger expression="0 0 3 * * ?"/>
        </int:poller>
        
-->
    
</int-jdbc:inbound-channel-adapter>
    
<!--  
    <int-jms:message-driven-channel-adapter id="queInbound" destination-name="jmsQueue" channel="target"/>
    
-->
    
<int:service-activator input-channel="target" ref="jdbcMessageHandler"/>
     
     
<context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>
     
<bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">
        
<property name="driverClassName" value="${database.driverClassName}"/>
        
<property name="url" value="${database.url}"/>
        
<property name="username" value="${database.username}"/>
        
<property name="password" value="${database.password}"/>
    
</bean>
    
    
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        
<property name="dataSource" ref="dataSource"/>
    
</bean>
    
    
    
<bean id="activeMqConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        
<property name="brokerURL" value="vm://localhost" />
    
</bean>
    
    
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        
<property name="sessionCacheSize" value="10" />
        
<property name="cacheProducers" value="false"/>
        
<property name="targetConnectionFactory" ref="activeMqConnectionFactory"/>
    
</bean>
    
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        
<property name="connectionFactory" ref="connectionFactory"/>
        
<property name="defaultDestinationName" value="jmsQueue" />
    
</bean>
</beans>

转:http://wiselyman.iteye.com/blog/1150495

posted on 2012-10-17 11:50 paulwong 阅读(2320) 评论(1)  编辑  收藏 所属分类: SPRING INTERGRATION

Feedback

# re: spring integration同步数据库数据 2014-07-24 15:18 石因

没有将数据插入到数据库的例子啊!  回复  更多评论   



只有注册用户登录后才能发表评论。


网站导航: