9910

单飞

   :: 首页 :: 联系 :: 聚合  :: 管理
1.jhttp
2.javanetlib

在看源代码的时候,看到一个设计简单的线程池
package cs519.proxy.tpool;

import java.util.*;

/**
 * Thread Pool Manager.
 *
 * Use a fixed number of threads and process requests.
 */
public class ThreadPool
{
    /**
     * @param    nThread
     *        number of threads to create.
     */
    public ThreadPool( int nThread ) {
        for( int i=0; i<nThread; i++ )
            new WorkerThread().start();
    }
    
    /** list of tasks waiting for execution */
    private final List tasks = new LinkedList();
    
    /** list of lower-priority tasks waiting for execution */
    private final List backgroundTasks = new LinkedList();
    
    /**
     * Adds a new 'task' to the pool.
     * Assigned task will be eventually processed by one of the threads
     * in the pool.
     */
    public synchronized void addTask( Runnable task ) {
        tasks.add(task);
        notify();   // notify any thread waiting for a task
    }
    
    /**
     * Adds a new low-priority 'task' to the pool.
     * Assigned task will be eventually processed by one of the threads
     * in the pool.
     */
    public synchronized void addBackgroundTask( Runnable task ) {
        backgroundTasks.add(task);
    }
    
    /**
     * Obtains a task from the queue.
     */
    private synchronized Runnable getTask() {
        while(true) {
            if(!tasks.isEmpty())
                return (Runnable)tasks.remove(0);
            
            if(!backgroundTasks.isEmpty())
                return (Runnable)backgroundTasks.remove(0);
            
            try {
                wait(); // wait for a new task
            } catch( InterruptedException e ) {
                System.err.println("thread interrupted");
                throw new InternalError();
            }
        }
    }

    
    /** destructs the pool. */
    public void dispose() {
        suicideSignal = true;
    }
    private boolean suicideSignal = false;
    
    private class WorkerThread extends Thread {
        public void run() {
            while(!suicideSignal)
                getTask().run();
        }
    }
}

并且提供了缓存管理
package cs519.proxy.cache;

import java.io.*;
import java.util.*;
import java.text.ParseException;
import cs519.proxy.http.*;

public class CacheManagerImpl implements CacheManager
{
    /**
     * Set to non-null to see debug traces.
     */
    private PrintStream debug = System.out;

    /**
     * The number of bytes in the cache.
     * This field is accessed by the Janitor.
     */
    long usedCacheSize = 0;
    
    private Object usedCacheSizeLock = new Object();
        
    /** Atomically modify the value of usedCacheSize. */
    private void modifyUsedCacheSize( long offset ) {
        synchronized(usedCacheSizeLock) {
            usedCacheSize += offset;
        }
    }
        
    private final long cacheLimit;
    protected final TreeMap map=new TreeMap();
    protected final TreeSet fileSet=new TreeSet();
    
    /** The directory in which all cached objects are placed. */
    private final File dir = new File("cache");
    public final File getCacheDir() { return dir; }
    
      
    private final CachePolicy policy;

    /**
     * Constructor:
     *   Load cache file information into CacheFileInfo objects.
     *   Include: filename, lastModified time, file length in byte.
     *   Add these objects to a TreeSet object, make them ordered
     *   by lastModified time.
     *   get current used cache size (in Bytes)
     */
    public CacheManagerImpl(long limit, CachePolicy _policy) throws IOException
    {
        this.policy = _policy;
        this.cacheLimit=limit;  //in bytes

        if(!dir.exists())
            dir.mkdir();
                  
        String[] files=dir.list();

        for(int i=0;i<files.length;i++)
        {
            //System.out.println(files[i].getName());
            File f=new File(dir,files[i]);
            CacheFileInfo cfi = new CacheFileInfo(f);
            fileSet.add(cfi);
            
            map.put( files[i], cfi );
            
            modifyUsedCacheSize(cfi.fsize);
        }
        
        // launch the janitor
        new Thread(new CacheJanitor(this)).start();
    }

  /**
   * Put HttpResponse object into proxy cache.
   * Write HttpResponse to a cache file if it is "toBeCached":
   * 1. use request URL as a base of filename
   * 2. the structure of the cache file contains two parts
   *   a. Request URL (String)
   *   b. HttpResponse object
   */
    public void put( HttpRequest request, HttpResponse response ) {

        long reservedSize;
        
        try {
            
//            if(debug!=null)
//                debug.println("trying to store:"+request.getPath());

            if(policy.toBeCached(request,response))
            {
                
                reservedSize = response.getBodySize()+2000;
                
                // TODO: check if this object fits into the memory
//                if(...)
//                    return;
                
                
                // allocate the space before we actually use it
                modifyUsedCacheSize(reservedSize);
                
                // allocate physical space if it's necessary
                if(!compactCache()) {
            
                    if(debug!=null)
                        debug.println("cache compacting failure:"+request.getPath());
                    
                    // we can't store this object.
                    // cancel this reservation
                    modifyUsedCacheSize(-reservedSize);
                    return;
                }
                
//                if(debug!=null)
//                    debug.println("storing "+request.getPath());

                File f = getCacheFileName(request);
                ObjectOutputStream fileOut=new ObjectOutputStream(
                        new FileOutputStream(f));
                fileOut.writeObject(request.getPath());
                try {
                    fileOut.writeObject(response.getHeaderAsDate("Expires"));
                } catch( java.text.ParseException e ) {
                    fileOut.writeObject(null);  // write a dummy
                }
                fileOut.writeObject(response);
                fileOut.close();
                
                long actualSize = f.length();
                
                synchronized(fileSet) { // use one lock for both objects
                    CacheFileInfo cfi = new CacheFileInfo(f);
                    fileSet.add( cfi );
                    map.put( cfi.fname, cfi );
                }
                
                modifyUsedCacheSize(actualSize-reservedSize);

                if(debug!=null)
                    debug.println("stored  :"+request.getPath());
            }
        } catch( IOException e ) {
            // TODO: possibly return the reservedSize.
            reservedSize=-0;
            modifyUsedCacheSize(reservedSize);
            e.printStackTrace();
            // TODO: remove any corrupted file

        }
    }

    /**
     * Get requested object from proxy cache.
     * if (URL in file)==(request URL), and object not expired, then
     * return object to callee. else return null
     */
    public HttpResponse get( HttpRequest request )
    {
        try {
            File f = getCacheFileName(request);
    
            if(f.exists()) {
                ObjectInputStream oi=new ObjectInputStream(
                                          new FileInputStream(f));
                String fURL=(String)oi.readObject();
                
                if(fURL.equals(request.getPath())) {
                    Date now = new Date();
                    
                    // we won't use it, but we need to read this object
                    // to get to the body
                    Date expires = (Date)oi.readObject();
                
                    // parse the body
                    HttpResponse resp=(HttpResponse)oi.readObject();
                    oi.close();
                    
                    if(debug!=null)
                        debug.println("hit     :"+request.getPath());
                    
                    //check if the object expired
                    try {
                        Date d = resp.getHeaderAsDate("Expires");

                        if(d==null || d.after(now)) {
                            // not expired. use this object.
                            
                            // touch this file for LRU purpose, and
                            // modify fileSet and map content
                            Util.setLastModified(f,now.getTime());
                    
                            // maintain the control data structure
                            synchronized(fileSet) {
                                //remove this object first
                                fileSet.remove(map.get(f.getName()));
                                map.remove(f.getName());
                                
                                //add this object with current attributes
                                CacheFileInfo new_cfi=new CacheFileInfo(f);
                                fileSet.add(new_cfi);
                                map.put(f.getName(),new_cfi);
                            }
                            
                            return resp;
                        }
                    } catch( ParseException e ) { ; }
                    
                    // we'll return null, so the caller will go ahead
                    // and fetch a new one, then store that new object
                    // into the cache.
                    // so we don't need to remove the old item now.
                }
                oi.close();
            }
        } catch( IOException e ) {
            e.printStackTrace();
        } catch( ClassNotFoundException e ) {
            e.printStackTrace();
        }
        if(debug!=null)
            debug.println("miss    :"+request.getPath());
        return null;
    }
    
    public boolean contains( HttpRequest request ) {
        try {
            File f = getCacheFileName(request);
    
            if(!f.exists()) return false;
            
            ObjectInputStream oi=new ObjectInputStream(
                                      new FileInputStream(f));
            String fURL=(String)oi.readObject();
            boolean r = fURL.equals(request.getPath());
            oi.close();
            return r;
        } catch( Exception e ) {
            return false;
        }
    }
    
 
    private File getCacheFileName( HttpRequest msg ) {
        return new File( dir, Integer.toHexString( msg.getPath().hashCode() ) );
    }
 

    /**
     * Compacts the cache so that the total size fell below the limit.
     *
     * @return
     *      true if the operation is successful and our used size is
     *      now lower then the limit.
     */
    public boolean compactCache() {
        synchronized(fileSet) {
            /**
            * Remove LRU cache file until get enough free space
            * LRU strategy
            */
            CacheFileInfo cFile;

            while(cacheLimit<usedCacheSize && !fileSet.isEmpty()) {
                cFile=(CacheFileInfo)fileSet.first();   //the LRU cache file
                
                removeCacheItem(cFile);
            }
            
            return cacheLimit>=usedCacheSize;
        }
    }

    /**
     * Deletes the object represented by a given CacheFileInfo.
     *
     * To remove something from the cache, you need to call this method
     * so that the cache manager can maintain the proper data structure.
     */
    protected void removeCacheItem( CacheFileInfo cfi ) {
        
        synchronized(fileSet) {
            fileSet.remove(cfi);
            map.remove(cfi.fname);
        }
        
        File tmp = new File(dir,"_"+cfi.fname);
        new File(dir,cfi.fname).renameTo(tmp);
        
        if(debug!=null) {
            try {
                // open the file just to print the name of URL
                ObjectInputStream ois = new ObjectInputStream(new FileInputStream(tmp));
                debug.println("delete  :"+ois.readObject());
                ois.close();
            } catch( Exception e ) {
                // it's OK if we fail to print the debug message.
                // so just ignore the error
            }
        }
        
        tmp.delete();
                        
        modifyUsedCacheSize(-cfi.fsize);
    }
}



posted on 2009-11-25 14:09 单飞 阅读(943) 评论(0)  编辑  收藏 所属分类: java

只有注册用户登录后才能发表评论。


网站导航: