|
2016年10月12日
在Spring cloud config出来之前, 自己实现了基于ZK的配置中心, 杜绝了本地properties配置文件, 原理很简单, 只是重载了PropertyPlaceholderConfigurer的mergeProperties(): /** * 重载合并属性实现 * 先加载file properties, 然后并入ZK配置中心读取的properties * * @return 合并后的属性集合 * @throws IOException 异常 */ @Override protected Properties mergeProperties() throws IOException { Properties result = new Properties(); // 加载父类的配置 Properties mergeProperties = super.mergeProperties(); result.putAll(mergeProperties); // 加载从zk中读取到的配置 Map<String, String> configs = loadZkConfigs(); result.putAll(configs); return result; } 这个实现在spring项目里用起来还是挺顺手的, 但是近期部分spring-boot项目里发现这种placeholder的实现跟spring boot的@ConfigurationProperties(prefix = "xxx") 不能很好的配合工作, 也就是属性没有被resolve处理, 用@Value的方式确可以读到, 但是@Value配置起来如果属性多的话还是挺繁琐的, 还是倾向用@ConfigurationProperties的prefix, 于是看了下spring boot的文档发现 PropertySource order: * Devtools global settings properties on your home directory (~/.spring-boot-devtools.properties when devtools is active). * @TestPropertySource annotations on your tests. * @SpringBootTest#properties annotation attribute on your tests. * Command line arguments. * Properties from SPRING_APPLICATION_JSON (inline JSON embedded in an environment variable or system property) * ServletConfig init parameters. * ServletContext init parameters. * JNDI attributes from java:comp/env. * Java System properties (System.getProperties()). * OS environment variables. * A RandomValuePropertySource that only has properties in random.*. * Profile-specific application properties outside of your packaged jar (application-{profile}.properties and YAML variants) * Profile-specific application properties packaged inside your jar (application-{profile}.properties and YAML variants) * Application properties outside of your packaged jar (application.properties and YAML variants). * Application properties packaged inside your jar (application.properties and YAML variants). * @PropertySource annotations on your @Configuration classes. * Default properties (specified using SpringApplication.setDefaultProperties). 不难发现其会检查Java system propeties里的属性, 也就是说, 只要把mergerProperties读到的属性写入Java system props里即可, 看了下源码, 找到个切入点 /** * 重载处理属性实现 * 根据选项, 决定是否将合并后的props写入系统属性, Spring boot需要 * * @param beanFactoryToProcess * @param props 合并后的属性 * @throws BeansException */ @Override protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props) throws BeansException { // 原有逻辑 super.processProperties(beanFactoryToProcess, props); // 写入到系统属性 if (writePropsToSystem) { // write all properties to system for spring boot Enumeration<?> propertyNames = props.propertyNames(); while (propertyNames.hasMoreElements()) { String propertyName = (String) propertyNames.nextElement(); String propertyValue = props.getProperty(propertyName); System.setProperty(propertyName, propertyValue); } } } 为避免影响过大, 设置了个开关, 是否写入系统属性, 如果是spring boot的项目, 就开启, 这样对线上非spring boot项目做到影响最小, 然后spring boot的@ConfigurationProperties完美读到属性; 具体代码见: org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { ConfigurationProperties annotation = AnnotationUtils .findAnnotation(bean.getClass(), ConfigurationProperties.class); if (annotation != null) { postProcessBeforeInitialization(bean, beanName, annotation); } annotation = this.beans.findFactoryAnnotation(beanName, ConfigurationProperties.class); if (annotation != null) { postProcessBeforeInitialization(bean, beanName, annotation); } return bean; }
Spring默认不允许对类的变量, 也就是静态变量进行注入操作, 但是在某些场景比如单元测试的@AfterClass要访问注入对象, 而Junit的这个方法必须是静态的, 也就产生了悖论; 解决思路有两个: 思路1: 想办法对静态变量注入, 也就是绕过Spring只能运行非静态变量才能注入依赖的壁垒 思路2: 想办法@AfterClass改造为非静态 实现Junit RunListener, 覆盖testRunFinished方法, 这里去实现类似@AfterClass的功能, 这个方法是非静态的 不要用Junit, 改用TestNG, TestNG里的AfterClass是非静态的 用Spring的TestExecutionListeners, 实现个Listener, 里面也有个类似非静态的AfterClass的实现, 覆盖实现就行
思路2的几个方法都可以实现, 但是单元测试Runner需要用 而且改用TestNG工程浩大, 只能放弃掉这个思路 继续走思路1, 只能去绕过Spring的依赖注入的static壁垒了, 具体代码如下: @Autowired private Destination dfsOperationQueue; private static Destination dfsOperationQueueStatic; // static version @Autowired private MessageQueueAPI messageQueueAPI; private static MessageQueueAPI messageQueueAPIStatic; // static version
@PostConstruct public void init() { dfsOperationQueueStatic = this.dfsOperationQueue; messageQueueAPIStatic = this.messageQueueAPI; }
@AfterClass public static void afterClass() { MessageVO messageVO = messageQueueAPIStatic.removeDestination(dfsOperationQueueStatic); System.out.println(messageVO); }
其实就是用了@PostConstruct 来个偷梁换柱而已, 多声明个静态成员指向非静态对象, 两者其实是一个对象
知道activemq现在已经支持了rest api, 但是官方对这部分的介绍一笔带过 (http://activemq.apache.org/rest.html),
通过google居然也没搜到一些有用的, 比如像删除一个destination, 都是问的多,然后没下文. 于是花了一些心思研究了一下:
首先通过rest api获取当前版本所有已支持的协议 http://172.30.43.206:8161/api/jolokia/list
然后根据json输出关于removeTopic, removeQueue的mbean实现通过rest api删除destination的方法, 注意到用GET请求而不是POST,不然会报错 (官网的例子里用的wget给的灵感, 开始用了POST老报错)
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.DefaultHttpClient; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.springframework.web.client.RestTemplate;
import javax.jms.Destination; import javax.jms.JMSException; import java.util.Arrays;
public class MessageQueueAdmin { private static final RestTemplate restTemplate = getRestTemplate("admin", "admin");
private static String brokerHost = "172.30.43.206"; private static String adminConsolePort = "8161"; private static String protocol = "http";
public static void removeDestination(Destination destination) throws JMSException { String destName, destType; if (destination instanceof ActiveMQQueue) { destName = ((ActiveMQQueue) destination).getQueueName(); destType = "Queue"; } else { destName = ((ActiveMQTopic) destination).getTopicName(); destType = "Topic"; }
// build urls String url = String.format("%s://%s:%s/api/jolokia/exec/org.apache.activemq:" + "brokerName=localhost,type=Broker/remove%s/%s", protocol, brokerHost, adminConsolePort, destType, destName); System.out.println(url); // do operation HttpHeaders headers = new HttpHeaders(); headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON)); HttpEntity<String> entity = new HttpEntity<String>("parameters", headers); ResponseEntity response = restTemplate.exchange(url, HttpMethod.GET, entity, String.class); System.out.println(response.getBody()); }
public static void main(String[] args) throws JMSException { ActiveMQTopic topic = new ActiveMQTopic("test-activemq-topic"); removeDestination(topic); }
private static RestTemplate getRestTemplate(String user, String password) { DefaultHttpClient httpClient = new DefaultHttpClient(); BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); httpClient.setCredentialsProvider(credentialsProvider); ClientHttpRequestFactory rf = new HttpComponentsClientHttpRequestFactory(httpClient);
return new RestTemplate(rf); } }
其他的请求,应该都是类似jolokia的exec get request的格式:
https://jolokia.org/reference/html/protocol.html#exec
<base url>/exec/<mbean name>/<operation name>/<arg1>/<arg2>/.
用Spring JMS 的JmsTemplate从消息队列消费消息时发现,使用了CLIENT_ACKNOWLEDGE模式,消息返回后总是自动被ack,也就是被broker "Dequeued" protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException { try { // Use transaction timeout (if available). long timeout = getReceiveTimeout(); JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory()); if (resourceHolder != null && resourceHolder.hasTimeout()) { timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis()); } Message message = doReceive(consumer, timeout); if (session.getTransacted()) { // Commit necessary - but avoid commit call within a JTA transaction. if (isSessionLocallyTransacted(session)) { // Transacted session created by this template -> commit. JmsUtils.commitIfNecessary(session); } } else if (isClientAcknowledge(session)) { // Manually acknowledge message, if any. if (message != null) { message.acknowledge(); } } return message; } finally { JmsUtils.closeMessageConsumer(consumer); } }
但是使用异步listener 就不会出现这个情况,搜了下google,发现果然存在这个问题 https://jira.spring.io/browse/SPR-12995 https://jira.spring.io/browse/SPR-13255 http://louisling.iteye.com/blog/241073 同步方式拉取消息,暂时没找到好的封装,只能暂时用这。或者尽量用listener, 这个问题暂时标记下,或者谁有更好的解决方案可以comment我
|