需求为:当客户已有系统的数据被同步到我方数据库后,若再有新数据,只同步新数据到我方数据库。解决:因为客户的业务表是不能变动的,我方在客户数据库中新建一状态表,记录哪些数据被更新过。
当客户业务表有新数据插入时,用触发器将新数据id插入到状态表。
为方便实例:业务表pp,状态表status
结构为:
pp:
CREATE TABLE `pp` (
`name` varchar(255) default NULL,
`address` varchar(255) default NULL,
`id` int(11) NOT NULL auto_increment,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;
data:image/s3,"s3://crabby-images/370e0/370e053b28c0d1e5a884270fad646284f2d183b3" alt=""
status:
data:image/s3,"s3://crabby-images/370e0/370e053b28c0d1e5a884270fad646284f2d183b3" alt=""
CREATE TABLE `status` (
`id` int(11) NOT NULL auto_increment,
`status` varchar(255) default 'new',
`ppid` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8;触发器:
DROP TRIGGER if EXISTS mytrigger
CREATE TRIGGER mytrigger after INSERT on pp
for EACH ROW
BEGIN
INSERT into `status`(ppid) values(new.id);
END;核心配置:jdbc-inbound-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/integration/jdbc
http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd
http://www.springframework.org/schema/jdbc
http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd">
<context:component-scan base-package="com.wisely.inbound"/>
<int:channel id="target"/>
<int-jdbc:inbound-channel-adapter channel="target"
data-source="dataSource"
query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"
update="update status as st set st.status='old' where ppid in (:ppid)"
>
<!-- 每隔多少毫秒去抓取 -->
<int:poller fixed-rate="5000" >
<int:transactional/>
</int:poller>
<!-- 指定时刻抓取
<int:poller max-messages-per-poll="1">
<int:transactional/>
<int:cron-trigger expression="0 0 3 * * ?"/>
</int:poller>
-->
</int-jdbc:inbound-channel-adapter>
<int:service-activator input-channel="target" ref="jdbcMessageHandler"/>
<context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>
<bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">
<property name="driverClassName" value="${database.driverClassName}"/>
<property name="url" value="${database.url}"/>
<property name="username" value="${database.username}"/>
<property name="password" value="${database.password}"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
</beans>JdbcMessageHandler:
package com.wisely.inbound.jdbc;
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
import java.util.List;
import java.util.Map;
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
@Component
data:image/s3,"s3://crabby-images/2a1f3/2a1f35146451967292b66fa62c8f22027e7067cf" alt=""
public class JdbcMessageHandler
{
@ServiceActivator
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
public void handleJdbcMessage(List<Map<String ,Object>> message)
{
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
for(Map<String,Object> resultMap:message)
{
System.out.println("组:");
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
for(String column:resultMap.keySet())
{
System.out.println("字段:"+column+" 值:"+resultMap.get(column));
}
}
}
}测试类:
package com.wisely.inbound.jdbc;
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/2a1f3/2a1f35146451967292b66fa62c8f22027e7067cf" alt=""
public class JdbcInbound
{
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
/** *//**
* @param args
*/
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
public static void main(String[] args)
{
ApplicationContext context =
new ClassPathXmlApplicationContext("/META-INF/spring/jdbc-inbound-context.xml");
}
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
}若将channel改为jms的通道。配置文件做以下修改:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/integration/jdbc
http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd
http://www.springframework.org/schema/jdbc
http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd">
<context:component-scan base-package="com.wisely.inbound"/>
<int-jms:channel id="target" queue-name="jdbc.queue" connection-factory="connectionFactory"/>
<int-jdbc:inbound-channel-adapter channel="target"
data-source="dataSource"
query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"
update="update status as st set st.status='old' where ppid in (:ppid)"
>
<!-- 每隔多少毫秒去抓取 -->
<int:poller fixed-rate="5000" >
<int:transactional/>
</int:poller>
<!-- 指定时刻抓取
<int:poller max-messages-per-poll="1">
<int:transactional/>
<int:cron-trigger expression="0 0 3 * * ?"/>
</int:poller>
-->
</int-jdbc:inbound-channel-adapter>
<!--
<int-jms:message-driven-channel-adapter id="queInbound" destination-name="jmsQueue" channel="target"/>
-->
<int:service-activator input-channel="target" ref="jdbcMessageHandler"/>
<context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>
<bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">
<property name="driverClassName" value="${database.driverClassName}"/>
<property name="url" value="${database.url}"/>
<property name="username" value="${database.username}"/>
<property name="password" value="${database.password}"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="activeMqConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost" />
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="sessionCacheSize" value="10" />
<property name="cacheProducers" value="false"/>
<property name="targetConnectionFactory" ref="activeMqConnectionFactory"/>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestinationName" value="jmsQueue" />
</bean>
</beans>转:
http://wiselyman.iteye.com/blog/1150495