庄周梦蝶

生活、程序、未来
   :: 首页 ::  ::  :: 聚合  :: 管理

线程池池

Posted on 2008-09-01 19:38 dennis 阅读(2394) 评论(2)  编辑  收藏 所属分类: java
    这个题目比较怪,听俺道来。俺一直在负责公司游戏服务器的开发和维护,日积月累下来终于将原本混乱的代码和结构重构的比较清晰了,在此过程中的体会就是,重构啊,不仅仅是技术活,更多是要克服不情愿的、得过且过的心理去做,去做了才发现麻烦并没有想象中的大。
    改造过程中遇到这么个问题,我想将对某个创建的游戏的操作都固定在一个线程执行,与其他游戏可以并发地处理;或者说依据游戏id派发到某个固定的线程处理,对此游戏的操作都是串行化。不是俺不想彻底并行化,但是要将现有的代码改造成适应并行化相当困难,俺尝试的结果是问题百出,因此就想了这么个折中策略,不同游戏之间的操作可以并行,单个游戏内操作串行。怎么派发呢?很简单的机制,根据id%size结果来处理就好,size就是你准备开的线程数。因此可以很容易地模拟一个生产者消费者模型的线程池,根据游戏id%size的结果将任务塞到队列中,让生产者线程顺序处理。已经有部分代码是这样处理的,不过是自己实现的模型(BlockingQueue),比较不适合俺想要的任务式的处理过程,灵机一动,jdk5引入的线程池不是有个单线程的版本吗?俺将这个线程池再做个池不就OK了?说起来不好理解,看代码:
public interface Task extends Runnable {
    
public int getCode();
}
    嗯,定义一个Task接口,继承Runnable,多了个getCode方法用于决定派发任务到哪个ExecutorService执行。线程池池登场:
public class SingleThreadPoolPool {
    
private Map<Integer, ExecutorService> threadPoolMap = new HashMap<Integer, ExecutorService>();

    
private int size;

    
public SingleThreadPoolPool(int size) {
        
this.size = size;
        
for (int i = 0; i < size; i++) {
            ExecutorService executor 
= Executors.newSingleThreadExecutor();
            threadPoolMap.put(i, executor);
        }
    }

    
public void execute(Task task) {
        
if (task == null)
            
return;
        threadPoolMap.get(getIndex(task.getCode())).execute(task);
    }

    
public void execute(int code, Runnable r) {
        
if (r == null)
            
return;
        threadPoolMap.get(getIndex(code)).execute(r);
    }

    
private int getIndex(int code) {
        
int index = -1;
        
if (code < 0)
            index 
= 0;
        
else
            index 
= code % this.size;
        
return index;
    }

    
public void shutdown() {
        
for (int i = 0; i < size; i++) {
            threadPoolMap.get(i).shutdown();
        }
        threadPoolMap.clear();
    }

    
public int size() {
        
return this.size;
    }
}

    哇靠,这也太简单了,这就能保证code相同的任务会被排队顺序执行。是啊,很简单,不是啥高科技,但简单明了地实现了俺的需求。需要注意的是,只有通过Executor的execute方法提交的任务才会被排到队列中哦。
    补充一个线程安全测试:
import java.util.concurrent.CountDownLatch;
import com.xlands.game.lobby.util.SingleThreadPoolPool;

import junit.framework.TestCase;

class Counter {
    
int i;

    
public void incr() {
        i
++;
    }
}

class IncrTask implements Runnable {
    Counter counter;
    CountDownLatch latch;

    
public IncrTask(Counter counter, CountDownLatch latch) {
        
this.counter = counter;
        
this.latch = latch;
    }

    
public void run() {
        
try {
            counter.incr();

        } 
finally {
            latch.countDown();
        }
    }
}

public class SingleThreadPoolPoolTest extends TestCase {
    
static final int NUM = 10000;
    SingleThreadPoolPool singleThreadPoolPool;

    @Override
    
protected void setUp() throws Exception {
        singleThreadPoolPool 
= new SingleThreadPoolPool(2);
        
super.setUp();
    }

    @Override
    
protected void tearDown() throws Exception {
        singleThreadPoolPool.shutdown();
        
super.tearDown();
    }

    
public void testThreadSafe() throws Exception {
        Counter c1 
= new Counter();
        Counter c2 
= new Counter();
        assertEquals(singleThreadPoolPool.size(), 
2);
        CountDownLatch latch 
= new CountDownLatch(NUM * 2);
        
for (int i = 0; i < NUM; i++) {
            singleThreadPoolPool.execute(
0new IncrTask(c1, latch));
            singleThreadPoolPool.execute(
1new IncrTask(c2, latch));
        }
        latch.await();
        assertEquals(NUM, c1.i);
        assertEquals(NUM, c2.i);
    }
}



评论

# re: 线程池池  回复  更多评论   

2008-09-01 20:26 by 山风小子
旧酒装新瓶,呵呵 :)

# re: 线程池池  回复  更多评论   

2008-09-01 21:21 by YZ
还不如你写个简单的游戏例子,教教我们得了!那样来得更直观,更有成就感!

只有注册用户登录后才能发表评论。


网站导航: