paulwong

在SPRING BOOT中使用多JMS CONNECTION

使用自定义CONNECTION FACTORY,这样会覆盖SPRING 的AUTO CONFIGURATION。

ActiveMQConnectionFactoryFactory.java
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryCustomizer;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQProperties;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQProperties.Packages;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;


/**
 * Factory to create a {
@link ActiveMQConnectionFactory} instance from properties defined
 * in {
@link SecondaryActiveMQProperties}.
 *
 * 
@author Phillip Webb
 * 
@author Venil Noronha
 
*/
class ActiveMQConnectionFactoryFactory {

    private static final String DEFAULT_EMBEDDED_BROKER_URL = "vm://localhost?broker.persistent=false";

    private static final String DEFAULT_NETWORK_BROKER_URL = "tcp://localhost:61616";

    private final ActiveMQProperties properties;

    private final List<ActiveMQConnectionFactoryCustomizer> factoryCustomizers;

    ActiveMQConnectionFactoryFactory(ActiveMQProperties properties,
            List<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
        Assert.notNull(properties, "Properties must not be null");
        this.properties = properties;
        this.factoryCustomizers = (factoryCustomizers != null) ? factoryCustomizers : Collections.emptyList();
    }

    public <T extends ActiveMQConnectionFactory> T createConnectionFactory(Class<T> factoryClass) {
        try {
            return doCreateConnectionFactory(factoryClass);
        }
        catch (Exception ex) {
            throw new IllegalStateException("Unable to create " + "ActiveMQConnectionFactory", ex);
        }
    }

    private <T extends ActiveMQConnectionFactory> T doCreateConnectionFactory(Class<T> factoryClass) throws Exception {
        T factory = createConnectionFactoryInstance(factoryClass);
        if (this.properties.getCloseTimeout() != null) {
            factory.setCloseTimeout((intthis.properties.getCloseTimeout().toMillis());
        }
        factory.setNonBlockingRedelivery(this.properties.isNonBlockingRedelivery());
        if (this.properties.getSendTimeout() != null) {
            factory.setSendTimeout((intthis.properties.getSendTimeout().toMillis());
        }
        Packages packages = this.properties.getPackages();
        if (packages.getTrustAll() != null) {
            factory.setTrustAllPackages(packages.getTrustAll());
        }
        if (!packages.getTrusted().isEmpty()) {
            factory.setTrustedPackages(packages.getTrusted());
        }
        customize(factory);
        return factory;
    }

    private <T extends ActiveMQConnectionFactory> T createConnectionFactoryInstance(Class<T> factoryClass)
            throws InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
        String brokerUrl = determineBrokerUrl();
        String user = this.properties.getUser();
        String password = this.properties.getPassword();
        if (StringUtils.hasLength(user) && StringUtils.hasLength(password)) {
            return factoryClass.getConstructor(String.class, String.class, String.class).newInstance(user, password,
                    brokerUrl);
        }
        return factoryClass.getConstructor(String.class).newInstance(brokerUrl);
    }

    private void customize(ActiveMQConnectionFactory connectionFactory) {
        for (ActiveMQConnectionFactoryCustomizer factoryCustomizer : this.factoryCustomizers) {
            factoryCustomizer.customize(connectionFactory);
        }
    }

    String determineBrokerUrl() {
        if (this.properties.getBrokerUrl() != null) {
            return this.properties.getBrokerUrl();
        }
        if (this.properties.isInMemory()) {
            return DEFAULT_EMBEDDED_BROKER_URL;
        }
        return DEFAULT_NETWORK_BROKER_URL;
    }
}

TwinJmsConnectionFactoryConfiguration.java
import java.util.stream.Collectors;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.messaginghub.pooled.jms.JmsPoolConnectionFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.jms.JmsPoolConnectionFactoryFactory;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryCustomizer;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Profile;


@Configuration
@Profile({"local"})
public class TwinJmsConnectionFactoryConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "spring.activemq.primary")
    public ActiveMQProperties primaryActiveMQProperties() {
        return new ActiveMQProperties();
    }
    
    @Bean(destroyMethod = "stop")
    @Primary
    @ConditionalOnProperty(prefix = "spring.activemq.pool", name = "enabled", havingValue = "true")
    public JmsPoolConnectionFactory connectionFactory(ActiveMQProperties primaryActiveMQProperties,
            ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactoryFactory(primaryActiveMQProperties,
                factoryCustomizers.orderedStream().collect(Collectors.toList()))
                .createConnectionFactory(ActiveMQConnectionFactory.class);
        return new JmsPoolConnectionFactoryFactory(primaryActiveMQProperties.getPool())
                .createPooledConnectionFactory(connectionFactory);
    }
    
    ////////////////////////////////////////////////////////////////////////////////
    @Bean
    @ConfigurationProperties(prefix = "spring.activemq.sescond")
    public ActiveMQProperties sescondActiveMQProperties() {
        return new ActiveMQProperties();
    }
    
    @Bean(destroyMethod = "stop")
    @ConditionalOnProperty(prefix = "spring.activemq.pool", name = "enabled", havingValue = "true")
    public JmsPoolConnectionFactory sescondPooledJmsConnectionFactory(ActiveMQProperties sescondActiveMQProperties,
            ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactoryFactory(sescondActiveMQProperties,
                factoryCustomizers.orderedStream().collect(Collectors.toList()))
                .createConnectionFactory(ActiveMQConnectionFactory.class);
        return new JmsPoolConnectionFactoryFactory(sescondActiveMQProperties.getPool())
                .createPooledConnectionFactory(connectionFactory);
    }
    
}


posted on 2020-03-19 09:45 paulwong 阅读(600) 评论(0)  编辑  收藏 所属分类: JMS


只有注册用户登录后才能发表评论。


网站导航: