集群通常是有多个相同的实例,但对于定时任务场景,只希望有一个实例工作即可,如果这个实例挂了,其他实例可以顶替。
这个问题的方案则是集群选主,一个集群中,只有一个LEADER,由LEADER负责执行定时任务工作。当LEADER被取消时,会在剩下的实例中再选LEADER。
持有分布式锁的实例则是LEADER。
SPRING INTEGRATION JDBC 则已提供相关功能。
pom.xml
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
</dependency>
LeaderElectionIntegrationConfig.java
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.sql.DataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.jdbc.lock.DefaultLockRepository;
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
import org.springframework.integration.jdbc.lock.LockRepository;
import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;
import com.paul.integration.leader.ControlBusGateway;
import com.paul.integration.leader.MyCandidate;
@Configuration
public class LeaderElectionIntegrationConfig {
@Bean
public List<String> needToStartupAdapterList(){
return new CopyOnWriteArrayList<>();
}
@Bean
public DefaultLockRepository defaultLockRepository(DataSource dataSource){
DefaultLockRepository defaultLockRepository =
new DefaultLockRepository(dataSource);
// defaultLockRepository.setTimeToLive(60_000);
return defaultLockRepository;
}
@Bean
public JdbcLockRegistry jdbcLockRegistry(LockRepository lockRepository){
return new JdbcLockRegistry(lockRepository);
}
@Bean
public MyCandidate myCandidate(
ControlBusGateway controlBusGateway,
List<String> needToStartupAdapterList
) {
return new MyCandidate(controlBusGateway, needToStartupAdapterList);
}
@Bean
public LockRegistryLeaderInitiator leaderInitiator() {
return new LockRegistryLeaderInitiator(
jdbcLockRegistry(null), myCandidate(null, null)
);
}
}
MyCandidate.java
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.leader.Context;
import org.springframework.integration.leader.DefaultCandidate;
import com.novacredit.mcra.mcracommon.integration.gateway.ControlBusGateway;
public class MyCandidate extends DefaultCandidate{
private static final Logger LOG = LoggerFactory.getLogger(MyCandidate.class);
private List<String> needToStartupAdapterList;
private ControlBusGateway controlBusGateway;
public MyCandidate(
ControlBusGateway controlBusGateway,
List<String> needToStartupAdapterList
) {
this.controlBusGateway = controlBusGateway;
this.needToStartupAdapterList = needToStartupAdapterList;
}
@Override
public void onGranted(Context context) {
super.onGranted(context);
LOG.info("*** Leadership granted ***");
LOG.info("STARTING MONGODB POLLER");
needToStartupAdapterList
.forEach(
c -> {
// c = "@'testIntegrationFlow.org.springframework.integration.config."
// + "SourcePollingChannelAdapterFactoryBean#0'";
String command = c + ".start()";
LOG.info("-----{}", command);
controlBusGateway.sendCommand(command);
}
);
LOG.info("STARTUP MESSAGE SENT");
}
@Override
public void onRevoked(Context context) {
super.onRevoked(context);
LOG.info("*** Leadership revoked ***");
LOG.info("STOPPING MONGODB POLLER");
needToStartupAdapterList
.forEach(
c -> {
// c = "@'testIntegrationConfig.testIntegrationFlow."
// + "mongoMessageSource.inboundChannelAdapter'";
String command = c + ".stop()";
LOG.info("-----{}", command);
// controlBusGateway.sendCommand(command);
}
);
LOG.info("SHUTDOWN MESSAGE SENT");
}
}
ControlBusIntegrationConfig.java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.MessageChannel;
import com.paul.integration.gateway.ControlBusGateway;
@Configuration
public class ControlBusIntegrationConfig {
@Bean
public MessageChannel controlBusChannel() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from(controlBusChannel())
.log(LoggingHandler.Level.INFO, "controlBusChannel")
.controlBus()
.get();
}
@Bean
public GatewayProxyFactoryBean controlBusGateway() {
GatewayProxyFactoryBean gateway = new GatewayProxyFactoryBean(ControlBusGateway.class);
gateway.setDefaultRequestChannel(controlBusChannel());
gateway.setDefaultRequestTimeout(300l);
gateway.setDefaultReplyTimeout(300l);
return gateway;
}
}
ControlBusGateway.java
public interface ControlBusGateway {
public void sendCommand(String command);
}
各个应用实例运行时,其中的LockRegistryLeaderInitiator会自动运行,抢夺LEADER数据,最终只有一个实例夺取。之后再执行MyCandidate中的代码。