在用户修改了领域对象的值后,我们有时需要记录下用户的改动。比如对一些关键业务对象的改动有时往往需要发邮件通知客户。有时用户可能想查阅所有历史的改动,甚至有可能会改回原先的值。
领域逻辑关系往往比较复杂,这时我们会使用到ORM Framework。本文以toplink为例,讲述如何利用toplink编写一个完成此功能的简易Framework,我们暂且把它称为ActionMemed。
我们先来看一下大体的流程:
l 我们获得用户修改信息通常有两种方式,一种是被动的监听,另一种主动的通知。被动的监听就是framework订阅所关心领域对象的修改,主动通知是application主动的将修改之前和之后的对象通知framework。
l Framework接着从整个对象的树结构中找出用户所关心的某个特定的字段或者字段的组合,生成actionRecord。ActionRecord是描述用户对领域对象修改的数据结构,会包括用户修改的原因,修改者,修改的时间,修改的字段或者组合,修改前后的值等等信息。
l 在ActionRecord生成好之后,会将它记录到DB,发邮件通知用户或者通过JMS通知其他Application。
有了基本的概念后,看一下整体的结构:
Registry: 在TopLink上注册ActionListener。一旦在TopLink上检测到业务对象的改动就会调用ActionService,生成ActionRecord并调用相关的ActionRecorder。
ActionListener:TopLink的SessionListener,每次会话都会调用。我们在这里实现了preCommit方法,在UnitOfWork提交之前,捕捉用户的所有修改,并从中选取出用户所关心的对象的变动。
ActionService:当ActionListener从TopLink中获得到改动的对象,就会调用ActionService生成ActionRecord,并通知相关的Recorder,可能是Log到DB。如果用户是通过主动的方式传入新老两个对象就不需要Listener,直接调用ActionService,将新老对象或者新对象和ValueDistiller作为参数传入,
ValueDistiler:根据当前的新对象,萃取出老对象。TopLink就可以根据当前UnitOfWork中的新对象获取原始对象。方法是:
public Object getOriginalVersionOfObject(Object workingClone)
|
Expression:ActionMemed相关的配置数据,由ExpressionParser解析出来后就会cache在内存中。这个配置可以是文件,或者DB配置。只要能描述清楚就行。文件配置我们直接利用spring bean。
ActionConstructor:Listener从TopLink ChangeSet中拿到的只是有改动的对象。而我们关心的只是对象上某个Field或者它引用的某个对象的Field,比如说Employee有PhoneNumber List,PhoneNumber有个属性是areaCode,可能我们只关心areaCode值的更改,就只需要记录areaCode的更改,并且通知客户。所以我们需要根据用户配置对新老对象进行对比,比较是否有关注的属性被用户更改了。并且构建ActionRecord。比较的方法我们可以用JXpath, Xpath的表达能力很强,而且还可以自定义函数,在自定义扩展函数里用户可以对字段进行组合处理,从而生成它们自己想要记录的值。
ActionRecorder:当Action构建完成后,ActionRecorder就要将它通知客户,用JMS发给其他项目或者记录到DB。用户可以配置多个ActionRecorder。
MemedEventListener,让用户在ActionRecorder调用之前和之后做一些额外的处理。比如说用户可能在之前对Action的数据结构加入一些定制信息。
上面介绍了ActionMemed的流程和相关模块的功能。其实在使用中,特别是一次修改很多业务对象的时候,处理Action时间会有点长,况且Action的处理也并不需要实时。所以Action还需要提供异步处理的功能。
将异步调用的模块图和先前的结构图进行比较会发现有两处不同:
ServiceTask: 实现Java Runnable接口,基本实现类似于先前图中的ActionService。
ObjectCloner: 如果我们使用TopLink,在异步的情况下,用户当前的UnitOfWork(事务)会先提交,提交之后,从TopLink中萃取的旧对象会被Merge成新对象,这时我们只能提前在UnitOfWork提交之前自己根据Expression的结构深Copy一份出来。
ActionAsyncService: 为异步设计的ActionService,利用ValueDistiller从UnitOfwork获得当前对象的原始clone,构建ServiceTask,将ServiceTask提交到ThreadPool,当task被执行时,就会调用ActionService,这时的ActionService重用了同步流程中的ActonService。
几个注意点:
整个Framework的原理还是相当简单的,稍微值得注意的可能是下面几个方面。
Listener如何获取被改动的对象
TopLink会把所有改动过的对象都会被放在UnitOfWorkChangeSet中,因为在UnitOfWork提交的时候它需要将UnitOfWorkChangeSet中记下的改动提交到数据库。然后merge到session cache。所以所有改动从UnitOfWork中都是可以拿到的。
public class ActionLogPassiveAsyncListener extends AbstractActionLogAsyncListener {
private ValueDistiller distiller;
public void preCommitUnitOfWork(SessionEvent sessionevent) {
log.debug("preCommitUnitOfWork begin.");
UnitOfWork unitOfWork = (UnitOfWork) sessionevent.getSession();
if (null == unitOfWork.getUnitOfWorkChangeSet()) {
unitOfWork.setUnitOfWorkChangeSet((oracle.toplink.internal.sessions.UnitOfWorkChangeSet) unitOfWork
.getCurrentChanges());
}
UnitOfWorkChangeSet ucs = unitOfWork.getUnitOfWorkChangeSet();
if (ucs != null && ucs.getAllChangeSets() != null) {
Set finishedObjects = new HashSet();
Map<Class<?>, List<ChangedPair>> changedPairs = new HashMap<Class<?>, List<ChangedPair>>();
for (Enumeration objectChangeSetEnum = ucs.getAllChangeSets().keys(); objectChangeSetEnum
.hasMoreElements();) {
ObjectChangeSet objectChangeSet = (ObjectChangeSet) objectChangeSetEnum.nextElement();
if (objectChangeSet == null) {
continue;
}
Object clone = objectChangeSet.getUnitOfWorkClone();
if (!finishedObjects.contains(clone)) {
for (Class focusClass : this.focusClasses) {
if ((includeSubclass ? focusClass.isAssignableFrom(clone.getClass())
: clone.getClass() == focusClass)
&& (filter == null || (filter != null && !filter.isFiltered(clone, unitOfWork)))) {
finishedObjects.add(clone);
if (objectChangeSet.hasChanges()) {
List<ChangedPair> changedPairList = changedPairs.get(focusClass);
if (null == changedPairList) {
changedPairList = new ArrayList<ChangedPair>();
changedPairs.put(focusClass, changedPairList);
}
Object originalObject = this.distiller.getOriginObject(clone);
changedPairList.add(new ChangedPair(originalObject, clone));
}
}
}
}
}
if (!changedPairs.isEmpty()) {
try {
ChangedPairMap changedPairMap = this.assembleChangedPairMap(unitOfWork, changedPairs);
this.changedPairCache.set(changedPairMap);
} catch (ActionLogException e) {
if (shouldBreakIfException) {
throw new ActionLogRuntimeException(e);
}
}
}
log.debug("preCommitUnitOfWork end.");
}
}
}
|
异步状态下,ActionRecord要在Application事务提交之后生成
同步状态下,ActionRecord的生成可以Join Application的transaction,这样他们会一起成功或者失败。但是异步情况下,就会是不同的事务,两个事务之间的关系可能是有先后顺序或者互不相干。互不相干是不可能的,从业务意义上讲只有Application的改动确实生效之后ActionRecord才能生成,但是将ActionRecord放在Application事务提交成功之后生成或者提交,也会面临一个问题,就是application成功提交了,但ActionRecord的生成可能会失败。但要知道ActionRecord失败的几率远比Application提交失败的几率要小得多,application常常会因为乐观锁的问题而提交失败,但ActionRecord只可能因为DB Shutdown而丢失数据。失败后会做详细的备份,以便做恢复。那如何感知application事务是提交成功还是失败了呢?TopLink的SessionEventListener有四个有用的回调方法:PreCommit,PostCommit,PostRollback,PostRelease,用户事务提交的时候在提交之前会调用PreCommit方法,这时我们还可以从UnitOfWork中获取新老对象,我们会把老对象深clone一份出来,将他们存放在ThreadLocal中,而在PostCommit回调的实现中,我们会从ThreadLocal中取出新老对象完成ActionRecord的生成,而PostRollback就可以什么都不干了。但不管是提交成功还是提交失败Rlease方法都会被调用,UnitOfWork需要release,这里我们就会去清空ThreadLocal,以便内存即时的垃圾回收。这样说来即使是主动调用ActionAsyncService也会注册一个Listener,不同的是这个Listener不需要从UnitOfWork检测变化。
public class AbstractActionAsyncListener extends AbstractActionListener {
protected ThreadLocal<ChangedPairMap> changedPairCache = new ThreadLocal<ChangedPairMap>();
public void postCommitUnitOfWork(SessionEvent arg0) {
ChangedPairMap changedPairMap = this.changedPairCache.get();
if (changedPairMap != null) {
if (!changedPairMap.isEmpty()) {
//异步生成ActionRecord
}
}
}
public void postReleaseUnitOfWork(SessionEvent arg0) {
if (this.changedPairCache.get() != null) {
this.clearResource();
}
}
private void clearResource() {
this.changedPairCache.set(null);
}
}
|
public class ActionActiveAsyncListener extends AbstractActionAsyncListener {
private Map<Class<?>, List<ChangedPair>> changedPairs;
public void preCommitUnitOfWork(SessionEvent sessionEvent) {
UnitOfWork unitOfWork = (UnitOfWork) sessionEvent.getSession();
try {
ChangedPairMap changedPairMap = this.assembleChangedPairMap(unitOfWork, this.changedPairs);
this.changedPairCache.set(changedPairMap);
} catch (ActionLogException e) {
log.error("Assemble ChangePairMap fails! ChangedPairs: " + changedPairs.toString(), e);
if (shouldBreakIfException) {
throw new ActionRuntimeException(e);
}
}
}
}
|
ObjectCloner:
如果对象树的结构很庞大,深copy的性能代价不得不考虑。BeanUtils进行深copy的性能很差。5000个对象花了我20s。首先要说的是其实不需要所有对象引用都需要深copy,只有那些用户对关注的对象属性才需要深copy,clone的步骤大概如下:
l 对根对象进行浅copy
l 对用户关心的对象属性迭代的进行深copy
l 如果关心的对象属性是Collection,浅copy Collection中的每个对象并深copy对象中用户关注的对象属性
l 其实那些domain class,早在做ORM的时就确定下来了,所以所有domain对象反射metadata都可以事先确定,存在内存中,这样会大大提高性能,其实toplink也会把这些反射结构解析出来后缓存在内存中,直接利用toplink的clone逻辑就可以了。1000个对象深clone一把大约是120ms。