1.jhttp
2.javanetlib
在看源代码的时候,看到一个设计简单的线程池
package cs519.proxy.tpool;
import java.util.*;
/**
* Thread Pool Manager.
*
* Use a fixed number of threads and process requests.
*/
public class ThreadPool
{
/**
* @param nThread
* number of threads to create.
*/
public ThreadPool( int nThread ) {
for( int i=0; i<nThread; i++ )
new WorkerThread().start();
}
/** list of tasks waiting for execution */
private final List tasks = new LinkedList();
/** list of lower-priority tasks waiting for execution */
private final List backgroundTasks = new LinkedList();
/**
* Adds a new 'task' to the pool.
* Assigned task will be eventually processed by one of the threads
* in the pool.
*/
public synchronized void addTask( Runnable task ) {
tasks.add(task);
notify(); // notify any thread waiting for a task
}
/**
* Adds a new low-priority 'task' to the pool.
* Assigned task will be eventually processed by one of the threads
* in the pool.
*/
public synchronized void addBackgroundTask( Runnable task ) {
backgroundTasks.add(task);
}
/**
* Obtains a task from the queue.
*/
private synchronized Runnable getTask() {
while(true) {
if(!tasks.isEmpty())
return (Runnable)tasks.remove(0);
if(!backgroundTasks.isEmpty())
return (Runnable)backgroundTasks.remove(0);
try {
wait(); // wait for a new task
} catch( InterruptedException e ) {
System.err.println("thread interrupted");
throw new InternalError();
}
}
}
/** destructs the pool. */
public void dispose() {
suicideSignal = true;
}
private boolean suicideSignal = false;
private class WorkerThread extends Thread {
public void run() {
while(!suicideSignal)
getTask().run();
}
}
}
并且提供了缓存管理
package cs519.proxy.cache;
import java.io.*;
import java.util.*;
import java.text.ParseException;
import cs519.proxy.http.*;
public class CacheManagerImpl implements CacheManager
{
/**
* Set to non-null to see debug traces.
*/
private PrintStream debug = System.out;
/**
* The number of bytes in the cache.
* This field is accessed by the Janitor.
*/
long usedCacheSize = 0;
private Object usedCacheSizeLock = new Object();
/** Atomically modify the value of usedCacheSize. */
private void modifyUsedCacheSize( long offset ) {
synchronized(usedCacheSizeLock) {
usedCacheSize += offset;
}
}
private final long cacheLimit;
protected final TreeMap map=new TreeMap();
protected final TreeSet fileSet=new TreeSet();
/** The directory in which all cached objects are placed. */
private final File dir = new File("cache");
public final File getCacheDir() { return dir; }
private final CachePolicy policy;
/**
* Constructor:
* Load cache file information into CacheFileInfo objects.
* Include: filename, lastModified time, file length in byte.
* Add these objects to a TreeSet object, make them ordered
* by lastModified time.
* get current used cache size (in Bytes)
*/
public CacheManagerImpl(long limit, CachePolicy _policy) throws IOException
{
this.policy = _policy;
this.cacheLimit=limit; //in bytes
if(!dir.exists())
dir.mkdir();
String[] files=dir.list();
for(int i=0;i<files.length;i++)
{
//System.out.println(files[i].getName());
File f=new File(dir,files[i]);
CacheFileInfo cfi = new CacheFileInfo(f);
fileSet.add(cfi);
map.put( files[i], cfi );
modifyUsedCacheSize(cfi.fsize);
}
// launch the janitor
new Thread(new CacheJanitor(this)).start();
}
/**
* Put HttpResponse object into proxy cache.
* Write HttpResponse to a cache file if it is "toBeCached":
* 1. use request URL as a base of filename
* 2. the structure of the cache file contains two parts
* a. Request URL (String)
* b. HttpResponse object
*/
public void put( HttpRequest request, HttpResponse response ) {
long reservedSize;
try {
// if(debug!=null)
// debug.println("trying to store:"+request.getPath());
if(policy.toBeCached(request,response))
{
reservedSize = response.getBodySize()+2000;
// TODO: check if this object fits into the memory
// if(...)
// return;
// allocate the space before we actually use it
modifyUsedCacheSize(reservedSize);
// allocate physical space if it's necessary
if(!compactCache()) {
if(debug!=null)
debug.println("cache compacting failure:"+request.getPath());
// we can't store this object.
// cancel this reservation
modifyUsedCacheSize(-reservedSize);
return;
}
// if(debug!=null)
// debug.println("storing "+request.getPath());
File f = getCacheFileName(request);
ObjectOutputStream fileOut=new ObjectOutputStream(
new FileOutputStream(f));
fileOut.writeObject(request.getPath());
try {
fileOut.writeObject(response.getHeaderAsDate("Expires"));
} catch( java.text.ParseException e ) {
fileOut.writeObject(null); // write a dummy
}
fileOut.writeObject(response);
fileOut.close();
long actualSize = f.length();
synchronized(fileSet) { // use one lock for both objects
CacheFileInfo cfi = new CacheFileInfo(f);
fileSet.add( cfi );
map.put( cfi.fname, cfi );
}
modifyUsedCacheSize(actualSize-reservedSize);
if(debug!=null)
debug.println("stored :"+request.getPath());
}
} catch( IOException e ) {
// TODO: possibly return the reservedSize.
reservedSize=-0;
modifyUsedCacheSize(reservedSize);
e.printStackTrace();
// TODO: remove any corrupted file
}
}
/**
* Get requested object from proxy cache.
* if (URL in file)==(request URL), and object not expired, then
* return object to callee. else return null
*/
public HttpResponse get( HttpRequest request )
{
try {
File f = getCacheFileName(request);
if(f.exists()) {
ObjectInputStream oi=new ObjectInputStream(
new FileInputStream(f));
String fURL=(String)oi.readObject();
if(fURL.equals(request.getPath())) {
Date now = new Date();
// we won't use it, but we need to read this object
// to get to the body
Date expires = (Date)oi.readObject();
// parse the body
HttpResponse resp=(HttpResponse)oi.readObject();
oi.close();
if(debug!=null)
debug.println("hit :"+request.getPath());
//check if the object expired
try {
Date d = resp.getHeaderAsDate("Expires");
if(d==null || d.after(now)) {
// not expired. use this object.
// touch this file for LRU purpose, and
// modify fileSet and map content
Util.setLastModified(f,now.getTime());
// maintain the control data structure
synchronized(fileSet) {
//remove this object first
fileSet.remove(map.get(f.getName()));
map.remove(f.getName());
//add this object with current attributes
CacheFileInfo new_cfi=new CacheFileInfo(f);
fileSet.add(new_cfi);
map.put(f.getName(),new_cfi);
}
return resp;
}
} catch( ParseException e ) { ; }
// we'll return null, so the caller will go ahead
// and fetch a new one, then store that new object
// into the cache.
// so we don't need to remove the old item now.
}
oi.close();
}
} catch( IOException e ) {
e.printStackTrace();
} catch( ClassNotFoundException e ) {
e.printStackTrace();
}
if(debug!=null)
debug.println("miss :"+request.getPath());
return null;
}
public boolean contains( HttpRequest request ) {
try {
File f = getCacheFileName(request);
if(!f.exists()) return false;
ObjectInputStream oi=new ObjectInputStream(
new FileInputStream(f));
String fURL=(String)oi.readObject();
boolean r = fURL.equals(request.getPath());
oi.close();
return r;
} catch( Exception e ) {
return false;
}
}
private File getCacheFileName( HttpRequest msg ) {
return new File( dir, Integer.toHexString( msg.getPath().hashCode() ) );
}
/**
* Compacts the cache so that the total size fell below the limit.
*
* @return
* true if the operation is successful and our used size is
* now lower then the limit.
*/
public boolean compactCache() {
synchronized(fileSet) {
/**
* Remove LRU cache file until get enough free space
* LRU strategy
*/
CacheFileInfo cFile;
while(cacheLimit<usedCacheSize && !fileSet.isEmpty()) {
cFile=(CacheFileInfo)fileSet.first(); //the LRU cache file
removeCacheItem(cFile);
}
return cacheLimit>=usedCacheSize;
}
}
/**
* Deletes the object represented by a given CacheFileInfo.
*
* To remove something from the cache, you need to call this method
* so that the cache manager can maintain the proper data structure.
*/
protected void removeCacheItem( CacheFileInfo cfi ) {
synchronized(fileSet) {
fileSet.remove(cfi);
map.remove(cfi.fname);
}
File tmp = new File(dir,"_"+cfi.fname);
new File(dir,cfi.fname).renameTo(tmp);
if(debug!=null) {
try {
// open the file just to print the name of URL
ObjectInputStream ois = new ObjectInputStream(new FileInputStream(tmp));
debug.println("delete :"+ois.readObject());
ois.close();
} catch( Exception e ) {
// it's OK if we fail to print the debug message.
// so just ignore the error
}
}
tmp.delete();
modifyUsedCacheSize(-cfi.fsize);
}
}