同步、异步、周期以及事务地执行工作 
David Currie 将继续他的由三部分组成的系列文章,介绍 Java 2 企业版(J2EE)连接器架构(JCA)最新版本中的增强和变化。在本文中,他将介绍新的 JCA 1.5 工作管理合约,该合约允许资源适配器利用应用服务器的某些功能来调度和处理工作。在 JCA 的另一个增强 —— 事务流入 —— 的支持下,企业信息系统可以在自己的事务中执行这项工作。

JCA 1.5 是 J2EE 连接器体系结构的最新版本,它包含许多重要的改进和一些重要增强。本文是介绍这些变化的由三部分组成的系列文章的第 2 部分,建立在介绍的生命周期管理特性的第 1 部分的基础上,本文介绍的是 JCA 新的工作管理合约。该合约允许资源适配器为延迟执行或定期执行的工作创建计时器,并允许计时器使用应用服务器的线程同步或异步执行处理。本文将描述事务流入支持如何在资源适配器导入到服务器的事务中进行这种处理,以及资源适配器随后如何控制事务的完成。

如果想在现有资源适配器中使用这项功能,或者正在考虑编写新的 JCA 1.5 资源适配器,那么本文是一份必不可少的读物。如果要编写使用资源适配器的应用程序,想了解更多幕后的情况,那么本文也会吸引您。

让工作完成
在本系列的第 1 部分中,我们介绍了 ResourceAdapter 接口,它提供了一种机制,能够在应用服务器内部为资源适配器提供生命周期。您可能还记得 start 方法被用来传递一个叫做 BootstrapContext 的对象。上次我们介绍了这个对象,但是 BootstrapContext 接口的三个方法才是工作管理和事务流入合约的关键,如清单 1 所示:

清单 1. 用来得到三个工具对象的 BootstrapContext 接口
public interface BootstrapContext {

    WorkManager getWorkManager();
    XATerminator getXATerminator();
    Timer createTimer() throws UnavailableException;

}

WorkManager 允许资源适配器对工作进行调度,在应用服务器线程上同步或异步执行调度。这个工作可以在资源导入的事务中执行,在这种情况下,XATerminator 有助于完成工作。Timer 负责延迟工作或定期工作的执行。本文将更深入地研究这三个类,并说明如何使用它们。

WorkManager 接口提供了三套处理工作的方法(doWorkstartWorkscheduleWork),如清单 2 所示:

清单 2. 用来提交工作项目的WorkManager 接口
public interface WorkManager {
    
    void doWork(Work work) throws WorkException;
    void doWork(Work work, long startTimeout, ExecutionContext execContext,
            WorkListener workListener) throws WorkException;

    long startWork(Work work) throws WorkException;
    long startWork(Work work, long startTimeout, ExecutionContext execContext,
            WorkListener workListener) throws WorkException;

    void scheduleWork(Work work) throws WorkException;
    void scheduleWork(Work work, long startTimeout,
            ExecutionContext execContext, WorkListener workListener)
            throws WorkException;
    
}

每个方法接收的第一个参数,都是实现 Work 接口的对象的一个实例,如清单 3 所示:

清单 3. 资源适配器实现的 Work 接口
public interface Work extends Runnable {

    void release();
    
}

Work 接口扩展了 Runnable 接口,您应当像直接进行 Java 线程编程时所做的那样,实现run 方法中执行的工作。您很快就会看到 release 方法发挥其作用的地方。

WorkManager 上的 doWork 方法可以让一些工作同步执行、一直受阻塞或者直到某些工作完成才执行。这看起来可能不是特别有用——这不就是直接调用 run 方法时发生的事情吗?并不完全如此。首先,它让应用程序服务器说明现在不是做这项工作的恰当时候。例如,如果在 ResourceAdapterstart 方法的范围内调用 doWork,那么您可能发现它将抛出 WorkRejectedException 异常。应当尽快从这个方法返回,如果可能的话,应当把工作安排成异步处理。

第二,如果应用服务器特别繁忙,那么它可能会推迟这项工作的启动。可以用第 2 个startTimeout 参数指明资源适配器准备为工作启动等候多长时间。如果应用服务器没能在这个时间内启动工作,那么就会抛出 WorkRejectedException 异常。WorkManager 接口定义了常量 IMMEDIATEINDEFINITE,它们允许资源适配器指明自己根本不准备等候或者准备一直等候下去。

第三,正如下一节解释的,有可能让工作片断在资源适配器导入的事务上下文中执行,而不是在与当前线程关联的上下文中执行。这正是第 3 个参数的用途,该参数是一个可选的 ExecutionContext

最后,使用 doWork 方法让应用服务器对资源适配器执行的工作拥有更多控制。在关闭应用服务器时,如果资源适配器夹在一个漫长的、复杂的操作中间,那么服务器不需要等待资源适配器完成,或者被清理干净。相反,服务器可以通过调用 Work 对象的 release 方法,给资源适配器发信号。然后资源适配器应当尽快完成处理。清单 4 显示了 release 方法使用的一个示例:

清单 4. Work 对象的示例
public class ExampleWork implements Work {

    private volatile boolean _released;

    void run() {
        for (int i = 0; i < 100; i++) {
            if (_released) break;
            // Do something
        }
    }

    void release() {
        _released = true;
    }
    
}

注意清单 4 中使用的关键字 volatile。在 run 方法正进行其处理的同时,会在一个独立的线程上调用 release 方法,volatile 修饰符确保 run 方法能够看到更新的字段。

doWork 方法也可能失败,抛出一个名称古怪的 WorkCompletedException 异常。这个异常是在将 run 方法分配给一个线程时抛出的,但这个异常或者是上下文设置失败,或者是通过抛出运行时异常退出方法。doWork 方法会提供一个错误代码,表示这些故障发生的路径,还把问题原因作为链接的异常提供。

为什么等待?
您已经看到 doWork 方法允许您在调用线程阻塞的同时执行工作。但是如果您不想等待 —— 也就是说,如果您不想同步执行工作,该怎么办呢?在 Java 2 平台标准版本(J2SE)环境中,可以使用多线程实现这一目标。但是,J2EE 规范让应用程序服务器使用 Java 2 安全管理器防止应用程序离开自己的线程。如果应用服务器的这部分功能是通过 EJB 池或 servlet 池来提供并发性,那么这样做是合理的。服务器也可能想把各种形式的上下文都关联到一个线程上,因为产生新线程时,上下文可能会丢失。最后,正如我以前讨论过的,不受服务器控制的线程会造成有序关机很难实现。

虽然可以通过使用安全策略,逐个案例地克服这个限制,但是 WorkManager 上的 startWorkscheduleWork 方法可以让资源适配器异步地处理工作,同时确保应用程序服务器仍然在控制之下。startWork 方法会一直等候,直到工作片断开始执行,但不用等到工作结束。所以,如果调用器需要知道工作是否已经得以执行,但是不需要等到工作完成,那么可以使用这种方法。相比之下,只要该工作调度被接受,scheduleWork 方法就立即返回。在这种情况下,并不能确保工作真的被执行。

清单 2 中的 WorkManager 方法的第 4 个参数(WorkListener)在用于这些异步方法时最有用。清单 5 显示了这个接口:

清单 5. 接收事件通知的 WorkListener 接口
public interface WorkListener {

    void workAccepted(WorkEvent e);
    void workRejected(WorkEvent e);
    void workStarted(WorkEvent e);
    void workCompleted(WorkEvent e);
    
}

资源适配器能够有选择地传递一个监听器,当工作的项目通过接受、启动或完成这几个状态(失败的情况下则是拒绝)传递过来时,监听器会得到通知。在这个监听器已经完成其工作项目,或者出现故障要重新安排工作项目时,可以用它将通知发送给工作的发起者。WorkAdapter 类也包含在内,它提供了所有方法的默认实现,因此,子类只需覆盖自己感兴趣的方法即可。

WorkListener 的每个方法都采用 WorkEvent 对象作为参数,如清单 6 所示:

清单 6. WorkEvent 类上的附加方法
public class WorkEvent extends EventObject {

    ...

    public int getType() { ... }
    public Work getWork() { ... }
    public long getStartDuration() { ... }
    public WorkException getException() { ... }
 
}

除了通常的事件方法之外,WorkEvent 类还为这些类型的事件 (接受、拒绝、启动或完成)以及有问题的工作项目提供了存取器。这使您能够用一个(多线程的)监听器负责多个工作提交。还有一些方法可以返回启动工作所花费的时间,而且,在出现 workRejectedworkCompleted 时,返回可能发生的对应的 WorkRejectedExceptionWorkCompletedException 异常。

图 1 显示了通过 Work 对象传递的状态。

图 1. Work 对象的状态
Work 对象的状态

沿着图 1 的底部,有三种提交方法,从上到下的点线表示具体的方法返回的生命周期中的时间点。

可以推迟到明天的事为什么要现在做?
doWorkstartWorkscheduleWork 方法可以都立即向 WorkManager 提交任务。在 WorkManager 执行提交之前,可能会发生延迟,但是调用者只能控制最大延迟(用启动超时)。那么如果想让任务晚些而不是立即处理,该怎么办呢?scheduleWork 方法只允许将工作安排到另一个线程,而不是安排到时间轴上的以后某个时间点上。

这正是 BootstrapContext 接口的第 3 个方法发挥其作用的地方。createTimer 方法使资源适配器能够得到 java.util.Timer 类的实例。这个类从 1.3 版开始就已经成为标准 Java 库的一部分,如清单 7 所示:

清单 7. Timer 类的方法
public class Timer  {
 
    public void schedule(TimerTask task, long delay) { ... }
    public void schedule(TimerTask task, Date time) { ... }

    public void schedule(TimerTask task, Date firstTime, long period) { ... }
    public void schedule(TimerTask task, long delay, long period) { ... }
    public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) { ... }
    public void scheduleAtFixedRate(TimerTask task, long delay, long period) { ... }

    public void cancel() { ... }
 
}

可以用 java.util.Timer 类的前两个方法把任务安排在指定延迟之后或指定日期和时间发生。余下的 4 个调度方法还有额外的 period 参数。可以用它们对那些初次运行之后需要按照常规间隔发生的事件进行调度,使用周期参数指定时间间隔。schedulescheduleAtFixedRate 方法是不同的方法,因为这些操作的计时无法保证;像垃圾搜集这样的操作可能会造成任务推迟执行。如果任务被延迟,那么 schedule 方法在运行下一个任务之前仍然会等候一个完整的周期。但是,scheduleAtFixedRate 方法是在前一个任务 应当 运行的固定周期之后运行下一个任务。所以,如果任务之间的时间对您非常很重要,那么请使用 schedule。如果绝对时间或累积时间很重要,那么请使用 scheduleAtFixedRate

每个 schedule 方法都采用一个扩展了 TimerTask 类的对象作为自己的第一个参数。同使用 Work 接口时一样,这个类扩展了 Runnable ,而且 Timer 会在适当的时间调用 run 方法。TimerTask 类有一个 cancel 方法,可以用它取消对任务的后续调用。或者,也可以调用 Timer 上的 cancel 方法来取消目前安排的所有任务。scheduledExecutionTime 方法允许 run 方法将当前实际调用时间和它应当被调用的时间进行比较。

虽然 TimerTaskrun 方法是在新线程中调用 ,但是这个线程是 JVM 已经分配的线程,处于应用服务器的控制之外。如果您想让资源适配器进行严肃的处理,那么在这个时候,应当用 WorkManager 切换到应用服务器的线程。清单 8 显示的示例采用了这一良好实践来调度工作,从当前时间开始每分钟执行一次:

清单 8. 组合使用 TimerWorkManager
final WorkManager workManager = context.getWorkManager();
final Timer timer = context.createTimer();

timer.scheduleAtFixedRate(new TimerTask() {
    public void run() {
        workManager.scheduleWork(new ExampleWork());
    }
}, 0, 60 * 1000);

JCA WorkManager 和 Timer 的替代方案
正如前一小节提到过的,在 JCA 中,使用 Timer 有一个问题:执行 TimerTask 的线程不在应用服务器的控制之下。因为 Timer 是类而不是接口,而且线程实际是在类的构造函数中产生的, 所以应用服务器不能修改这种行为。允许应用服务器实现这一目的一个替代方案就是使用由 IBM 和 BEA 联合开发的 Timer and Work Manager for Application Servers 规范的接口(请参阅 参考资料) 。

在使用这个规范的接口时,会从 Java 名称与目录服务接口(JNDI)得到一个 TimerManager 实现。可以通过给管理器安排了一个 TimerListener 实现来返回一个 Timer (是一个commonj.timers.Timer 而不是 java.util.Timer)。在安排好的时间上会调用 TimerListener,可以用 Timer 执行诸如取消预定操作之类的操作。

顾名思义,这个规范还提供了 JCA 工作管理的一个替代。这里的接口与 JCA 的那些接口很相似,除了初始的 commmonj.work.WorkManager 是从 JNDI 得到的。这个合约提供的附加行为还包括:阻塞到一个或全部预定工作完成的能力、在远程 JVM 上执行可以序列化的工作的可能性。与 commonj Timer 一样,commonj WorkManager 的应用不限于资源适配器。任何服务器端 J2EE 组件都可以使用这项功能。

清单 9 显示了为了使用 commonj 的类而被重写的 清单 8 的示例:

清单 9. 使用 commonj 接口
final InitialContext context = new InitialContext();
final WorkManager workManager = (WorkManager) 
        context.lookup("java:comp/env/wm/MyWorkManager");
final TimerManager timerManager = (TimerManager)
	context.lookup("java:comp/env/timer/MyTimer");

timerManager.scheduleAtFixedRate(new TimerListener() {
    public void timerExpired(final Timer timer) {
        try {
            workManager.schedule(new ExampleCommonjWork());
        } catch (final WorkException exception) {
        }
    }
}, 0, 60 * 1000);

ExampleCommonjWork 类与 清单 4ExampleWork 相同,此外,它还实现了 commonj Work 接口要求的额外的 isDaemon 方法。如果工作长期存在,那么该方法应当返回 true

导入事务并完成事务
在 JCA 1.5 之前,应用服务器总是充当事务的协调者。应用程序负责器负责启动事务,而且在每个资源管理器通过 JCA 连接管理器登记了自己的 XAResource 之后,还通过一起提交或回滚所有资源来协调事务的完成。JCA 1.5 的事务流入合约允许企业信息系统(EIS)启动和完成事务,充当事务的协调者。这样 EIS 就能通过资源适配器把事务导入应用服务器,并在事务范围内在服务器上执行工作。例如,它可能使用 Supports 的容器管理器事务属性来调用消息驱动 bean(MDB)。这样,MDB 方法执行的工作,包括对其它 EJB 的调用,就会成为事务的一部分。

资源适配器通过第 3 个 ExecutionContext 参数导入事务,在 清单 2 中可以看到它被传递给 WorkManager。正如在清单 10 中可以看到的,这个类拥有设置事务 ID (XID) 和事务超时的方法:

清单 10. 传递给 WorkManagerExecutionContext
public class ExecutionContext {

    public ExecutionContext() { ... }
    public void setXid(Xid xid) { ... }
    public Xid getXid() { ... }
    public void setTransactionTimeout(long timeout)
        throws NotSupportedException { ... }
    public long getTransactionTimeout() { ... }    
    
}

XID 惟一地标识事务,它由三部分构成:格式标识符、事务界定符和分支界定符。如果 EIS 还没有事务的 XID,那么就必须根据 XA 规范(请参阅 参考资料)构建一个 XID。然后应用服务器会在调用工作对象的 run 方法之前把这个事务与执行线程关联起来。

把事务导入应用服务器之后,接下来就由资源适配器负责把属于这个事务的事件通知给服务器。具体地说,它必须把事务完成通知给应用服务器。它是通过清单 11 中的 XATerminator 接口做到这一点的,可以从 BootstrapContext 中得到它的实现:

清单 11. 用于事务完成的 XATerminator
public interface XATerminator {

    void commit(Xid xid, boolean onePhase) throws XAException;
    void forget(Xid xid) throws XAException;
    int prepare(Xid xid) throws XAException;
    Xid[] recover(int flag) throws XAException;
    void rollback(Xid xid) throws XAException;
    
}

XATerminator 接口的方法与 XAResource 上的方法对应,只是在这个例子中,资源适配器需要调用应用程序服务器。典型情况下,如果事务中包含不止一个资源,那么资源适配器会调用 XATerminatorprepare 方法,传递与 ExecutionContext 中传递的 Xid 相同的 Xid。如果所有的资源都返回 XA_OK (或者 XA_RDONLY),那么资源适配器会接着调用 commit;否则就会调用 rollback

结束语
本文介绍了如何用 WorkManager 接口对工作进行调度,以便在应用程序的控制下处理这些工作。您已经看到如何用 Timer 的实例在以后某个时间执行工作或定期执行工作,还了解了使用 JCA 提供的对象的 commonj 替代方案。您已经看到资源适配器如何在自己导入到应用服务器的事务中执行工作,并用 XATerminator 接口控制事务的完成。在本系列的第 3 篇也是最后一篇文章中,我将介绍 JCA 1.5 消息流入合约,它比较出名的地方是对消息驱动 bean 的支持。