庄周梦蝶

生活、程序、未来
   :: 首页 ::  ::  :: 聚合  :: 管理
   
    Ruby Fiber指南(一)基础
    Ruby Fiber指南(二)参数传递
    Ruby Fiber指南(三)过滤器
    Ruby Fiber指南(四)迭代器
    Ruby Actor指南(五)实现Actor

    写这个指南的时候,计划是第五章写一个Fiber的应用例子,但是一时没有想到比较好的例子,模仿《Programming in Lua》中的多任务下载的例子也不合适,因为Ruby中的异步HttpClient跟lua还是很不一样的,体现不了Fiber的优点。因此,这第五节一直拖着没写。
    恰巧最近在小组中做了一次Erlang的分享,有人问到Erlang调度器的实现问题,这块我没注意过,那时候就根据我对coroutine实现actor的想法做了下解释,后来思考了下那个解释是错误的,Erlang的调度器是抢占式的,而通过coroutine实现的actor调度却是非抢占的,两者还是截然不同的。我在《Actor、Coroutine和Continuation的概念澄清》中提到coroutine可以实现actor风格,actor跟coroutine并没有必然的联系,这篇文章的目的就在于证明这一点,使用Ruby Fiber实现一个简单的actor风格的库,整个代码不到100行。后面还会谈到这个实现的缺点,以及我对Erlang调度器实现的理解。

    首先是monkey patch,给Thread和Fiber类加上两个方法,分别用于获取当前线程的调度器和Fiber对应的actor:
class Thread
  
#得到当前线程的调度器
  def __scheduler__
    @internal_scheduler
||=FiberActor::Scheduler.new
  end
end

class Fiber
  
#得到当前Fiber的actor
  def __actor__
    @internal_actor
  end
end

     这里实现的actor仍然是Thread内的,一个Thread只跑一个调度器,每个actor关联一个Fiber。
     让我们来想想调度器该怎么实现,调度器顾名思义就是协调actor的运行,每次挑选适当的actor并执行,可以想象调度器内部应该维护一个等待调度的actor队列,Scheduler每次从队列里取出一个actor并执行,执行完之后取下一个actor执行,不断循环持续这个过程;在没有actor可以调度的时候,调度器应该让出执行权。因此调度器本身也是一个Fiber,它内部有个queue,用于维护等待调度的actor:
module FiberActor
  
class Scheduler
    
def initialize
      @queue
=[]
      @running
=false
    end

    
def run
      
return if @running
      @running
=true
      
while true
        
#取出队列中的actor并执行
        while actor=@queue.shift
          begin
            actor.fiber.resume
          rescue 
=> ex
            puts 
"actor resume error,#{ex}"
          end
        end
        
#没有任务,让出执行权
        Fiber.yield
      end
    end

    
def reschedule
      
if @running
        
#已经启动,只是被挂起,那么再次执行
        @fiber.resume
      
else
        
#将当前actor加入队列
        self << Actor.current
      end
    end

    
def running?
      @running
    end

    
def <<(actor)
      
#将actor加入等待队列
      @queue << actor unless @queue.last == actor
      
#启动调度器
      unless @running
         @queue 
<< Actor.current
         @fiber
=Fiber.new { run }
         @fiber.resume
      end
    end
  end
end

    run方法是核心的调度方法,注释说明了主要的工作流程。因为调度器可能让出执行权,因此提供了reschedule方法重新resume启动调度器。<<方法用于将等待被调度的actor加入等待队列,如果调度器没有启动,那么就启动调度Fiber。

    有了调度器,Actor的实现也很简单,Actor跟Fiber是一对一的关系,Actor内部维护一个mailbox,用来存储接收到的消息。最重要的是receive原语的实现,我们这里很简单,不搞模式匹配,只是接收消息。receive的工作流程大概是这样,判断mailbox中有没有消息,有消息的话,取出消息并调用block处理,没有消息的话就yield让出执行权。

module FiberActor  
  
class Actor
    attr_accessor :fiber
    
#定义类方法
    class << self
      
def scheduler
        Thread.current.
__scheduler__
      end

      
def current
        Fiber.current.
__actor__
      end

      
#启动一个actor
      def spawn(*args,&block)
        fiber
=Fiber.new do
           block.call(args)
        end
        actor
=new(fiber)
        fiber.instance_variable_set :@internal_actor,actor
        scheduler 
<< actor
        actor
      end

      
def receive(&block)
        current.receive(
&block)
      end
    end

    
def initialize(fiber)
       @mailbox
=[]
       @fiber
=fiber
    end

    
#给actor发送消息
    def << (msg)
      @mailbox 
<< msg
      
#加入调度队列
      Actor.scheduler << self
    end

    
def receive(&block)
      
#没有消息的时候,让出执行权
      Fiber.yield while @mailbox.empty?
      msg
=@mailbox.shift
      block.call(msg)
    end

    
def alive?
      @fiber.alive?
    end
  end

end

    Actor.spawn用于启动一个actor,内部其实是创建了一个fiber并包装成actor给用户,每个actor一被创建就加入调度器的等待队列。<<方法用于向actor传递消息,传递消息后,该actor也将加入等待队列,等待被调度。

    我们的简化版actor库已经写完了,可以尝试写几个例子,最简单的hello world:
include FiberActor

Actor.spawn { puts 
"hello world!"}
     输出:
hello world!

    没有问题,那么试试传递消息:
actor=Actor.spawn{
   Actor.receive{ 
|msg|  puts "receive #{msg}"}
}
actor 
<< :test_message
    输出:
receive test_message
    
    也成了,那么试试两个actor互相传递消息,乒乓一下下:
pong=Actor.spawn do
      Actor.receive do 
|ping|
        
#收到ping,返回pong
        ping << :pong
      end
    end
ping
=Actor.spawn do
      
#ping一下,将ping作为消息传递
      pong << Actor.current
      Actor.receive do 
|msg|
        
#接收到pong
        puts "ping #{msg}"
      end
    end
#resume调度器
Actor.scheduler.reschedule

     输出:
ping pong
    
     都没有问题,这个超级简单actor基本完成了。可以看到,利用coroutine来实现actor是完全可行的,事实上我这里描述的实现基本上是revactor这个库的实现原理。revactor是一个ruby的actor库,它的实现就是基于Fiber,并且支持消息的模式匹配和thread之间的actor调度,有兴趣地可以去玩下。更进一步,其实采用轻量级协程来模拟actor风格早就不是新鲜主意,比如在cn-erlounge的第四次会议上就有两个topic是关于这个,一个是51.com利用基于ucontext的实现的类erlang进程模型,一个是许世伟的CERL。可以想见,他们的基本原理跟本文所描述不会有太大差别,那么面对的问题也是一样。

     采用coroutine实现actor的主要缺点如下:
1、因为是非抢占式,这就要求actor不能有阻塞操作,任何阻塞操作都需要异步化。IO可以使用异步IO,没有os原生支持的就需要利用线程池,基本上是一个重复造轮子的过程。
2、异常的隔离,某个actor的异常不能影响到调度器的运转,简单的try...catch是不够的。
3、多核的利用,调度器只能跑在一个线程上,无法充分利用多核优势。
4、效率因素,在actor数量剧增的情况下,简单的FIFO的调度策略效率是个瓶颈,尽管coroutine的切换已经非常高效。

    当然,上面提到的这些问题并非无法解决,例如可以使用多线程多个调度器,类似erlang smp那样来解决单个调度器的问题。但是如调度效率这样的问题是很难解决的。相反,erlang的actor实现就不是通过coroutine,而是自己实现一套类似os的调度程序。
    首先明确一点,Erlang的process的调度是抢占式的,而非couroutine的协作式的。其次,Erlang早期版本是只有一个调度器,运行在一个线程上,随着erts的发展,现在erlang的调度器已经支持smp,每个cpu关联一个调度器,并且可以明确指定哪个调度器绑定到哪个cpu上。第三,Erlang的调度也是采用优先队列+时间片轮询的方式,每个调度器关联一个ErtsRunQueueErtsRunQueue内部又分为三个ErtsRunPrioQueue队列,分别对应high,max和normal,low的优先级,其中normal和low共用一个队列;在Erlang中时间片是以reduction为单位,你可以将reduction理解成一次函数调用,每个被调度的process能执行的reduction次数是有限的。调度器每次都是从max队列开始寻找等待调度的process并执行,当前调度的队列如果为空或者执行的reductions超过限制,那么就降低优先级,调度下一个队列。

   从上面的描述可以看出,Erlang优秀的地方不仅在于actor风格的轻量级process,另一个强悍的地方就是它的类os的调度器,再加上OTP库的完美支持,这不是一般方案能山寨的。
    
    
  

评论

# re: Ruby Fiber指南(五): 实现Actor,兼谈Erlang的process调度  回复  更多评论   

2013-11-04 02:39 by 我傻逼我自豪
协程=>流程控制
actor=>对象抽象

只有注册用户登录后才能发表评论。


网站导航: