Dict.CN 在线词典, 英语学习, 在线翻译

都市淘沙者

荔枝FM Everyone can be host

统计

留言簿(23)

积分与排名

优秀学习网站

友情连接

阅读排行榜

评论排行榜

java线程控制器实现(转)

java线程控制器代码分享-根据cpu情况决定线程运行数量和情况

原文地址:请点击标题即可。


在人人网海量存储系统的存储引擎部分,为了提高CPU和网络的使用情况,使用了java多线程管理并行操作的方式。

在java中控制线程是一件很简单的事情,jdk提供了诸多的方法,其中比常用的两个是notify()和wait(),一个是唤醒,一个等待线程,在下面的代码中,将看到一个线程分配器,根据cpu的负载情况,自动完成对应线程的唤醒或者是等待操作。整个过程是一个平滑的过程,不会因为线程的切换而导致机器负载出线锯齿。

先看一个类,读取Linux系统TOP等指令拿到系统当前负载:

    
import java.io.BufferedReader;
    
import java.io.InputStreamReader;

    
/**
    * 节点的cpu 内存 磁盘空间 情况
    *
    * 
@author zhen.chen
    *
    
*/
    
public class NodeLoadView {

    
/**
    * 获取cpu使用情况
    *
    * 
@return
    * 
@throws Exception
    
*/
    
public double getCpuUsage() throws Exception {
    
double cpuUsed = 0;

    Runtime rt 
= Runtime.getRuntime();
    Process p 
= rt.exec(“/usr/bin/uptime”);// 调用系统的“top”命令
    String[] strArray = null;
    BufferedReader in 
= null;
    
try {
    in 
= new BufferedReader(new InputStreamReader(p.getInputStream()));
    String str 
= null;
    
while ((str = in.readLine()) != null) {
    strArray 
= str.split(“load average: “);
    strArray 
= strArray[1].split(“,”);
    cpuUsed 
= Double.parseDouble(strArray[0]);
    }
    } 
catch (Exception e) {
    e.printStackTrace();
    } 
finally {
    in.close();
    }
    
return cpuUsed;
    }

    
/**
    * 内存监控
    *
    * 
@return
    * 
@throws Exception
    
*/
    
public double getMemUsage() throws Exception {

    
double menUsed = 0;
    Runtime rt 
= Runtime.getRuntime();
    Process p 
= rt.exec(“top --1″);// 调用系统的“top”命令

    BufferedReader in 
= null;
    
try {
    in 
= new BufferedReader(new InputStreamReader(p.getInputStream()));
    String str 
= null;
    String[] strArray 
= null;

    
while ((str = in.readLine()) != null) {
    
int m = 0;

    
if (str.indexOf(” R “) != -1) {// 只分析正在运行的进程,top进程本身除外 &&
    
//
    
// System.out.println(“——————3—————–”);
    strArray = str.split(” “);
    
for (String tmp : strArray) {
    
if (tmp.trim().length() == 0)
    
continue;

    
if (++== 10) {
    
// 9)–第10列为mem的使用百分比(RedHat 9)

    menUsed 
+= Double.parseDouble(tmp);

    }
    }

    }
    }
    } 
catch (Exception e) {
    e.printStackTrace();
    } 
finally {
    in.close();
    }
    
return menUsed;
    }

    
/**
    * 获取磁盘空间大小
    *
    * 
@return
    * 
@throws Exception
    
*/
    
public double getDeskUsage() throws Exception {
    
double totalHD = 0;
    
double usedHD = 0;
    Runtime rt 
= Runtime.getRuntime();
    Process p 
= rt.exec(“df -hl”);// df -hl 查看硬盘空间

    BufferedReader in 
= null;
    
try {
    in 
= new BufferedReader(new InputStreamReader(p.getInputStream()));
    String str 
= null;
    String[] strArray 
= null;
    
while ((str = in.readLine()) != null) {
    
int m = 0;
    
// if (flag > 0) {
    
// flag++;
    strArray = str.split(” “);
    
for (String tmp : strArray) {
    
if (tmp.trim().length() == 0)
    
continue;
    
++m;
    
// System.out.println(“—-tmp—-” + tmp);
    if (tmp.indexOf(“G”) != -1) {
    
if (m == 2) {
    
// System.out.println(“—G—-” + tmp);
    if (!tmp.equals(“”) && !tmp.equals(“0″))
    totalHD 
+= Double.parseDouble(tmp.substring(0,
    tmp.length() – 
1)) * 1024;

    }
    
if (m == 3) {
    
// System.out.println(“—G—-” + tmp);
    if (!tmp.equals(“none”) && !tmp.equals(“0″))
    usedHD 
+= Double.parseDouble(tmp.substring(0,
    tmp.length() – 
1)) * 1024;

    }
    }
    
if (tmp.indexOf(“M”) != -1) {
    
if (m == 2) {
    
// System.out.println(“—M—” + tmp);
    if (!tmp.equals(“”) && !tmp.equals(“0″))
    totalHD 
+= Double.parseDouble(tmp.substring(0,
    tmp.length() – 
1));

    }
    
if (m == 3) {
    
// System.out.println(“—M—” + tmp);
    if (!tmp.equals(“none”) && !tmp.equals(“0″))
    usedHD 
+= Double.parseDouble(tmp.substring(0,
    tmp.length() – 
1));
    
// System.out.println(“—-3—-” + usedHD);
    }
    }

    }

    
// }
    }
    } 
catch (Exception e) {
    e.printStackTrace();
    } 
finally {
    in.close();
    }
    
return (usedHD / totalHD) * 100;
    }
    
//
    
//    public static void main(String[] args) throws Exception {
    
//        NodeLoadView cpu = new NodeLoadView();
    
//        System.out
    
//                .println(“—————cpu used:” + cpu.getCpuUsage() + “%”);
    
//        System.out
    
//                .println(“—————mem used:” + cpu.getMemUsage() + “%”);
    
//        System.out
    
//                .println(“—————HD used:” + cpu.getDeskUsage() + “%”);
    
//        System.out.println(“————jvm监控———————-”);
    
//        Runtime lRuntime = Runtime.getRuntime();
    
//        System.out.println(“————–Free Momery:” + lRuntime.freeMemory()
    
//                + “K”);
    
//        System.out.println(“————–Max Momery:” + lRuntime.maxMemory()
    
//                + “K”);
    
//        System.out.println(“————–Total Momery:”
    
//                + lRuntime.totalMemory() + “K”);
    
//        System.out.println(“—————Available Processors :”
    
//                + lRuntime.availableProcessors());
    
//    }
    }

再来看关键的一个类,THreadScheduler:

    
import java.util.Map;

    
import org.apache.log4j.Logger;

    
import test.NodeLoadView;
    
public class ThreadScheduler {
    
private static Logger logger = Logger.getLogger(ThreadScheduler.class.getName());
    
private Map<String, Thread> runningThreadMap;
    
private Map<String, Thread> waitingThreadMap;
    
private boolean isFinished = false;
    
private int runningSize;

    
public ThreadScheduler (Map<String, Thread> runningThreadMap, Map<String, Thread> waitingThreadMap) {
    
this.runningThreadMap = runningThreadMap;
    
this.waitingThreadMap = waitingThreadMap;
    
this.runningSize = waitingThreadMap.size();
    }

    
/**
    * 开始调度线程
    * 
@author zhen.chen
    * @createTime 2010-1-28 上午11:04:52
    
*/
    
public void schedule(){
    
long sleepMilliSecond = 1 * 1000;
    
int allowRunThreads = 15;
    
// 一次启动的线程数,cpuLoad变大时以此值为参考递减
    int allowRunThreadsRef = 15;
    
double cpuLoad = 0;// 0-15
    NodeLoadView load = new NodeLoadView();

    
while (true) {
    
try {
    cpuLoad 
= load.getCpuUsage();
    } 
catch (Exception e1) {
    e1.printStackTrace();
    }
    
// cpuLoad低 启动的线程多
    allowRunThreads = (int) Math.floor(allowRunThreadsRef – cpuLoad);
    
// threads不能为0
    if (allowRunThreads < 1) {
    allowRunThreads 
= 1;
    }
    
if (allowRunThreads > allowRunThreadsRef) {
    allowRunThreads 
= allowRunThreadsRef;
    }
    
if (logger.isDebugEnabled()) {
    logger.debug(“[ThreadScheduler]running Thread:” 
+ runningThreadMap.size() + “; waiting Thread:” + waitingThreadMap.size() + “; cpu:” + cpuLoad + ” allowRunThreads:” + allowRunThreads);
    }

    
// 检查runningSize个线程的情况,满足条件则启动
    for (int x = 0; x < runningSize; x++) {
    
if (waitingThreadMap.get(x+") != null) {
    if (allowRunThreadsRef <= runningThreadMap.size()) {
    
break;
    }
    
synchronized (waitingThreadMap.get(x+")) {
    if (!waitingThreadMap.get(x+").isAlive()) {
    waitingThreadMap.get(x+").start();
    }else{
    waitingThreadMap.get(x
+").notify();
    }
    }
    runningThreadMap.put(x
+", waitingThreadMap.get(x+”"));
    waitingThreadMap.remove(x
+");
    }
    }
    
// 检查runningSize个线程的情况,满足条件则暂停
    for (int x = 0; x < runningSize; x++) {
    
if (runningThreadMap.size() <= allowRunThreads) {
    
break;
    }
    
if (runningThreadMap.get(x+") != null) {
    synchronized (runningThreadMap.get(x+")) {
    try {
    
if (runningThreadMap.get(x+").isAlive()) {
    runningThreadMap.get(x+").wait();
    }else{
    
continue;
    }
    } 
catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    waitingThreadMap.put(x
+", runningThreadMap.get(x));
    runningThreadMap.remove(x+");
    }
    }
    
// 全部跑完,返回
    if (waitingThreadMap.size() == 0 && runningThreadMap.size() == 0) {
    
if (logger.isDebugEnabled()) {
    logger.debug(“[ThreadScheduler] over.total Threads size:” 
+ runningSize);
    }
    
this.isFinished = true;
    
return;
    }
    
// 使主while循环慢一点
    try {
    Thread.sleep(sleepMilliSecond);
    } 
catch (InterruptedException e1) {
    e1.printStackTrace();
    }
    }

    }

    
public boolean isFinished() {
    
return isFinished;
    }
    }

这个类的作用:

1.接收runningThreadMap和waitingThreadMap两个map,里面对应存了运行中的线程实例和等待中的线程实例。

2.读cpu情况,自动判断要notify等待中的线程还是wait运行中的线程。

3.两个map都结束,退出。(必须runningThreadMap内部的Thread自己将runningThreadMap对应的Thread remove掉)

如何使用:

    
public class TestThread {
    
public static class Runner extends Thread {
    
public Runner(int j, Map<String, Thread> threadMap) {

    }
    
public void run() {
    
// TODO 你的逻辑 完成后需要从threadMap中remove掉
    }
    }

    
public static void main(String[] args) {
    
// 运行中的线程
    Map<String, Thread> threadMap = new HashMap<String, Thread>();
    
// 正在等待中的线程
    Map<String, Thread> waitThreadMap = new HashMap<String, Thread>();
    
for (int j = 0; j < args.length; j++) {
    Thread t 
= new Runner(j, threadMap);
    waitThreadMap.put(j 
+ “”, t);
    }

    ThreadScheduler threadScheduler 
= new ThreadScheduler(threadMap, waitThreadMap);
    threadScheduler.schedule();
    
if (threadScheduler.isFinished() == false) {
    
//没能正常结束
    }
    }
    }


posted on 2010-08-18 07:59 都市淘沙者 阅读(1149) 评论(0)  编辑  收藏 所属分类: 多线程并发编程


只有注册用户登录后才能发表评论。


网站导航: