Ruby Fiber指南(一)基础
Ruby Fiber指南(二)参数传递
Ruby Fiber指南(三)过滤器
Ruby Fiber指南(四)迭代器
Ruby Actor指南(五)实现Actor
写这个指南的时候,计划是第五章写一个Fiber的应用例子,但是一时没有想到比较好的例子,模仿《Programming in Lua》中的多任务下载的例子也不合适,因为Ruby中的异步HttpClient跟lua还是很不一样的,体现不了Fiber的优点。因此,这第五节一直拖着没写。
恰巧最近在小组中做了一次Erlang的分享,有人问到Erlang调度器的实现问题,这块我没注意过,那时候就根据我对coroutine实现actor的想法做了下解释,后来思考了下那个解释是错误的,Erlang的调度器是抢占式的,而通过coroutine实现的actor调度却是非抢占的,两者还是截然不同的。我在《
Actor、Coroutine和Continuation的概念澄清》中提到coroutine可以实现actor风格,actor跟coroutine并没有必然的联系,这篇文章的目的就在于证明这一点,使用Ruby Fiber实现一个简单的actor风格的库,整个代码不到100行。后面还会谈到这个实现的缺点,以及我对Erlang调度器实现的理解。
首先是monkey patch,给Thread和Fiber类加上两个方法,分别用于获取当前线程的调度器和Fiber对应的actor:
class Thread
#得到当前线程的调度器
def __scheduler__
@internal_scheduler||=FiberActor::Scheduler.new
end
end
class Fiber
#得到当前Fiber的actor
def __actor__
@internal_actor
end
end
这里实现的actor仍然是Thread内的,一个Thread只跑一个调度器,每个actor关联一个Fiber。
让我们来想想调度器该怎么实现,调度器顾名思义就是协调actor的运行,每次挑选适当的actor并执行,可以想象调度器内部应该维护一个等待调度的actor队列,Scheduler每次从队列里取出一个actor并执行,执行完之后取下一个actor执行,不断循环持续这个过程;在没有actor可以调度的时候,调度器应该让出执行权。因此调度器本身也是一个Fiber,它内部有个queue,用于维护等待调度的actor:
module FiberActor
class Scheduler
def initialize
@queue=[]
@running=false
end
def run
return if @running
@running=true
while true
#取出队列中的actor并执行
while actor=@queue.shift
begin
actor.fiber.resume
rescue => ex
puts "actor resume error,#{ex}"
end
end
#没有任务,让出执行权
Fiber.yield
end
end
def reschedule
if @running
#已经启动,只是被挂起,那么再次执行
@fiber.resume
else
#将当前actor加入队列
self << Actor.current
end
end
def running?
@running
end
def <<(actor)
#将actor加入等待队列
@queue << actor unless @queue.last == actor
#启动调度器
unless @running
@queue << Actor.current
@fiber=Fiber.new { run }
@fiber.resume
end
end
end
end
run方法是核心的调度方法,注释说明了主要的工作流程。因为调度器可能让出执行权,因此提供了reschedule方法重新resume启动调度器。<<方法用于将等待被调度的actor加入等待队列,如果调度器没有启动,那么就启动调度Fiber。
有了调度器,Actor的实现也很简单,Actor跟Fiber是一对一的关系,Actor内部维护一个mailbox,用来存储接收到的消息。最重要的是receive原语的实现,我们这里很简单,不搞模式匹配,只是接收消息。receive的工作流程大概是这样,判断mailbox中有没有消息,有消息的话,取出消息并调用block处理,没有消息的话就yield让出执行权。
module FiberActor
class Actor
attr_accessor :fiber
#定义类方法
class << self
def scheduler
Thread.current.__scheduler__
end
def current
Fiber.current.__actor__
end
#启动一个actor
def spawn(*args,&block)
fiber=Fiber.new do
block.call(args)
end
actor=new(fiber)
fiber.instance_variable_set :@internal_actor,actor
scheduler << actor
actor
end
def receive(&block)
current.receive(&block)
end
end
def initialize(fiber)
@mailbox=[]
@fiber=fiber
end
#给actor发送消息
def << (msg)
@mailbox << msg
#加入调度队列
Actor.scheduler << self
end
def receive(&block)
#没有消息的时候,让出执行权
Fiber.yield while @mailbox.empty?
msg=@mailbox.shift
block.call(msg)
end
def alive?
@fiber.alive?
end
end
end
Actor.spawn用于启动一个actor,内部其实是创建了一个fiber并包装成actor给用户,每个actor一被创建就加入调度器的等待队列。<<方法用于向actor传递消息,传递消息后,该actor也将加入等待队列,等待被调度。
我们的简化版actor库已经写完了,可以尝试写几个例子,最简单的hello world:
include FiberActor
Actor.spawn { puts "hello world!"}
输出:
hello world!
没有问题,那么试试传递消息:
actor=Actor.spawn{
Actor.receive{ |msg| puts "receive #{msg}"}
}
actor << :test_message
输出:
receive test_message
也成了,那么试试两个actor互相传递消息,乒乓一下下:
pong=Actor.spawn do
Actor.receive do |ping|
#收到ping,返回pong
ping << :pong
end
end
ping=Actor.spawn do
#ping一下,将ping作为消息传递
pong << Actor.current
Actor.receive do |msg|
#接收到pong
puts "ping #{msg}"
end
end
#resume调度器
Actor.scheduler.reschedule
输出:
ping pong
都没有问题,这个超级简单actor基本完成了。可以看到,利用coroutine来实现actor是完全可行的,事实上我这里描述的实现基本上是
revactor这个库的实现原理。
revactor是一个ruby的actor库,它的实现就是基于Fiber,并且支持消息的模式匹配和thread之间的actor调度,有兴趣地可以去玩下。更进一步,其实采用轻量级协程来模拟actor风格早就不是新鲜主意,比如在
cn-erlounge的第四次会议上就有两个topic是关于这个,一个是51.com利用基于ucontext的实现的类erlang进程模型,一个是许世伟的CERL。可以想见,他们的基本原理跟本文所描述不会有太大差别,那么面对的问题也是一样。
采用coroutine实现actor的主要缺点如下:
1、因为是非抢占式,这就要求actor不能有阻塞操作,任何阻塞操作都需要异步化。IO可以使用异步IO,没有os原生支持的就需要利用线程池,基本上是一个重复造轮子的过程。
2、异常的隔离,某个actor的异常不能影响到调度器的运转,简单的try...catch是不够的。
3、多核的利用,调度器只能跑在一个线程上,无法充分利用多核优势。
4、效率因素,在actor数量剧增的情况下,简单的FIFO的调度策略效率是个瓶颈,尽管coroutine的切换已经非常高效。
当然,上面提到的这些问题并非无法解决,例如可以使用多线程多个调度器,类似erlang smp那样来解决单个调度器的问题。但是如调度效率这样的问题是很难解决的。相反,erlang的actor实现就不是通过coroutine,而是自己实现一套类似os的调度程序。
首先明确一点,Erlang的process的调度是抢占式的,而非couroutine的协作式的。其次,Erlang早期版本是只有一个调度器,运行在一个线程上,随着erts的发展,现在erlang的调度器已经支持smp,每个cpu关联一个调度器,并且可以明确指定哪个调度器绑定到哪个cpu上。第三,Erlang的调度也是采用优先队列+时间片轮询的方式,每个调度器关联一个
ErtsRunQueue,ErtsRunQueue内部又分为三个ErtsRunPrioQueue队列,分别对应high,max和normal,low的优先级,其中normal和low共用一个队列;在Erlang中时间片是以reduction为单位,你可以将reduction理解成一次函数调用,每个被调度的process能执行的reduction次数是有限的。调度器每次都是从max队列开始寻找等待调度的process并执行,当前调度的队列如果为空或者执行的reductions超过限制,那么就降低优先级,调度下一个队列。
从上面的描述可以看出,Erlang优秀的地方不仅在于actor风格的轻量级process,另一个强悍的地方就是它的类os的调度器,再加上OTP库的完美支持,这不是一般方案能山寨的。