移植jQuery deferred到java,基于java的promise编程模型
很多语言都支持promise编程模型,像是scala中promise类和jquery(javascript)中的deferred对象等,在java中好像缺少相关实现。笔者不得以,只能自己动手弄了一个。最后选择将jquery中的deferred对象移植到java中来的方案。目前已经应用在企业级项目的高性能服务器和android客户端等项目中。
Promise编程模型的概念这里也不再赘述,大家自己上网查找即可。这种编程模型主要解决的问题就是“同步调用变异步的问题”,通常解决异步调用的方式是使用“回调”。但普通回调的使用在代码书写,返回值传递和“异步方法编排上”非常的不方便。所以才会有Promise模型的诞生。
这次会介绍java版的deferred对象的使用方法,以及用jquery版之间的变化和改进。目前开放的版本是基于线程池的版本,正在开发基于akka的版本。在jquery的实现中,因为javascript是单线程的,所以不用考虑线程同步的问题。在java线程池的版的deferred里,基于多线程环境做了很多测试,保证了线程安全及可靠性。
一. 基本调用形式
final Deferred def = new Deferred (App. executor);
执行某个异步调用,比如某个基于网络的异步服务
callService(new Response(){
public void onMessage(Object message){
def.resolve(message);
}
Public void onFail(Exception e){
def.reject(e);
}
});
你可以在构造Deferred 对象后的任意时候,使用def的then方法。比如
def.then(new Reply(){
public Object done(Object d) {
System.out.println("response:"+d);
return d;
}
public void fail(Object f) {
System.out.println("error:"+f);
}
});
一个经常遇到的场景是callService后将def作为参数传递到其他方法,在其他方法内部再决定def要绑定什么样的后续动作,也就是绑定什么样的then。
注意then方法的定义public Object done(Object d),在实际使用中done通常是以“处理链”的方式来使用的,即你会看到def.then().then().then()…这样的方式,每一个then的done方法接收的参数都是其上一个then的done方法的返回值。通常作为参数传递给某个方法的Deferred上面已经绑定了一些默认的then对象,来处理一些必要的步骤。比如对接收报文的初步解码。
注意同在Reply接口中fail方法是没有返回值的,一旦异步处理链上的某个Deferred被reject,其本身及后面所有的Deferred绑定的then都会被触发fail方法。这保证了整个业务编排上或是你精心设计的算法编排上任意一个环节,无论如何都会得到响应,这也是Promise模型关于异常的最重要的处理方式。
Promise编程模型本身是强健的,但异步服务却不是总能得到响应。在实际应用中,每一个作为计算或业务环节的Deferred都应该被定时轮询,以保证在异步服务彻底得不到响应的时候(比如你执行了一个数据库查询,但过了很长很长时间仍没有得到回应),可以给Deferred对象reject一个超时错误。
响应处理对象then中方法done和fail都是不允许抛出任何异常的,特别是done方法,如果你的算法依赖异常,请在done中加上try…catch,并将异常传换成下一个then可以理解的信息,以便这个Deferred处理链中可以正常执行下去。
二. pipe到另外一个异步处理流程上去
假如你有如下的业务场景,你需要顺序调用三个异步的webservice服务来得到最终的返回结果,其中没个webservice的入参都和上一个的异步返回结果相关。(注意,异步的webservice是调用之后,服务端立刻返回,服务端处理完成后再主动访问刚才的请求方返回结果的方式)如果将这种webservice调用封装成同步方法无疑在编程上是非常方便的,可以使用我们平常写程序时顺序的书写方式,比如
reval1 = callwebservice1(param0)
reval2 = callwebservice2(reval1)
reval3 = callwebservice3(reval2)
方便的同时却牺牲了性能。调用线程要在callwebservice方法内阻塞,以等待异步返回。这样的编程方法无法满足高性能及高并发的需要。那么有没有既能类似于平常写程序时顺序的书写方式又能满足异步无阻塞的需要呢,这就是Promise编程模型本身要解决的最大问题。
通常解决这种问题的方式是使用pipe,pipe这个方法名称的由来应该是来自于linux shell的管道符,即“|”
使用Deferred对象的解决方案类似于如下:
Deferred.resolvedDeferred(App.executor,param0).pipe(new AsyncRequest2(){
public void apply(Object param0,final Deferred newDefered) throws Exception{
asyncCallwebservice1(param0).onResponse(new Response(){
public void onMessage(String message){
newDefered.resolve(message);
}
});
}
}).pipe(new AsyncRequest2(){
public void apply(Object reval1,final Deferred newDefered) throws Exception{
asyncCallwebservice2(reval1).onResponse(new Response(){
public void onMessage(String message){
newDefered.resolve(message);
}
});
}
}).pipe(new AsyncRequest2(){
public void apply(Object reval2,final Deferred newDefered) throws Exception{
asyncCallwebservice3(reval3).onResponse(new Response(){
public void onMessage(String message){
newDefered.resolve(message);
}
});
}
}).then(new new Reply(){
public Object done(Object d) {
//在这里消费最终结果
return d;
}
public void fail(Object f) {
}
});
使用Deferred对象提供的方案好处就是,所有的调用都是异步的,上面这一连串代码立刻就会返回。所有的业务编排会按照书写顺序在线程池中的线程里被调用,你也不必担心返回值结果和参数传递过程中的线程安全问题,框架在关键位置都做了同步,也做了相当多的测试用于验证。
可以看出,对于异步方法调用而言,比较难以解决的问题是异步算法的编排问题。Deferred对象为异步算法提供了很好的解决方案。
相较于AsyncRequest2类还有一个AsyncRequest1类,接口如下:
public interface AsyncRequest1<R> {
public Deferred apply(R result) throws Exception;
}
这个类要求在在apply方法中要自己创建Deferred对象。
三. 一些小改进
相较于传统promise编程模型,在java多线程环境下做了一些小升级。这里主要介绍synchronize方法
Synchronize方法签名如下:
Deferred synchronize(ExecutorService executor,Deferred... deferreds)
实际上,synchronize方法将众多的Deferred对象的完成状态同归集到一个唯一的Deferred对象上去,即如果所有的Deferred对象参数都resolved了,作为最终结果的Deferred也resolve,如果众多的Deferred对象参数有一个reject了,最终的那个Deferred也会立即reject(其他参数的状态都舍弃)。
这个方法一般用于多个并行流程最终状态的“归并”中。
除了synchronize,框架还提供一些传统promise编程模型没有的改进,比如pipe4fail和source等。
四.在android项目中的应用
(略)
https://github.com/jonenine/javaDeferred
https://github.com/jonenine/HST
虽然大数据的发展已经将近10个年头了,hadoop技术仍然没有过时,特别是一些低成本,入门级的小项目,使用hadoop还是蛮不错的。而且,也不是每一个公司都有能力招聘和培养自己的spark人才。
我本人对于hadoop mapreduce是有一些意见的。hadoop mapreduce技术对于开发人员的友好度不高,程序难写,调试困难,对于复杂的业务逻辑远没有spark得心应手。
2016年的春节前接到一个任务,要在一个没有spark的平台实现电力系统的一些统计分析算法,可选的技术只有hadoop mapreduce。受了这个刺激之后产生了一些奇思妙想,然后做了一些试验,并最终形成HST---hadoop simplize toolkit,还真是无心载柳柳成荫啊。
HST基本优点如下:
屏蔽了hadoop数据类型,取消了driver,将mapper和reducer转化为transformer和joiner,业务逻辑更接近sql。相当程度的减少了代码量,极大的降低了大数据编程的门槛,让基层程序员通过简单的学习即可掌握大数据的开发。
克服了hadoop mapreduce数据源单一的情况,比如在一个job内,input可以同时读文件和来自不同集群的hbase。
远程日志系统,让mapper和reducer的日志集中到driver的控制台,极大减轻了并行多进程程序的调试难度。
克服了hadoop mapreduce编写业务逻辑时,不容易区分数据来自哪个数据源的困难。接近了spark(或者sql)的水平。
天生的多线程执行,即在mapper和reducer端都默认使用多线程来执行业务逻辑。
对于多次迭代的任务,相连的两个任务可以建立关联,下一个任务直接引用上一个任务的结果,使多次迭代任务的代码结构变得清晰优美。
以下会逐条说明
基本概念的小变化:
Source类代替了hadoop Input体系(format,split和reader)
Transformer代替了mapper
Joiner代替了Reducer
去掉了饱受诟病的Driver,改为内置的实现,现在完全不用操心了。
1. 基本上,屏蔽了hadoop的数据类型,使用纯java类型
在原生的hadoop mapreduce开发中,使用org.apache.hadoop.io包下的各种hadoop数据类型,比如hadoop的Text类型,算法的编写中一些转换非常不方便。而在HST中一律使用java基本类型,完全屏蔽了hadoop类型体系。
比如在hbase作为source(Input)的时候,再也不用直接使用ImmutableBytesWritable和Result了,HST为你做了自动的转换。
现在的mapper(改名叫Transformer了)风格是这样的
public static class TransformerForHBase0 extends HBaseTransformer<Long>
…
现在map方法叫flatmap,看到没,已经帮你自动转成了string和map
public void flatMap(String key, Map<String, String> row,
Collector<Long> collector)
可阅读xs.hadoop.iterated.IteratedUtil类中关于类型自动转换的部分
2. 克服了hadoop mapreduce数据源单一的情况。比如在一个job内,数据源同时读文件和hbase,这在原生的hadoop mapreduce是不可能做到的
以前访问hbase,需要使用org.apache.hadoop.hbase.client.Scan和TableMapReduceUtil,现在完全改为与spark相似的方式。
现在的风格是这样的:
Configuration conf0 = HBaseConfiguration.create();
conf0.set("hbase.zookeeper.property.clientPort", "2181");
conf0.set("hbase.zookeeper.quorum", "172.16.144.132,172.16.144.134,172.16.144.136");
conf0.set(TableInputFormat.INPUT_TABLE,"APPLICATION_JOBS");
conf0.set(TableInputFormat.SCAN_COLUMN_FAMILY,"cf");
conf0.set(TableInputFormat.SCAN_CACHEBLOCKS,"false");
conf0.set(TableInputFormat.SCAN_BATCHSIZE,"20000");
...其他hbase的Configuration,可以来自不同集群。
IteratedJob<Long> iJob = scheduler.createJob("testJob")
.from(Source.hBase(conf0), TransformerForHBase0.class)
.from(Source.hBase(conf1), TransformerForHBase1.class)
.from(Source.textFile("file:///home/cdh/0.txt"),Transformer0.class)
.join(JoinerHBase.class)
Hadoop中的input,现在完全由source类来代替。通过内置的机制转化为inputformat,inputsplit和reader。在HST的框架下,其实可以很容易的写出诸如Source.dbms(),Source.kafka()以及Source.redis()方法。想想吧,在一个hadoop job中,你终于可以将任意数据源,例如来自不同集群的HBASE和来自数据库的source进行join了,这是多么happy的事情啊!
3. 远程日志系统。让mapper和reducer的日志集中在driver进行显示,极大减轻了了并行多进程程序的调试难度
各位都体验过,job fail后到控制台页面,甚至ssh到计算节点去查看日志的痛苦了吧。对,hadoop原生的开发,调试很痛苦的呢!
现在好了,有远程日志系统,可以在调试时将mapper和reducer的日志集中在driver上,错误和各种counter也会自动发送到driver上,并实时显示在你的控制台上。如果在eclipse中调试程序,就可以实现点击console中的错误,直接跳到错误代码行的功能喽!
Ps:有人可能会问,如何在集群外使用eclipse调试一个job,却可以以集群方式运行呢?这里不再赘述了,网上有很多答案的哦
4. 克服了hadoop mapreduce在join上,区分数据来自哪个数据源的困难,接近spark(或者sql)的水平
在上面给出示例中,大家都看到了,现在的mapper可以绑定input喽!,也就是每个input都有自己独立的mapper。正因为此,现在的input和mapper改名叫Source和Transformer。
那么,大家又要问了,在mapper中,我已经可以轻松根据不同的数据输入写出不同的mapper了,那reducer中怎么办,spark和sql都是很容易实现的哦?比如看人家sql
Select a.id,b.name from A a,B b where a.id = b.id
多么轻松愉悦啊!
在原生hadoop mapreduce中,在reducer中找出哪个数据对应来自哪个input可是一个令人抓狂的问题呢!
现在这个问题已经被轻松解决喽!看下面这个joiner,对应原生的reducer
public static class Joiner0 extends Joiner<Long, String, String>
…
Reduce方法改名叫join方法,是不是更贴近sql的概念呢?
public void join(Long key,RowHandler handler,Collector collector) throws Exception{
List<Object> row = handler.getSingleFieldRows(0);//对应索引为0的source
List<Object> row2 = handler.getSingleFieldRows(1);//对应第二个定义的source
注意上面两句,可以按照数据源定义的索引来取出来自不同数据源join后的数据了,以后有时间可能会改成按照别名来取出,大家看源码的时候,会发现别名这个部分的接口都写好了,要不你来帮助实现了吧。
5. 天生的多线程执行,即在mapper和reducer端都默认使用多线程来执行业务逻辑。
看看源码吧,HST框架是并发调用flatMap和join方法的,同时又不能改变系统调用reduce方法的顺序(否则hadoop的辛苦排序可就白瞎了),这可不是一件容易的事呢!
看到这里,有的同学说了。你这个HST好是好,但你搞的自动转换类型这个机制可能会把性能拉下来的。这个吗,不得不承认,可能是会有一点影响。但在生产环境做的比对可以证明,影响太小了,基本忽略不计。
笔者在生产环境做了做了多次试验,mapper改成多线程后性能并未有提高,特别是对一些业务简单的job,增加Transformer中的并发级别效率可能还会下降。
很多同学喜欢在mapper中做所谓“mapper端的join”。这种方式,相信在HST中通过提高mapper的并发级别后会有更好的表现。
Reducer中的性能相对原生提升的空间还是蛮大的。大部分的mapreduce项目,都是mapper简单而reducer复杂,HST采用并发执行join的方式对提升reducer性能是超好的。
6. 对于多次迭代的任务,相连的两个任务可以建立关联,在流程上的下一个job直接引用上一个job的结果,使多次迭代任务的代码结构变得清晰优美
虽然在最后才提到这一点,但这却是我一开始想要写HST原因。多次迭代的任务太麻烦了,上一个任务要写在hdfs做存储,下一个任务再取出使用,麻烦不麻烦。如果都由程序自动完成,岂不美哉!
在上一个任务里format一下
IteratedJob<Long> iJob = scheduler.createJob("testJob")
...//各种source定义
.format("f1","f2")
在第二个任务中,直接引用
IteratedJob<Long> stage2Job = scheduler.createJob("stage2Job")
.fromPrevious(iJob, Transformer2_0.class);
//Transformer2_0.class
public static class Transformer2_0 extends PreviousResultTransformer<Long>
...
public void flatMap(Long inputKey, String[] inputValues,Collector<Long> collector) {
String f1 = getFiledValue(inputValues, "f1");
String f2 = getFiledValue(inputValues, "f2");
看到没,就是这么简单。
在最开始的计划中,我还设计了使用redis队列来缓冲前面job的结果,供后面的job作为输入。这样本来必须严格串行的job可以在一定程度上并发。另外还设计了子任务的并发调度,这都留给以后去实现吧。
7. 便捷的自定义参数传递。
有时候,在业务中需要作一些“开关变量”,在运行时动态传入不同的值以实现不同的业务逻辑。这个问题HST框架其实也为你考虑到了。
Driver中的自定义参数,source中的自定义参数都会以内置的方式传到transformer或joiner中去,方便程序员书写业务。
查看transformer或joiner的源码就会发现:
getSourceParam(name)和getDriverParam(pIndex)方法,在计算节点轻松的得到在driver和source中设置的各层次级别的自定义参数,爽吧!
8. 其他工具
HST提供的方便还不止以上这些,比如在工具类中还提供了两行数据(map类型)直接join的方法。这些都留给你自己去发现并实践吧!
https://github.com/jonenine/HST
利用动态类加载技术调式ECLIPSE插件
eclipse平台提供runtime方式调试插件和RCP项目,但随着插件项目越写约复杂,启动时间也越来越长,特别是集成了诸如 Hibernate和Spring之类的容器级框架的时候。仅仅为了调试代码中一些琐碎的片段而频繁的重启项目实在是一件异常烦人的工作。
即使重启了项目也许还没完。为了使项目处于某个特定的状态下以方便测试,每次都要重新操作一遍前面业务流程,这同样是十分令人厌倦的。
eclispe使用了OGSI作为微内核,引入了一些动态特性。但是OSGI的动态特性是在保持平台运行的情况下动态更新Bundle,也就是说需要重启插件才能完成动态加载的过程。有没有一些更加细粒度的动态载入方案呢?
在Tomcat下开发过web项目的人都知道,使用调试模式来部署项目即”热部署”可以实现动态载入class文件,让程序员得以动态调试项目。今天向大家提供的这个jar包使得这种效果可以在eclipse runtime上实现。
这是我在自己的插件平台项目——SCOOP框架中使用的几个包。
它可以非常好的解决动态类载入的问题,包括内部类的动态载入都可以很好解决。
其他的几个包还进行了以下尝试
1. 使用元数据标注的办法解决SWT UI 线程的种种问题
2. 还提供了eclipse流程框架的简单实现,以规范插件开发。特别是提出了一个面向业务而不是面向技术的工作流概念,使得编码粒度变大,并得以提高效率。另外这个简单的流程框架还将前面的两种机制很好的结合起来,并且可以和eclipse平台的一些复杂机制解耦,为复杂流程的开发测试提供了方便。为将来实现自定义脚本语言(比如某个类似于BPEL的工作流语言)开发eclise插件项目甚至使用图形化的开发奠定了一定的基础。
我给出了一个完整的示例——JAXB插件。很多框架同jaxb一样提供了code generation工具,可以在这个例子的基础上经过简单修改为这些框架提供插件,比如CXF插件、AXIS2插件等等。
动态类载入编码原则
使用动态类载入机制来进行调试在编码上有一定限制。
首先是要进行动态载入的实例不要在非动态域进行引用。只有这样,当一个流程结束时此实例才会在jvm中得以释放。当然,非要在其他地方进行引用从而长久的在运行时保持这个实例也是有解决方案的(可以使用代理类技术来实现,具体解决办法不在本文之内)。
其次是进行接口同实现类的分离或父类同子类的分离,以隔离不同的class load scope。接口由父类载入器载入,不同的实现(比如修改后的实现)由不同的子类载入器载入。使得最终同一个类型由同一个类载入器载入,这样才能符合jvm的类载入规范。在父类载入域的父类型的方法的参数类型及返回值类型也不能在动态域中。
最后,那些注册在扩展点上的类如ActionDelegate和WorkbenchPart等是不能够动态载入的,他们必须由eclipse提供的类载入器载入(平台会自动载入并管理其生命周期)。如果需要让这些类也动态载入,就需要在平台提供的动态注册机制基础上使用代理或像EJB2.0一样使用侵入式编译来实现代理机制,这个话题同样不在本文之内。
下面就举例说明使用方法
因为时间有限我无法详尽的完成此文,请感兴趣的朋友自己阅读示例源码。
1. 几种动态类载入的办法
(1) 使用手动编码进行类载入
因为会使二次开发人员产生迷惑,故未举例
(2) 在调试时使用spring文件配置动态类载入域
Spring配置文件同样也是动态的,而且会使调试开发工作变得更加清晰,推荐使用
(3) 使用Flow框架来进行动态类载入
2. 使用元数据标注解决UI线程访问的问题
使用三个元数据及动态代理类解决SWT及Eclipse的线程问题。
3. 同eclipse内部机制解耦以方便开发和测试
开发插件项目很多时候需要实现Eclipse内部的一些回调接口来实现功能,这对程序员的水平是一种考验,也使得插件开发工作更加复杂化。比如在实现progressMonitor的时候,就需要实现它的回调接口,将业务逻辑放置在其指定的回调接口——runable接口来实现,这是非常不方便的。我们需要是一种可以提供功能的工具类,像调用一个普通javaBean一样来调用它,而不是将我们的业务代码变形撕碎去融入到Eclipse的种种机制中去。
这样做的另外一个坏处就是很难进行单元测试,比如脱离eclipse平台,使用一些mock类来进行简单有效的单元测试。
我在这方面也进行了一些尝试——”反客为主”,将必须遵守eclipse的回调要求变为必须遵守业务开发简单快捷的要求。同样是在progressMonitor上面,使用工厂类来创建delegate,然后可以在回调机制的外部向调用javaBean一样来使用平台给我们提供的这个功能。
这种尝试是有一定难度的,要使用到不同的设计模式,处理各种线程问题。更重要的一点是eclipse平台本身有这样的潜力,它也在等待着我们这样做。
4. 使用Flow规范插件项目开发
我的scoop项目最终搁浅,到最后我发现实现它已经超出了我当时的能力。我原本是想开发一个统一的插件开发及部署平台。这样很多中小软件企业,特别是像我原来工作的那家公司就可以拥有自己的eclipse插件集,以适应自己特殊的要求。我还想提供一套面向插件开发业务的接口,而不再面向技术也屏蔽各种技术细节,使得可以非常方便的扩展、修改甚至移植插件。我只是心里有想念就去实现而已,当最终认识到它有多么困难的时候不得不放弃了。最后虽然失败了,但并不觉得气馁。因为知道了要努力的方向,同时也积累了丰富的经验。最后就将这个jar包命名为SCOOP已示纪念。
演示文档及框架JAR包如下
/Files/jonenine/Eclipse_Dynamic_Classload.rar