置顶随笔

[置顶]线程池代码

ThreadPoolManager

package com.threadpool.test;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.omg.CORBA.TIMEOUT;

import com.nio.test.ReadFileThread;

public class ThreadPoolManager {

    private static ThreadPoolManager tpm = new ThreadPoolManager();

    // 线程池最小线程数
    private final static int MIN_SIZE = 4;
    // 线程池最大线程数
    private final static int MAX_SIZE = 10;
    // 线程池维护线程允许的空闲限制
    private final static int KEEP_ACTIVE_TIME = 0;
    // 线程池用的缓冲队列大小
    private final static int WORK_QUEUE_SIZE = 10;
    // 消息缓冲队列
    Queue queue = new LinkedList();
    
    final Runnable accessBuffeThread = new Runnable()
    {

        public void run() {
            
            if( hasMoreAcquire() ){
                
                String msg = ( String ) queue.poll();
                Runnable task = new AccessDBThread( msg );
                threadpool.execute( task );
            }
        }
        
    };
    // 无法由 ThreadPoolExecutor 执行的任务的处理程序
    final RejectedExecutionHandler handler = new RejectedExecutionHandler()
    {

        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // TODO Auto-generated method stub
            System.out.println(((AccessDBThread )r).getMsg()+"消息放入队列中重新等待执行");
            queue.offer((( AccessDBThread ) r ).getMsg() );
        }
        
    };
    
    final ThreadPoolExecutor threadpool = new ThreadPoolExecutor(MIN_SIZE, MAX_SIZE, KEEP_ACTIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE),this.handler);
    
    // 调度线程池

    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1 );

    final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(accessBuffeThread, 0, 1, TimeUnit.SECONDS);
    
    public static ThreadPoolManager newinstance()
    {
        return tpm;
    }
    
    private ThreadPoolManager (){}
    
    
    private boolean hasMoreAcquire()
    {
        return !queue.isEmpty();
    }
    
    public void addLogMsg(String msg)
    {
     Runnable task = new AccessDBThread(msg);
     threadpool.execute(task);
    }
}
AccessDBThread
package com.threadpool.test;

public class AccessDBThread implements Runnable{

    private String msg;
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
    
    public AccessDBThread(){
        super();
    }
    
    public AccessDBThread(String msg)
    {
        this.msg=msg;
    }
    public void run() {
        // TODO Auto-generated method stub
        System.out.println("Added the message: "+msg+" into the Database");
    }

    
}

测试类:
package com.threadpool.test;


public class TestDriver {

    ThreadPoolManager tpm = ThreadPoolManager.newinstance();
    
    public void addMsg(String msg)
    {
        tpm.addLogMsg(msg);
    }
    public static void main(String[] args) {
        for(int i=0;i<100;i++)
        {
            new TestDriver().addMsg(Integer.toString(i));
        }
    }
}



posted @ 2011-09-02 15:20 crazy-李阳 阅读(412) | 评论 (0)编辑 收藏

2011年9月14日

ibatis分页<一>

package com.lxy.dialect;
public abstract interface Dialect {
public boolean supportLimit();
public String getLimitString(String sql,boolean hasoffset);
public String getLimitString(String sql,int offset,int limit);
}


package com.lxy.dialect;
public class MyDialect implements Dialect {
protected static final String SQL_END_DELIMITER = ";";
public String getLimitString(String sql, boolean hasoffset) {
return new StringBuffer(sql.length() + 20).append(trim(sql)).append(
hasoffset ? "limit ?,?" : "limit ?").append(SQL_END_DELIMITER)
.toString();
}
public String getLimitString(String sql, int offset, int limit) {
sql = trim(sql);
StringBuffer sb = new StringBuffer(sql.length() + 20);
sb.append(sql);
if (offset > 0) {
sb.append("limit").append(offset).append(',').append(limit).append(
";");
} else {
sb.append("limit").append(limit).append(";");
}
return sb.toString();
}
public boolean supportLimit() {
return true;
}
private String trim(String sql) {
sql = sql.trim();
if (sql.endsWith(";")) {
sql = sql.substring(0, sql.length() - 1 - ";".length());
}
return sql;
}
}

posted @ 2011-09-14 19:11 crazy-李阳 阅读(205) | 评论 (0)编辑 收藏

2011年9月2日

线程池代码

ThreadPoolManager

package com.threadpool.test;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.omg.CORBA.TIMEOUT;

import com.nio.test.ReadFileThread;

public class ThreadPoolManager {

    private static ThreadPoolManager tpm = new ThreadPoolManager();

    // 线程池最小线程数
    private final static int MIN_SIZE = 4;
    // 线程池最大线程数
    private final static int MAX_SIZE = 10;
    // 线程池维护线程允许的空闲限制
    private final static int KEEP_ACTIVE_TIME = 0;
    // 线程池用的缓冲队列大小
    private final static int WORK_QUEUE_SIZE = 10;
    // 消息缓冲队列
    Queue queue = new LinkedList();
    
    final Runnable accessBuffeThread = new Runnable()
    {

        public void run() {
            
            if( hasMoreAcquire() ){
                
                String msg = ( String ) queue.poll();
                Runnable task = new AccessDBThread( msg );
                threadpool.execute( task );
            }
        }
        
    };
    // 无法由 ThreadPoolExecutor 执行的任务的处理程序
    final RejectedExecutionHandler handler = new RejectedExecutionHandler()
    {

        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // TODO Auto-generated method stub
            System.out.println(((AccessDBThread )r).getMsg()+"消息放入队列中重新等待执行");
            queue.offer((( AccessDBThread ) r ).getMsg() );
        }
        
    };
    
    final ThreadPoolExecutor threadpool = new ThreadPoolExecutor(MIN_SIZE, MAX_SIZE, KEEP_ACTIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE),this.handler);
    
    // 调度线程池

    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1 );

    final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(accessBuffeThread, 0, 1, TimeUnit.SECONDS);
    
    public static ThreadPoolManager newinstance()
    {
        return tpm;
    }
    
    private ThreadPoolManager (){}
    
    
    private boolean hasMoreAcquire()
    {
        return !queue.isEmpty();
    }
    
    public void addLogMsg(String msg)
    {
     Runnable task = new AccessDBThread(msg);
     threadpool.execute(task);
    }
}
AccessDBThread
package com.threadpool.test;

public class AccessDBThread implements Runnable{

    private String msg;
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
    
    public AccessDBThread(){
        super();
    }
    
    public AccessDBThread(String msg)
    {
        this.msg=msg;
    }
    public void run() {
        // TODO Auto-generated method stub
        System.out.println("Added the message: "+msg+" into the Database");
    }

    
}

测试类:
package com.threadpool.test;


public class TestDriver {

    ThreadPoolManager tpm = ThreadPoolManager.newinstance();
    
    public void addMsg(String msg)
    {
        tpm.addLogMsg(msg);
    }
    public static void main(String[] args) {
        for(int i=0;i<100;i++)
        {
            new TestDriver().addMsg(Integer.toString(i));
        }
    }
}



posted @ 2011-09-02 15:20 crazy-李阳 阅读(412) | 评论 (0)编辑 收藏

2011年9月1日

线程池

为什么要用线程池?

诸如 Web 服务器、数据库服务器、文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务。请求以某种方式到达服务器,这种方 式可能是通过网络协议(例如 HTTP、FTP 或 POP)、通过 JMS 队列或者可能通过轮询数据库。不管请求如何到达,服务器应用程序中经常出现的情况是:单个任务处理的时间很短而请求的数目却是巨大的。

构建服务器应用程序的一个过于简单的模型应该是:每当一个请求到达就创建一个新线程,然后在新线程中为请求服务。实际上,对于原型开发这种方法工作得很 好,但如果试图部署以这种方式运行的服务器应用程序,那么这种方法的严重不足就很明显。每个请求对应一个线程(thread-per-request)方 法的不足之一是:为每个请求创建一个新线程的开销很大;为每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源要比花在处理实际的用 户请求的时间和资源更多。

除了创建和销毁线程的开销之外,活动的线程也消耗系统资源。在一个 JVM 里创建太多的线程可能会导致系统由于过度消耗内存而用完内存或“切换过度”。为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目。

线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。其好处是,因为在请求到达时 线程已经存在,所以无意中也消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使应用程序响应更快。而且,通过适当地调整线程池中的线程数目,也 就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。

 

使用线程池的风险

虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。用线程池构建的应用程序容易遭受任何其它多线程应用程序容易遭受的所有并发风险,诸如同步错误和死锁,它还容易遭受特定于线程池的少数其它风险,诸如与池有关的死锁、资源不足和线程泄漏。

死锁

任何多线程应用程序都有死锁风险。当一组进程或线程中的每一个都在等待一个只有该组中另一个进程才能引起的事件时,我们就说这组进程或线程 死锁了。死锁的最简单情形是:线程 A 持有对象 X 的独占锁,并且在等待对象 Y 的锁,而线程 B 持有对象 Y 的独占锁,却在等待对象 X 的锁。除非有某种方法来打破对锁的等待(Java 锁定不支持这种方法),否则死锁的线程将永远等下去。

虽然任何多线程程序中都有死锁的风险,但线程池却引入了另一种死锁可能,在那种情况下,所有池线程都在执行已阻塞的等待队列中另一任务的执行结果的任务, 但这一任务却因为没有未被占用的线程而不能运行。当线程池被用来实现涉及许多交互对象的模拟,被模拟的对象可以相互发送查询,这些查询接下来作为排队的任 务执行,查询对象又同步等待着响应时,会发生这种情况。

资源不足

线程池的一个优点在于:相对于其它替代调度机制(有些我们已经讨论过)而言,它们通常执行得很好。但只有恰当地调整了线程池大小时才是这样的。线程消耗包括内存和其它系统资源在内的大量资源。除了 Thread 对象所需的内存之外,每个线程都需要两个可能很大的执行调用堆栈。除此以外,JVM 可能会为每个 Java 线程创建一个本机线程,这些本机线程将消耗额外的系统资源。最后,虽然线程之间切换的调度开销很小,但如果有很多线程,环境切换也可能严重地影响程序的性能。

如果线程池太大,那么被那些线程消耗的资源可能严重地影响系统性能。在线程之间进行切换将会浪费时间,而且使用超出比您实际需要的线程可能会引起资源匮乏 问题,因为池线程正在消耗一些资源,而这些资源可能会被其它任务更有效地利用。除了线程自身所使用的资源以外,服务请求时所做的工作可能需要其它资源,例 如 JDBC 连接、套接字或文件。这些也都是有限资源,有太多的并发请求也可能引起失效,例如不能分配 JDBC 连接。

并发错误

线程池和其它排队机制依靠使用 wait()notify() 方法,这两个方法都难于使用。如果编码不正确,那么可能丢失通知,导致线程保持空闲状态,尽管队列中有工作要处理。使用这些方法时,必须格外小心;即便是专家也可能在它们上面出错。而最好使用现有的、已经知道能工作的实现,例如在下面的 无须编写您自己的池中讨论的 util.concurrent 包。

线程泄漏

各种类型的线程池中一个严重的风险是线程泄漏,当从池中除去一个线程以执行一项任务,而在任务完成后该线程却没有返回池时,会发生这种情况。发生线程泄漏的一种情形出现在任务抛出一个 RuntimeException 或一个 Error 时。如果池类没有捕捉到它们,那么线程只会退出而线程池的大小将会永久减少一个。当这种情况发生的次数足够多时,线程池最终就为空,而且系统将停止,因为没有可用的线程来处理任务。

有些任务可能会永远等待某些资源或来自用户的输入,而这些资源又不能保证变得可用,用户可能也已经回家了,诸如此类的任务会永久停止,而这些停止的任务也 会引起和线程泄漏同样的问题。如果某个线程被这样一个任务永久地消耗着,那么它实际上就被从池除去了。对于这样的任务,应该要么只给予它们自己的线程,要 么只让它们等待有限的时间。

请求过载

仅仅是请求就压垮了服务器,这种情况是可能的。在这种情形下,我们可能不想将每个到来的请求都排队到我们的工作队列,因为排在队列中等待执行的任务可能会 消耗太多的系统资源并引起资源缺乏。在这种情形下决定如何做取决于您自己;在某些情况下,您可以简单地抛弃请求,依靠更高级别的协议稍后重试请求,您也可 以用一个指出服务器暂时很忙的响应来拒绝请求。


有效使用线程池的准则

只要您遵循几条简单的准则,线程池可以成为构建服务器应用程序的极其有效的方法:

  • 不要对那些同步等待其它任务结果的任务排队。这可能会导致上面所描述的那种形式的死锁,在那种死锁中,所有线程都被一些任务所占用,这些任务依次等待排队任务的结果,而这些任务又无法执行,因为所有的线程都很忙。
  • 在为时间可能很长的操作使用合用的线程时要小心。如果程序必须等待诸如 I/O 完成这样的某个资源,那么请指定最长的等待时间,以及随后是失效还是将任务重新排队以便稍后执行。这样做保证了:通过将某个线程释放给某个可能成功完成的任务,从而将最终取得 某些进展。
  • 理解任务。要有效地调整线程池大小,您需要理解正在排队的任务以及它们正在做什么。它们是 CPU 限制的(CPU-bound)吗?它们是 I/O 限制的(I/O-bound)吗?您的答案将影响您如何调整应用程序。如果您有不同的任务类,这些类有着截然不同的特征,那么为不同任务类设置多个工作队 列可能会有意义,这样可以相应地调整每个池。

 

posted @ 2011-09-01 14:01 crazy-李阳 阅读(233) | 评论 (0)编辑 收藏

仅列出标题  
<2024年12月>
24252627282930
1234567
891011121314
15161718192021
22232425262728
2930311234

导航

统计

常用链接

留言簿

随笔档案

搜索

最新评论

阅读排行榜

评论排行榜