环境是这样的:服务器是用Java做的, 数据库是MongoDB

 

需求是这样的:我们的系统里要生成一个唯一ID,前面的部分有一定的格式,并和时间关联, 精确到微秒,考虑到同一微秒内有可能存在并发情况, 所以后面在加两位序列号, 系统需要定义为1毫秒内的并发小于100个,所以后面两位就够用了。 Java服务器端有多台机器都可以用来生成这个唯一ID,所以需要在不同的机器上不能生成相同的序列号,所以需要在某一点上做全局的范围同步来保存这序列 号的唯一性。 其实如果不考虑需求里的唯一ID是有一定意义的格式的, 用UUID或MongoDB的ObjectId都是更好的选择,完全不需要在某一点上进行同步,性能会更好。

 

这个可以生成序列号的点, 我们可以做一个序列号生成服务器来对应, 也可以用数据库来对应。 单单为这个简单的功能准备一个服务器来做显然不合适。 但是我们用的MongoDB并没有类似于MySQL或Oracle中的SELECT FOR UPDATE这样的锁机制。 所以没有办法简单的对这个序列号做原子操作。 但是MongoDB的对单个document进行update操作中有很是具有原子性的, 例如

  • $set
  • $unset
  • $inc
  • $push
  • $pushAll
  • $pull
  • $pullAll

我们可以利用这些原子操作,在数据库层以乐观锁的形式来实现循环序列字段。为了方便调用我把这段逻辑做成数据库中的Javascript函数。 类似与MySQL中的存储过程。

 

首先我们需要一个collection来存放序列号,并对需要的需要的序列号进行初始化。我们叫它counters。

Js代码  收藏代码
  1. db.counters.save({_id:"SerialNo1", val:0, maxval:99})  

 

然后我们想system.js里添加一个Javascript函数

Js代码  收藏代码
  1. db.system.js.save({_id:"getNextUniqueSeq",  
  2. value:function (keyName) {  
  3.     var seqObj = db.counters.findOne({_id:keyName});  
  4.     if (seqObj == null) {  
  5.         print("can not find record with key: " + keyName);  
  6.         return -1;  
  7.     }  
  8.       
  9.     // the max value of sequence  
  10.     var maxVal = seqObj.maxval;  
  11.     // the current value of sequence  
  12.     var curVal = seqObj.val;  
  13.       
  14.     while(true){  
  15.         // if curVal reach max, reset it  
  16.         if(curVal >= maxVal){  
  17.             db.counters.update({_id : keyName, val : curVal}, { $set : { val : 0 }}, falsefalse);  
  18.             var err = db.getLastErrorObj();  
  19.             if( err && err.code ) {  
  20.                 print( "unexpected error reset data: " + tojson( err ) );  
  21.                 return -2;  
  22.             } else if (err.n == 0){  
  23.                 // fail to reset value, may be reseted by others  
  24.                 print("fail to reset value: ");  
  25.             }   
  26.   
  27.             // get current value again.  
  28.             seqObj = db.counters.findOne({_id:keyName});  
  29.             maxVal = seqObj.maxval;  
  30.             curVal = seqObj.val;  
  31.             continue;  
  32.         }   
  33.           
  34.         // if curVal not reach the max, inc it;  
  35.         // increase   
  36.         db.counters.update({_id : keyName, val : curVal}, { $inc : { val : 1 }}, falsefalse);  
  37.         var err = db.getLastErrorObj();  
  38.         if( err && err.code ) {  
  39.             print( "unexpected error inc val: " + tojson( err ) );  
  40.                return -3;  
  41.         } else if (err.n == 0){  
  42.             // fail to reset value, may be increased by others  
  43.             print("fail to inc value: ");  
  44.               
  45.             // get current value again.  
  46.             seqObj = db.counters.findOne({_id:keyName});  
  47.             maxVal = seqObj.maxval;  
  48.             curVal = seqObj.val;  
  49.             continue;  
  50.         } else {  
  51.             var retVal = curVal + 1;  
  52.             print("success to get seq : " + retVal);  
  53.             // increase successful  
  54.             return retVal;  
  55.         }  
  56.     }  
  57. }  
  58. });  

上面这段会把指定的序列号的val值+1,如果val达到上限则从0开始。所以叫循环序列。

 

其实上面的实现在原理上和Java里的AtomicInteger系列的功能实现是类似的,利用循环重试和原子性的CAS来实现。这种实现方式在多线程的环境里由于锁(Monitor)的范围很小,所以并发性上比排他锁要好一些。

 

下面我们用Java来测试一下这个函数的正确性。 即在多线程的情况下会不会得到重复的序列号。

 

第一个测试,val=0, maxval=2000, Java端20个线程每个线程循环调用100次。 共2000次。 所以正确的情况下,从0到1999应该每个数字只出现一次。

 

Java代码  收藏代码
  1. @Test  
  2. public void testGetNextUniqueSeq1() throws Exception {  
  3.   
  4.     final int THREAD_COUNT = 20;  
  5.     final int LOOP_COUNT = 100;  
  6.   
  7.     Mongo mongoClient = new Mongo("172.17.2.100"27017);  
  8.     DB db = mongoClient.getDB("im");  
  9.     db.authenticate("imadmin""imadmin".toCharArray());  
  10.     BasicDBObject q = new BasicDBObject();  
  11.     q.put("_id""UNIQUE_KEY");  
  12.   
  13.     BasicDBObject upd = new BasicDBObject();  
  14.     BasicDBObject set = new BasicDBObject();  
  15.     set.put("val"0);  
  16.     set.put("maxval", THREAD_COUNT * LOOP_COUNT);  
  17.     upd.put("$set", set);  
  18.   
  19.     db.getCollection("counters").update(q, upd);  
  20.   
  21.     Thread[] threads = new Thread[THREAD_COUNT];  
  22.     final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];  
  23.     for (int i = 0; i < THREAD_COUNT; i++) {  
  24.         final int temp_i = i;  
  25.         threads[i] = new Thread("" + i) {  
  26.             @Override  
  27.             public void run() {  
  28.                 try {  
  29.                     Mongo mongoClient = new Mongo("172.17.2.100"27017);  
  30.                     DB db = mongoClient.getDB("im");  
  31.                     db.authenticate("imadmin""imadmin".toCharArray());  
  32.                     for (int j = 0; j < LOOP_COUNT; j++) {  
  33.                         Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");  
  34.                         System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());  
  35.                         results[temp_i][j] = ((Double) result).intValue();  
  36.                     }  
  37.                 } catch (UnknownHostException e) {  
  38.                     e.printStackTrace();  
  39.                 }  
  40.             }  
  41.         };  
  42.     }  
  43.   
  44.     for (Thread thread : threads) {  
  45.         thread.start();  
  46.     }  
  47.   
  48.     for (Thread thread : threads) {  
  49.         thread.join();  
  50.     }  
  51.   
  52.     for (int num = 1; num <= LOOP_COUNT * THREAD_COUNT; num++) {  
  53.         // every number appear 1 times only!  
  54.         int times = 0;  
  55.         for (int j = 0; j < THREAD_COUNT; j++) {  
  56.             for (int k = 0; k < LOOP_COUNT; k++) {  
  57.                 if (results[j][k] == num)  
  58.                     times++;  
  59.             }  
  60.         }  
  61.   
  62.         assertEquals(1, times);  
  63.     }  
  64. }  

 

然后我们再测试一下循环的情况。 val=0, maxval=99。 同样是Java端20个线程每个线程循环调用100次。 共2000次。这次从0到99的数字每个应该取得20次。

 

Java代码  收藏代码
  1. @Test  
  2. public void testGetNextUniqueSeq2() throws Exception {  
  3.   
  4.     final int THREAD_COUNT = 20;  
  5.     final int LOOP_COUNT = 100;  
  6.   
  7.     Mongo mongoClient = new Mongo("172.17.2.100"27017);  
  8.     DB db = mongoClient.getDB("im");  
  9.     db.authenticate("imadmin""imadmin".toCharArray());  
  10.     BasicDBObject q = new BasicDBObject();  
  11.     q.put("_id""UNIQUE_KEY");  
  12.   
  13.     BasicDBObject upd = new BasicDBObject();  
  14.     BasicDBObject set = new BasicDBObject();  
  15.     set.put("val"0);  
  16.     set.put("maxval", LOOP_COUNT);  
  17.     upd.put("$set", set);  
  18.   
  19.     db.getCollection("counters").update(q, upd);  
  20.   
  21.     Thread[] threads = new Thread[THREAD_COUNT];  
  22.     final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];  
  23.     for (int i = 0; i < THREAD_COUNT; i++) {  
  24.         final int temp_i = i;  
  25.         threads[i] = new Thread("" + i) {  
  26.             @Override  
  27.             public void run() {  
  28.                 try {  
  29.                     Mongo mongoClient = new Mongo("172.17.2.100"27017);  
  30.                     DB db = mongoClient.getDB("im");  
  31.                     db.authenticate("imadmin""imadmin".toCharArray());  
  32.                     for (int j = 0; j < LOOP_COUNT; j++) {  
  33.                         Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");  
  34.                         System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());  
  35.                         results[temp_i][j] = ((Double) result).intValue();  
  36.                     }  
  37.                 } catch (UnknownHostException e) {  
  38.                     e.printStackTrace();  
  39.                 }  
  40.             }  
  41.         };  
  42.     }  
  43.   
  44.     for (Thread thread : threads) {  
  45.         thread.start();  
  46.     }  
  47.   
  48.     for (Thread thread : threads) {  
  49.         thread.join();  
  50.     }  
  51.   
  52.     for (int num = 1; num <= LOOP_COUNT; num++) {  
  53.         // every number appear 20 times only!  
  54.         int times = 0;  
  55.         for (int j = 0; j < THREAD_COUNT; j++) {  
  56.             for (int k = 0; k < LOOP_COUNT; k++) {  
  57.                 if (results[j][k] == num)  
  58.                     times++;  
  59.             }  
  60.         }  
  61.   
  62.         assertEquals(20, times);  
  63.     }  
  64. }  

 

这个测试跑了几次都是正确的。

 

由于没有可以进行对比其他的实现方式(例如排他锁)所以没有做性能测试。

 

写在最后。 虽然MongoDB支持类似于存储过程的Stored Javascript,但是其实不建议使用这个来解决复杂问题。主要原因是没法调试,维护起来太不方便。而且在2.4之前MongoDB对服务端 Javascript支持并不是很好, 一个mongod进程同时只能执行一段Javascript。如果能在应用层解决掉还是在应用层里实现逻辑比较好。