走自己的路

路漫漫其修远兮,吾将上下而求索

  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理 ::
  50 随笔 :: 4 文章 :: 118 评论 :: 0 Trackbacks
 1.目的

主要是要构建一个符合自己需求的的连接池(对象池)。这个连接池中所存放的对象的行为是自定义的并满足项目实际应用需求的。但是这些对象的频繁创建需要很大的代价,比如一些Socket连接。

2. 结构

 

NetRealConnection: 物理连接相关行为的接口

FTPConnectionNetRealConnectionftp实现,在apache Commons-netFTPClient基础上实现。

NetConnection: NetRealConnection的代理,是从poolget出来的对象,由于从pool中获取的connection并不需要了解物理连接的行为,所以要屏蔽这些行为,而且逻辑连接在使用完毕后需要关闭,增加了close方法返回pool

CommonsConnectionWrapper:从CommonsConnectionPool得到的NetConnection的实现,它需要把NetRealConnection返回给CommonsConnectionPool


NetConnectionPool:
一个NetConnectionpool,描述这类pool的行为

CommonsConnectionPool apache commons-pool基础上实现的一个NetConnectionPool

NetConnectioFactory 用于创建NetRealConnetion的抽象工厂

NetConnectionPoolFactory; 用于创建NetConnectionPool的抽象工厂

基于接口设计, Connection-pool的行为被抽象出来,便于添加不同connecton-pool的具体实现,NetConnection的行为被抽象出来,便于添加各种connection的具体实现,poolconnection之间通过接口建立关系,connection的具体实现可以随意切换。

 

2.配置

Pool的配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<pl:poolConfig xmlns:pl="http://www.ldd600.com/frm/mif/pl" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
       http://www.ldd600.com/frm/mif/pl mifPool.xsd"
>
    
<pl:pool id="ftpPool" class="com.ldd600.frm.mif.net.pool.commons.impl.CommonsNetConnectionPool">
        
<pl:connectionFactory refId="ftpConnectionFactory"/>
        
<pl:params>
            
<pl:param value="10000" key="idleTime"/>
            
<pl:param value="5" key="maximalNum"/>
            
<pl:param value="0" key="minIdle"/>
            
<pl:param value="3" key="maxIdle"/>
            
<pl:param value="100000" key="waitTime"/>
            
<pl:param value="1000" key="cleanInterval"/>
        
</pl:params>
    
</pl:pool>
    
<pl:connectionFactory id="ftpConnectionFactory" class="com.ldd600.frm.mif.net.conn.ftp.FTPConnectionFactory">
        
<pl:params>
            
<pl:param value="192.168.0.1" key="hostName"/>
            
<pl:param value="21" key="port"/>
            
<pl:param value="luga_test" key="userName"/>
            
<pl:param value="lugatest" key="passWord"/>
            
<pl:param value="/home/luga_test" key="pathName"/>
        
</pl:params>
    
</pl:connectionFactory>
</pl:poolConfig>

   

可以配置多个pool和多个connectionFactoryconnectionFactory被单独提取出来,1是为了不同的pool重用同一个factory的配置,2connectionfactory可能会生产各种不同类型的connection,比如安全连接,自动应答连接等等,3是当应用开发人员不需要使用pool的时候,也可以直接通过context获取factory直接创建连接。

 

Poolschema文件:

<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns:pl="http://www.ldd600.com/frm/mif/pl" xmlns:xsd="http://www.w3.org/2001/XMLSchema" targetNamespace="http://www.ldd600.com/frm/mif/pl" elementFormDefault="qualified" attributeFormDefault="unqualified">
    
<xsd:simpleType name="leafType">
        
<xsd:restriction base="xsd:string">
            
<xsd:pattern value="(\s)*"/>
        
</xsd:restriction>
    
</xsd:simpleType>
    
<xsd:simpleType name="notEmptyToken">
        
<xsd:restriction base="xsd:token">
            
<xsd:pattern value="(\S+\s*)+"/>
        
</xsd:restriction>
    
</xsd:simpleType>
    
<xsd:simpleType name="classAttributeType">
        
<xsd:restriction base="xsd:token">
            
<xsd:pattern value="([a-z|0-9|_]+\.)*[A-Z]([A-Z|a-z|0-9|_])*"/>
        
</xsd:restriction>
    
</xsd:simpleType>
    
<xsd:complexType name="idClassType">
        
<xsd:all minOccurs="0">
            
<xsd:element name="params" type="pl:paramsType">
                
<xsd:key name="outerParamsKey">
                    
<xsd:selector xpath="pl:param"/>
                    
<xsd:field xpath="@key"/>
                
</xsd:key>
            
</xsd:element>
        
</xsd:all>
        
<xsd:attribute name="id" type="pl:notEmptyToken" use="required"/>
        
<xsd:attribute name="class" type="pl:classAttributeType" use="required">
            
<xsd:annotation>
                
<xsd:documentation>class name</xsd:documentation>
            
</xsd:annotation>
        
</xsd:attribute>
    
</xsd:complexType>
    
<xsd:complexType name="refIdClassType">
        
<xsd:complexContent>
            
<xsd:extension base="pl:classType">
                
<xsd:attribute name="refId" type="pl:notEmptyToken"/>
            
</xsd:extension>
        
</xsd:complexContent>
    
</xsd:complexType>
    
<xsd:complexType name="paramsType">
        
<xsd:annotation>
            
<xsd:documentation> of java class</xsd:documentation>
        
</xsd:annotation>
        
<xsd:sequence maxOccurs="unbounded">
            
<xsd:element name="param">
                
<xsd:complexType>
                    
<xsd:annotation>
                        
<xsd:documentation>key is property name, value is property value</xsd:documentation>
                    
</xsd:annotation>
                    
<xsd:simpleContent>
                        
<xsd:extension base="pl:leafType">
                            
<xsd:attribute name="key" type="pl:notEmptyToken" use="required"/>
                            
<xsd:attribute name="value" type="pl:notEmptyToken" use="required"/>
                        
</xsd:extension>
                    
</xsd:simpleContent>
                
</xsd:complexType>
            
</xsd:element>
        
</xsd:sequence>
    
</xsd:complexType>
    
<xsd:complexType name="classType">
        
<xsd:sequence minOccurs="0">
            
<xsd:element name="params" type="pl:paramsType">
                
<xsd:key name="paramsKey">
                    
<xsd:selector xpath="pl:param"/>
                    
<xsd:field xpath="@key"/>
                
</xsd:key>
            
</xsd:element>
        
</xsd:sequence>
        
<xsd:attribute name="class" type="pl:classAttributeType">
            
<xsd:annotation>
                
<xsd:documentation>class name</xsd:documentation>
            
</xsd:annotation>
        
</xsd:attribute>
    
</xsd:complexType>
    
<xsd:complexType name="idPoolType">
        
<xsd:all minOccurs="0" maxOccurs="1">
            
<xsd:element name="params" type="pl:paramsType">
                
<xsd:key name="idPoolParamsKey">
                    
<xsd:selector xpath="pl:param"/>
                    
<xsd:field xpath="@key"/>
                
</xsd:key>
            
</xsd:element>
            
<xsd:element name="connectionFactory" type="pl:refIdClassType" minOccurs="1"/>
        
</xsd:all>
        
<xsd:attribute name="id" type="pl:notEmptyToken" use="required"/>
        
<xsd:attribute name="class" type="pl:classAttributeType" use="required"/>
    
</xsd:complexType>
    
<xsd:element name="poolConfig">
        
<xsd:complexType>
            
<xsd:sequence minOccurs="0" maxOccurs="unbounded">
                
<xsd:choice>
                    
<xsd:element name="connectionFactory" type="pl:idClassType" minOccurs="0"/>
                    
<xsd:element name="pool" type="pl:idPoolType" minOccurs="0"/>
                
</xsd:choice>
            
</xsd:sequence>
        
</xsd:complexType>
        
<xsd:key name="poolIdKey">
            
<xsd:selector xpath="pl:pool"/>
            
<xsd:field xpath="@id"/>
        
</xsd:key>
        
<xsd:key name="connFactoryIdKey">
            
<xsd:selector xpath="pl:connection-factory"/>
            
<xsd:field xpath="@id"/>
        
</xsd:key>
    
</xsd:element>
</xsd:schema>

 

我是用apache xmlbeans来进行schema验证和object binding。在parse xml文件时,使用commons-beanutils完成字段值的动态设置。


 

3.具体实现

 

CommonsConnectionPooledObjectFactory类: 具体可参见commons-poolPoolableObjectFactory详细解释。

 1package com.ldd600.frm.mif.net.pool.commons.impl;
 2
 3import com.ldd600.frm.mif.net.conn.NetConnection;
 4import com.ldd600.frm.mif.net.conn.NetConnectionFactory;
 5import com.ldd600.frm.mif.net.conn.NetRealConnection;
 6import com.ldd600.frm.mif.net.pool.commons.ObjectPool;
 7import com.ldd600.frm.mif.net.pool.commons.PoolableObjectFactory;
 8
 9public class CommonsConnectionPooledObjectFactory implements
10        PoolableObjectFactory {
11    private String poolId;
12    private ObjectPool pool;
13    private NetConnectionFactory connFactory;
14    CommonsConnectionPooledObjectFactory(String poolId, NetConnectionFactory connFactory, ObjectPool pool) {
15        this.poolId = poolId;
16        this.pool = pool;
17        this.connFactory = connFactory;
18    }

19
20    public ObjectPool getPool() {
21        return pool;
22    }

23
24    synchronized public void setPool(ObjectPool pool) {
25        if (null != this.pool && this.pool != pool) {
26            try {
27                this.pool.close();
28            }
 catch (Exception e) {
29                // ignored
30            }

31        }

32        this.pool = pool;
33    }

34
35    public void activateObject(Object obj) throws Exception {
36        if (obj instanceof CommonsConnectionWrapper) {
37            ((CommonsConnectionWrapper) obj).setClosed(false);
38        }

39    }

40
41    public void destroyObject(Object obj) throws Exception {
42        if (obj instanceof CommonsConnectionWrapper) {
43            ((CommonsConnectionWrapper) obj).getConnection().disconnect();
44        }

45    }

46
47    public NetConnection makeObject() throws Exception {
48        NetRealConnection connection = connFactory.createConnection();
49        connection.connect();
50        return new CommonsConnectionWrapper(connection, pool);
51    }

52
53    public void passivateObject(Object obj) throws Exception {
54        if (obj instanceof CommonsConnectionWrapper) {
55            ((CommonsConnectionWrapper) obj).setClosed(true);
56        }

57    }

58
59    public NetConnectionFactory getConnFactory() {
60        return connFactory;
61    }

62
63    public String getPoolId() {
64        return poolId;
65    }

66
67    public boolean validateObject(Object obj) {
68        if (obj instanceof CommonsConnectionWrapper) {
69            return ((CommonsConnectionWrapper) obj).getConnection()
70                    .isConnected();
71        }
 else {
72            return false;
73        }

74    }

75}

76

CommonsNetConnectionPool:在commons-pool的基础上实现的connection pool, 对于一些比较复杂的功能采用了默认设置,并增加了pool的初始化和验证。InitializingBean是仿照spring写的一个接口,用于在parse配置xml文件并设置完bean的属性后,调用InitializingBeanafterpropertiesset方法,弥补一些默认构造函数不能实现的初始化操作,比如对属性值的验证,和属性值的初始化处理。
 
  1package com.ldd600.frm.mif.net.pool.commons.impl;
  2
  3import com.ldd600.frm.common.utils.StringUtils;
  4import com.ldd600.frm.mif.config.bean.InitializingBean;
  5import com.ldd600.frm.mif.exception.MIFConnectionException;
  6import com.ldd600.frm.mif.net.conn.NetConnection;
  7import com.ldd600.frm.mif.net.conn.NetConnectionFactory;
  8import com.ldd600.frm.mif.net.pool.NetConnectionPool;
  9import com.ldd600.frm.mif.net.pool.commons.ObjectPool;
 10
 11public class CommonsNetConnectionPool  implements
 12        NetConnectionPool, InitializingBean{
 13
 14    /** ***************************************************************************** */
 15    public static final long DEFAULT_IDLETIME = GenericObjectPool.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS;
 16    public static final int DEFAULT_MAXIMAL_CONNECTIOM_NUM = 5;
 17    public static final int DEFAULT_MINIMAL_IDLE_NUM = 0;
 18    public static final int DEFAULT_MAXIMAL_IDLE_NUM = 3;
 19    public static final long DEFAULT_WAIT_TIME = GenericObjectPool.DEFAULT_MAX_WAIT;
 20    public static final long DEFAULT_CLEAN_INTERVAL = GenericObjectPool.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
 21
 22    /** **************************************************************************** */
 23    private SubGeneriObjectcPool connectionPool;
 24    private long idleTime = DEFAULT_IDLETIME;
 25    private int maximalNum = DEFAULT_MAXIMAL_CONNECTIOM_NUM;
 26    private int minIdle = DEFAULT_MINIMAL_IDLE_NUM;
 27    private int maxIdle = DEFAULT_MAXIMAL_IDLE_NUM;
 28    private long waitTime = DEFAULT_WAIT_TIME;
 29    private long cleanInterval = DEFAULT_CLEAN_INTERVAL;
 30    private NetConnectionFactory connectonFactory;
 31    private String poolId;
 32    
 33    public CommonsNetConnectionPool() {
 34        connectionPool = new SubGeneriObjectcPool();
 35    }

 36    
 37    /** **************************************************************************** */
 38
 39    /**
 40     * @param poolId
 41     * @param idleTime
 42     * @param maximalNum
 43     * @param minIdle
 44     * @param timeOut
 45     * @param cleanInterval
 46     * @throws MIFConnectionException
 47     */

 48    CommonsNetConnectionPool(String poolId, NetConnectionFactory connFactory, long idleTime, int maximalNum,
 49            int minIdle, int maxIdle, long waitTime, long cleanInterval)
 50            throws MIFConnectionException {
 51        this.idleTime = idleTime;
 52        this.maximalNum = maximalNum;
 53        this.minIdle = minIdle;
 54        this.maxIdle = maxIdle;
 55        this.waitTime = waitTime;
 56        this.cleanInterval = cleanInterval;
 57        connectionPool = new SubGeneriObjectcPool(); 
 58        CommonsConnectionPooledObjectFactory factory = new CommonsConnectionPooledObjectFactory(
 59                poolId, connFactory, connectionPool);
 60        try {
 61            validateConnectionFactory(factory);
 62        }
 catch (Exception ex) {
 63            throw new MIFConnectionException(
 64                    "Validate connection factory failure!", ex);
 65        }

 66        connectionPool.setFactory(factory);
 67        connectionPool.setMaxActive(maximalNum);
 68//        connectionPool.setWhenExhaustedAction(GenericObjectPool.WHEN_EXHAUSTED_BLOCK);
 69        connectionPool.setMaxWait(waitTime);
 70        connectionPool.setMaxIdle(maxIdle);
 71        connectionPool.setMinIdle(minIdle);
 72        connectionPool.setTestOnBorrow(true);
 73//        connectionPool.setTestOnReturn(GenericObjectPool.DEFAULT_TEST_ON_RETURN);
 74        connectionPool.setTimeBetweenEvictionRunsMillis(cleanInterval);
 75        connectionPool.setNumTestsPerEvictionRun(maxIdle - minIdle);
 76        connectionPool.setMinEvictableIdleTimeMillis(idleTime);
 77//        connectionPool
 78//                .setSoftMinEvictableIdleTimeMillis(GenericObjectPool.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS);
 79//        connectionPool.setTestWhileIdle(GenericObjectPool.DEFAULT_TEST_WHILE_IDLE);
 80//        connectionPool.setLifo(GenericObjectPool.DEFAULT_LIFO);
 81        //initialize the pool
 82        try {
 83            for (int i = 0; i < maxIdle; i++{
 84                connectionPool.addObject();
 85            }

 86        }
 catch (Exception ex) {
 87            throw new MIFConnectionException(
 88                    "Initialize connection pool failure!", ex);
 89        }

 90        System.out.println("Connection number: " + connectionPool.getNumIdle());
 91    }

 92
 93    
 94    public NetConnectionFactory getConnectonFactory() {
 95        return connectonFactory;
 96    }

 97
 98    public void setConnectonFactory(NetConnectionFactory connectonFactory) {
 99        this.connectonFactory = connectonFactory;
100    }

101
102    public long getIdleTime() {
103        return idleTime;
104    }

105
106    public void setIdleTime(long idleTime) {
107        this.idleTime = idleTime;
108        connectionPool.setMinEvictableIdleTimeMillis(idleTime);
109    }

110
111    public int getMaximalNum() {
112        return maximalNum;
113    }

114
115    public void setMaximalNum(int maximalNum) {
116        this.maximalNum = maximalNum;
117        connectionPool.setMaxActive(maximalNum);
118    }

119
120    public int getMinIdle() {
121        return minIdle;
122    }

123
124    public void setMinIdle(int minIdle) {
125        this.minIdle = minIdle;
126        connectionPool.setMinIdle(minIdle);
127    }

128    
129    public int getMaxIdle() {
130        return maxIdle;
131    }

132
133    public void setMaxIdle(int maxIdle) {
134        this.maxIdle = maxIdle;
135        connectionPool.setMaxIdle(maxIdle);
136    }

137
138    public long getWaitTime() {
139        return waitTime;
140    }

141
142    public void setWaitTime(long timeOut) {
143        this.waitTime = timeOut;
144        connectionPool.setMaxWait(timeOut);
145    }

146
147    public long getCleanInterval() {
148        return cleanInterval;
149    }

150
151    public void setCleanInterval(long cleanInterval) {
152        this.cleanInterval = cleanInterval;
153        connectionPool.setTimeBetweenEvictionRunsMillis(cleanInterval);
154    }

155
156    public CommonsConnectionWrapper getConnection() throws MIFConnectionException {
157        try {
158            return (CommonsConnectionWrapper) connectionPool.borrowObject();
159        }
 catch (Exception ex) {
160            throw new MIFConnectionException(
161                    "Exception when get a connection from pool: " + ex.getMessage(), ex);
162        }

163    }

164
165    public void close() throws MIFConnectionException {
166        ObjectPool oldPool = connectionPool;
167        connectionPool = null;
168        if (null != oldPool) {
169            try {
170                oldPool.close();
171            }
 catch (Exception e) {
172                throw new MIFConnectionException("Close pool failure!", e);
173            }

174        }

175    }

176
177    private void validateConnectionFactory(
178            CommonsConnectionPooledObjectFactory connectionFactory)
179            throws Exception {
180        NetConnection conn = null;
181        try {
182            conn = connectionFactory.makeObject();
183            connectionFactory.activateObject(conn);
184            connectionFactory.validateObject(conn);
185            connectionFactory.passivateObject(conn);
186        }
 finally {
187            connectionFactory.destroyObject(conn);
188        }

189    }

190
191    public CommonsConnectionWrapper getConnection(long waitTime) throws MIFConnectionException {
192        try {
193            return (CommonsConnectionWrapper) connectionPool.borrowObject(waitTime);
194        }
 
195        catch (Exception ex) {
196            throw new MIFConnectionException(
197                    "Exception when get a connection from pool: " + ex.getMessage(), ex);
198        }

199    }

200
201    public CommonsConnectionWrapper getConnectionWithOutWait() throws MIFConnectionException {
202        try {
203            return (CommonsConnectionWrapper) connectionPool.borrowObjectWithNoWait();
204        }
 catch (Exception ex) {
205            throw new MIFConnectionException(
206                    "Exception when get a connection from pool: " + ex.getMessage(), ex);
207        }

208    }

209
210    public int getNumActive() {
211        return connectionPool.getNumActive();
212    }

213
214    public int getNumIdle() {
215        return connectionPool.getNumIdle();
216    }

217
218    public int getMaxActiveNum() {
219        return this.getMaximalNum();
220    }

221    
222    
223    public String getPoolId() {
224        return poolId;
225    }

226
227    public void setPoolId(String poolId) {
228        if(StringUtils.isEmpty(poolId)) {
229            throw new IllegalArgumentException("pool id must not be empty!");
230        }

231        this.poolId = poolId;
232    }

233
234    public void clear() {
235        connectionPool.clear();
236    }

237
238    public void afterPropertiesSet() throws MIFConnectionException  {
239        CommonsConnectionPooledObjectFactory factory = new CommonsConnectionPooledObjectFactory(
240                poolId, connectonFactory, connectionPool);
241        try {
242            validateConnectionFactory(factory);
243        }
 catch (Exception ex) {
244            throw new MIFConnectionException(
245                    "Validate connection factory failure!", ex);
246        }

247        connectionPool.setFactory(factory);
248//        connectionPool.setWhenExhaustedAction(GenericObjectPool.WHEN_EXHAUSTED_BLOCK);
249        connectionPool.setTestOnBorrow(true);
250//        connectionPool.setTestOnReturn(GenericObjectPool.DEFAULT_TEST_ON_RETURN);
251        connectionPool.setNumTestsPerEvictionRun(maxIdle - minIdle);
252//        connectionPool
253//                .setSoftMinEvictableIdleTimeMillis(GenericObjectPool.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS);
254//        connectionPool.setTestWhileIdle(GenericObjectPool.DEFAULT_TEST_WHILE_IDLE);
255//        connectionPool.setLifo(GenericObjectPool.DEFAULT_LIFO);
256        //initialize the pool
257        try {
258            for (int i = 0; i < maxIdle; i++{
259                connectionPool.addObject();
260            }

261        }
 catch (Exception ex) {
262            throw new MIFConnectionException(
263                    "Initialize connection pool failure!", ex);
264        }

265        System.out.println("Connection number: " + connectionPool.getNumIdle());
266    }

267
268}

269
270

 

CommonsConnectionPoolFactorypool的工厂,创建NetConnectionPool

package com.ldd600.frm.mif.net.pool.commons.impl;

import com.ldd600.frm.mif.config.exception.MIFConnectionException;
import com.ldd600.frm.mif.context.ContextFactory;
import com.ldd600.frm.mif.net.conn.NetConnectionFactory;
import com.ldd600.frm.mif.net.pool.NetConnectionPoolFactory;


public class CommonsConnectionPoolFactory implements NetConnectionPoolFactory{
    
private long idleTime = CommonsNetConnectionPool.DEFAULT_IDLETIME;
    
private int maximalNum = CommonsNetConnectionPool.DEFAULT_MAXIMAL_CONNECTIOM_NUM;
    
private int minIdle = CommonsNetConnectionPool.DEFAULT_MINIMAL_IDLE_NUM;
    
private int maxIdle = CommonsNetConnectionPool.DEFAULT_MAXIMAL_IDLE_NUM;
    
private long waitTime = CommonsNetConnectionPool.DEFAULT_WAIT_TIME;
    
private long cleanInterval = CommonsNetConnectionPool.DEFAULT_CLEAN_INTERVAL;
    
private String poolId;
    
private NetConnectionFactory connectionFactory;

    
public CommonsConnectionPoolFactory(String poolId,NetConnectionFactory connectionFactory, int idleTime, int maximalNum,
            
int minIdle, int maxIdle, int waitTime, int cleanInterval) {
        
this.poolId = poolId;
        
this.connectionFactory = connectionFactory;
        
this.idleTime = idleTime;
        
this.maximalNum = maximalNum;
        
this.minIdle = minIdle;
        
this.maxIdle = maxIdle;
        
this.waitTime = waitTime;
        
this.cleanInterval = cleanInterval;
    }

    
    
public CommonsConnectionPoolFactory() {}

    
public long getIdleTime() {
        
return idleTime;
    }


    
public void setIdleTime(long idleTime) {
        
this.idleTime = idleTime;
    }


    
public int getMaximalNum() {
        
return maximalNum;
    }


    
public void setMaximalNum(int maximalNum) {
        
this.maximalNum = maximalNum;
    }


    
public long getWaitTime() {
        
return waitTime;
    }


    
public void setWaitTime(long waitTime) {
        
this.waitTime = waitTime;
    }


    
public long getCleanInterval() {
        
return cleanInterval;
    }


    
public void setCleanInterval(long cleanInterval) {
        
this.cleanInterval = cleanInterval;
    }


    
public String getPoolId() {
        
return poolId;
    }


    
public void setPoolId(String poolId) {
        
if(ContextFactory.getPoolConfigContext().containsConnectionPool(poolId)) {
            
throw new IllegalArgumentException("The pool id esxits, please use another one!");
        }

        
this.poolId = poolId;
    }


    
public int getMinIdle() {
        
return minIdle;
    }


    
public void setMinIdle(int minimalNum) {
        
this.minIdle = minimalNum;
    }


    
public int getMaxIdle() {
        
return maxIdle;
    }


    
public void setMaxIdle(int maxIdle) {
        
this.maxIdle = maxIdle;
    }


    
public CommonsNetConnectionPool createPool() throws MIFConnectionException {
          
return new CommonsNetConnectionPool(poolId, connectionFactory, idleTime, maximalNum, minIdle, maxIdle, waitTime, cleanInterval);
    }


    
public void setNetConnectionFactory(NetConnectionFactory connectionFactory) {
        
if(null == connectionFactory) {
            
throw new IllegalArgumentException("Connection factory is null!");
        }

        
this.connectionFactory = connectionFactory;
    }


    
public NetConnectionFactory getConnectionFactory() {
        
return connectionFactory;
    }

}



 

CommonsConnectionWrapper

  1package com.ldd600.frm.mif.net.pool.commons.impl;
  2
  3import java.io.File;
  4import java.io.InputStream;
  5import java.io.OutputStream;
  6import java.util.ArrayList;
  7import java.util.Arrays;
  8import java.util.Collection;
  9import java.util.Collections;
 10import java.util.Iterator;
 11
 12import com.ldd600.frm.mif.common.FileObject;
 13import com.ldd600.frm.mif.config.exception.MIFConnectionException;
 14import com.ldd600.frm.mif.config.exception.MIFReceiverException;
 15import com.ldd600.frm.mif.config.exception.MIFSenderException;
 16import com.ldd600.frm.mif.net.conn.NetConnection;
 17import com.ldd600.frm.mif.net.conn.NetRealConnection;
 18import com.ldd600.frm.mif.net.pool.commons.ObjectPool;
 19
 20public class CommonsConnectionWrapper implements NetConnection {
 21    protected NetRealConnection connection;
 22    protected ObjectPool pool;
 23    protected boolean isClosed;
 24
 25    public CommonsConnectionWrapper(NetRealConnection connection,
 26            ObjectPool pool) {
 27        this.connection = connection;
 28        this.pool = pool;
 29    }

 30    
 31    // ************************************************************//
 32
 33    public boolean isConnected() {
 34        return connection.isConnected();
 35    }

 36
 37    public boolean isClosed() {
 38        return this.isClosed;
 39    }

 40
 41    public void setClosed(boolean isClosed) {
 42        this.isClosed = isClosed;
 43    }

 44
 45    // *************************************************************//
 46
 47    public boolean sendMessage(FileObject msgObj) throws MIFSenderException {
 48        return connection.sendMessage(msgObj.getFileName(), msgObj);
 49    }

 50
 51    public boolean sendMessage(Collection<FileObject> msgObjs)
 52            throws MIFSenderException {
 53        boolean success = true;
 54        for (FileObject msgObj : msgObjs) {
 55            if (!this.sendMessage(msgObj) && success) {
 56                success = false;
 57            }

 58        }

 59        return success;
 60    }

 61
 62    public boolean sendMessage(FileObject msgObjs) throws MIFSenderException {
 63        return this.sendMessage(Arrays.asList(msgObjs));
 64    }

 65
 66    public boolean sendMessage(String fileName, String content)
 67            throws MIFSenderException {
 68        return connection.sendMessage(fileName, content);
 69    }

 70
 71    public boolean sendMessage(String fileName, InputStream ins)
 72            throws MIFSenderException {
 73        return connection.sendMessage(fileName, ins);
 74    }

 75
 76    // ************************************************************//
 77    public boolean receiveMessage(String fileName, byte[] bytes) throws MIFReceiverException {
 78        return connection.receiveMessage(fileName, bytes);
 79    }

 80    
 81    public File receiveMessage(FileObject fileObj) throws MIFReceiverException {
 82        return connection.receiveMessage(fileObj.getFileName(), fileObj);
 83    }

 84
 85    public boolean receiveMessage(String fileName, OutputStream ous) throws MIFReceiverException {
 86        return connection.receiveMessage(fileName, ous);
 87    }

 88
 89    public File[] receiveMessage(FileObject fileObjs) throws MIFReceiverException {
 90        Collection<File> fileList = receiveMessage(Arrays.asList(fileObjs));
 91        File[] fileArr = new File[fileList.size()];
 92        int i = 0;
 93        for(Iterator<File> it = fileList.iterator(); it.hasNext(); i++{
 94            File file = it.next();
 95            fileArr[i] = file;
 96        }

 97        return fileArr;
 98    }

 99
100    public Collection<File> receiveMessage(Collection<FileObject> fileObjs) throws MIFReceiverException {
101        Collection<File> fileList = new ArrayList<File> ();
102        for(FileObject fileObj : fileObjs) {
103            File file = connection.receiveMessage(fileObj.getFileName(), fileObj);
104            fileList.add(file);
105        }

106        return Collections.unmodifiableCollection(fileList);
107    }

108
109    // *************************************************************//
110
111    public void close() throws MIFConnectionException {
112        try {
113            pool.returnObject(this);
114        }
 catch (Exception ex) {
115            throw new MIFConnectionException(
116                    "Exception occurs when close a connection, that means return the connection to the pool",
117                    ex);
118        }

119    }

120
121}

122



 

FTPConnection


  1package com.ldd600.frm.mif.net.conn.ftp;
  2
  3import java.io.BufferedInputStream;
  4import java.io.File;
  5import java.io.FileInputStream;
  6import java.io.FileOutputStream;
  7import java.io.IOException;
  8import java.io.InputStream;
  9import java.io.OutputStream;
 10
 11import org.apache.commons.net.ftp.FTPClient;
 12import org.apache.commons.net.ftp.FTPFile;
 13import org.apache.commons.net.ftp.FTPReply;
 14
 15import com.ldd600.frm.mif.common.FileObject;
 16import com.ldd600.frm.mif.config.exception.MIFConnectionException;
 17import com.ldd600.frm.mif.config.exception.MIFReceiverException;
 18import com.ldd600.frm.mif.config.exception.MIFSenderException;
 19import com.ldd600.frm.mif.constant.BasicConstants;
 20import com.ldd600.frm.mif.net.conn.NetRealConnection;
 21import com.ldd600.frm.mif.util.StringUtils;
 22
 23public class FTPConnection implements NetRealConnection {
 24    // ------------------------variables------------------------//
 25
 26    public static final int DEFAULT_FTP_PORT = 21;
 27    public static final String DEFAULT_UPLOAD_PATH = "/";
 28    public static final String DEFAULT_USERNAME = "";
 29    public static final String DEFAULT_PASSWORD = "";
 30    private FTPClient ftpClient = new FTPClient();
 31    private String hostName;
 32    private int port = DEFAULT_FTP_PORT;
 33    private String userName = DEFAULT_USERNAME;
 34    private String passWord = DEFAULT_PASSWORD;
 35    private String pathName = DEFAULT_UPLOAD_PATH;
 36
 37    // ------------------------------------------------------------//
 38
 39    // ---------------------------Constructors---------------------//
 40    public FTPConnection() {
 41    }

 42
 43    public FTPConnection(String hostName) {
 44        if (StringUtils.isEmpty(hostName)) {
 45            throw new IllegalArgumentException("Host name must not empty!");
 46        }
 else {
 47            this.hostName = hostName;
 48        }

 49    }

 50
 51    public FTPConnection(String hostName, int port) {
 52        this(hostName);
 53        this.port = port;
 54    }

 55
 56    public FTPConnection(String hostName, int port, String userName,
 57            String passWord) {
 58        this(hostName, port);
 59        this.userName = userName;
 60        this.passWord = passWord;
 61    }

 62
 63    public FTPConnection(String hostName, int port, String userName,
 64            String passWord, String pathName) {
 65        this(hostName, port, userName, passWord);
 66        this.pathName = pathName;
 67    }

 68
 69    // -----------------------------------------------------------------------//
 70
 71    // ------------------------------getters and setters--------------------//
 72    public String getHostName() {
 73        return hostName;
 74    }

 75
 76    public void setHostName(String hostName) {
 77        this.hostName = hostName;
 78    }

 79
 80    public int getPort() {
 81        return port;
 82    }

 83
 84    public void setPort(int port) {
 85        this.port = port;
 86    }

 87
 88    public String getUserName() {
 89        return userName;
 90    }

 91
 92    public void setUserName(String userName) {
 93        this.userName = userName;
 94    }

 95
 96    public String getPassWord() {
 97        return passWord;
 98    }

 99
100    public void setPassWord(String passWord) {
101        this.passWord = passWord;
102    }

103
104    public FTPClient getFtpClient() {
105        return ftpClient;
106    }

107
108    public String getPathName() {
109        return pathName;
110    }

111
112    public void setPathName(String pathName) {
113        this.pathName = pathName;
114    }

115
116    // ------------------------------------------------------------------------//
117
118    public void connect() throws MIFConnectionException {
119        if (StringUtils.isEmpty(hostName)) {
120            throw new IllegalStateException(
121                    "Host name must be initialized before connect!");
122        }

123        try {
124            ftpClient.connect(hostName, port);
125            System.out.println(ftpClient.getReplyCode());
126            if (!login(userName, passWord)) {
127                ftpClient.disconnect();
128                throw new MIFConnectionException(
129                        "username or password doesn't match!");
130            }

131            System.out.println(ftpClient.getReplyCode());
132            if (!this.changeToDirectory(pathName)) {
133                disconnect();
134                throw new MIFConnectionException(
135                        "path doesn't esxits or have no permission to access, please change one!");
136            }

137
138            // Use passive mode as default because most of us are
139            // behind firewalls these days.
140            ftpClient.enterLocalPassiveMode();
141        }
 catch (IOException ioex) {
142            throw new MIFConnectionException("Connect to server: " + hostName
143                    + " port: " + port + "with username: " + userName
144                    + " password: " + passWord
145                    + " encounter IO or Socket error!", ioex);
146        }

147
148    }

149
150    public void disconnect() throws MIFConnectionException {
151        try {
152            ftpClient.logout();
153        }
 catch (IOException ioex) {
154            // swallow the exception
155        }
 finally {
156            if (ftpClient.isConnected()) {
157                try {
158                    ftpClient.disconnect();
159                }
 catch (IOException e) {
160                    throw new MIFConnectionException("Disconnect to server "
161                            + hostName + "ecounter io error!", e);
162                }

163            }

164        }

165    }

166
167    public boolean isConnected() {
168        if (!ftpClient.isConnected()) {
169            return false;
170        }

171        try {
172            int code = ftpClient.stat();
173            if (FTPReply.isNegativePermanent(code)) {
174                return false;
175            }

176        }
 catch (IOException ioex) {
177            return false;
178        }

179        return true;
180    }

181
182    public boolean sendMessage(String fileName, FileObject fileObj) throws MIFSenderException {
183        boolean success = true;
184        String localFolder = fileObj.getFolder();
185        String localFileName = fileObj.getFileName();
186        if (StringUtils.isEmpty(localFolder) || StringUtils.isEmpty(localFileName)) {
187            throw new IllegalArgumentException(
188                    "Local folder or file name is an empty string!");
189        }

190        StringBuilder sb = new StringBuilder(localFolder);
191        sb.append(BasicConstants.FILE_SEPARATOR).append(localFileName);
192        File file = new File(sb.toString());
193        if (null == file) {
194            throw new IllegalArgumentException(
195                    "Local folder or file name doesn't exsits!");
196        }

197        InputStream ins = null;
198        try {
199            ins = new FileInputStream(file);
200            if (!ftpClient.storeFile(fileName, ins)) {
201                //TODO some log
202                success = false;
203            }

204            // if (FTPReply.isNegativePermanent(ftpClient.getReplyCode())) {
205            // disconnect();
206            // }
207            return success;
208        }
 catch (IOException ioex) {
209            throw new MIFSenderException("Send file: " + fileObj.getFileName()
210                    + " failure!", ioex);
211        }

212        // catch (MIFConnectionException mcex) {
213        // throw new MIFSenderException("Send file: " + msgObj.getFileName()
214        // + " failure!", mcex);
215        // }
216        finally {
217
218            // ftpClient.completePendingCommand();
219            if (null != ins) {
220                try {
221                    ins.close();
222                }
 catch (IOException e) {
223                    throw new MIFSenderException("Close Stream failure!", e);
224                }

225            }

226        }

227    }

228
229    public boolean sendMessage(String fileName, String content)
230            throws MIFSenderException {
231        OutputStream os = null;
232        try {
233            System.out.println(ftpClient.getReplyCode());
234            OutputStream retOs = ftpClient.storeFileStream(fileName);
235            os = retOs;
236            os.write(content.getBytes());
237            // os.flush();
238
239        }
 catch (IOException ioex) {
240            throw new MIFSenderException(
241                    "Send file: " + fileName + " failure!", ioex);
242        }
 finally {
243            if (null != os) {
244                try {
245                    os.close();
246
247                }
 catch (IOException e) {
248                    throw new MIFSenderException("Close Stream failure!", e);
249                }

250                // catch (MIFConnectionException mcex) {
251                // throw new MIFSenderException("Send file: " + fileName
252                // + " failure!", mcex);
253                // }
254            }

255        }

256        
257        try {
258            if(ftpClient.completePendingCommand()){
259                return true;
260            }
else{
261                //TODO some log
262                return false;
263            }

264        }
 catch (IOException ioex) {
265            throw new MIFSenderException("Complete pending command failure!",
266                    ioex);
267        }

268    }

269
270    public boolean sendMessage(String fileName, InputStream ins)
271            throws MIFSenderException {
272        boolean success = true;
273        try {
274            System.out.println(ftpClient.getReplyCode());
275            if (!ftpClient.storeFile(fileName, ins)) {
276                //TODO do some log;
277                System.out.println("send message failed!");
278                success = false;
279            }

280            // if (FTPReply.isNegativePermanent(ftpClient.getReplyCode())) {
281            // disconnect();
282            // }
283            return success;
284        }
 catch (IOException ioex) {
285            throw new MIFSenderException(
286                    "Send file: " + fileName + " failure!", ioex);
287        }

288        // catch (MIFConnectionException mcex) {
289        // throw new MIFSenderException(
290        // "Send file: " + fileName + " failure!", mcex);
291        // }
292        finally {
293            try {
294                ins.close();
295            }
 catch (IOException e) {
296                throw new MIFSenderException("Close Stream failure!", e);
297            }

298        }

299    }

300
301    public boolean receiveMessage(String fileName, byte[] bytes)
302            throws MIFReceiverException {
303        BufferedInputStream bins = null;
304        try {
305            InputStream ins = ftpClient.retrieveFileStream(fileName);
306            if(ins != null{
307                bins = new BufferedInputStream(ins);
308                ins.read(bytes);
309            }

310        }
 catch (IOException ioex) {
311            throw new MIFReceiverException("When receive file: " + fileName
312                    + " ecounter an IO error!", ioex);
313        }
 finally {
314            if (bins != null{
315                try {
316                    bins.close();
317                }
 catch (IOException e) {
318                    throw new MIFReceiverException(
319                            "close inputestream failure!", e);
320                }

321            }

322        }

323        
324        try {
325            if(ftpClient.completePendingCommand()){
326                return true;
327            }
else{
328                return false;
329            }

330        }
 catch (IOException ioex) {
331            throw new MIFReceiverException("Complete pending command failure!",
332                    ioex);
333        }

334    }

335
336    public File receiveMessage(String fileName, FileObject fileObj)
337            throws MIFReceiverException {
338        String localFolder = fileObj.getFolder();
339        String localFileName = fileObj.getFileName();
340        if (StringUtils.isEmpty(localFolder) || StringUtils.isEmpty(fileName)) {
341            throw new IllegalArgumentException(
342                    "Local folder or file name is an empty string!");
343        }

344        StringBuilder sb = new StringBuilder(localFolder);
345        sb.append(BasicConstants.FILE_SEPARATOR).append(localFileName);
346        File file = new File(sb.toString());
347        if (null == file) {
348            throw new IllegalArgumentException(
349                    "Local folder or file name doesn't exsits!");
350        }

351        OutputStream ous = null;
352        try {
353            ous = new FileOutputStream(file);
354            if (!ftpClient.retrieveFile(fileName, ous)) {
355                //TODO some log
356                file.delete();
357                return null;
358            }

359            return file;
360        }
 catch (IOException ioex) {
361            if (file.exists()) {
362                file.delete();
363            }

364
365            throw new MIFReceiverException("When receive file: " + fileName
366                    + " ecounter an IO error!", ioex);
367        }
 finally {
368            if (null != ous) {
369                try {
370                    ous.close();
371                }
 catch (IOException e) {
372                    throw new MIFReceiverException("When close output stream "
373                            + " ecounter an IO error!", e);
374                }

375            }

376        }

377
378    }

379
380    public boolean receiveMessage(String fileName, OutputStream ous)
381            throws MIFReceiverException {
382        try {
383            return ftpClient.retrieveFile(fileName, ous);
384        }
 catch (IOException ioex) {
385            throw new MIFReceiverException("When receive file: " + fileName
386                    + " ecounter an IO error!", ioex);
387        }

388    }

389
390    // ---------------------------------------------------------------------------//
391
392    public boolean login(String userName, String passWord) throws IOException {
393        return ftpClient.login(userName, passWord);
394    }

395
396    public boolean changeToDirectory(String pathName) throws IOException {
397        return ftpClient.changeWorkingDirectory(pathName);
398    }

399
400    public boolean fileExists(String remote) throws IOException {
401        FTPFile[] files = ftpClient.listFiles(this.pathName);
402        for (FTPFile file : files) {
403            if (remote.equalsIgnoreCase(file.getName())) {
404                return true;
405            }

406        }

407        return false;
408    }

409
410}

411


 

FTPConnectionFactory

 1
 2package com.ldd600.frm.mif.net.conn.ftp;
 3
 4import com.ldd600.frm.mif.net.conn.NetConnectionFactory;
 5
 6public class FTPConnectionFactory implements NetConnectionFactory {
 7    private String hostName;
 8    private int port = FTPConnection.DEFAULT_FTP_PORT;
 9    private String userName = FTPConnection.DEFAULT_USERNAME;
10    private String passWord = FTPConnection.DEFAULT_PASSWORD;
11    private String pathName = FTPConnection.DEFAULT_UPLOAD_PATH;
12
13    public FTPConnectionFactory() {}
14    
15    public String getHostName() {
16        return hostName;
17    }

18
19    public void setHostName(String hostName) {
20        this.hostName = hostName;
21    }

22
23
24    public int getPort() {
25        return port;
26    }

27
28
29    public void setPort(int port) {
30        this.port = port;
31    }

32
33
34    public String getUserName() {
35        return userName;
36    }

37
38
39    public void setUserName(String userName) {
40        this.userName = userName;
41    }

42
43
44    public String getPassWord() {
45        return passWord;
46    }

47
48
49    public void setPassWord(String passWord) {
50        this.passWord = passWord;
51    }

52
53
54    public String getPathName() {
55        return pathName;
56    }

57
58
59    public void setPathName(String pathName) {
60        this.pathName = pathName;
61    }

62
63
64    public FTPConnection createConnection() {
65        return new FTPConnection(hostName, port, userName, passWord, pathName);
66    }

67
68}

69
70


posted on 2008-08-10 16:52 叱咤红人 阅读(2713) 评论(1)  编辑  收藏 所属分类: Other Java and J2EE frameworks XML

评论

# re: 自定义connection pool的实现 2008-08-10 18:50 lvq810
代码展不开  回复  更多评论
  


只有注册用户登录后才能发表评论。


网站导航: