1.
目的
主要是要构建一个符合自己需求的的连接池(对象池)。这个连接池中所存放的对象的行为是自定义的并满足项目实际应用需求的。但是这些对象的频繁创建需要很大的代价,比如一些Socket连接。
2. 结构

NetRealConnection: 物理连接相关行为的接口
FTPConnection:NetRealConnection的ftp实现,在apache Commons-net的FTPClient基础上实现。
NetConnection: NetRealConnection的代理,是从pool中get出来的对象,由于从pool中获取的connection并不需要了解物理连接的行为,所以要屏蔽这些行为,而且逻辑连接在使用完毕后需要关闭,增加了close方法返回pool。
CommonsConnectionWrapper:从CommonsConnectionPool得到的NetConnection的实现,它需要把NetRealConnection返回给CommonsConnectionPool。
NetConnectionPool: 一个NetConnection的pool,描述这类pool的行为
CommonsConnectionPool: 在apache commons-pool基础上实现的一个NetConnectionPool
NetConnectioFactory: 用于创建NetRealConnetion的抽象工厂
NetConnectionPoolFactory; 用于创建NetConnectionPool的抽象工厂
基于接口设计, Connection-pool的行为被抽象出来,便于添加不同connecton-pool的具体实现,NetConnection的行为被抽象出来,便于添加各种connection的具体实现,pool和connection之间通过接口建立关系,connection的具体实现可以随意切换。
2.配置
Pool的配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<pl:poolConfig xmlns:pl="http://www.ldd600.com/frm/mif/pl" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.ldd600.com/frm/mif/pl mifPool.xsd">
<pl:pool id="ftpPool" class="com.ldd600.frm.mif.net.pool.commons.impl.CommonsNetConnectionPool">
<pl:connectionFactory refId="ftpConnectionFactory"/>
<pl:params>
<pl:param value="10000" key="idleTime"/>
<pl:param value="5" key="maximalNum"/>
<pl:param value="0" key="minIdle"/>
<pl:param value="3" key="maxIdle"/>
<pl:param value="100000" key="waitTime"/>
<pl:param value="1000" key="cleanInterval"/>
</pl:params>
</pl:pool>
<pl:connectionFactory id="ftpConnectionFactory" class="com.ldd600.frm.mif.net.conn.ftp.FTPConnectionFactory">
<pl:params>
<pl:param value="192.168.0.1" key="hostName"/>
<pl:param value="21" key="port"/>
<pl:param value="luga_test" key="userName"/>
<pl:param value="lugatest" key="passWord"/>
<pl:param value="/home/luga_test" key="pathName"/>
</pl:params>
</pl:connectionFactory>
</pl:poolConfig>

可以配置多个pool和多个connectionFactory,connectionFactory被单独提取出来,1是为了不同的pool重用同一个factory的配置,2是connectionfactory可能会生产各种不同类型的connection,比如安全连接,自动应答连接等等,3是当应用开发人员不需要使用pool的时候,也可以直接通过context获取factory直接创建连接。
Pool的schema文件:
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns:pl="http://www.ldd600.com/frm/mif/pl" xmlns:xsd="http://www.w3.org/2001/XMLSchema" targetNamespace="http://www.ldd600.com/frm/mif/pl" elementFormDefault="qualified" attributeFormDefault="unqualified">
<xsd:simpleType name="leafType">
<xsd:restriction base="xsd:string">
<xsd:pattern value="(\s)*"/>
</xsd:restriction>
</xsd:simpleType>
<xsd:simpleType name="notEmptyToken">
<xsd:restriction base="xsd:token">
<xsd:pattern value="(\S+\s*)+"/>
</xsd:restriction>
</xsd:simpleType>
<xsd:simpleType name="classAttributeType">
<xsd:restriction base="xsd:token">
<xsd:pattern value="([a-z|0-9|_]+\.)*[A-Z]([A-Z|a-z|0-9|_])*"/>
</xsd:restriction>
</xsd:simpleType>
<xsd:complexType name="idClassType">
<xsd:all minOccurs="0">
<xsd:element name="params" type="pl:paramsType">
<xsd:key name="outerParamsKey">
<xsd:selector xpath="pl:param"/>
<xsd:field xpath="@key"/>
</xsd:key>
</xsd:element>
</xsd:all>
<xsd:attribute name="id" type="pl:notEmptyToken" use="required"/>
<xsd:attribute name="class" type="pl:classAttributeType" use="required">
<xsd:annotation>
<xsd:documentation>class name</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="refIdClassType">
<xsd:complexContent>
<xsd:extension base="pl:classType">
<xsd:attribute name="refId" type="pl:notEmptyToken"/>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
<xsd:complexType name="paramsType">
<xsd:annotation>
<xsd:documentation> of java class</xsd:documentation>
</xsd:annotation>
<xsd:sequence maxOccurs="unbounded">
<xsd:element name="param">
<xsd:complexType>
<xsd:annotation>
<xsd:documentation>key is property name, value is property value</xsd:documentation>
</xsd:annotation>
<xsd:simpleContent>
<xsd:extension base="pl:leafType">
<xsd:attribute name="key" type="pl:notEmptyToken" use="required"/>
<xsd:attribute name="value" type="pl:notEmptyToken" use="required"/>
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>
</xsd:element>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="classType">
<xsd:sequence minOccurs="0">
<xsd:element name="params" type="pl:paramsType">
<xsd:key name="paramsKey">
<xsd:selector xpath="pl:param"/>
<xsd:field xpath="@key"/>
</xsd:key>
</xsd:element>
</xsd:sequence>
<xsd:attribute name="class" type="pl:classAttributeType">
<xsd:annotation>
<xsd:documentation>class name</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="idPoolType">
<xsd:all minOccurs="0" maxOccurs="1">
<xsd:element name="params" type="pl:paramsType">
<xsd:key name="idPoolParamsKey">
<xsd:selector xpath="pl:param"/>
<xsd:field xpath="@key"/>
</xsd:key>
</xsd:element>
<xsd:element name="connectionFactory" type="pl:refIdClassType" minOccurs="1"/>
</xsd:all>
<xsd:attribute name="id" type="pl:notEmptyToken" use="required"/>
<xsd:attribute name="class" type="pl:classAttributeType" use="required"/>
</xsd:complexType>
<xsd:element name="poolConfig">
<xsd:complexType>
<xsd:sequence minOccurs="0" maxOccurs="unbounded">
<xsd:choice>
<xsd:element name="connectionFactory" type="pl:idClassType" minOccurs="0"/>
<xsd:element name="pool" type="pl:idPoolType" minOccurs="0"/>
</xsd:choice>
</xsd:sequence>
</xsd:complexType>
<xsd:key name="poolIdKey">
<xsd:selector xpath="pl:pool"/>
<xsd:field xpath="@id"/>
</xsd:key>
<xsd:key name="connFactoryIdKey">
<xsd:selector xpath="pl:connection-factory"/>
<xsd:field xpath="@id"/>
</xsd:key>
</xsd:element>
</xsd:schema>

我是用apache xmlbeans来进行schema验证和object binding。在parse xml文件时,使用commons-beanutils完成字段值的动态设置。
3.具体实现
CommonsConnectionPooledObjectFactory类: 具体可参见commons-pool的PoolableObjectFactory详细解释。
1
package com.ldd600.frm.mif.net.pool.commons.impl;
2
3
import com.ldd600.frm.mif.net.conn.NetConnection;
4
import com.ldd600.frm.mif.net.conn.NetConnectionFactory;
5
import com.ldd600.frm.mif.net.conn.NetRealConnection;
6
import com.ldd600.frm.mif.net.pool.commons.ObjectPool;
7
import com.ldd600.frm.mif.net.pool.commons.PoolableObjectFactory;
8
9
public class CommonsConnectionPooledObjectFactory implements
10
PoolableObjectFactory
{
11
private String poolId;
12
private ObjectPool pool;
13
private NetConnectionFactory connFactory;
14
CommonsConnectionPooledObjectFactory(String poolId, NetConnectionFactory connFactory, ObjectPool pool)
{
15
this.poolId = poolId;
16
this.pool = pool;
17
this.connFactory = connFactory;
18
}
19
20
public ObjectPool getPool()
{
21
return pool;
22
}
23
24
synchronized public void setPool(ObjectPool pool)
{
25
if (null != this.pool && this.pool != pool)
{
26
try
{
27
this.pool.close();
28
} catch (Exception e)
{
29
// ignored
30
}
31
}
32
this.pool = pool;
33
}
34
35
public void activateObject(Object obj) throws Exception
{
36
if (obj instanceof CommonsConnectionWrapper)
{
37
((CommonsConnectionWrapper) obj).setClosed(false);
38
}
39
}
40
41
public void destroyObject(Object obj) throws Exception
{
42
if (obj instanceof CommonsConnectionWrapper)
{
43
((CommonsConnectionWrapper) obj).getConnection().disconnect();
44
}
45
}
46
47
public NetConnection makeObject() throws Exception
{
48
NetRealConnection connection = connFactory.createConnection();
49
connection.connect();
50
return new CommonsConnectionWrapper(connection, pool);
51
}
52
53
public void passivateObject(Object obj) throws Exception
{
54
if (obj instanceof CommonsConnectionWrapper)
{
55
((CommonsConnectionWrapper) obj).setClosed(true);
56
}
57
}
58
59
public NetConnectionFactory getConnFactory()
{
60
return connFactory;
61
}
62
63
public String getPoolId()
{
64
return poolId;
65
}
66
67
public boolean validateObject(Object obj)
{
68
if (obj instanceof CommonsConnectionWrapper)
{
69
return ((CommonsConnectionWrapper) obj).getConnection()
70
.isConnected();
71
} else
{
72
return false;
73
}
74
}
75
}
76
CommonsNetConnectionPool:在commons-pool的基础上实现的connection pool, 对于一些比较复杂的功能采用了默认设置,并增加了pool的初始化和验证。InitializingBean是仿照spring写的一个接口,用于在parse配置xml文件并设置完bean的属性后,调用InitializingBean的afterpropertiesset方法,弥补一些默认构造函数不能实现的初始化操作,比如对属性值的验证,和属性值的初始化处理。
1
package com.ldd600.frm.mif.net.pool.commons.impl;
2
3
import com.ldd600.frm.common.utils.StringUtils;
4
import com.ldd600.frm.mif.config.bean.InitializingBean;
5
import com.ldd600.frm.mif.exception.MIFConnectionException;
6
import com.ldd600.frm.mif.net.conn.NetConnection;
7
import com.ldd600.frm.mif.net.conn.NetConnectionFactory;
8
import com.ldd600.frm.mif.net.pool.NetConnectionPool;
9
import com.ldd600.frm.mif.net.pool.commons.ObjectPool;
10
11
public class CommonsNetConnectionPool implements
12
NetConnectionPool, InitializingBean
{
13
14
/** *//** ***************************************************************************** */
15
public static final long DEFAULT_IDLETIME = GenericObjectPool.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS;
16
public static final int DEFAULT_MAXIMAL_CONNECTIOM_NUM = 5;
17
public static final int DEFAULT_MINIMAL_IDLE_NUM = 0;
18
public static final int DEFAULT_MAXIMAL_IDLE_NUM = 3;
19
public static final long DEFAULT_WAIT_TIME = GenericObjectPool.DEFAULT_MAX_WAIT;
20
public static final long DEFAULT_CLEAN_INTERVAL = GenericObjectPool.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
21
22
/** *//** **************************************************************************** */
23
private SubGeneriObjectcPool connectionPool;
24
private long idleTime = DEFAULT_IDLETIME;
25
private int maximalNum = DEFAULT_MAXIMAL_CONNECTIOM_NUM;
26
private int minIdle = DEFAULT_MINIMAL_IDLE_NUM;
27
private int maxIdle = DEFAULT_MAXIMAL_IDLE_NUM;
28
private long waitTime = DEFAULT_WAIT_TIME;
29
private long cleanInterval = DEFAULT_CLEAN_INTERVAL;
30
private NetConnectionFactory connectonFactory;
31
private String poolId;
32
33
public CommonsNetConnectionPool()
{
34
connectionPool = new SubGeneriObjectcPool();
35
}
36
37
/** *//** **************************************************************************** */
38
39
/** *//**
40
* @param poolId
41
* @param idleTime
42
* @param maximalNum
43
* @param minIdle
44
* @param timeOut
45
* @param cleanInterval
46
* @throws MIFConnectionException
47
*/
48
CommonsNetConnectionPool(String poolId, NetConnectionFactory connFactory, long idleTime, int maximalNum,
49
int minIdle, int maxIdle, long waitTime, long cleanInterval)
50
throws MIFConnectionException
{
51
this.idleTime = idleTime;
52
this.maximalNum = maximalNum;
53
this.minIdle = minIdle;
54
this.maxIdle = maxIdle;
55
this.waitTime = waitTime;
56
this.cleanInterval = cleanInterval;
57
connectionPool = new SubGeneriObjectcPool();
58
CommonsConnectionPooledObjectFactory factory = new CommonsConnectionPooledObjectFactory(
59
poolId, connFactory, connectionPool);
60
try
{
61
validateConnectionFactory(factory);
62
} catch (Exception ex)
{
63
throw new MIFConnectionException(
64
"Validate connection factory failure!", ex);
65
}
66
connectionPool.setFactory(factory);
67
connectionPool.setMaxActive(maximalNum);
68
// connectionPool.setWhenExhaustedAction(GenericObjectPool.WHEN_EXHAUSTED_BLOCK);
69
connectionPool.setMaxWait(waitTime);
70
connectionPool.setMaxIdle(maxIdle);
71
connectionPool.setMinIdle(minIdle);
72
connectionPool.setTestOnBorrow(true);
73
// connectionPool.setTestOnReturn(GenericObjectPool.DEFAULT_TEST_ON_RETURN);
74
connectionPool.setTimeBetweenEvictionRunsMillis(cleanInterval);
75
connectionPool.setNumTestsPerEvictionRun(maxIdle - minIdle);
76
connectionPool.setMinEvictableIdleTimeMillis(idleTime);
77
// connectionPool
78
// .setSoftMinEvictableIdleTimeMillis(GenericObjectPool.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS);
79
// connectionPool.setTestWhileIdle(GenericObjectPool.DEFAULT_TEST_WHILE_IDLE);
80
// connectionPool.setLifo(GenericObjectPool.DEFAULT_LIFO);
81
//initialize the pool
82
try
{
83
for (int i = 0; i < maxIdle; i++)
{
84
connectionPool.addObject();
85
}
86
} catch (Exception ex)
{
87
throw new MIFConnectionException(
88
"Initialize connection pool failure!", ex);
89
}
90
System.out.println("Connection number: " + connectionPool.getNumIdle());
91
}
92
93
94
public NetConnectionFactory getConnectonFactory()
{
95
return connectonFactory;
96
}
97
98
public void setConnectonFactory(NetConnectionFactory connectonFactory)
{
99
this.connectonFactory = connectonFactory;
100
}
101
102
public long getIdleTime()
{
103
return idleTime;
104
}
105
106
public void setIdleTime(long idleTime)
{
107
this.idleTime = idleTime;
108
connectionPool.setMinEvictableIdleTimeMillis(idleTime);
109
}
110
111
public int getMaximalNum()
{
112
return maximalNum;
113
}
114
115
public void setMaximalNum(int maximalNum)
{
116
this.maximalNum = maximalNum;
117
connectionPool.setMaxActive(maximalNum);
118
}
119
120
public int getMinIdle()
{
121
return minIdle;
122
}
123
124
public void setMinIdle(int minIdle)
{
125
this.minIdle = minIdle;
126
connectionPool.setMinIdle(minIdle);
127
}
128
129
public int getMaxIdle()
{
130
return maxIdle;
131
}
132
133
public void setMaxIdle(int maxIdle)
{
134
this.maxIdle = maxIdle;
135
connectionPool.setMaxIdle(maxIdle);
136
}
137
138
public long getWaitTime()
{
139
return waitTime;
140
}
141
142
public void setWaitTime(long timeOut)
{
143
this.waitTime = timeOut;
144
connectionPool.setMaxWait(timeOut);
145
}
146
147
public long getCleanInterval()
{
148
return cleanInterval;
149
}
150
151
public void setCleanInterval(long cleanInterval)
{
152
this.cleanInterval = cleanInterval;
153
connectionPool.setTimeBetweenEvictionRunsMillis(cleanInterval);
154
}
155
156
public CommonsConnectionWrapper getConnection() throws MIFConnectionException
{
157
try
{
158
return (CommonsConnectionWrapper) connectionPool.borrowObject();
159
} catch (Exception ex)
{
160
throw new MIFConnectionException(
161
"Exception when get a connection from pool: " + ex.getMessage(), ex);
162
}
163
}
164
165
public void close() throws MIFConnectionException
{
166
ObjectPool oldPool = connectionPool;
167
connectionPool = null;
168
if (null != oldPool)
{
169
try
{
170
oldPool.close();
171
} catch (Exception e)
{
172
throw new MIFConnectionException("Close pool failure!", e);
173
}
174
}
175
}
176
177
private void validateConnectionFactory(
178
CommonsConnectionPooledObjectFactory connectionFactory)
179
throws Exception
{
180
NetConnection conn = null;
181
try
{
182
conn = connectionFactory.makeObject();
183
connectionFactory.activateObject(conn);
184
connectionFactory.validateObject(conn);
185
connectionFactory.passivateObject(conn);
186
} finally
{
187
connectionFactory.destroyObject(conn);
188
}
189
}
190
191
public CommonsConnectionWrapper getConnection(long waitTime) throws MIFConnectionException
{
192
try
{
193
return (CommonsConnectionWrapper) connectionPool.borrowObject(waitTime);
194
}
195
catch (Exception ex)
{
196
throw new MIFConnectionException(
197
"Exception when get a connection from pool: " + ex.getMessage(), ex);
198
}
199
}
200
201
public CommonsConnectionWrapper getConnectionWithOutWait() throws MIFConnectionException
{
202
try
{
203
return (CommonsConnectionWrapper) connectionPool.borrowObjectWithNoWait();
204
} catch (Exception ex)
{
205
throw new MIFConnectionException(
206
"Exception when get a connection from pool: " + ex.getMessage(), ex);
207
}
208
}
209
210
public int getNumActive()
{
211
return connectionPool.getNumActive();
212
}
213
214
public int getNumIdle()
{
215
return connectionPool.getNumIdle();
216
}
217
218
public int getMaxActiveNum()
{
219
return this.getMaximalNum();
220
}
221
222
223
public String getPoolId()
{
224
return poolId;
225
}
226
227
public void setPoolId(String poolId)
{
228
if(StringUtils.isEmpty(poolId))
{
229
throw new IllegalArgumentException("pool id must not be empty!");
230
}
231
this.poolId = poolId;
232
}
233
234
public void clear()
{
235
connectionPool.clear();
236
}
237
238
public void afterPropertiesSet() throws MIFConnectionException
{
239
CommonsConnectionPooledObjectFactory factory = new CommonsConnectionPooledObjectFactory(
240
poolId, connectonFactory, connectionPool);
241
try
{
242
validateConnectionFactory(factory);
243
} catch (Exception ex)
{
244
throw new MIFConnectionException(
245
"Validate connection factory failure!", ex);
246
}
247
connectionPool.setFactory(factory);
248
// connectionPool.setWhenExhaustedAction(GenericObjectPool.WHEN_EXHAUSTED_BLOCK);
249
connectionPool.setTestOnBorrow(true);
250
// connectionPool.setTestOnReturn(GenericObjectPool.DEFAULT_TEST_ON_RETURN);
251
connectionPool.setNumTestsPerEvictionRun(maxIdle - minIdle);
252
// connectionPool
253
// .setSoftMinEvictableIdleTimeMillis(GenericObjectPool.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS);
254
// connectionPool.setTestWhileIdle(GenericObjectPool.DEFAULT_TEST_WHILE_IDLE);
255
// connectionPool.setLifo(GenericObjectPool.DEFAULT_LIFO);
256
//initialize the pool
257
try
{
258
for (int i = 0; i < maxIdle; i++)
{
259
connectionPool.addObject();
260
}
261
} catch (Exception ex)
{
262
throw new MIFConnectionException(
263
"Initialize connection pool failure!", ex);
264
}
265
System.out.println("Connection number: " + connectionPool.getNumIdle());
266
}
267
268
}
269
270
CommonsConnectionPoolFactory:pool的工厂,创建NetConnectionPool
package com.ldd600.frm.mif.net.pool.commons.impl;

import com.ldd600.frm.mif.config.exception.MIFConnectionException;
import com.ldd600.frm.mif.context.ContextFactory;
import com.ldd600.frm.mif.net.conn.NetConnectionFactory;
import com.ldd600.frm.mif.net.pool.NetConnectionPoolFactory;



public class CommonsConnectionPoolFactory implements NetConnectionPoolFactory
{
private long idleTime = CommonsNetConnectionPool.DEFAULT_IDLETIME;
private int maximalNum = CommonsNetConnectionPool.DEFAULT_MAXIMAL_CONNECTIOM_NUM;
private int minIdle = CommonsNetConnectionPool.DEFAULT_MINIMAL_IDLE_NUM;
private int maxIdle = CommonsNetConnectionPool.DEFAULT_MAXIMAL_IDLE_NUM;
private long waitTime = CommonsNetConnectionPool.DEFAULT_WAIT_TIME;
private long cleanInterval = CommonsNetConnectionPool.DEFAULT_CLEAN_INTERVAL;
private String poolId;
private NetConnectionFactory connectionFactory;

public CommonsConnectionPoolFactory(String poolId,NetConnectionFactory connectionFactory, int idleTime, int maximalNum,

int minIdle, int maxIdle, int waitTime, int cleanInterval)
{
this.poolId = poolId;
this.connectionFactory = connectionFactory;
this.idleTime = idleTime;
this.maximalNum = maximalNum;
this.minIdle = minIdle;
this.maxIdle = maxIdle;
this.waitTime = waitTime;
this.cleanInterval = cleanInterval;
}

public CommonsConnectionPoolFactory()
{}


public long getIdleTime()
{
return idleTime;
}


public void setIdleTime(long idleTime)
{
this.idleTime = idleTime;
}


public int getMaximalNum()
{
return maximalNum;
}


public void setMaximalNum(int maximalNum)
{
this.maximalNum = maximalNum;
}


public long getWaitTime()
{
return waitTime;
}


public void setWaitTime(long waitTime)
{
this.waitTime = waitTime;
}


public long getCleanInterval()
{
return cleanInterval;
}


public void setCleanInterval(long cleanInterval)
{
this.cleanInterval = cleanInterval;
}


public String getPoolId()
{
return poolId;
}


public void setPoolId(String poolId)
{

if(ContextFactory.getPoolConfigContext().containsConnectionPool(poolId))
{
throw new IllegalArgumentException("The pool id esxits, please use another one!");
}
this.poolId = poolId;
}


public int getMinIdle()
{
return minIdle;
}


public void setMinIdle(int minimalNum)
{
this.minIdle = minimalNum;
}


public int getMaxIdle()
{
return maxIdle;
}


public void setMaxIdle(int maxIdle)
{
this.maxIdle = maxIdle;
}


public CommonsNetConnectionPool createPool() throws MIFConnectionException
{
return new CommonsNetConnectionPool(poolId, connectionFactory, idleTime, maximalNum, minIdle, maxIdle, waitTime, cleanInterval);
}


public void setNetConnectionFactory(NetConnectionFactory connectionFactory)
{

if(null == connectionFactory)
{
throw new IllegalArgumentException("Connection factory is null!");
}
this.connectionFactory = connectionFactory;
}


public NetConnectionFactory getConnectionFactory()
{
return connectionFactory;
}
}

CommonsConnectionWrapper:
1
package com.ldd600.frm.mif.net.pool.commons.impl;
2
3
import java.io.File;
4
import java.io.InputStream;
5
import java.io.OutputStream;
6
import java.util.ArrayList;
7
import java.util.Arrays;
8
import java.util.Collection;
9
import java.util.Collections;
10
import java.util.Iterator;
11
12
import com.ldd600.frm.mif.common.FileObject;
13
import com.ldd600.frm.mif.config.exception.MIFConnectionException;
14
import com.ldd600.frm.mif.config.exception.MIFReceiverException;
15
import com.ldd600.frm.mif.config.exception.MIFSenderException;
16
import com.ldd600.frm.mif.net.conn.NetConnection;
17
import com.ldd600.frm.mif.net.conn.NetRealConnection;
18
import com.ldd600.frm.mif.net.pool.commons.ObjectPool;
19
20
public class CommonsConnectionWrapper implements NetConnection
{
21
protected NetRealConnection connection;
22
protected ObjectPool pool;
23
protected boolean isClosed;
24
25
public CommonsConnectionWrapper(NetRealConnection connection,
26
ObjectPool pool)
{
27
this.connection = connection;
28
this.pool = pool;
29
}
30
31
// ************************************************************//
32
33
public boolean isConnected()
{
34
return connection.isConnected();
35
}
36
37
public boolean isClosed()
{
38
return this.isClosed;
39
}
40
41
public void setClosed(boolean isClosed)
{
42
this.isClosed = isClosed;
43
}
44
45
// *************************************************************//
46
47
public boolean sendMessage(FileObject msgObj) throws MIFSenderException
{
48
return connection.sendMessage(msgObj.getFileName(), msgObj);
49
}
50
51
public boolean sendMessage(Collection<FileObject> msgObjs)
52
throws MIFSenderException
{
53
boolean success = true;
54
for (FileObject msgObj : msgObjs)
{
55
if (!this.sendMessage(msgObj) && success)
{
56
success = false;
57
}
58
}
59
return success;
60
}
61
62
public boolean sendMessage(FileObject
msgObjs) throws MIFSenderException
{
63
return this.sendMessage(Arrays.asList(msgObjs));
64
}
65
66
public boolean sendMessage(String fileName, String content)
67
throws MIFSenderException
{
68
return connection.sendMessage(fileName, content);
69
}
70
71
public boolean sendMessage(String fileName, InputStream ins)
72
throws MIFSenderException
{
73
return connection.sendMessage(fileName, ins);
74
}
75
76
// ************************************************************//
77
public boolean receiveMessage(String fileName, byte[] bytes) throws MIFReceiverException
{
78
return connection.receiveMessage(fileName, bytes);
79
}
80
81
public File receiveMessage(FileObject fileObj) throws MIFReceiverException
{
82
return connection.receiveMessage(fileObj.getFileName(), fileObj);
83
}
84
85
public boolean receiveMessage(String fileName, OutputStream ous) throws MIFReceiverException
{
86
return connection.receiveMessage(fileName, ous);
87
}
88
89
public File[] receiveMessage(FileObject
fileObjs) throws MIFReceiverException
{
90
Collection<File> fileList = receiveMessage(Arrays.asList(fileObjs));
91
File[] fileArr = new File[fileList.size()];
92
int i = 0;
93
for(Iterator<File> it = fileList.iterator(); it.hasNext(); i++)
{
94
File file = it.next();
95
fileArr[i] = file;
96
}
97
return fileArr;
98
}
99
100
public Collection<File> receiveMessage(Collection<FileObject> fileObjs) throws MIFReceiverException
{
101
Collection<File> fileList = new ArrayList<File> ();
102
for(FileObject fileObj : fileObjs)
{
103
File file = connection.receiveMessage(fileObj.getFileName(), fileObj);
104
fileList.add(file);
105
}
106
return Collections.unmodifiableCollection(fileList);
107
}
108
109
// *************************************************************//
110
111
public void close() throws MIFConnectionException
{
112
try
{
113
pool.returnObject(this);
114
} catch (Exception ex)
{
115
throw new MIFConnectionException(
116
"Exception occurs when close a connection, that means return the connection to the pool",
117
ex);
118
}
119
}
120
121
}
122
FTPConnection:
1
package com.ldd600.frm.mif.net.conn.ftp;
2
3
import java.io.BufferedInputStream;
4
import java.io.File;
5
import java.io.FileInputStream;
6
import java.io.FileOutputStream;
7
import java.io.IOException;
8
import java.io.InputStream;
9
import java.io.OutputStream;
10
11
import org.apache.commons.net.ftp.FTPClient;
12
import org.apache.commons.net.ftp.FTPFile;
13
import org.apache.commons.net.ftp.FTPReply;
14
15
import com.ldd600.frm.mif.common.FileObject;
16
import com.ldd600.frm.mif.config.exception.MIFConnectionException;
17
import com.ldd600.frm.mif.config.exception.MIFReceiverException;
18
import com.ldd600.frm.mif.config.exception.MIFSenderException;
19
import com.ldd600.frm.mif.constant.BasicConstants;
20
import com.ldd600.frm.mif.net.conn.NetRealConnection;
21
import com.ldd600.frm.mif.util.StringUtils;
22
23
public class FTPConnection implements NetRealConnection
{
24
// ------------------------variables------------------------//
25
26
public static final int DEFAULT_FTP_PORT = 21;
27
public static final String DEFAULT_UPLOAD_PATH = "/";
28
public static final String DEFAULT_USERNAME = "";
29
public static final String DEFAULT_PASSWORD = "";
30
private FTPClient ftpClient = new FTPClient();
31
private String hostName;
32
private int port = DEFAULT_FTP_PORT;
33
private String userName = DEFAULT_USERNAME;
34
private String passWord = DEFAULT_PASSWORD;
35
private String pathName = DEFAULT_UPLOAD_PATH;
36
37
// ------------------------------------------------------------//
38
39
// ---------------------------Constructors---------------------//
40
public FTPConnection()
{
41
}
42
43
public FTPConnection(String hostName)
{
44
if (StringUtils.isEmpty(hostName))
{
45
throw new IllegalArgumentException("Host name must not empty!");
46
} else
{
47
this.hostName = hostName;
48
}
49
}
50
51
public FTPConnection(String hostName, int port)
{
52
this(hostName);
53
this.port = port;
54
}
55
56
public FTPConnection(String hostName, int port, String userName,
57
String passWord)
{
58
this(hostName, port);
59
this.userName = userName;
60
this.passWord = passWord;
61
}
62
63
public FTPConnection(String hostName, int port, String userName,
64
String passWord, String pathName)
{
65
this(hostName, port, userName, passWord);
66
this.pathName = pathName;
67
}
68
69
// -----------------------------------------------------------------------//
70
71
// ------------------------------getters and setters--------------------//
72
public String getHostName()
{
73
return hostName;
74
}
75
76
public void setHostName(String hostName)
{
77
this.hostName = hostName;
78
}
79
80
public int getPort()
{
81
return port;
82
}
83
84
public void setPort(int port)
{
85
this.port = port;
86
}
87
88
public String getUserName()
{
89
return userName;
90
}
91
92
public void setUserName(String userName)
{
93
this.userName = userName;
94
}
95
96
public String getPassWord()
{
97
return passWord;
98
}
99
100
public void setPassWord(String passWord)
{
101
this.passWord = passWord;
102
}
103
104
public FTPClient getFtpClient()
{
105
return ftpClient;
106
}
107
108
public String getPathName()
{
109
return pathName;
110
}
111
112
public void setPathName(String pathName)
{
113
this.pathName = pathName;
114
}
115
116
// ------------------------------------------------------------------------//
117
118
public void connect() throws MIFConnectionException
{
119
if (StringUtils.isEmpty(hostName))
{
120
throw new IllegalStateException(
121
"Host name must be initialized before connect!");
122
}
123
try
{
124
ftpClient.connect(hostName, port);
125
System.out.println(ftpClient.getReplyCode());
126
if (!login(userName, passWord))
{
127
ftpClient.disconnect();
128
throw new MIFConnectionException(
129
"username or password doesn't match!");
130
}
131
System.out.println(ftpClient.getReplyCode());
132
if (!this.changeToDirectory(pathName))
{
133
disconnect();
134
throw new MIFConnectionException(
135
"path doesn't esxits or have no permission to access, please change one!");
136
}
137
138
// Use passive mode as default because most of us are
139
// behind firewalls these days.
140
ftpClient.enterLocalPassiveMode();
141
} catch (IOException ioex)
{
142
throw new MIFConnectionException("Connect to server: " + hostName
143
+ " port: " + port + "with username: " + userName
144
+ " password: " + passWord
145
+ " encounter IO or Socket error!", ioex);
146
}
147
148
}
149
150
public void disconnect() throws MIFConnectionException
{
151
try
{
152
ftpClient.logout();
153
} catch (IOException ioex)
{
154
// swallow the exception
155
} finally
{
156
if (ftpClient.isConnected())
{
157
try
{
158
ftpClient.disconnect();
159
} catch (IOException e)
{
160
throw new MIFConnectionException("Disconnect to server "
161
+ hostName + "ecounter io error!", e);
162
}
163
}
164
}
165
}
166
167
public boolean isConnected()
{
168
if (!ftpClient.isConnected())
{
169
return false;
170
}
171
try
{
172
int code = ftpClient.stat();
173
if (FTPReply.isNegativePermanent(code))
{
174
return false;
175
}
176
} catch (IOException ioex)
{
177
return false;
178
}
179
return true;
180
}
181
182
public boolean sendMessage(String fileName, FileObject fileObj) throws MIFSenderException
{
183
boolean success = true;
184
String localFolder = fileObj.getFolder();
185
String localFileName = fileObj.getFileName();
186
if (StringUtils.isEmpty(localFolder) || StringUtils.isEmpty(localFileName))
{
187
throw new IllegalArgumentException(
188
"Local folder or file name is an empty string!");
189
}
190
StringBuilder sb = new StringBuilder(localFolder);
191
sb.append(BasicConstants.FILE_SEPARATOR).append(localFileName);
192
File file = new File(sb.toString());
193
if (null == file)
{
194
throw new IllegalArgumentException(
195
"Local folder or file name doesn't exsits!");
196
}
197
InputStream ins = null;
198
try
{
199
ins = new FileInputStream(file);
200
if (!ftpClient.storeFile(fileName, ins))
{
201
//TODO some log
202
success = false;
203
}
204
// if (FTPReply.isNegativePermanent(ftpClient.getReplyCode())) {
205
// disconnect();
206
// }
207
return success;
208
} catch (IOException ioex)
{
209
throw new MIFSenderException("Send file: " + fileObj.getFileName()
210
+ " failure!", ioex);
211
}
212
// catch (MIFConnectionException mcex) {
213
// throw new MIFSenderException("Send file: " + msgObj.getFileName()
214
// + " failure!", mcex);
215
// }
216
finally
{
217
218
// ftpClient.completePendingCommand();
219
if (null != ins)
{
220
try
{
221
ins.close();
222
} catch (IOException e)
{
223
throw new MIFSenderException("Close Stream failure!", e);
224
}
225
}
226
}
227
}
228
229
public boolean sendMessage(String fileName, String content)
230
throws MIFSenderException
{
231
OutputStream os = null;
232
try
{
233
System.out.println(ftpClient.getReplyCode());
234
OutputStream retOs = ftpClient.storeFileStream(fileName);
235
os = retOs;
236
os.write(content.getBytes());
237
// os.flush();
238
239
} catch (IOException ioex)
{
240
throw new MIFSenderException(
241
"Send file: " + fileName + " failure!", ioex);
242
} finally
{
243
if (null != os)
{
244
try
{
245
os.close();
246
247
} catch (IOException e)
{
248
throw new MIFSenderException("Close Stream failure!", e);
249
}
250
// catch (MIFConnectionException mcex) {
251
// throw new MIFSenderException("Send file: " + fileName
252
// + " failure!", mcex);
253
// }
254
}
255
}
256
257
try
{
258
if(ftpClient.completePendingCommand())
{
259
return true;
260
}else
{
261
//TODO some log
262
return false;
263
}
264
} catch (IOException ioex)
{
265
throw new MIFSenderException("Complete pending command failure!",
266
ioex);
267
}
268
}
269
270
public boolean sendMessage(String fileName, InputStream ins)
271
throws MIFSenderException
{
272
boolean success = true;
273
try
{
274
System.out.println(ftpClient.getReplyCode());
275
if (!ftpClient.storeFile(fileName, ins))
{
276
//TODO do some log;
277
System.out.println("send message failed!");
278
success = false;
279
}
280
// if (FTPReply.isNegativePermanent(ftpClient.getReplyCode())) {
281
// disconnect();
282
// }
283
return success;
284
} catch (IOException ioex)
{
285
throw new MIFSenderException(
286
"Send file: " + fileName + " failure!", ioex);
287
}
288
// catch (MIFConnectionException mcex) {
289
// throw new MIFSenderException(
290
// "Send file: " + fileName + " failure!", mcex);
291
// }
292
finally
{
293
try
{
294
ins.close();
295
} catch (IOException e)
{
296
throw new MIFSenderException("Close Stream failure!", e);
297
}
298
}
299
}
300
301
public boolean receiveMessage(String fileName, byte[] bytes)
302
throws MIFReceiverException
{
303
BufferedInputStream bins = null;
304
try
{
305
InputStream ins = ftpClient.retrieveFileStream(fileName);
306
if(ins != null)
{
307
bins = new BufferedInputStream(ins);
308
ins.read(bytes);
309
}
310
} catch (IOException ioex)
{
311
throw new MIFReceiverException("When receive file: " + fileName
312
+ " ecounter an IO error!", ioex);
313
} finally
{
314
if (bins != null)
{
315
try
{
316
bins.close();
317
} catch (IOException e)
{
318
throw new MIFReceiverException(
319
"close inputestream failure!", e);
320
}
321
}
322
}
323
324
try
{
325
if(ftpClient.completePendingCommand())
{
326
return true;
327
}else
{
328
return false;
329
}
330
} catch (IOException ioex)
{
331
throw new MIFReceiverException("Complete pending command failure!",
332
ioex);
333
}
334
}
335
336
public File receiveMessage(String fileName, FileObject fileObj)
337
throws MIFReceiverException
{
338
String localFolder = fileObj.getFolder();
339
String localFileName = fileObj.getFileName();
340
if (StringUtils.isEmpty(localFolder) || StringUtils.isEmpty(fileName))
{
341
throw new IllegalArgumentException(
342
"Local folder or file name is an empty string!");
343
}
344
StringBuilder sb = new StringBuilder(localFolder);
345
sb.append(BasicConstants.FILE_SEPARATOR).append(localFileName);
346
File file = new File(sb.toString());
347
if (null == file)
{
348
throw new IllegalArgumentException(
349
"Local folder or file name doesn't exsits!");
350
}
351
OutputStream ous = null;
352
try
{
353
ous = new FileOutputStream(file);
354
if (!ftpClient.retrieveFile(fileName, ous))
{
355
//TODO some log
356
file.delete();
357
return null;
358
}
359
return file;
360
} catch (IOException ioex)
{
361
if (file.exists())
{
362
file.delete();
363
}
364
365
throw new MIFReceiverException("When receive file: " + fileName
366
+ " ecounter an IO error!", ioex);
367
} finally
{
368
if (null != ous)
{
369
try
{
370
ous.close();
371
} catch (IOException e)
{
372
throw new MIFReceiverException("When close output stream "
373
+ " ecounter an IO error!", e);
374
}
375
}
376
}
377
378
}
379
380
public boolean receiveMessage(String fileName, OutputStream ous)
381
throws MIFReceiverException
{
382
try
{
383
return ftpClient.retrieveFile(fileName, ous);
384
} catch (IOException ioex)
{
385
throw new MIFReceiverException("When receive file: " + fileName
386
+ " ecounter an IO error!", ioex);
387
}
388
}
389
390
// ---------------------------------------------------------------------------//
391
392
public boolean login(String userName, String passWord) throws IOException
{
393
return ftpClient.login(userName, passWord);
394
}
395
396
public boolean changeToDirectory(String pathName) throws IOException
{
397
return ftpClient.changeWorkingDirectory(pathName);
398
}
399
400
public boolean fileExists(String remote) throws IOException
{
401
FTPFile[] files = ftpClient.listFiles(this.pathName);
402
for (FTPFile file : files)
{
403
if (remote.equalsIgnoreCase(file.getName()))
{
404
return true;
405
}
406
}
407
return false;
408
}
409
410
}
411
FTPConnectionFactory:
1
2
package com.ldd600.frm.mif.net.conn.ftp;
3
4
import com.ldd600.frm.mif.net.conn.NetConnectionFactory;
5
6
public class FTPConnectionFactory implements NetConnectionFactory
{
7
private String hostName;
8
private int port = FTPConnection.DEFAULT_FTP_PORT;
9
private String userName = FTPConnection.DEFAULT_USERNAME;
10
private String passWord = FTPConnection.DEFAULT_PASSWORD;
11
private String pathName = FTPConnection.DEFAULT_UPLOAD_PATH;
12
13
public FTPConnectionFactory()
{}
14
15
public String getHostName()
{
16
return hostName;
17
}
18
19
public void setHostName(String hostName)
{
20
this.hostName = hostName;
21
}
22
23
24
public int getPort()
{
25
return port;
26
}
27
28
29
public void setPort(int port)
{
30
this.port = port;
31
}
32
33
34
public String getUserName()
{
35
return userName;
36
}
37
38
39
public void setUserName(String userName)
{
40
this.userName = userName;
41
}
42
43
44
public String getPassWord()
{
45
return passWord;
46
}
47
48
49
public void setPassWord(String passWord)
{
50
this.passWord = passWord;
51
}
52
53
54
public String getPathName()
{
55
return pathName;
56
}
57
58
59
public void setPathName(String pathName)
{
60
this.pathName = pathName;
61
}
62
63
64
public FTPConnection createConnection()
{
65
return new FTPConnection(hostName, port, userName, passWord, pathName);
66
}
67
68
}
69
70