庄周梦蝶

生活、程序、未来
   :: 首页 ::  ::  :: 聚合  :: 管理

Clojure 的并发(一) Ref和STM
Clojure 的并发(二)Write Skew分析
Clojure 的并发(三)Atom、缓存和性能
Clojure 的并发(四)Agent深入分析和Actor
Clojure 的并发(五)binding和let
Clojure的并发(六)Agent可以改进的地方
Clojure的并发(七)pmap、pvalues和pcalls
Clojure的并发(八)future、promise和线程

四、 Agent和Actor

   除了用于协调同步的Ref,独立同步的Ref,还有一类非常常见的需求:你可能希望状态的更新是异步,你通常不关心更新的结果,这时候你可以考虑下使用Agent。

1、创建agent:

user=> (def counter (agent 0))
#'user/counter

user
=> counter
#<Agent@9444d1: 0>


通过agent函数你就可以创建一个agent,指向一个不可变的初始状态。

2、取agent的值,这跟Ref和Atom没啥两样,都是通过deref或者@宏:
user=> @counter
0
user
=> (deref counter)
0

3、更新agent,通过send或者send-off函数给agent发送任务去更新agent:
user=> (send counter inc)
#<Agent@9444d1: 0>

  send返回agent对象,内部的值仍然是0,而非inc递增之后的1,这是因为send是异步发送,更新是在另一个线程执行,两个线程(REPL主线程和更新任务的线程)的执行顺序没有同步,显示什么取决于两者谁更快。更新肯定是发生了,查看counter的值:
user=> @counter
1

   果然更新到了1了。send的方法签名:
(send a f & args)

   其中f是更新的函数,它的定义如下:
(f state-of-agent & args)
   也就是它会在第一个参数接收当前agent的状态,而args是send附带的参数。

   还有个方法,send-off,它的作用于send类似:
user=> (send-off counter inc)
#<Agent@9444d1: 1>
user=> @counter
2

   send和send-off的区别在于,send是将任务交给一个固定大小的线程池执行
final public static ExecutorService pooledExecutor =
        Executors
.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
   默认线程池大小是CPU核数加上2。因此send执行的任务最好不要有阻塞的操作。而send-off则使用没有大小限制(取决于内存)的线程池:

final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();
  
   因此,send-off比较适合任务有阻塞的操作,如IO读写之类。请注意,所有的agent是共用这些线程池,这从这些线程池的定义看出来,都是静态变量。

4、异步转同步
,刚才提到send和send-off都是异步将任务提交给线程池去处理,如果你希望同步等待结果返回,那么可以使用await函数:
 (do (send counter inc) (await counter) (println @counter))

send一个任务之后,调用await等待agent所有派发的更新任务结束,然后打印agent的值。await是阻塞当前线程,直到至今为止所有任务派发执行完毕才返回。await没有超时,会一直等待直到条件满足,await-for则可以接受等待的超时时间,如果超过指定时间没有返回,则返回nil,否则返回结果。
 (do (send counter inc) (await-for 100 counter) (println @counter))

await-for接受的单位是毫秒。

5、错误处理


   agent也可以跟Ref和Atom一样设置validator,用于约束验证。由于agent的更新是异步的,你不知道更新的时候agent是否发生异常,只有等到你去取值或者更新的时候才能发现:
user=> (def counter (agent 0 :validator number?))
#
'user/counter

user
=> (send counter (fn[_] "foo"))
#
<clojure.lang.Agent@4de8ce62: 0>

   强制要求counter的值是数值类型,第二个表达式我们给counter发送了一个更新任务,想将状态更新为字符串"foo",由于是异步更新,返回的结果可能没有显示异常,当你取值的时候,问题出现了:
user=> @counter
java.lang.Exception: Agent has errors (NO_SOURCE_FILE:
0)

  告诉你agent处于不正常的状态,如果你想获取详细信息,可以通过agent-errors函数:
user=> (.printStackTrace (agent-errors counter))
java.lang.IllegalArgumentException: No matching field found: printStackTrace 
for class clojure.lang.PersistentList (NO_SOURCE_FILE:0)

   你可以恢复agent到前一个正常的状态,通过clear-agent-errors函数:
 
user=> (clear-agent-errors counter)
nil
user
=> @counter
0

6、加入事务

agent跟atom不一样,agent可以加入事务,在事务里调用send发送一个任务,当事务成功的时候该任务将只会被发送一次,最多最少都一次。利用这个特性,我们可以实现在事务操作的时候写文件,达到ACID中的D——持久性的目的:
(def backup-agent (agent "output/messages-backup.clj" ))
(def messages (ref []))
(use 
'[clojure.contrib.duck-streams :only (spit)])
(defn add-message-with-backup [msg]
       (dosync
           (let [snapshot (commute messages conj msg)]
                (send
-off backup-agent (fn [filename]
                                        (spit filename snapshot)
                                        filename))
           snapshot)))

定义了一个backup-agent用于保存消息,add-message-with-backup函数首先将状态保存到messages,这是个普通的Ref,然后调用send-off给backup-agent一个任务:
 (fn [filename]
          (spit filename snapshot)
         filename)
这个任务是一个匿名函数,它利用spit打开文件,写入当前的快照,并且关闭文件,文件名来自backup-agent的状态值。注意到,我们是用send-off,send-off利用cache线程池,哪怕阻塞也没关系。

利用事务加上一个backup-agent可以实现类似数据库的ACID,但是还是不同的,主要区别在于backup-agent的更新是异步,并不保证一定写入文件,因此持久性也没办法得到保证。

7、关闭线程池:


前面提到agent的更新都是交给线程池去处理,在系统关闭的时候你需要关闭这两个线程吃,通过shutdown-agents方法,你再添加任务将被拒绝:
user=> (shutdown-agents)
nil
user
=> (send counter inc)
java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:
0)
user
=> (def counter (agent 0))
#
'user/counter
user=> (send counter inc)    
java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:
0)

哪怕我重新创建了counter,提交任务仍然被拒绝,进一步证明这些线程池是全局共享的。

8、原理浅析

前文其实已经将agent的实现原理大体都说了,agent本身只是个普通的java对象,它的内部维持一个状态和一个队列:
    volatile Object state;
    AtomicReference
<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);


任务提交的时候,是封装成Action对象,添加到此队列

    
public Object dispatch(IFn fn, ISeq args, boolean solo) {
        
if (errors != null) {
            
throw new RuntimeException("Agent has errors", (Exception) RT.first(errors));
        }
        
//封装成action对象
        Action action = new Action(this, fn, args, solo);
        dispatchAction(action);

        
return this;
    }


    
static void dispatchAction(Action action) {
        LockingTransaction trans 
= LockingTransaction.getRunning();
        
// 有事务,加入事务
        if (trans != null)
            trans.enqueue(action);
        
else if (nested.get() != null) {
            nested.set(nested.get().cons(action));
        }
        
else {
            
// 入队
            action.agent.enqueue(action);
        }
    }

send和send-off都是调用Agent的dispatch方法,只是两者的参数不一样,dispatch的第二个参数 solo决定了是使用哪个线程池处理action:
(defn send
  [#
^clojure.lang.Agent a f & args]
    (. a (dispatch f args 
false)))

(defn send
-off
  [#
^clojure.lang.Agent a f & args]
    (. a (dispatch f args 
true)))

send-off将solo设置为true,当为true的时候使用cache线程池:

   
final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();

    
final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>();

        
void execute() {
            
if (solo)
                soloExecutor.execute(
this);
            
else
                pooledExecutor.execute(
this);
        }

执行的时候调用更新函数并设置新的状态:

try {
                    Object oldval 
= action.agent.state;
                    Object newval 
= action.fn.applyTo(RT.cons(action.agent.state, action.args));
                    action.agent.setState(newval);
                    action.agent.notifyWatches(oldval, newval);
                }
                
catch (Throwable e) {
                    
// todo report/callback
                    action.agent.errors = RT.cons(e, action.agent.errors);
                    hadError 
= true;
                }

9、跟actor的比较:

Agent跟Actor有一个显著的不同,agent的action来自于别人发送的任务附带的更新函数,而actor的action则是自身逻辑的一部分。因此,如果想用agent实现actor模型还是相当困难的,下面是我的一个尝试:

(ns actor)

(defn receive [
& args]
   (apply hash
-map args))
(defn self [] 
*agent*)

(defn spawn [recv
-map]
    (agent recv
-map))

(defn 
! [actor msg]
    (send actor #(apply (get 
%1 %2)  (vector %2)) msg))
;;启动一个actor
(def actor (spawn 
             (receive :hello #(println 
"receive "%))))
;;发送消息 hello
(
! actor :hello)

   利用spawn启动一个actor,其实本质上是一个agent,而发送通过感叹号!,给agent发送一个更新任务,它从recv-map中查找消息对应的处理函数并将消息作为参数来执行。难点在于消息匹配,匹配这种简单类型的消息没有问题,但是如果匹配用到变量,暂时没有想到好的思路实现,例如实现两个actor的ping/pong。

 

posted @ 2010-07-19 18:48 dennis 阅读(4118) | 评论 (0)编辑 收藏

Clojure 的并发(一) Ref和STM
Clojure 的并发(二)Write Skew分析
Clojure 的并发(三)Atom、缓存和性能
Clojure 的并发(四)Agent深入分析和Actor
Clojure 的并发(五)binding和let
Clojure的并发(六)Agent可以改进的地方
Clojure的并发(七)pmap、pvalues和pcalls
Clojure的并发(八)future、promise和线程

三、Atom和缓存

    Ref适用的场景是系统中存在多个相互关联的状态,他们需要一起更新,因此需要通过dosync做事务包装。但是如果你有一个状态变量,不需要跟其他状态变量协作,这时候应该使用Atom了。可以将一个Atom和一个Ref一起在一个事务里更新吗?这没办法做到,如果你需要相互协作,你只能使用Ref。Atom适用的场景是状态是独立,没有依赖,它避免了与其他Ref交互的开销,因此性能会更好,特别是对于读来说。

 1、定义Atom,采用atom函数,赋予一个初始状态:
(def mem (atom {}))

这里将mem的初始状态定义为一个map。

2、deref和@:可以用deref函数,也可以简单地用宏@,这跟Ref一样,取atom的值:
@mem         => {}
(deref mem)  => {}

3、reset!:重新设置atom的值,不关心当前值是什么:
 (reset! mem {:1})

查看mem:
user=> @mem
{
:1}
已经更新到新的map了。

4、swap!:如果你的更新需要依赖当前的状态值,或者只想更新状态的某个部分,那么就需要使用swap!(类似alter):
(swap! an-atom f & args)

swap! 将函数f作用于当前状态值和额外的参数args之上,形成新的状态值,例如我们给mem加上一个keyword:
user=> (swap! mem assoc :2)
{
:2, :1}

看到,:b 2被加入了当前的map。

5、compare and set:
类似原子变量AtomicInteger之类,atom也可以做compare and set的操作:
(compare-and-set! atom oldValue newValue)

当且仅当atom的当前状态值等于oldValue的时候,将状态值更新为newValue,并返回一个布尔值表示成功或者失败:
user=> (def c (atom 1))
#'user/c
user=> (compare-and-set! c 2 3)
false
user
=> (compare-and-set! c 1 3)
true
user
=> @c
3


6、缓存和atom:
(1)atom非常适合实现缓存,缓存通常不会跟其他系统状态形成依赖,并且缓存对读的速度要求更高。上面例子中用到的mem其实就是个简单的缓存例子,我们来实现一个putm和getm函数:
;;创建缓存
(defn make
-cache [] (atom {}))

;;放入缓存
(defn putm [cache key value] (swap
! cache assoc key value))

;;取出
(defn getm [cache key] (key 
@cache))


   这里key要求是keyword,keyword是类似:a这样的字符序列,你熟悉ruby的话,可以暂时理解成symbol。使用这些API:
user=> (def cache (make-cache))
#'user/cache
user=> (putm cache :1)
{
:1}
user
=> (getm cache :a)
1
user
=> (putm cache :2)
{
:2, :1}
user
=> (getm cache :b)
2

(2)memoize函数作用于函数f,产生一个新函数,新函数内部保存了一个缓存,缓存从参数到结果的映射。第一次调用的时候,发现缓存没有,就会调用f去计算实际的结果,并放入内部的缓存;下次调用同样的参数的时候,就直接从缓存中取,而不用再次调用f,从而达到提升计算效率的目的。
memoize的实现就是基于atom,查看源码:
(defn memoize
  [f]
  (let [mem (atom {})]
    (fn [
& args]
      (
if-let [e (find @mem args)]
        (val e)
        (let [ret (apply f args)]
          (swap
! mem assoc args ret)
          ret)))))

内部的缓存名为mem,memoize返回的是一个匿名函数,它接收原有的f函数的参数,if-let判断绑定的变量e是否存在,变量e是通过find从缓存中查询args得到的项,如果存在的话,调用val得到真正的结果并返回;如果不存在,那么使用apply函数将f作用于参数列表之上,计算出结果,并利用swap!将结果加入mem缓存,返回计算结果。

7、性能测试
使用atom实现一个计数器,和使用java.util.concurrent.AtomicInteger做计数器,做一个性能比较,各启动100个线程,每个线程执行100万次原子递增,计算各自的耗时,测试程序如下,代码有注释,不再罗嗦:

(ns atom-perf)
(
import 'java.util.concurrent.atomic.AtomicInteger)
(import 'java.util.concurrent.CountDownLatch)

(
def a (AtomicInteger. 0))
(
def b (atom 0))

;;为了性能,给java加入type hint
(defn java
-inc [#^AtomicInteger counter] (.incrementAndGet counter))
(defn countdown-latch [#^CountDownLatch latch] (.countDown latch))

;;单线程执行缓存次数
(
def max_count 1000000)
;;线程数 
(
def thread_count 100)

(defn benchmark [fun]
  (let [ latch (CountDownLatch. thread_count)  ;;关卡锁
         start (System
/currentTimeMillis) ]     ;;启动时间
       (dotimes [_ thread_count] (.start (Thread. 
#(do (dotimes [_ max_count] (fun)) (countdown-latch latch))))) 
       (.await latch)
       (
- (System/currentTimeMillis) start)))
         

(println 
"atom:" (benchmark #(swap! b inc)))
(println "AtomicInteger:" (benchmark #(java-inc a)))

(println (.get a))
(println @b)

    默认clojure调用java都是通过反射,加入type hint之后编译的字节码就跟java编译器的一致,为了比较公平,定义了java-inc用于调用AtomicInteger.incrementAndGet方法,定义countdown-latch用于调用CountDownLatch.countDown方法,两者都为参数添加了type hint。如果不采用type hint,AtomicInteger反射调用的效率是非常低的。

测试下来,在我的ubuntu上,AtomicInteger还是占优,基本上比atom的实现快上一倍:

atom: 9002
AtomicInteger: 
4185
100000000
100000000

照我的理解,这是由于AtomicInteger调用的是native的方法,基于硬件原语做cas,而atom则是用户空间内的clojure自己做的CAS,两者的性能有差距不出意料之外。

看了源码,Atom是基于java.util.concurrent.atomic.AtomicReference实现的,调用的方法是
 public final boolean compareAndSet(V expect, V update) {
        
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    }

而AtomicInteger调用的方法是:

    public final boolean compareAndSet(int expect, int update) {
    
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

两者的效率差距有这么大吗?暂时存疑。
   

posted @ 2010-07-17 13:58 dennis 阅读(6122) | 评论 (3)编辑 收藏

Clojure 的并发(一) Ref和STM
Clojure 的并发(二)Write Skew分析
Clojure 的并发(三)Atom、缓存和性能
Clojure 的并发(四)Agent深入分析和Actor
Clojure 的并发(五)binding和let
Clojure的并发(六)Agent可以改进的地方
Clojure的并发(七)pmap、pvalues和pcalls
Clojure的并发(八)future、promise和线程

     在介绍Ref的上一篇blog提到,基于snapshot做隔离的MVCC实现来说,有个现象,叫写偏序——Write Skew。根本的原因是由于每个事务在更新过程中无法看到其他事务的更改的结果,导致各个事务提交之后的最终结果违反了一致性。为了理解这个现象,最好的办法是在代码中复现这个现象。考虑下列这个场景:
   屁民Peter有两个账户account1和account2,简称为A1和A2,这两个账户各有100块钱,一个显然的约束就是这两个账户的余额之和必须大于或者等于零,银行肯定不能让你赚了去,你也怕成为下个许霆。现在,假设有两个事务T1和T2,T1从A1提取200块钱,T2则从A2提取200块钱。如果这两个事务按照先后顺序进行,后面执行的事务判断A1+A2-200>=0约束的时候发现失败,那么就不会执行,保证了一致性和隔离性。但是基于多版本并发控制的Clojure,这两个事务完全可能并发地执行,因为他们都是基于一个当前账户的快照做更新的, 并且在更新过程中无法看到对方的修改结果,T1执行的时候判断A1+A2-200>=0约束成立,从A1扣除了200块;同样,T2查看当前快照也满足约束A1+A2-200>=0,从A2扣除了200块,问题来了,最终的结果是A1和A2都成-100块了,身为屁民的你竟然从银行多拿了200块,你等着无期吧。

   现在,我们就来模拟这个现象,定义两个账户:

;;两个账户,约束是两个账户的余额之和必须>=0
(def account1 (
ref 100))
(def account2 (
ref 100))

   定义一个取钱方法:
;;定义扣除函数
(defn deduct [account n other]
      (dosync 
          (
if (>= (+ (- @account n) @other0)
              (alter account 
- n))))

   其中account是将要扣钱的帐号,other是peter的另一个帐号,在执行扣除前要满足约束@account-n+@other>=0

   接下来就是搞测试了,各启动N个线程尝试从A1和A2扣钱,为了尽快模拟出问题,使得并发程度高一些,我们将线程设置大一些,并且使用java.util.concurrent.CyclicBarrier做关卡,测试代码如下:

;;设定关卡
(def barrier (java
.util.concurrent.CyclicBarrier. 6001))
;;各启动3000个线程尝试去从账户1和账户2扣除200
(dotimes [_ 
3000] (.start (Thread. #(do (.await  barrier) (deduct account1 200 account2) (.await  barrier)))))
(dotimes [_ 3000] (.start (Thread. #(do (.await  barrier) (deduct account2 200 account1) (.await  barrier)))))

(
.await barrier)

(
.await barrier)
;;打印最终结果
(println 
@account1)
(println 
@account2)

     线程里干了三件事情:首先调用barrier.await尝试突破关卡,所有线程启动后冲破关卡,进入扣钱环节deduct,最后再调用barrier.await用于等待所有线程结束。在所有线程结束后,打印当前账户的余额。

     这段代码在我的机器上每执行10次左右都至少有一次打印:
-100
-100
   
    这表示A1和A2的账户都欠下了100块钱,完全违反了约束条件,法庭的传票在召唤peter。

    那么怎么防止write skew现象呢?如果我们能在事务过程中保护某些Ref不被其他事务修改,那么就可以保证当前的snapshot的一致性,最终保证结果的一致性。通过ensure函数即可保护Ref,稍微修改下deduct函数:
(defn deduct [account n other]
      (dosync (ensure account) (ensure other)
          (
if (>= (+ (- @account n) @other0)
              (alter account 
- n))))

   在执行事务更新前,先通过ensure保护下account和other账户不被其他事务修改。你可以再多次运行看看,会不会再次打印非法结果。

   上篇blog最后也提到了一个士兵巡逻的例子来介绍write skew,我也写了段代码来模拟那个例子,有兴趣可以跑跑,非法结果是三个军营的士兵之和小于100(两个军营最后只剩下25个人)。

;1号军营
(def g1 (
ref 45))
;2号军营
(def g2 (
ref 45))
;3号军营
(def g3 (
ref 45))
;从1号军营抽调士兵
(defn dispatch
-patrol-g1 [n]
    (dosync 
      (
if (> (+ (- @g1 n) @g2 @g3100)
          (alter g1 
- 20)
        ))
      )
;从2号军营抽调士兵
(defn dispatch
-patrol-g2 [n]
    (dosync 
      (
if (> (+ @g1 (- @g2 n) @g3100)
          (alter g2 
- 20)
        ))
      )
;;设定关卡
(def barrier (java
.util.concurrent.CyclicBarrier. 4001))
;;各启动2000个线程尝试去从1号和2号军营抽调20个士兵
(dotimes [_ 
2000] (.start (Thread. #(do (.await  barrier) (dispatch-patrol-g1 20) (.await  barrier)))))
(dotimes [_ 2000] (.start (Thread. #(do (.await  barrier) (dispatch-patrol-g2 20) (.await  barrier)))))
;(dotimes [_ 10] (.start (Thread. #(do (.await  barrier) (dispatch-patrol-g3 20) (.await  barrier)))))

(
.await barrier)

(
.await barrier)
;;打印最终结果
(println 
@g1)
(println 
@g2)
(println 
@g3)




posted @ 2010-07-17 05:44 dennis 阅读(4935) | 评论 (1)编辑 收藏

    我估计我不写这样的标题,吸引不了人气。问题的起因是Javaeye的一个帖子《淘宝面试题:如何充分利用多核CPU,计算很大的 List中所有整数的和》,看见为了这么个问题写长长的Java代码,让我十分蛋疼。为了表示蛋定,我想介绍下用Clojure解决这个问题的方法。

    题目很明确了,要你充分多核,这年头不“多核”不好意思出去跟人打招呼,为了多核,你要将list拆分下,每个子list并发去计算和,然后综合这些结果求出最终的和,您没搞错,这不是传说中人见人爱的MapReduce吗?你没听过?不好意思,你out了。

    咱们先别着急并发,先搞个不并发的版本试试看,第一步,切分list,实在不好意思,clojure有现成的clojure.core/partition函数:
user=> (partition 3 3 [0'(1 2 3 4 5 6 7 8 9 10))
((1 2 3) (4 5 6) (7 8 9) (10 0))

   牛刀小试,将(1 2 3 4 5 6 7 8 9 10)切分成了3个子串,还有个可怜的犀利哥——10号同学没办法归入任意一个组,只好让他跟虚无的0为伴了。partition的第三个参数指定一个凑伙的集合,当无法完全切分的时候,拿这个集合里的元素凑合。但是我们不能随便凑合啊,随便凑合加起来结果就不对了,用0就没问题了。

   切分完了,计算子集的和,那不要太简单,该reduce同学上场,请大家欢呼、扔鸡蛋,千万别客气:
user=> (reduce + 0 '(1 2 3))
6
  
    自然要reduce,总要指定规则怎么reduce,我们这里很简单,就是个加法运算,再给初始值0就好咯,reduce万岁。

    慢着,有个问题,partition返回的是集合的集合((1 2 3) (4 5 6) (7 8 9) (10 0)),而上面的reduce却要作用在子集里,怎么办?无敌的map大神出场了,map的作用是将某个函数作用于集合上,并且返回作用后的集合结果,这里要干的事情就是将上面的reduce作用在partition返回的集合的集合上面:

user=> (map #(reduce + 0 % )(partition 3 3 [0'(1 2 3 4 5 6 7 8 9 10)))            
(6 15 24 10)

    返回的是4个子集各自的和,答案肯定没错,最后一个结果不正是唯一的元素10吗?这里可能比较费解的是#(reduce + 0 %),这其实定义了一个匿名函数,它接收一个参数,这个参数用百分号%默认指代,因为是将map作用于集合的集合,因此这里的%其实就是各个子集。

   map返回的是个集合,又要求集合的总和,是不是又该reduce上场了?不好意思,map同学,这次演出你就一跑龙套的:
user=> (reduce + 0 (map #(reduce + 0 % )(partition 3 3 [0'(1 2 3 4 5 6 7 8 9 10))))
55

    伟大的55出来了,它不是一个人在战斗,这一刻LISP、Scheme、Erlang、Scala、Clojure、JavaScript灵魂附体,它继承了FP的光荣传统,干净漂亮地解决了这个问题。

    综合上面所述,我们给出一个非多核版本的解答:
(defn mysum [coll n]
     (let [sub
-colls   (partition n n [0] coll)
           result
-coll (map #(reduce + 0 %) sub-colls) ]
         (reduce 
+ 0 result-coll)))
   
    我们是使用了let语句绑定变量,纯粹是为了更好看懂一些。sub-colls绑定到partition返回的集合的集合,result-coll就是各个子集的结果组成的集合,#(reduce + 0 %)是个匿名函数,其中%指向匿名函数的第一个参数,也就是每个子集。最终,利用reduce将result-coll的结果综合在一起。

    “我们要多核,我们要多核,我们不要西太平洋大学的野鸡MapReduce"。

     台下别激动,神奇的“多核”马上出场,我要改动的代码只是那么一点点,用pmap替代map
(defn psum [coll n]
     (let [sub
-colls   (partition n n [0] coll)
           result
-coll (pmap #(reduce + 0 %) sub-colls) ]
         (reduce 
+ 0 result-coll)))

   完了吗?真完了,你要改动的只有这么一点点,就可以让切分出来的子集并发地计算了。(感谢网友@clojans的提醒)。

以下是原文:
    首先是匿名函数改造一点点:
    我干嘛了,我就加了个future,给你个未来。严肃点地说,future将启动一个单独的线程去reduce子集。现在result-coll里不再是直接的结果,而是各个子集的Future对象,为了得到真正的和,你需要等待线程结束并取得结果,因此最后的reduce也要小小地改动下:
(reduce #(+ %1 @%20 result-coll))

    reduce不再是简单地用加号了,替代的则是一个两个参数的匿名函数,第二个参数%2是Future对象,我们通过@操作符等待Future返回结果,并跟第一个参数%1(初始为0)作加法运算。

    最终的多核版本:
(defn mysum2 [coll  n]
    (let [sub
-colls   (partition n n [0] coll)
          result
-coll (map #(future (reduce + 0 %)) sub-colls)] 
         (reduce #(
+ %1 @%20 result-coll)))

   这个多核版本跟非多核版本区别大吗?不大吗?大吗?不大吗?……
   可以看到,Clojure可以多么容易地在并发与非并发之间徘徊,习惯脚踏N只船了。


   

posted @ 2010-07-15 11:19 dennis 阅读(8086) | 评论 (10)编辑 收藏

    详解clojure递归(上)
    详解clojure递归(下)

    递归可以说是LISP的灵魂之一,通过递归可以简洁地描述数学公式、函数调用,Clojure是LISP的方言,同样需要递归来扮演重要作用。递归的价值在于可以让你的思维以what的形式思考,而无需考虑how,你写出来的代码就是数学公式,就是函数的描述,一切显得直观和透明。如果你不习惯递归,那只是因为命令式语言的思维根深蒂固,如x=x+1这样的表达式,从数学的角度来看完全不合法,但是在命令式语言里却是合法的赋值语句。

   递归可以分为直接递归和间接递归,取决于函数是否直接或者间接地调用自身。如果函数的最后一个调用是递归调用,那么这样的递归调用称为尾递归,针对此类递归调用,编译器可以作所谓的尾递归优化(TCO),因为递归调用是最后一个,因此函数的局部变量等没有必要再保存,本次调用的结果可以完全作为参数传递给下一个递归调用,清空当前的栈并复用,那么就不需要为递归的函数调用保存一长串的栈,因此不会有栈溢出的问题。在Erlang、LISP这样的FP语言里,都支持TCO,无论是直接递归或者间接递归。

   但是由于JVM自身的限制,Clojure和Scala一样,仅支持直接的尾递归优化,将尾递归调用优化成循环语句。例如一个求阶乘的例子:
   
;;第一个版本的阶乘函数
(defn fac [n]
          (
if (= 1 n)
              
1
             (
* n (fac (dec n)))))

   第一个版本的阶乘并非尾递归,这是因为最后一个表达式的调用是一个乘法运算,而非(fac (dec n)),因此这个版本的阶乘在计算大数的时候会导致栈溢出:
user=> (fac 10000)
java.lang.StackOverflowError (NO_SOURCE_FILE:
0)

   将第一个版本改进一下,为了让最后一个调用是递归调用,那么我们需要将结果作为参数来传递,而不是倚靠栈来保存,并且为了维持接口一样,我们引入了一个内部函数fac0:
  
  ;;第二个版本,不是尾递归的“尾递归”
  (defn fac [n]
           (defn fac0 [c r]
              (
if (= 0 c)
                  r
                  (fac0 (dec c) (
* c r))))
           (fac0 n 
1))

   这个是第二个版本的阶乘,通过将结果提取成参数来传递,就将fac0函数的递归调用修改为尾递归的形式,这是个尾递归吗?这在Scala里,在LISP里,这都是尾递归,但是Clojure的TCO优化却是要求使用recur这个特殊形式,而不能直接用函数名作递归调用,因此我们这个第二版本在计算大数的时候仍然将栈溢出:
user=> (fac 10000)
java.lang.StackOverflowError (NO_SOURCE_FILE:
0)

   在Clojure里正确地TCO应该是什么样子的呢?其实只要用recur在最后调用那一下替代fac0即可,这就形成我们第三个版本的阶乘:
  ;;第三个版本,TCO起作用了
  (defn fac [n]
           (defn fac0 [c r]
              (
if (= 0 c)
                  r
                  (recur (dec c) (
* c r))))
           (fac0 n 
1))

    此时你再计算大数就没有问题了,计算(fac 10000)可以正常运行(结果太长,我就不贴出来了)。recur只能跟函数或者loop结合在一起使用,只有函数和loop会形成递归点。我们第三个版本就是利用函数fac0做了尾递归调用的优化。
   
    loop跟let相似,只不过loop会在顶层形成一个递归点,以便recur重新绑定参数,使用loop改写阶乘函数,这时候就不需要定义内部函数了:
;;利用loop改写的第四个版本的阶乘函数
(defn fac [n]
           (loop [n n r 
1]
                (
if (= n 0)
                    r
                    (recur (dec n) (
* n r)))))

   loop初始的时候将n绑定为传入的参数n(由于作用域不同,同名没有问题),将r绑定为1,最后recur就可以将新的参数值绑定到loop的参数列表并递归调用。

   Clojure的TCO是怎么做到的,具体可以看看我前两天写的这篇博客,本质上是在编译的时候将最后的递归调用转化成一条goto语句跳转到开始的Label,也就是转变成了循环调用。

   这个阶乘函数仍然有优化的空间,可以看到,每次计算其实都有部分是重复计算的,如计算(fac 5)也就是1*2*3*4*5,计算(fac 6)的1*2*3*4*5*6,如果能将前面的计算结果缓存下来,那么计算(fac 6)的时候将更快一些,这可以通过memoize函数来包装阶乘函数:
;;第五个版本的阶乘,缓存中间结果
(
def fac (memoize fac))

第一次计算(fac 10000)花费的时间长一些,因为还没有缓存:
user=> (time (fac 10000)) 
"Elapsed time: 170.489622 msecs"


第二次计算快了非常多(其实没有计算,只是返回缓存结果):
user=> (time (fac 10000))
"Elapsed time: 0.058737 msecs"

    可以看到,如果没有预先缓存,利用memoize包装的阶乘函数也是快不了。memoize的问题在于,计算(fac n)路径上的没有用到的值都不会缓存,它只缓存最终的结果,因此如果计算n前面的其他没有计算过的数字,仍然需要重新计算。那么怎么保存路径上的值呢?这可以将求阶乘转化成另一个等价问题来解决。
    我们可以将所有的阶乘结果组织成一个无穷集合,求阶乘变成从这个集合里取第n个元素,这是利用Clojure里集合是lazy的特性,集合里的元素如果没有使用到,那么就不会预先计算,而是等待要用到的时候才计算出来,定义一个阶乘结果的无穷集合,可以利用map将fac作用在整数集合上,map、reduce这样的高阶函数返回的是LazySeq:
 (def fac-seq (map fac (iterate inc 0)))

   (iterate inc 0)定义了正整数集合包括0,0的阶乘没有意义。这个集合的第0项其实是多余的。
   查看fac-seq的类型,这是一个LazySeq:
user=> (class fac-seq)
clojure.lang.LazySeq

  求n的阶乘,等价于从这个集合里取第n个元素:
user=> (nth fac-seq 10)
3628800

  这个集合会比较耗内存,因为会缓存所有计算路径上的独立的值,哪怕他们暂时不会被用到。但是这种采用LazySeq的方式来定义阶乘函数的方式有个优点,那就是在定义fac-seq使用的fac函数无需一定是符合TCO的函数,我们的第一个版本的阶乘函数稍微修改下也可以使用,并且不会栈溢出:
(defn fac [n]
          (
if (<= n 1)
              
1
              (
* n (fac (dec n)))))

(
def fac (memoize fac))
(
def fac-seq (map fac (iterate inc 0)))
(nth fac
-seq 10000)


  因为集合从0开始,因此只是修改了fac的if条件为n<=1的时候返回1。至于为什么这样就不会栈溢出,有兴趣的朋友可以自己思考下。

    从这个例子也可以看出,一些无法TCO的递归调用可以转化为LazySeq来处理,这算是弥补JVM缺陷的一个办法。
   


posted @ 2010-07-14 22:03 dennis 阅读(5236) | 评论 (4)编辑 收藏


Clojure 的并发(一) Ref和STM
Clojure 的并发(二)Write Skew分析
Clojure 的并发(三)Atom、缓存和性能
Clojure 的并发(四)Agent深入分析和Actor
Clojure 的并发(五)binding和let
Clojure的并发(六)Agent可以改进的地方
Clojure的并发(七)pmap、pvalues和pcalls
Clojure的并发(八)future、promise和线程

    Clojure处理并发的思路与众不同,采用的是所谓STM的模型——软事务内存。你可以将STM想象成数据库,只不过是内存型的,它只支持事务的ACI,也就是原子性、一致性、隔离性,但是不包括持久性,因为状态的保存都在内存里。

    Clojure的并发API分为四种模型:
1、管理协作式、同步修改可变状态的Ref
2、管理非协作式、同步修改可变状态的Atom
3、管理异步修改可变状态的Agent
4、管理Thread local变量的Var。

    下面将对这四部分作更详细的介绍。

一、Ref和STM

 1、ref:

通过ref函数创建一个可变的引用(reference),指向一个不可变的对象:
(ref x)

例子:创建一个歌曲集合:
(def song (ref #{}))

2、deref和@:
 取引用的内容,解引用使用deref函数
(deref song)

也可以用reader宏@:
@song

3、ref-set和dosync:


改变引用指向的内容,使用ref-set函数
(ref-set ref new-value)

如,我们设置新的歌曲集合,加入一首歌:
(ref-set song #{"Dangerous"})
但是这样会报错:
java.lang.IllegalStateException: No transaction running (NO_SOURCE_FILE:0)

这是因为引用是可变的,对状态的更新需要进行保护,传统语言的话可能采用锁,Clojure是采用事务,将更新包装到事务里,这是通过dosync实现的:
(dosync (ref-set song #{"Dangerous"}))

dosync的参数接受多个表达式,这些表达式将被包装在一个事务里,事务支持ACI:
(1)Atomic,如果你在事务里更新多个Ref,那么这些更新对事务外部来说是一个独立的操作。
(2)Consistent,Ref的更新可以设置 validator,如果某个验证失败,整个事务将回滚。
(3)Isolated,运行中的事务无法看到其他事务部分完成的结果。

dosync更新多个Ref,假设我们还有个演唱者Ref,同时更新歌曲集合和演唱者集合:
(def singer (ref #{}))
(dosync (ref
-set song #{"Dangerous"})
               (ref
-set singer #{"MJ"}) )

@song      
=>  #{"Dangerous"}
@singer    
=>  #{"MJ"}

4、alter:
完全更新整个引用的值还是比较少见,更常见的更新是根据当前状态更新,例如我们向歌曲集合添加一个歌曲,步骤大概是先查询集合内容,然后往集合里添加歌曲,然后更新整个集合:
(dosync (ref-set song (conj @song "heal the world")))

查询并更新的操作可以合成一步,这是通过alter函数:
(alter ref update-fn & args)

alter接收一个更新的函数,函数将在更新的时候调用,传入当前状态值并返回新的状态值,因此上面的例子可以改写为:
 (dosync (alter song conj "heal the world"))

这里使用conj而非cons是因为conj接收的第一个参数是集合,也就是当前状态值,而cons要求第一个参数是将要加入的元素。

5、commute:
  commute函数是alter的变形,commute顾名思义就是要求update-function是可交换的,它的顺序是可以任意排序。commute的允许的并发程度比alter更高一些,因此性能会更好。但是由于commute要求update-function是可交换的,并且会自动重排序,因此如果你的更新要求顺序性,那么commute是不能接受的,commute仅可用在对顺序性没有要求或者要求很低的场景:例如更新聊天窗口的聊天信息,由于网络延迟的因素和个人介入的因素,聊天信息可以认为是天然排序,因此使用commute还可以接受,更新乱序的可能性很低。
  另一个例子就不能使用commute了,如实现一个计数器:
(def counter (ref 0))

  实现一个next-counter函数获取计数器的下一个值,我们先使用commute实现:
(defn next-counter [] (dosync (commute counter inc)))

   这个函数很简单,每次调用inc递增counter的值,接下来写个测试用例:启动50个线程并发去获取next counter:
(dotimes [_ 50] (.start (Thread. #(println (next-counter)))))
  
   这段代码稍微解释下,dotimes是重复执行50次,每次启动new并启动一个Thread,这个Thread里干了两件事情:调用next-counter,打印调用结果,第一个版本的next-counter执行下,这是其中一次输出的截取:
23
23
23

23
23
23
23
23
23
23
23
23
28
23
21
23
23
23
23
25
28

可以看到有很多的重复数值,这是由于重排序导致事务结束后的值不同,但是你查看counter,确实是50:
@counter  => 50

证明更新是没有问题的,问题出在commute的返回值上。

如果将next-counter修改为alter实现:
(defn next-counter [] (dosync (alter counter inc)))

此时再执行测试用例,可以发现打印结果完全正确了:
……
39
41
42
45
27
46
47
44
48
43
49
40
50

查看counter,也是正确更新到50了:
@counter => 50

最佳实践:通常情况下,你应该优先使用alter,除非在遇到明显的性能瓶颈并且对顺序不是那么关心的时候,可以考虑用commute替换。

6、validator:
   类似数据库,你也可以为Ref添加“约束”,在数据更新的时候需要通过validator函数的验证,如果验证不通过,整个事务将回滚。添加validator是通过ref函数传入metadata的map实现的,例如我们要求歌曲集合添加的歌曲名称不能为空:
(def validate-song
     (partial every? #(not (nil?
%))))

(def song (ref #{} :validator validate
-song))

validate-song是一个验证函数,partial返回某个函数的半函数(固定了部分参数,部分参数没固定),你可以将partial理解成currying,虽然还是不同的。validate-song调用every?来验证集合内的所有元素都不是nil,其中#(not (nil? %))是一个匿名函数,%指向匿名函数的第一个参数,也就是集合的每个元素。ref指定了validator为validate-song,那么在每次更新song集合的时候都会将新的状态传入validator函数里验证一下,如果返回false,整个事务将回滚:

(dosync (alter song conj nil))
java.lang.IllegalStateException: Invalid reference state (NO_SOURCE_FILE:
0)

更新失败,非法的reference状态,查看song果然还是空的:
@song => #{}

更新正常的值就没有问题:
 (dosync (alter song conj "dangerous"))   => #{"dangerous"}

   
7、ensure:

  ensure函数是为了保护Ref不会被其他事务所修改,它的主要目的是为了防止所谓的“写偏序”(write skew)问题。写偏序问题的产生跟STM的实现有关,clojure的STM实现是基于MVCC(Multiversion Concurrency Control)——多版本并发控制,对一个Ref保存多个版本的状态值,在更新的时候取得当前状态值的一个隔离的snapshot,更新是基于snapshot进行的。那么我们来看下写偏序是怎么产生,以一个比喻来描述:
  想象有一个系统用于管理美国最神秘的军事禁区——51区的安全巡逻,你有3个营的士兵,每个营45个士兵,并且你需要保证总体巡逻的士兵人数不能少于100个人。假设有一天,有两个指挥官都登录了这个管理系统,他们都想从某个军营里抽走20个士兵,假设指挥官A想从1号军营抽走,指挥官B想要从2号军营抽走士兵,他们同时执行下列操作:
Admin 1if ((G1 - 20+ G2 + G3) > 100 then dispatchPatrol

Admin 
2if (G1 + (G2 - 20+ G3) > 100 then dispatchPatrol

我们刚才提到,Clojure的更新是基于隔离的snapshot,一个事务的更改无法看到另一个事务更改了部分的结果,因此这两个操作都因为满足(45-20)+45+45=115的约束而得到执行,导致实际抽调走了40个士兵,只剩下95个士兵,低于设定的安全标准100人,这就是写偏序现象。
  写偏序的解决就很简单,在执行抽调前加入ensure即可保护ref不被其他事务所修改。ensure比(ref-set ref @ref)允许的并发程度更高一些。


Ref和STM的介绍暂时到这里,原理和源码的解析要留待下一篇文章了。



posted @ 2010-07-14 02:34 dennis 阅读(7844) | 评论 (8)编辑 收藏


    解释器求值的顺序可以分为应用序和正则序,应用序是先求值参数,再执行表达式;正则序则是先将表达式按照实际参数展开,然后再执行。具体可以看看过去写的这篇文章

   Clojure的求值可以肯定是应用序的,如执行
(defn mytest [a b] 
      (
if (= a 0)
          a
          b))
(mytest 
0 1/0)
        

尽管在(mytest 0 1/0)中a绑定为0,如果求值器是完全展开再求值,那应该正常执行并返回a,也就是1;但是因为clojure是应用序,因此参数b的1/0会先计算,这显然会报错。

   clojure的dosync用于将一些表达式包装成事务,Ref的更新操作没有包装在事务里,会抛出异常
;;定义mutable的Ref
 (def song (ref #{}))

;;添加一首歌
(alter song conj 
"dangerous")

   alter用于向Ref查询并添加元素,用conj将"dangerous"这首歌加入集合,但是alter要求执行在一个事务里,因此上面的代码会报错
java.lang.IllegalStateException: No transaction running (NO_SOURCE_FILE:0)

   如果你用dosync包装就没有问题
user=> (dosync (alter song conj "dangerous"))
#{
"dangerous"}

   返回更新后的结果集合。这个跟我们要谈的正则序和应用序有什么关系呢?可能你看出来了,如果说clojure是应用序,那么在表达式 (dosync (alter song conj "dangerous"))中,alter也应该先执行,应当照样报" No transaction running"的错误才对,为何却没有呢?难道dosync是按照正则序执行?

   查看dosync的文档
user=> (doc dosync)
-------------------------
clojure.core
/dosync
([
& exprs])
Macro
  Runs the exprs (in an implicit 
do) in a transaction that encompasses
  exprs and any nested calls.  Starts a transaction 
if none is already
  running on 
this thread. Any uncaught exception will abort the
  transaction and flow out of dosync. The exprs may be run more than
  once, but any effects on Refs will be atomic.

   这是一个宏,他的作用是将表达式包装在一个事务里,如果当前线程没有事务,那么就启动一个。
查看源码:

(defmacro dosync
  
"Runs the exprs (in an implicit do) in a transaction that encompasses
  exprs and any nested calls.  Starts a transaction if none is already
  running on 
this thread. Any uncaught exception will abort the
  transaction and flow out of dosync. The exprs may be run more than
  once, but any effects on Refs will be atomic.
"
  [& exprs]
  `(sync nil 
~@exprs))

   本质上dosync是调用了sync这个宏,sync干了些什么?
(defmacro sync
  
"transaction-flags => TBD, pass nil for now

  Runs the exprs (in an implicit 
do) in a transaction that encompasses
  exprs and any nested calls.  Starts a transaction 
if none is already
  running on 
this thread. Any uncaught exception will abort the
  transaction and flow out of sync. The exprs may be run more than
  once, but any effects on Refs will be atomic.
"
  [flags-ignored-for-now & body]
  `(. clojure.lang.LockingTransaction
      (runInTransaction (fn [] 
~@body))))

   找到了,原来是调用了clojure.lang.LockingTransaction.runInTransaction这个静态方法,并且将exps包装成一个匿名函数

fn [] ~@body

     因此,dosync并非正则序,dosync是个宏,(dosync (alter song conj "dangerous"))展开之后,其实是
(sync nil (fun [] (alter song conj "dangerous")))
   
     这就解答了为什么(dosync (alter song conj "dangerous"))可以正常运行的疑问。宏的使用,首先是展开,然后才是按照应用序的顺序求值。


  




posted @ 2010-07-13 12:02 dennis 阅读(2505) | 评论 (2)编辑 收藏

    这题目起的哗众取宠,其实只是想介绍下怎么查看Clojure动态生成的字节码,这对分析Clojure的内部实现很重要。

    第一步,下载最新的Clojure 1.1.0源码并解压,并导入到你喜欢的IDE。

    其次,下载asm 3.0的源码并解压。

    第三,删除Clojure 1.1.0源码中的clojure.asm包。clojure并不是引用asm的jar包,而是将asm的源码合并到clojure中,并且删除一些只会在调试阶段用到的package和class,保留使用asm的最小源码集合,这可能是处于防止asm不同版本的jar包冲突以及缩小clojure大小的考虑。

    第四,将asm 3.0源码拷入clojure的源码中,并将包org.objectweb.asm包括子包整体重名名为clojure.asm。

    第五步,修改Clojure源码,加入TraceClassVisitor的适配器,用于跟踪字节码生成,这需要修改clojure.lang.Compiler类中的两个compile方法,找到类似
ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_MAXS);
// ClassWriter cw = new ClassWriter(0);
ClassVisitor cv = cw;

这样的代码,将cv修改为TraceClassVisitor:
 ClassVisitor cv = new TraceClassVisitor(new CheckClassAdapter(cw), new PrintWriter(System.out));

    TraceClassVisitor的第二个参数指定将跟踪到的字节码输出到哪里,这里简单地输出到标准输出方便查看。

    第六步,接下来可以尝试下我们修改过的clojure怎么动态生成字节码,启动REPL,
java clojure.main

启动阶段就会输出一些字节码信息,主要预先加载的一些标准库函数,如clojure.core中的函数等,REPL启动完毕,随便输入一个表达式都将看到生成的字节码
user=> (+ 1 2)

输出类似

compile 1
// class version 49.0 (49)
// access flags 33
public class user$eval__4346 extends clojure/lang/AFunction  {

  
// compiled from: NO_SOURCE_FILE
  
// debug info: SMAP
eval__4346.java
Clojure
*S Clojure
*F
+ 1 NO_SOURCE_FILE
NO_SOURCE_PATH
*L
0#1,1:0
*E

  
// access flags 25
  public final static Lclojure/lang/Var; const__0

  
// access flags 25
  public final static Ljava/lang/Object; const__1

  
// access flags 25
  public final static Ljava/lang/Object; const__2

  
// access flags 9
  public static <clinit>()V
   L0
    LINENUMBER 
2 L0
    LDC 
"clojure.core"
    LDC 
"+"
    INVOKESTATIC clojure
/lang/RT.var (Ljava/lang/String;Ljava/lang/String;)Lclojure/lang/Var;
    CHECKCAST clojure
/lang/Var
    PUTSTATIC user$eval__4346.const__0 : Lclojure
/lang/Var;
    ICONST_1
    INVOKESTATIC java
/lang/Integer.valueOf (I)Ljava/lang/Integer;
    PUTSTATIC user$eval__4346.const__1 : Ljava
/lang/Object;
    ICONST_2
    INVOKESTATIC java
/lang/Integer.valueOf (I)Ljava/lang/Integer;
    PUTSTATIC user$eval__4346.const__2 : Ljava
/lang/Object;
    RETURN
    MAXSTACK 
= 0
    MAXLOCALS 
= 0

  
// access flags 1
  public <init>()V
   L0
    LINENUMBER 
2 L0
   L1
    ALOAD 
0
    INVOKESPECIAL clojure
/lang/AFunction.<init> ()V
   L2
    RETURN
    MAXSTACK 
= 0
    MAXLOCALS 
= 0

  
// access flags 1
  public invoke()Ljava/lang/Object; throws java/lang/Exception 
   L0
    LINENUMBER 
2 L0
   L1
    LINENUMBER 
2 L1
    GETSTATIC user$eval__4346.const__1 : Ljava/lang/Object;
    GETSTATIC user$eval__4346.const__2 : Ljava
/lang/Object;
    INVOKESTATIC clojure
/lang/Numbers.add (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Number;

   L2
    LOCALVARIABLE 
this Ljava/lang/Object; L0 L2 0
    ARETURN
    MAXSTACK 
= 0
    MAXLOCALS 
= 0
}
3

3就是表达式的结果。可以看到,一个表达式生成了一个class。其中<clinit>是静态初始化块,主要是初始化表达式中的字面常量;<init>不用说,默认的构造函数;invoke是核心方法,表达式生成的class,new一个实例后调用的就是invoke方法,执行实际的代码,高亮部分加载了两个常量,并执行Number.add方法。

最后,请Happy hacking!。



posted @ 2010-07-11 12:07 dennis 阅读(3597) | 评论 (1)编辑 收藏


    Clojure由于是基于JVM,同样无法支持完全的尾递归优化(TCO),这主要是Java的安全模型决定的,可以看看这个久远的bug描述。但是Clojure和Scala一样支持同一个函数的直接调用的尾递归优化,也就是同一个函数在函数体的最后调用自身,会优化成循环语句。让我们看看这是怎么实现的。
    Clojure的recur的特殊形式(special form)就是用于支持这个优化,让我们看一个例子,经典的求斐波那契数:
(defn recur-fibo [n]
     (letfn [(fib 
                [current next n]
                (
if (zero? n)
                    current
                    ;recur将递归调用fib函数
                    (recur next (
+ current next) (dec n))))]
    (fib 
0 1 n)))

    recur-fibo这个函数的内部定义了一个fib函数,fib函数的实现就是斐波那契数的定义,fib函数的三个参数分别是当前的斐波那契数(current)、下一个斐波那契数(next)、计数器(n),当计数器为0的时候返回当前的斐波那契数字,否则就将当前的斐波那契数设置为下一个,下一个斐波那契数字等于两者之和,计数递减并递归调用fib函数。注意,你这里不能直接调用(fib next (+ current next) (dec n)),否则仍将栈溢出。这跟Scala不同,Clojure是用recur关键字而非原函数名作TOC优化。

    Clojure是利用asm 3.0作字节码生成,观察下recur-fibo生成的字节码会发现它其实生成了两个类,类似user$recur_fibo__4346$fib__4348和user$recur_fibo__4346,user是namespace,前一个是recur-fibo中的fib函数的实现,后一个则是recur-fibo自身,这两个类都继承自 clojure.lang.AFunction类,值得一提的是前一个类是后一个类的内部类,这跟函数定义相吻合。所有的用户定义的函数都将继承 clojure.lang.AFunction。
   
    在这两个类中都有一个invoke方法,用于实际的方法执行,让我们看看内部类fib的invoke方法(忽略了一些旁枝末节)
 1 // access flags 1
 2   public invoke(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object; throws java/lang/Exception 
 3    L0
 4     LINENUMBER 2 L0
 5    L1
 6     LINENUMBER 4 L1
 7    L2
 8     LINENUMBER 4 L2
 9     ALOAD 3
10     INVOKESTATIC clojure/lang/Numbers.isZero (Ljava/lang/Object;)Z

11     IFEQ L3
12     ALOAD 1
13     GOTO L4

14    L5
15     POP
16    L3
17     ALOAD 2
18    L6
19     LINENUMBER 6 L6
20     ALOAD 1
21     ALOAD 2
22     INVOKESTATIC clojure/lang/Numbers.add (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Number;

23    L7
24     LINENUMBER 6 L7
25     ALOAD 3
26     INVOKESTATIC clojure/lang/Numbers.dec (Ljava/lang/Object;)Ljava/lang/Number;

27     ASTORE 3
28     ASTORE 2
29     ASTORE 1
30     GOTO L0

31    L4
32    L8
33     LOCALVARIABLE this Ljava/lang/Object; L0 L8 0
34     LOCALVARIABLE current Ljava/lang/Object; L0 L8 1
35     LOCALVARIABLE next Ljava/lang/Object; L0 L8 2
36     LOCALVARIABLE n Ljava/lang/Object; L0 L8 3
37     ARETURN
38     MAXSTACK = 0
39     MAXLOCALS = 0

    首先看方法签名,invoke接收三个参数,都是Object类型,对应fib函数里的current、next和n。
    关键指令都已经加亮,9——11行,加载n这个参数,利用Number.isZero判断n是否为0,如果为0,将1弹入堆,否则弹入0。IFEQ比较栈顶是否为0,为0(也就是n不为0)就跳转到L3,否则继续执行(n为0,加载参数1,也就是current,然后跳转到L4,最后通过ARETURN返回值current作结果。
   
    指令20——22行,加载current和next,执行相加操作,生成下一个斐波那契数。
    指令25-——26行,加载n并递减。
    指令27——29行,将本次计算的结果存储到local变量区,覆盖了原有的值。
    指令30行,跳转到L0,重新开始执行fib函数,此时local变量区的参数值已经是上一次执行的结果。
   
    有的朋友可能要问,为什么加载current是用aload 1,而不是aload 0,处在0位置上的是什么?0位置上存储的就是著名的this指针,invoke是实例方法,第一个参数一定是this。

   从上面的分析可以看到,recur干的事情就两件:覆盖原有的local变量,以及跳转到函数开头执行循环操作,这就是所谓的软尾递归优化。这从RecurExp的实现也可以看出来:

   //覆盖变量
  for (int i = loopLocals.count() - 1; i >= 0; i--) {
                LocalBinding lb 
= (LocalBinding) loopLocals.nth(i);
                Class primc 
= lb.getPrimitiveType();
                
if (primc != null) {
                    gen.visitVarInsn(Type.getType(primc).getOpcode(Opcodes.ISTORE), lb.idx);
                }
                
else {
                    gen.visitVarInsn(OBJECT_TYPE.getOpcode(Opcodes.ISTORE), lb.idx);
                }
            }
   
//执行跳转
   gen.goTo(loopLabel);
 
      recur分析完了,最后有兴趣可以看下recur-fibo的invoke字节码
 1  L0
 2     LINENUMBER 1 L0
 3     ACONST_NULL
 4     ASTORE 2
 5     NEW user$recur_fibo__4346$fib__4348
 6     DUP
 7     INVOKESPECIAL user$recur_fibo__4346$fib__4348.<init> ()V

 8     ASTORE 2
 9     ALOAD 2
10     CHECKCAST user$recur_fibo__4346$fib__4348
11     POP
12    L1
13    L2
14     LINENUMBER 7 L2
15     ALOAD 2
16     CHECKCAST clojure/lang/IFn
17     GETSTATIC user$recur_fibo__4346.const__2 : Ljava/lang/Object;
18     GETSTATIC user$recur_fibo__4346.const__3 : Ljava/lang/Object;
19     ALOAD 1
20     ACONST_NULL
21     ASTORE 1
22     ACONST_NULL
23     ASTORE 2
24     INVOKEINTERFACE clojure/lang/IFn.invoke (Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
25    L3
26     LOCALVARIABLE fib Ljava/lang/Object; L1 L3 2
27    L4
28     LOCALVARIABLE this Ljava/lang/Object; L0 L4 0
29     LOCALVARIABLE n Ljava/lang/Object; L0 L4 1
30     ARETURN

     5——7行,实例化一个内部的fib函数。
     24行,调用fib对象的invoke方法,传入3个初始参数。

     简单来说,recur-fibo生成的对象里只是new了一个fib生成的对象,然后调用它的invoke方法,这也揭示了Clojure的内部函数的实现机制。
      

posted @ 2010-07-11 04:20 dennis 阅读(4253) | 评论 (0)编辑 收藏

   托尔多正式宣布退役了,下个赛季他将会留在国际米兰做青训方面的工作,他在2000年欧洲杯上的经典表现将永载史册。他为国际米兰效力了十年,这也是我看国米比赛的十年。



posted @ 2010-07-10 00:10 dennis 阅读(721) | 评论 (0)编辑 收藏

仅列出标题
共56页: First 上一页 7 8 9 10 11 12 13 14 15 下一页 Last