Ruby Fiber指南(一)基础
Ruby Fiber指南(二)参数传递
Ruby Fiber指南(三)过滤器
Ruby Fiber指南(四)迭代器
Ruby Actor指南(五)实现Actor
在学习了Fiber的基础知识之后,可以尝试用Fiber去做一些比较有趣的事情。这一节将讲述如何使用Fiber来实现类似unix系统中的管道功能。在unix系统中,可以通过管道将多个命令组合起来做一些强大的功能,最常用的例如查找所有的java进程:
ps aux|grep java
通过组合ps和grep命令来实现,ps的输出作为grep的输入,如果有更多的命令就形成了一条过滤链。过滤器本质上还是生产者和消费者模型,前一个过滤器产生结果,后一个过滤器消费这个结果并产生新的结果给下一个过滤器消费。因此我们就从最简单的生产者消费者模型实现说起。
我们要展示的这个例子场景是这样:生产者从标准输入读入用户输入并发送给消费者,消费者打印这个输入,整个程序是由消费者驱动的,消费者唤醒生存者去读用户输入,生产者读到输入后让出执行权给消费者去打印,整个过程通过生产者和消费者的协作完成。
生产者发送是通过yield返回用户输入给消费者(还记的上一节吗?):
def send(x)
Fiber.yield(x)
end
而消费者的接收则是通过唤醒生产者去生产:
def receive(prod)
prod.resume
end
生产者是一个Fiber,它的任务就是等待用户输入并发送结果给消费者:
def producer()
Fiber.new do
while true
x=readline.chomp
send(x)
end
end
end
消费者负责驱动生产者,并且在接收到结果的时候打印,消费者是root fiber:
def consumer(producer)
while true
x=receive(producer)
break if x=='quit'
puts x
end
end
最终的调用如下:
consumer(producer())
完整的程序如下:
#生产者消费者
require 'fiber'
def send(x)
Fiber.yield(x)
end
def receive(prod)
prod.resume
end
def producer()
Fiber.new do
while true
x=readline.chomp
send(x)
end
end
end
def consumer(producer)
while true
x=receive(producer)
break if x=='quit'
puts x
end
end
if $0==__FILE__
consumer(producer())
end
读者可以尝试在ruby1.9下运行这个程序,每次程序都由消费者驱动生产者去等待用户输入,用户输入任何东西之后回车,生产者开始运行并将读到的结果发送给消费者并让出执行权(通过yield),消费者在接收到yield返回的结果后打印这个结果,因此整个交互过程是一个echo的例子。
最终的调用consumer(producer())已经有过滤器的影子在了,如果我们希望在producer和consumer之间插入其他过程对用户的输入做处理,也就是安插过滤器,那么新的过滤器也将作为fiber存在,新的fiber消费producer的输出,并输出新的结果给消费者,例如我们希望将用户的输入结果加上行号再打印,那么就插入一个称为filter的fiber:
def filter(prod)
return Fiber.new do
line=1
while true
value=receive(prod)
value=sprintf("%5d %s",line,value)
send(value)
line=line.succ
end
end
end
最终组合的调用如下:
consumer(filter(producer()))
类似unix系统那样,简单的加入新的fiber组合起来就可以为打印结果添加行号。
类似consumer(filter(producer()))的调用方式尽管已经很直观,但是我们还是希望能像unix系统那样调用,也就是通过竖线作为管道操作符:
producer | filter | consumer
这样的调用方式更将透明直观,清楚地表明整个过滤器链的运行过程。幸运的是在Ruby中支持对|方法符的重载,因此要实现这样的操作符并非难事,只要对Fiber做一层封装即可,下面给出的代码来自《Programming
ruby》的作者Dave Thomas的
blog:
class PipelineElement
attr_accessor :source
def initialize
@fiber_delegate = Fiber.new do
process
end
end
def |(other)
other.source = self
other
end
def resume
@fiber_delegate.resume
end
def process
while value = input
handle_value(value)
end
end
def handle_value(value)
output(value)
end
def input
@source.resume
end
def output(value)
Fiber.yield(value)
end
end
这段代码非常巧妙,将Fiber和Ruby的功能展示的淋漓尽致。大致解说下,PipelineElement作为任何一个过滤器的父类,其中封装了一个fiber,这个fiber默认执行process,在process方法中可以看到上面生产者和消费者例子的影子,input类似receive方法调用前置过滤器(source),output则将本过滤器处理的结果作为参数传递给yield并让出执行权,让这个过滤器的调用者(也就是后续过滤器)得到结果并继续处理。PipelineElement实现了“|”方法,用于组合过滤器,将下一个过滤器的前置过滤器设置为本过滤器,并返回下一个过滤器。整个过滤链的驱动者是最后一个过滤器。
有了这个封装,那么上面生产者消费者的例子可以改写为:
class Producer < PipelineElement
def process
while true
value=readline.chomp
handle_value(value)
end
end
end
class Filter < PipelineElement
def initialize
@line=1
super()
end
def handle_value(value)
value=sprintf("%5d %s",@line,value)
output(value)
@line=@line.succ
end
end
class Consumer < PipelineElement
def handle_value(value)
puts value
end
end
现在的调用方式可以跟unix的管道很像了:
producer=Producer.new
filter=Filter.new
consumer=Consumer.new
pipeline = producer | filter | consumer
pipeline.resume
如果你打印pipeline对象,你将看到一条清晰的过滤链,
#<Consumer:0x8f08bf4 @fiber_delegate=#<Fiber:0x8f08a88>, @source=#<Filter:0x8f08db4 @line=1, @fiber_delegate=#<Fiber:0x8f08d60>, @source=#<Producer:0x8f09054 @fiber_delegate=#<Fiber:0x8f09038>>>>