1.
目的
主要是要构建一个符合自己需求的的连接池(对象池)。这个连接池中所存放的对象的行为是自定义的并满足项目实际应用需求的。但是这些对象的频繁创建需要很大的代价,比如一些Socket连接。
2. 结构
NetRealConnection: 物理连接相关行为的接口
FTPConnection:NetRealConnection的ftp实现,在apache Commons-net的FTPClient基础上实现。
NetConnection: NetRealConnection的代理,是从pool中get出来的对象,由于从pool中获取的connection并不需要了解物理连接的行为,所以要屏蔽这些行为,而且逻辑连接在使用完毕后需要关闭,增加了close方法返回pool。
CommonsConnectionWrapper:从CommonsConnectionPool得到的NetConnection的实现,它需要把NetRealConnection返回给CommonsConnectionPool。
NetConnectionPool: 一个NetConnection的pool,描述这类pool的行为
CommonsConnectionPool: 在apache commons-pool基础上实现的一个NetConnectionPool
NetConnectioFactory: 用于创建NetRealConnetion的抽象工厂
NetConnectionPoolFactory; 用于创建NetConnectionPool的抽象工厂
基于接口设计, Connection-pool的行为被抽象出来,便于添加不同connecton-pool的具体实现,NetConnection的行为被抽象出来,便于添加各种connection的具体实现,pool和connection之间通过接口建立关系,connection的具体实现可以随意切换。
2.配置
Pool的配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<pl:poolConfig xmlns:pl="http://www.ldd600.com/frm/mif/pl" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.ldd600.com/frm/mif/pl mifPool.xsd">
<pl:pool id="ftpPool" class="com.ldd600.frm.mif.net.pool.commons.impl.CommonsNetConnectionPool">
<pl:connectionFactory refId="ftpConnectionFactory"/>
<pl:params>
<pl:param value="10000" key="idleTime"/>
<pl:param value="5" key="maximalNum"/>
<pl:param value="0" key="minIdle"/>
<pl:param value="3" key="maxIdle"/>
<pl:param value="100000" key="waitTime"/>
<pl:param value="1000" key="cleanInterval"/>
</pl:params>
</pl:pool>
<pl:connectionFactory id="ftpConnectionFactory" class="com.ldd600.frm.mif.net.conn.ftp.FTPConnectionFactory">
<pl:params>
<pl:param value="192.168.0.1" key="hostName"/>
<pl:param value="21" key="port"/>
<pl:param value="luga_test" key="userName"/>
<pl:param value="lugatest" key="passWord"/>
<pl:param value="/home/luga_test" key="pathName"/>
</pl:params>
</pl:connectionFactory>
</pl:poolConfig>
可以配置多个pool和多个connectionFactory,connectionFactory被单独提取出来,1是为了不同的pool重用同一个factory的配置,2是connectionfactory可能会生产各种不同类型的connection,比如安全连接,自动应答连接等等,3是当应用开发人员不需要使用pool的时候,也可以直接通过context获取factory直接创建连接。
Pool的schema文件:
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns:pl="http://www.ldd600.com/frm/mif/pl" xmlns:xsd="http://www.w3.org/2001/XMLSchema" targetNamespace="http://www.ldd600.com/frm/mif/pl" elementFormDefault="qualified" attributeFormDefault="unqualified">
<xsd:simpleType name="leafType">
<xsd:restriction base="xsd:string">
<xsd:pattern value="(\s)*"/>
</xsd:restriction>
</xsd:simpleType>
<xsd:simpleType name="notEmptyToken">
<xsd:restriction base="xsd:token">
<xsd:pattern value="(\S+\s*)+"/>
</xsd:restriction>
</xsd:simpleType>
<xsd:simpleType name="classAttributeType">
<xsd:restriction base="xsd:token">
<xsd:pattern value="([a-z|0-9|_]+\.)*[A-Z]([A-Z|a-z|0-9|_])*"/>
</xsd:restriction>
</xsd:simpleType>
<xsd:complexType name="idClassType">
<xsd:all minOccurs="0">
<xsd:element name="params" type="pl:paramsType">
<xsd:key name="outerParamsKey">
<xsd:selector xpath="pl:param"/>
<xsd:field xpath="@key"/>
</xsd:key>
</xsd:element>
</xsd:all>
<xsd:attribute name="id" type="pl:notEmptyToken" use="required"/>
<xsd:attribute name="class" type="pl:classAttributeType" use="required">
<xsd:annotation>
<xsd:documentation>class name</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="refIdClassType">
<xsd:complexContent>
<xsd:extension base="pl:classType">
<xsd:attribute name="refId" type="pl:notEmptyToken"/>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
<xsd:complexType name="paramsType">
<xsd:annotation>
<xsd:documentation> of java class</xsd:documentation>
</xsd:annotation>
<xsd:sequence maxOccurs="unbounded">
<xsd:element name="param">
<xsd:complexType>
<xsd:annotation>
<xsd:documentation>key is property name, value is property value</xsd:documentation>
</xsd:annotation>
<xsd:simpleContent>
<xsd:extension base="pl:leafType">
<xsd:attribute name="key" type="pl:notEmptyToken" use="required"/>
<xsd:attribute name="value" type="pl:notEmptyToken" use="required"/>
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>
</xsd:element>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="classType">
<xsd:sequence minOccurs="0">
<xsd:element name="params" type="pl:paramsType">
<xsd:key name="paramsKey">
<xsd:selector xpath="pl:param"/>
<xsd:field xpath="@key"/>
</xsd:key>
</xsd:element>
</xsd:sequence>
<xsd:attribute name="class" type="pl:classAttributeType">
<xsd:annotation>
<xsd:documentation>class name</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="idPoolType">
<xsd:all minOccurs="0" maxOccurs="1">
<xsd:element name="params" type="pl:paramsType">
<xsd:key name="idPoolParamsKey">
<xsd:selector xpath="pl:param"/>
<xsd:field xpath="@key"/>
</xsd:key>
</xsd:element>
<xsd:element name="connectionFactory" type="pl:refIdClassType" minOccurs="1"/>
</xsd:all>
<xsd:attribute name="id" type="pl:notEmptyToken" use="required"/>
<xsd:attribute name="class" type="pl:classAttributeType" use="required"/>
</xsd:complexType>
<xsd:element name="poolConfig">
<xsd:complexType>
<xsd:sequence minOccurs="0" maxOccurs="unbounded">
<xsd:choice>
<xsd:element name="connectionFactory" type="pl:idClassType" minOccurs="0"/>
<xsd:element name="pool" type="pl:idPoolType" minOccurs="0"/>
</xsd:choice>
</xsd:sequence>
</xsd:complexType>
<xsd:key name="poolIdKey">
<xsd:selector xpath="pl:pool"/>
<xsd:field xpath="@id"/>
</xsd:key>
<xsd:key name="connFactoryIdKey">
<xsd:selector xpath="pl:connection-factory"/>
<xsd:field xpath="@id"/>
</xsd:key>
</xsd:element>
</xsd:schema>
我是用apache xmlbeans来进行schema验证和object binding。在parse xml文件时,使用commons-beanutils完成字段值的动态设置。
3.具体实现
CommonsConnectionPooledObjectFactory类: 具体可参见commons-pool的PoolableObjectFactory详细解释。
1package com.ldd600.frm.mif.net.pool.commons.impl;
2
3import com.ldd600.frm.mif.net.conn.NetConnection;
4import com.ldd600.frm.mif.net.conn.NetConnectionFactory;
5import com.ldd600.frm.mif.net.conn.NetRealConnection;
6import com.ldd600.frm.mif.net.pool.commons.ObjectPool;
7import com.ldd600.frm.mif.net.pool.commons.PoolableObjectFactory;
8
9public class CommonsConnectionPooledObjectFactory implements
10 PoolableObjectFactory {
11 private String poolId;
12 private ObjectPool pool;
13 private NetConnectionFactory connFactory;
14 CommonsConnectionPooledObjectFactory(String poolId, NetConnectionFactory connFactory, ObjectPool pool) {
15 this.poolId = poolId;
16 this.pool = pool;
17 this.connFactory = connFactory;
18 }
19
20 public ObjectPool getPool() {
21 return pool;
22 }
23
24 synchronized public void setPool(ObjectPool pool) {
25 if (null != this.pool && this.pool != pool) {
26 try {
27 this.pool.close();
28 } catch (Exception e) {
29 // ignored
30 }
31 }
32 this.pool = pool;
33 }
34
35 public void activateObject(Object obj) throws Exception {
36 if (obj instanceof CommonsConnectionWrapper) {
37 ((CommonsConnectionWrapper) obj).setClosed(false);
38 }
39 }
40
41 public void destroyObject(Object obj) throws Exception {
42 if (obj instanceof CommonsConnectionWrapper) {
43 ((CommonsConnectionWrapper) obj).getConnection().disconnect();
44 }
45 }
46
47 public NetConnection makeObject() throws Exception {
48 NetRealConnection connection = connFactory.createConnection();
49 connection.connect();
50 return new CommonsConnectionWrapper(connection, pool);
51 }
52
53 public void passivateObject(Object obj) throws Exception {
54 if (obj instanceof CommonsConnectionWrapper) {
55 ((CommonsConnectionWrapper) obj).setClosed(true);
56 }
57 }
58
59 public NetConnectionFactory getConnFactory() {
60 return connFactory;
61 }
62
63 public String getPoolId() {
64 return poolId;
65 }
66
67 public boolean validateObject(Object obj) {
68 if (obj instanceof CommonsConnectionWrapper) {
69 return ((CommonsConnectionWrapper) obj).getConnection()
70 .isConnected();
71 } else {
72 return false;
73 }
74 }
75}
76
CommonsNetConnectionPool:在commons-pool的基础上实现的connection pool, 对于一些比较复杂的功能采用了默认设置,并增加了pool的初始化和验证。InitializingBean是仿照spring写的一个接口,用于在parse配置xml文件并设置完bean的属性后,调用InitializingBean的afterpropertiesset方法,弥补一些默认构造函数不能实现的初始化操作,比如对属性值的验证,和属性值的初始化处理。
1package com.ldd600.frm.mif.net.pool.commons.impl;
2
3import com.ldd600.frm.common.utils.StringUtils;
4import com.ldd600.frm.mif.config.bean.InitializingBean;
5import com.ldd600.frm.mif.exception.MIFConnectionException;
6import com.ldd600.frm.mif.net.conn.NetConnection;
7import com.ldd600.frm.mif.net.conn.NetConnectionFactory;
8import com.ldd600.frm.mif.net.pool.NetConnectionPool;
9import com.ldd600.frm.mif.net.pool.commons.ObjectPool;
10
11public class CommonsNetConnectionPool implements
12 NetConnectionPool, InitializingBean{
13
14 /** *//** ***************************************************************************** */
15 public static final long DEFAULT_IDLETIME = GenericObjectPool.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS;
16 public static final int DEFAULT_MAXIMAL_CONNECTIOM_NUM = 5;
17 public static final int DEFAULT_MINIMAL_IDLE_NUM = 0;
18 public static final int DEFAULT_MAXIMAL_IDLE_NUM = 3;
19 public static final long DEFAULT_WAIT_TIME = GenericObjectPool.DEFAULT_MAX_WAIT;
20 public static final long DEFAULT_CLEAN_INTERVAL = GenericObjectPool.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
21
22 /** *//** **************************************************************************** */
23 private SubGeneriObjectcPool connectionPool;
24 private long idleTime = DEFAULT_IDLETIME;
25 private int maximalNum = DEFAULT_MAXIMAL_CONNECTIOM_NUM;
26 private int minIdle = DEFAULT_MINIMAL_IDLE_NUM;
27 private int maxIdle = DEFAULT_MAXIMAL_IDLE_NUM;
28 private long waitTime = DEFAULT_WAIT_TIME;
29 private long cleanInterval = DEFAULT_CLEAN_INTERVAL;
30 private NetConnectionFactory connectonFactory;
31 private String poolId;
32
33 public CommonsNetConnectionPool() {
34 connectionPool = new SubGeneriObjectcPool();
35 }
36
37 /** *//** **************************************************************************** */
38
39 /** *//**
40 * @param poolId
41 * @param idleTime
42 * @param maximalNum
43 * @param minIdle
44 * @param timeOut
45 * @param cleanInterval
46 * @throws MIFConnectionException
47 */
48 CommonsNetConnectionPool(String poolId, NetConnectionFactory connFactory, long idleTime, int maximalNum,
49 int minIdle, int maxIdle, long waitTime, long cleanInterval)
50 throws MIFConnectionException {
51 this.idleTime = idleTime;
52 this.maximalNum = maximalNum;
53 this.minIdle = minIdle;
54 this.maxIdle = maxIdle;
55 this.waitTime = waitTime;
56 this.cleanInterval = cleanInterval;
57 connectionPool = new SubGeneriObjectcPool();
58 CommonsConnectionPooledObjectFactory factory = new CommonsConnectionPooledObjectFactory(
59 poolId, connFactory, connectionPool);
60 try {
61 validateConnectionFactory(factory);
62 } catch (Exception ex) {
63 throw new MIFConnectionException(
64 "Validate connection factory failure!", ex);
65 }
66 connectionPool.setFactory(factory);
67 connectionPool.setMaxActive(maximalNum);
68// connectionPool.setWhenExhaustedAction(GenericObjectPool.WHEN_EXHAUSTED_BLOCK);
69 connectionPool.setMaxWait(waitTime);
70 connectionPool.setMaxIdle(maxIdle);
71 connectionPool.setMinIdle(minIdle);
72 connectionPool.setTestOnBorrow(true);
73// connectionPool.setTestOnReturn(GenericObjectPool.DEFAULT_TEST_ON_RETURN);
74 connectionPool.setTimeBetweenEvictionRunsMillis(cleanInterval);
75 connectionPool.setNumTestsPerEvictionRun(maxIdle - minIdle);
76 connectionPool.setMinEvictableIdleTimeMillis(idleTime);
77// connectionPool
78// .setSoftMinEvictableIdleTimeMillis(GenericObjectPool.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS);
79// connectionPool.setTestWhileIdle(GenericObjectPool.DEFAULT_TEST_WHILE_IDLE);
80// connectionPool.setLifo(GenericObjectPool.DEFAULT_LIFO);
81 //initialize the pool
82 try {
83 for (int i = 0; i < maxIdle; i++) {
84 connectionPool.addObject();
85 }
86 } catch (Exception ex) {
87 throw new MIFConnectionException(
88 "Initialize connection pool failure!", ex);
89 }
90 System.out.println("Connection number: " + connectionPool.getNumIdle());
91 }
92
93
94 public NetConnectionFactory getConnectonFactory() {
95 return connectonFactory;
96 }
97
98 public void setConnectonFactory(NetConnectionFactory connectonFactory) {
99 this.connectonFactory = connectonFactory;
100 }
101
102 public long getIdleTime() {
103 return idleTime;
104 }
105
106 public void setIdleTime(long idleTime) {
107 this.idleTime = idleTime;
108 connectionPool.setMinEvictableIdleTimeMillis(idleTime);
109 }
110
111 public int getMaximalNum() {
112 return maximalNum;
113 }
114
115 public void setMaximalNum(int maximalNum) {
116 this.maximalNum = maximalNum;
117 connectionPool.setMaxActive(maximalNum);
118 }
119
120 public int getMinIdle() {
121 return minIdle;
122 }
123
124 public void setMinIdle(int minIdle) {
125 this.minIdle = minIdle;
126 connectionPool.setMinIdle(minIdle);
127 }
128
129 public int getMaxIdle() {
130 return maxIdle;
131 }
132
133 public void setMaxIdle(int maxIdle) {
134 this.maxIdle = maxIdle;
135 connectionPool.setMaxIdle(maxIdle);
136 }
137
138 public long getWaitTime() {
139 return waitTime;
140 }
141
142 public void setWaitTime(long timeOut) {
143 this.waitTime = timeOut;
144 connectionPool.setMaxWait(timeOut);
145 }
146
147 public long getCleanInterval() {
148 return cleanInterval;
149 }
150
151 public void setCleanInterval(long cleanInterval) {
152 this.cleanInterval = cleanInterval;
153 connectionPool.setTimeBetweenEvictionRunsMillis(cleanInterval);
154 }
155
156 public CommonsConnectionWrapper getConnection() throws MIFConnectionException {
157 try {
158 return (CommonsConnectionWrapper) connectionPool.borrowObject();
159 } catch (Exception ex) {
160 throw new MIFConnectionException(
161 "Exception when get a connection from pool: " + ex.getMessage(), ex);
162 }
163 }
164
165 public void close() throws MIFConnectionException {
166 ObjectPool oldPool = connectionPool;
167 connectionPool = null;
168 if (null != oldPool) {
169 try {
170 oldPool.close();
171 } catch (Exception e) {
172 throw new MIFConnectionException("Close pool failure!", e);
173 }
174 }
175 }
176
177 private void validateConnectionFactory(
178 CommonsConnectionPooledObjectFactory connectionFactory)
179 throws Exception {
180 NetConnection conn = null;
181 try {
182 conn = connectionFactory.makeObject();
183 connectionFactory.activateObject(conn);
184 connectionFactory.validateObject(conn);
185 connectionFactory.passivateObject(conn);
186 } finally {
187 connectionFactory.destroyObject(conn);
188 }
189 }
190
191 public CommonsConnectionWrapper getConnection(long waitTime) throws MIFConnectionException {
192 try {
193 return (CommonsConnectionWrapper) connectionPool.borrowObject(waitTime);
194 }
195 catch (Exception ex) {
196 throw new MIFConnectionException(
197 "Exception when get a connection from pool: " + ex.getMessage(), ex);
198 }
199 }
200
201 public CommonsConnectionWrapper getConnectionWithOutWait() throws MIFConnectionException {
202 try {
203 return (CommonsConnectionWrapper) connectionPool.borrowObjectWithNoWait();
204 } catch (Exception ex) {
205 throw new MIFConnectionException(
206 "Exception when get a connection from pool: " + ex.getMessage(), ex);
207 }
208 }
209
210 public int getNumActive() {
211 return connectionPool.getNumActive();
212 }
213
214 public int getNumIdle() {
215 return connectionPool.getNumIdle();
216 }
217
218 public int getMaxActiveNum() {
219 return this.getMaximalNum();
220 }
221
222
223 public String getPoolId() {
224 return poolId;
225 }
226
227 public void setPoolId(String poolId) {
228 if(StringUtils.isEmpty(poolId)) {
229 throw new IllegalArgumentException("pool id must not be empty!");
230 }
231 this.poolId = poolId;
232 }
233
234 public void clear() {
235 connectionPool.clear();
236 }
237
238 public void afterPropertiesSet() throws MIFConnectionException {
239 CommonsConnectionPooledObjectFactory factory = new CommonsConnectionPooledObjectFactory(
240 poolId, connectonFactory, connectionPool);
241 try {
242 validateConnectionFactory(factory);
243 } catch (Exception ex) {
244 throw new MIFConnectionException(
245 "Validate connection factory failure!", ex);
246 }
247 connectionPool.setFactory(factory);
248// connectionPool.setWhenExhaustedAction(GenericObjectPool.WHEN_EXHAUSTED_BLOCK);
249 connectionPool.setTestOnBorrow(true);
250// connectionPool.setTestOnReturn(GenericObjectPool.DEFAULT_TEST_ON_RETURN);
251 connectionPool.setNumTestsPerEvictionRun(maxIdle - minIdle);
252// connectionPool
253// .setSoftMinEvictableIdleTimeMillis(GenericObjectPool.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS);
254// connectionPool.setTestWhileIdle(GenericObjectPool.DEFAULT_TEST_WHILE_IDLE);
255// connectionPool.setLifo(GenericObjectPool.DEFAULT_LIFO);
256 //initialize the pool
257 try {
258 for (int i = 0; i < maxIdle; i++) {
259 connectionPool.addObject();
260 }
261 } catch (Exception ex) {
262 throw new MIFConnectionException(
263 "Initialize connection pool failure!", ex);
264 }
265 System.out.println("Connection number: " + connectionPool.getNumIdle());
266 }
267
268}
269
270
CommonsConnectionPoolFactory:pool的工厂,创建NetConnectionPool
package com.ldd600.frm.mif.net.pool.commons.impl;
import com.ldd600.frm.mif.config.exception.MIFConnectionException;
import com.ldd600.frm.mif.context.ContextFactory;
import com.ldd600.frm.mif.net.conn.NetConnectionFactory;
import com.ldd600.frm.mif.net.pool.NetConnectionPoolFactory;
public class CommonsConnectionPoolFactory implements NetConnectionPoolFactory{
private long idleTime = CommonsNetConnectionPool.DEFAULT_IDLETIME;
private int maximalNum = CommonsNetConnectionPool.DEFAULT_MAXIMAL_CONNECTIOM_NUM;
private int minIdle = CommonsNetConnectionPool.DEFAULT_MINIMAL_IDLE_NUM;
private int maxIdle = CommonsNetConnectionPool.DEFAULT_MAXIMAL_IDLE_NUM;
private long waitTime = CommonsNetConnectionPool.DEFAULT_WAIT_TIME;
private long cleanInterval = CommonsNetConnectionPool.DEFAULT_CLEAN_INTERVAL;
private String poolId;
private NetConnectionFactory connectionFactory;
public CommonsConnectionPoolFactory(String poolId,NetConnectionFactory connectionFactory, int idleTime, int maximalNum,
int minIdle, int maxIdle, int waitTime, int cleanInterval) {
this.poolId = poolId;
this.connectionFactory = connectionFactory;
this.idleTime = idleTime;
this.maximalNum = maximalNum;
this.minIdle = minIdle;
this.maxIdle = maxIdle;
this.waitTime = waitTime;
this.cleanInterval = cleanInterval;
}
public CommonsConnectionPoolFactory() {}
public long getIdleTime() {
return idleTime;
}
public void setIdleTime(long idleTime) {
this.idleTime = idleTime;
}
public int getMaximalNum() {
return maximalNum;
}
public void setMaximalNum(int maximalNum) {
this.maximalNum = maximalNum;
}
public long getWaitTime() {
return waitTime;
}
public void setWaitTime(long waitTime) {
this.waitTime = waitTime;
}
public long getCleanInterval() {
return cleanInterval;
}
public void setCleanInterval(long cleanInterval) {
this.cleanInterval = cleanInterval;
}
public String getPoolId() {
return poolId;
}
public void setPoolId(String poolId) {
if(ContextFactory.getPoolConfigContext().containsConnectionPool(poolId)) {
throw new IllegalArgumentException("The pool id esxits, please use another one!");
}
this.poolId = poolId;
}
public int getMinIdle() {
return minIdle;
}
public void setMinIdle(int minimalNum) {
this.minIdle = minimalNum;
}
public int getMaxIdle() {
return maxIdle;
}
public void setMaxIdle(int maxIdle) {
this.maxIdle = maxIdle;
}
public CommonsNetConnectionPool createPool() throws MIFConnectionException {
return new CommonsNetConnectionPool(poolId, connectionFactory, idleTime, maximalNum, minIdle, maxIdle, waitTime, cleanInterval);
}
public void setNetConnectionFactory(NetConnectionFactory connectionFactory) {
if(null == connectionFactory) {
throw new IllegalArgumentException("Connection factory is null!");
}
this.connectionFactory = connectionFactory;
}
public NetConnectionFactory getConnectionFactory() {
return connectionFactory;
}
}
CommonsConnectionWrapper:
1package com.ldd600.frm.mif.net.pool.commons.impl;
2
3import java.io.File;
4import java.io.InputStream;
5import java.io.OutputStream;
6import java.util.ArrayList;
7import java.util.Arrays;
8import java.util.Collection;
9import java.util.Collections;
10import java.util.Iterator;
11
12import com.ldd600.frm.mif.common.FileObject;
13import com.ldd600.frm.mif.config.exception.MIFConnectionException;
14import com.ldd600.frm.mif.config.exception.MIFReceiverException;
15import com.ldd600.frm.mif.config.exception.MIFSenderException;
16import com.ldd600.frm.mif.net.conn.NetConnection;
17import com.ldd600.frm.mif.net.conn.NetRealConnection;
18import com.ldd600.frm.mif.net.pool.commons.ObjectPool;
19
20public class CommonsConnectionWrapper implements NetConnection {
21 protected NetRealConnection connection;
22 protected ObjectPool pool;
23 protected boolean isClosed;
24
25 public CommonsConnectionWrapper(NetRealConnection connection,
26 ObjectPool pool) {
27 this.connection = connection;
28 this.pool = pool;
29 }
30
31 // ************************************************************//
32
33 public boolean isConnected() {
34 return connection.isConnected();
35 }
36
37 public boolean isClosed() {
38 return this.isClosed;
39 }
40
41 public void setClosed(boolean isClosed) {
42 this.isClosed = isClosed;
43 }
44
45 // *************************************************************//
46
47 public boolean sendMessage(FileObject msgObj) throws MIFSenderException {
48 return connection.sendMessage(msgObj.getFileName(), msgObj);
49 }
50
51 public boolean sendMessage(Collection<FileObject> msgObjs)
52 throws MIFSenderException {
53 boolean success = true;
54 for (FileObject msgObj : msgObjs) {
55 if (!this.sendMessage(msgObj) && success) {
56 success = false;
57 }
58 }
59 return success;
60 }
61
62 public boolean sendMessage(FileObject msgObjs) throws MIFSenderException {
63 return this.sendMessage(Arrays.asList(msgObjs));
64 }
65
66 public boolean sendMessage(String fileName, String content)
67 throws MIFSenderException {
68 return connection.sendMessage(fileName, content);
69 }
70
71 public boolean sendMessage(String fileName, InputStream ins)
72 throws MIFSenderException {
73 return connection.sendMessage(fileName, ins);
74 }
75
76 // ************************************************************//
77 public boolean receiveMessage(String fileName, byte[] bytes) throws MIFReceiverException {
78 return connection.receiveMessage(fileName, bytes);
79 }
80
81 public File receiveMessage(FileObject fileObj) throws MIFReceiverException {
82 return connection.receiveMessage(fileObj.getFileName(), fileObj);
83 }
84
85 public boolean receiveMessage(String fileName, OutputStream ous) throws MIFReceiverException {
86 return connection.receiveMessage(fileName, ous);
87 }
88
89 public File[] receiveMessage(FileObject fileObjs) throws MIFReceiverException {
90 Collection<File> fileList = receiveMessage(Arrays.asList(fileObjs));
91 File[] fileArr = new File[fileList.size()];
92 int i = 0;
93 for(Iterator<File> it = fileList.iterator(); it.hasNext(); i++) {
94 File file = it.next();
95 fileArr[i] = file;
96 }
97 return fileArr;
98 }
99
100 public Collection<File> receiveMessage(Collection<FileObject> fileObjs) throws MIFReceiverException {
101 Collection<File> fileList = new ArrayList<File> ();
102 for(FileObject fileObj : fileObjs) {
103 File file = connection.receiveMessage(fileObj.getFileName(), fileObj);
104 fileList.add(file);
105 }
106 return Collections.unmodifiableCollection(fileList);
107 }
108
109 // *************************************************************//
110
111 public void close() throws MIFConnectionException {
112 try {
113 pool.returnObject(this);
114 } catch (Exception ex) {
115 throw new MIFConnectionException(
116 "Exception occurs when close a connection, that means return the connection to the pool",
117 ex);
118 }
119 }
120
121}
122
FTPConnection:
1package com.ldd600.frm.mif.net.conn.ftp;
2
3import java.io.BufferedInputStream;
4import java.io.File;
5import java.io.FileInputStream;
6import java.io.FileOutputStream;
7import java.io.IOException;
8import java.io.InputStream;
9import java.io.OutputStream;
10
11import org.apache.commons.net.ftp.FTPClient;
12import org.apache.commons.net.ftp.FTPFile;
13import org.apache.commons.net.ftp.FTPReply;
14
15import com.ldd600.frm.mif.common.FileObject;
16import com.ldd600.frm.mif.config.exception.MIFConnectionException;
17import com.ldd600.frm.mif.config.exception.MIFReceiverException;
18import com.ldd600.frm.mif.config.exception.MIFSenderException;
19import com.ldd600.frm.mif.constant.BasicConstants;
20import com.ldd600.frm.mif.net.conn.NetRealConnection;
21import com.ldd600.frm.mif.util.StringUtils;
22
23public class FTPConnection implements NetRealConnection {
24 // ------------------------variables------------------------//
25
26 public static final int DEFAULT_FTP_PORT = 21;
27 public static final String DEFAULT_UPLOAD_PATH = "/";
28 public static final String DEFAULT_USERNAME = "";
29 public static final String DEFAULT_PASSWORD = "";
30 private FTPClient ftpClient = new FTPClient();
31 private String hostName;
32 private int port = DEFAULT_FTP_PORT;
33 private String userName = DEFAULT_USERNAME;
34 private String passWord = DEFAULT_PASSWORD;
35 private String pathName = DEFAULT_UPLOAD_PATH;
36
37 // ------------------------------------------------------------//
38
39 // ---------------------------Constructors---------------------//
40 public FTPConnection() {
41 }
42
43 public FTPConnection(String hostName) {
44 if (StringUtils.isEmpty(hostName)) {
45 throw new IllegalArgumentException("Host name must not empty!");
46 } else {
47 this.hostName = hostName;
48 }
49 }
50
51 public FTPConnection(String hostName, int port) {
52 this(hostName);
53 this.port = port;
54 }
55
56 public FTPConnection(String hostName, int port, String userName,
57 String passWord) {
58 this(hostName, port);
59 this.userName = userName;
60 this.passWord = passWord;
61 }
62
63 public FTPConnection(String hostName, int port, String userName,
64 String passWord, String pathName) {
65 this(hostName, port, userName, passWord);
66 this.pathName = pathName;
67 }
68
69 // -----------------------------------------------------------------------//
70
71 // ------------------------------getters and setters--------------------//
72 public String getHostName() {
73 return hostName;
74 }
75
76 public void setHostName(String hostName) {
77 this.hostName = hostName;
78 }
79
80 public int getPort() {
81 return port;
82 }
83
84 public void setPort(int port) {
85 this.port = port;
86 }
87
88 public String getUserName() {
89 return userName;
90 }
91
92 public void setUserName(String userName) {
93 this.userName = userName;
94 }
95
96 public String getPassWord() {
97 return passWord;
98 }
99
100 public void setPassWord(String passWord) {
101 this.passWord = passWord;
102 }
103
104 public FTPClient getFtpClient() {
105 return ftpClient;
106 }
107
108 public String getPathName() {
109 return pathName;
110 }
111
112 public void setPathName(String pathName) {
113 this.pathName = pathName;
114 }
115
116 // ------------------------------------------------------------------------//
117
118 public void connect() throws MIFConnectionException {
119 if (StringUtils.isEmpty(hostName)) {
120 throw new IllegalStateException(
121 "Host name must be initialized before connect!");
122 }
123 try {
124 ftpClient.connect(hostName, port);
125 System.out.println(ftpClient.getReplyCode());
126 if (!login(userName, passWord)) {
127 ftpClient.disconnect();
128 throw new MIFConnectionException(
129 "username or password doesn't match!");
130 }
131 System.out.println(ftpClient.getReplyCode());
132 if (!this.changeToDirectory(pathName)) {
133 disconnect();
134 throw new MIFConnectionException(
135 "path doesn't esxits or have no permission to access, please change one!");
136 }
137
138 // Use passive mode as default because most of us are
139 // behind firewalls these days.
140 ftpClient.enterLocalPassiveMode();
141 } catch (IOException ioex) {
142 throw new MIFConnectionException("Connect to server: " + hostName
143 + " port: " + port + "with username: " + userName
144 + " password: " + passWord
145 + " encounter IO or Socket error!", ioex);
146 }
147
148 }
149
150 public void disconnect() throws MIFConnectionException {
151 try {
152 ftpClient.logout();
153 } catch (IOException ioex) {
154 // swallow the exception
155 } finally {
156 if (ftpClient.isConnected()) {
157 try {
158 ftpClient.disconnect();
159 } catch (IOException e) {
160 throw new MIFConnectionException("Disconnect to server "
161 + hostName + "ecounter io error!", e);
162 }
163 }
164 }
165 }
166
167 public boolean isConnected() {
168 if (!ftpClient.isConnected()) {
169 return false;
170 }
171 try {
172 int code = ftpClient.stat();
173 if (FTPReply.isNegativePermanent(code)) {
174 return false;
175 }
176 } catch (IOException ioex) {
177 return false;
178 }
179 return true;
180 }
181
182 public boolean sendMessage(String fileName, FileObject fileObj) throws MIFSenderException {
183 boolean success = true;
184 String localFolder = fileObj.getFolder();
185 String localFileName = fileObj.getFileName();
186 if (StringUtils.isEmpty(localFolder) || StringUtils.isEmpty(localFileName)) {
187 throw new IllegalArgumentException(
188 "Local folder or file name is an empty string!");
189 }
190 StringBuilder sb = new StringBuilder(localFolder);
191 sb.append(BasicConstants.FILE_SEPARATOR).append(localFileName);
192 File file = new File(sb.toString());
193 if (null == file) {
194 throw new IllegalArgumentException(
195 "Local folder or file name doesn't exsits!");
196 }
197 InputStream ins = null;
198 try {
199 ins = new FileInputStream(file);
200 if (!ftpClient.storeFile(fileName, ins)) {
201 //TODO some log
202 success = false;
203 }
204 // if (FTPReply.isNegativePermanent(ftpClient.getReplyCode())) {
205 // disconnect();
206 // }
207 return success;
208 } catch (IOException ioex) {
209 throw new MIFSenderException("Send file: " + fileObj.getFileName()
210 + " failure!", ioex);
211 }
212 // catch (MIFConnectionException mcex) {
213 // throw new MIFSenderException("Send file: " + msgObj.getFileName()
214 // + " failure!", mcex);
215 // }
216 finally {
217
218 // ftpClient.completePendingCommand();
219 if (null != ins) {
220 try {
221 ins.close();
222 } catch (IOException e) {
223 throw new MIFSenderException("Close Stream failure!", e);
224 }
225 }
226 }
227 }
228
229 public boolean sendMessage(String fileName, String content)
230 throws MIFSenderException {
231 OutputStream os = null;
232 try {
233 System.out.println(ftpClient.getReplyCode());
234 OutputStream retOs = ftpClient.storeFileStream(fileName);
235 os = retOs;
236 os.write(content.getBytes());
237 // os.flush();
238
239 } catch (IOException ioex) {
240 throw new MIFSenderException(
241 "Send file: " + fileName + " failure!", ioex);
242 } finally {
243 if (null != os) {
244 try {
245 os.close();
246
247 } catch (IOException e) {
248 throw new MIFSenderException("Close Stream failure!", e);
249 }
250 // catch (MIFConnectionException mcex) {
251 // throw new MIFSenderException("Send file: " + fileName
252 // + " failure!", mcex);
253 // }
254 }
255 }
256
257 try {
258 if(ftpClient.completePendingCommand()){
259 return true;
260 }else{
261 //TODO some log
262 return false;
263 }
264 } catch (IOException ioex) {
265 throw new MIFSenderException("Complete pending command failure!",
266 ioex);
267 }
268 }
269
270 public boolean sendMessage(String fileName, InputStream ins)
271 throws MIFSenderException {
272 boolean success = true;
273 try {
274 System.out.println(ftpClient.getReplyCode());
275 if (!ftpClient.storeFile(fileName, ins)) {
276 //TODO do some log;
277 System.out.println("send message failed!");
278 success = false;
279 }
280 // if (FTPReply.isNegativePermanent(ftpClient.getReplyCode())) {
281 // disconnect();
282 // }
283 return success;
284 } catch (IOException ioex) {
285 throw new MIFSenderException(
286 "Send file: " + fileName + " failure!", ioex);
287 }
288 // catch (MIFConnectionException mcex) {
289 // throw new MIFSenderException(
290 // "Send file: " + fileName + " failure!", mcex);
291 // }
292 finally {
293 try {
294 ins.close();
295 } catch (IOException e) {
296 throw new MIFSenderException("Close Stream failure!", e);
297 }
298 }
299 }
300
301 public boolean receiveMessage(String fileName, byte[] bytes)
302 throws MIFReceiverException {
303 BufferedInputStream bins = null;
304 try {
305 InputStream ins = ftpClient.retrieveFileStream(fileName);
306 if(ins != null) {
307 bins = new BufferedInputStream(ins);
308 ins.read(bytes);
309 }
310 } catch (IOException ioex) {
311 throw new MIFReceiverException("When receive file: " + fileName
312 + " ecounter an IO error!", ioex);
313 } finally {
314 if (bins != null) {
315 try {
316 bins.close();
317 } catch (IOException e) {
318 throw new MIFReceiverException(
319 "close inputestream failure!", e);
320 }
321 }
322 }
323
324 try {
325 if(ftpClient.completePendingCommand()){
326 return true;
327 }else{
328 return false;
329 }
330 } catch (IOException ioex) {
331 throw new MIFReceiverException("Complete pending command failure!",
332 ioex);
333 }
334 }
335
336 public File receiveMessage(String fileName, FileObject fileObj)
337 throws MIFReceiverException {
338 String localFolder = fileObj.getFolder();
339 String localFileName = fileObj.getFileName();
340 if (StringUtils.isEmpty(localFolder) || StringUtils.isEmpty(fileName)) {
341 throw new IllegalArgumentException(
342 "Local folder or file name is an empty string!");
343 }
344 StringBuilder sb = new StringBuilder(localFolder);
345 sb.append(BasicConstants.FILE_SEPARATOR).append(localFileName);
346 File file = new File(sb.toString());
347 if (null == file) {
348 throw new IllegalArgumentException(
349 "Local folder or file name doesn't exsits!");
350 }
351 OutputStream ous = null;
352 try {
353 ous = new FileOutputStream(file);
354 if (!ftpClient.retrieveFile(fileName, ous)) {
355 //TODO some log
356 file.delete();
357 return null;
358 }
359 return file;
360 } catch (IOException ioex) {
361 if (file.exists()) {
362 file.delete();
363 }
364
365 throw new MIFReceiverException("When receive file: " + fileName
366 + " ecounter an IO error!", ioex);
367 } finally {
368 if (null != ous) {
369 try {
370 ous.close();
371 } catch (IOException e) {
372 throw new MIFReceiverException("When close output stream "
373 + " ecounter an IO error!", e);
374 }
375 }
376 }
377
378 }
379
380 public boolean receiveMessage(String fileName, OutputStream ous)
381 throws MIFReceiverException {
382 try {
383 return ftpClient.retrieveFile(fileName, ous);
384 } catch (IOException ioex) {
385 throw new MIFReceiverException("When receive file: " + fileName
386 + " ecounter an IO error!", ioex);
387 }
388 }
389
390 // ---------------------------------------------------------------------------//
391
392 public boolean login(String userName, String passWord) throws IOException {
393 return ftpClient.login(userName, passWord);
394 }
395
396 public boolean changeToDirectory(String pathName) throws IOException {
397 return ftpClient.changeWorkingDirectory(pathName);
398 }
399
400 public boolean fileExists(String remote) throws IOException {
401 FTPFile[] files = ftpClient.listFiles(this.pathName);
402 for (FTPFile file : files) {
403 if (remote.equalsIgnoreCase(file.getName())) {
404 return true;
405 }
406 }
407 return false;
408 }
409
410}
411
FTPConnectionFactory:
1
2package com.ldd600.frm.mif.net.conn.ftp;
3
4import com.ldd600.frm.mif.net.conn.NetConnectionFactory;
5
6public class FTPConnectionFactory implements NetConnectionFactory {
7 private String hostName;
8 private int port = FTPConnection.DEFAULT_FTP_PORT;
9 private String userName = FTPConnection.DEFAULT_USERNAME;
10 private String passWord = FTPConnection.DEFAULT_PASSWORD;
11 private String pathName = FTPConnection.DEFAULT_UPLOAD_PATH;
12
13 public FTPConnectionFactory() {}
14
15 public String getHostName() {
16 return hostName;
17 }
18
19 public void setHostName(String hostName) {
20 this.hostName = hostName;
21 }
22
23
24 public int getPort() {
25 return port;
26 }
27
28
29 public void setPort(int port) {
30 this.port = port;
31 }
32
33
34 public String getUserName() {
35 return userName;
36 }
37
38
39 public void setUserName(String userName) {
40 this.userName = userName;
41 }
42
43
44 public String getPassWord() {
45 return passWord;
46 }
47
48
49 public void setPassWord(String passWord) {
50 this.passWord = passWord;
51 }
52
53
54 public String getPathName() {
55 return pathName;
56 }
57
58
59 public void setPathName(String pathName) {
60 this.pathName = pathName;
61 }
62
63
64 public FTPConnection createConnection() {
65 return new FTPConnection(hostName, port, userName, passWord, pathName);
66 }
67
68}
69
70