badqiu

XPer
随笔 - 46, 文章 - 3, 评论 - 195, 引用 - 0
数据加载中……

分布式应用上下文(Distributed ThreadLocal)

1.问题

单机应用内,在进程内部,我们可以使用ThreadLocal传递应用上下文的方式. 当前的 Spring Secrucity , Spring TransactionManager, Log4J MDC, Struts2 ActionContext等等应用场景随处可见.

但在是分布式系统下,由于不是在同一个进程内,所以无法使用ThreadLocal. 那么什么是分布式ThreadLocal呢?就是将一个系统中的ThreadLocal信息可以传递至下一个系统,将两者的调用可以关联起来。如对应用有一个调用,我们生成一个请求ID (traceId),在后面所有分布式系统调用中,可以通过这个traceId将所有调用关联起来,这样查找调用日志都将十分方便.

2.实现方式

我们现在使用的通讯协议,一般都包含两部分:Header,Body. 如 Soap Header,Http Header. 通过自定义Header,可以带上我们的自定义信息。 然后在服务器端解析Header,再得到自定义信息。那么就可以完成Distributed ThreadLocal的功能。

如上图,通过两个拦截器,client在调用之前,将DistrbiutedThreadLocal中的信息放在soap header中,在服务端方法调用之前,从soap header中取回 DistrbiutedThreadLocal信息。

3. 实现代码.

以下为CXF webservice的实现代码,一个DistributedThreadLocal及增加了两个拦截器. hessian 也可以自定义Header,完成传递.

DistributedThreadLocal

/**
 * 分布式 ThreadLocal, 存放在ThreadLocal中的数据可以传输至另外一台机器上
 * 
@author badqiu
 
*/
public class DistributedThreadLocal {
    
public static String DISTRIBUTED_THREAD_LOCAL_KEY_PREFIX = "tl_";
    
    
public static ThreadLocal<Map<String, String>> threadLocal = new ThreadLocal<Map<String, String>>();

    
public static void putAll(Map<String, String> map) {
        getMap().putAll(map);
    }
    
    
public static void put(String key, String value) {
        getMap().put(key, value);
    }

    
public static String get(String key) {
        Map
<String, String> map = threadLocal.get();
        
if (map == null)
            
return null;
        
return (String) map.get(key);
    }

    
public static Map<String, String> getMap() {
        Map
<String, String> map = threadLocal.get();
        
if (map == null) {
            map 
= new HashMap();
            threadLocal.set(map);
        }
        
return map;
    }

    
public static void clear() {
        threadLocal.set(
null);
    }

}

DistributedThreadLocalInSOAPHeaderInterceptor

/**
 * 输入(In)拦截器,用于从 WebService SOAP 的Header中取回DistributedThreadLocal中的信息,并存放在DistributedThreadLocal中
 * 
 * 
@author badqiu
 
*/
public class DistributedThreadLocalInSOAPHeaderInterceptor extends AbstractSoapInterceptor {
    
    
private SAAJInInterceptor saajIn = new SAAJInInterceptor();  
    
    
public DistributedThreadLocalInSOAPHeaderInterceptor() {  
        
super(Phase.PRE_PROTOCOL);  
        getAfter().add(SAAJInInterceptor.
class.getName());  
    }  

    
public void handleMessage(SoapMessage message) throws Fault {
        SOAPMessage doc 
= message.getContent(SOAPMessage.class);  
        
if (doc == null) {  
            saajIn.handleMessage(message);  
            doc 
= message.getContent(SOAPMessage.class);  
        }  
        
        Map
<String,String> headers = toHeadersMap(doc);  
        DistributedThreadLocal.putAll(headers);
        
    }

    
private Map toHeadersMap(SOAPMessage doc) {
        SOAPHeader header 
= getSOAPHeader(doc);  
        
if (header == null) {  
            
return new HashMap(0);  
        } 
        
        Map
<String,String> headersMap = new HashMap();
        NodeList nodes 
= header.getChildNodes();
        
for(int i=0; i<nodes.getLength(); i++) {  
            Node item 
= nodes.item(i);
            
if(item.hasChildNodes()) {
                headersMap.put(item.getLocalName(), item.getFirstChild().getNodeValue());
            }
        }
        
return headersMap;
    }

    
private SOAPHeader getSOAPHeader(SOAPMessage doc) {
        SOAPHeader header;
        
try {
            header 
= doc.getSOAPHeader();
        } 
catch (SOAPException e) {
            
throw new RuntimeException(e);
        }
        
return header;
    }

}

DistributedThreadLocalOutSOAPHeaderInterceptor

/**
 * 输出(Out)拦截器,用于将DistributedThreadLocal中的信息存放在 WebService SOAP 的Header中
 * 
 * 
@author badqiu
 
*/
public class DistributedThreadLocalOutSOAPHeaderInterceptor extends AbstractSoapInterceptor {
    
    
public DistributedThreadLocalOutSOAPHeaderInterceptor() {
        
super(Phase.WRITE);
    }

    
public void handleMessage(SoapMessage message) throws Fault {
        
        List
<Header> headers = message.getHeaders();
        Map
<String,String> threadlocalMap = DistributedThreadLocal.getMap();
        
        
for(Map.Entry<String, String> entry : threadlocalMap.entrySet()) {
            headers.add(getHeader(entry.getKey(), entry.getValue()));
        }
    }

    
private Header getHeader(String key, String value) {
        QName qName 
= new QName(key);
        Document document 
= DOMUtils.createDocument();
        Element element 
= document.createElement(key);
        element.appendChild(document.createTextNode(value));
        SoapHeader header 
= new SoapHeader(qName, element);
        
return (header);
    }
}

CXF spring配置文件:

server端:

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:jaxws
="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core"
    xsi:schemaLocation
="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
    
default-lazy-init="true">

    
<description>Apache CXF的Web Service配置</description>

    
<import resource="classpath:META-INF/cxf/cxf.xml" />
    
<import resource="classpath:META-INF/cxf/cxf-servlet.xml" />
    
<import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />

    
<!-- jax-ws endpoint定义  -->
    
<jaxws:endpoint address="/hello" >
        
<jaxws:implementor ref="hello" />
        
<jaxws:inInterceptors>
            
<bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdInSOAPHeaderInterceptor"/>
        
</jaxws:inInterceptors>
    
</jaxws:endpoint>
    
    
<!-- WebService的实现Bean定义 -->
    
<bean id="hello" class="cn.org.rapid_framework.hessian.HessianTest.HelloImpl" />
</beans>

client端:

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:jaxws
="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core"
    xsi:schemaLocation
="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
    
default-lazy-init="true">
    
<description>Apache CXF Web Service Client端配置</description>

    
<jaxws:client id="hello" serviceClass="cn.org.rapid_framework.hessian.HessianTest.Hello"
        address
="http://localhost:8080/service/hello" >
        
<jaxws:outInterceptors>
            
<bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdOutSOAPHeaderInterceptor"/>
        
</jaxws:outInterceptors>
    
</jaxws:client>

</beans>

4. 应用场景.

通过分布式应用上下文,暂时想到的几个应用场景.

1. Log4j MDC traceId传递. 通过一个traceId,将所有相关的 操作所有的日志信息关联起来。

2. sessionId 传递, 让我们的应用也有状态,可以使用session什么的

3. Security(username,password)传递. 在需要安全调用的地方,避免污染接口,需要显式的在接口传递username,password. 相对应的 WSSecurity也可以走这个通道

分布式应用上下文的概念,全球首创,欢迎转载(因为google 搜索不到相关文章,或许早已经有相同的概念了,欢迎提醒我)。

posted on 2011-01-04 19:56 badqiu 阅读(2285) 评论(3)  编辑  收藏

评论

# re: 分布式应用上下文(Distributed ThreadLocal)[未登录]  回复  更多评论   

就是报文传输过来,待了threadlocal的信息,然后拦截,再写入当前threadlocal中,其实很多分布式的概率类似,就是信息共享,但是楼主就是将threadlocal信息共享?可以这样说?

恩,拦截器代入threadlocal,思路不错,有点像servlet用filter处理threadloacal,这里利用了webservice实现了分布式,然后拦截器有点filter的意思。
2011-01-05 11:24 | garfield

# re: 分布式应用上下文(Distributed ThreadLocal)  回复  更多评论   

@garfield

我另外一个 hassian的实现就是如你所有通过一个 Filter来实现的。
主要是对应用来说,可以透明的传递应用上下文。
2011-01-05 11:37 | badqiu

# re: 分布式应用上下文(Distributed ThreadLocal)[未登录]  回复  更多评论   

@badqiu
恩,这样主要是保证了开发两端的可以一致的使用,保证来开发的一致性,省去了很多不必要的东西
2011-01-05 15:35 | garfield

只有注册用户登录后才能发表评论。


网站导航: