Posted on 2008-09-01 19:38
dennis 阅读(2394)
评论(2) 编辑 收藏 所属分类:
java
这个题目比较怪,听俺道来。俺一直在负责公司游戏服务器的开发和维护,日积月累下来终于将原本混乱的代码和结构重构的比较清晰了,在此过程中的体会就是,重构啊,不仅仅是技术活,更多是要克服不情愿的、得过且过的心理去做,去做了才发现麻烦并没有想象中的大。
改造过程中遇到这么个问题,我想将对某个创建的游戏的操作都固定在一个线程执行,与其他游戏可以并发地处理;或者说依据游戏id派发到某个固定的线程处理,对此游戏的操作都是串行化。不是俺不想彻底并行化,但是要将现有的代码改造成适应并行化相当困难,俺尝试的结果是问题百出,因此就想了这么个折中策略,不同游戏之间的操作可以并行,单个游戏内操作串行。怎么派发呢?很简单的机制,根据id%size结果来处理就好,size就是你准备开的线程数。因此可以很容易地模拟一个生产者消费者模型的线程池,根据游戏id%size的结果将任务塞到队列中,让生产者线程顺序处理。已经有部分代码是这样处理的,不过是自己实现的模型(BlockingQueue),比较不适合俺想要的任务式的处理过程,灵机一动,jdk5引入的线程池不是有个单线程的版本吗?俺将这个线程池再做个池不就OK了?说起来不好理解,看代码:
public interface Task extends Runnable {
public int getCode();
}
嗯,定义一个Task接口,继承Runnable,多了个getCode方法用于决定派发任务到哪个ExecutorService执行。线程池池登场:
public class SingleThreadPoolPool {
private Map<Integer, ExecutorService> threadPoolMap = new HashMap<Integer, ExecutorService>();
private int size;
public SingleThreadPoolPool(int size) {
this.size = size;
for (int i = 0; i < size; i++) {
ExecutorService executor = Executors.newSingleThreadExecutor();
threadPoolMap.put(i, executor);
}
}
public void execute(Task task) {
if (task == null)
return;
threadPoolMap.get(getIndex(task.getCode())).execute(task);
}
public void execute(int code, Runnable r) {
if (r == null)
return;
threadPoolMap.get(getIndex(code)).execute(r);
}
private int getIndex(int code) {
int index = -1;
if (code < 0)
index = 0;
else
index = code % this.size;
return index;
}
public void shutdown() {
for (int i = 0; i < size; i++) {
threadPoolMap.get(i).shutdown();
}
threadPoolMap.clear();
}
public int size() {
return this.size;
}
}
哇靠,这也太简单了,这就能保证code相同的任务会被排队顺序执行。是啊,很简单,不是啥高科技,但简单明了地实现了俺的需求。需要注意的是,只有通过Executor的execute方法提交的任务才会被排到队列中哦。
补充一个线程安全测试:
import java.util.concurrent.CountDownLatch;
import com.xlands.game.lobby.util.SingleThreadPoolPool;
import junit.framework.TestCase;
class Counter {
int i;
public void incr() {
i++;
}
}
class IncrTask implements Runnable {
Counter counter;
CountDownLatch latch;
public IncrTask(Counter counter, CountDownLatch latch) {
this.counter = counter;
this.latch = latch;
}
public void run() {
try {
counter.incr();
} finally {
latch.countDown();
}
}
}
public class SingleThreadPoolPoolTest extends TestCase {
static final int NUM = 10000;
SingleThreadPoolPool singleThreadPoolPool;
@Override
protected void setUp() throws Exception {
singleThreadPoolPool = new SingleThreadPoolPool(2);
super.setUp();
}
@Override
protected void tearDown() throws Exception {
singleThreadPoolPool.shutdown();
super.tearDown();
}
public void testThreadSafe() throws Exception {
Counter c1 = new Counter();
Counter c2 = new Counter();
assertEquals(singleThreadPoolPool.size(), 2);
CountDownLatch latch = new CountDownLatch(NUM * 2);
for (int i = 0; i < NUM; i++) {
singleThreadPoolPool.execute(0, new IncrTask(c1, latch));
singleThreadPoolPool.execute(1, new IncrTask(c2, latch));
}
latch.await();
assertEquals(NUM, c1.i);
assertEquals(NUM, c2.i);
}
}