使用 WebSphere Extended Deployment Compute Grid 进行批处理编程之初探
2009-09-30 00:00:00 来源:WEB开发网引言
Compute Grid 是 IBM WebSphere Extended Deployment V6.1 中推出的一项功能,提供了最完整的企业 Java 批处理编程解决方案。通过 Compute Grid,您可以得到:
简洁但同时也非常强大的基于传统 Java 对象(plain old Java object,POJO)的编程模型。
简单打包。
简单的部署模型。
全功能作业控制语言(Job Control Language,JCL)。
成熟的作业调度程序。
可靠的执行环境。
全面的工作负载管理和管理工具。
虽然 Compute Grid 设计为与其他 WebSphere Extended Deployment 功能进行协作,但也可以独立对其进行购买和部署。在生产环境中,Compute Grid 使用 IBM WebSphere Application Server Network Deployment(此环境采用分布式多计算机配置)进行操作,但 Compute Grid 也提供了单元测试环境,可以在其中运行独立的 WebSphere Application Server。Compute Grid 还提供了基于 Eclipse 的开发体验,而且支持将 IBM Rational® Application Developer 作为全功能开发环境使用。
本文将说明如何使用 Compute Grid 进行 Java 批处理编程。不过,您首先务必充分了解批处理作业及 Compute Grid 所提供的用于构建批处理应用程序的编程模型。讨论了这两个主题后,本文将指导您使用 Compute Grid 和批处理模拟器测试实用工具开发简单的批处理应用程序。
批处理作业剖析
从较为抽象的角度而言,批处理作业是声明性构造,可指示执行一个或多个批处理应用程序组成的序列并指定其输入和输出。批处理作业按顺序执行这组任务,以完成特定业务功能。批处理应用程序是设计为在后台以非迭代方式执行的程序。输入和输出通常作为逻辑构造供批处理应用程序访问,而且由批处理作业定义映射到具体的数据资源。
批处理作业通常要处理大量输入/输出数据(通常是面向记录的),这些数据通常是关键业务数据,如客户账户、销售数据等等。批处理作业执行的业务处理任务的范围很广,包括发票生成、账户优化、商机分析等。批处理任务已经在 System z(大型机)环境中使用了几十年,直到今天还继续在很多大中型企业中作为中枢系统使用。
批处理作业的基本内容包括图 1 所示的组成部分。
图 1. 批处理作业剖析
作业定义描述了要执行的批处理步骤及其运行顺序。每个步骤都定义了要调用的特定批处理应用程序及其输入与输出数据。数据的常见源和目的地包括文件、数据库、事务系统、消息队列等。
批处理编程模型
Compute Grid 批处理应用程序由一组 POJO 组成,在 Compute Grid 批处理容器的控制下运行,而该容器本身作为标准 WebSphere Application Server 的扩展运行。图 2 显示了应用程序组件及其与批处理容器的关系。
图 2. 批处理应用程序剖析
批处理容器在异步 Bean 的控制下运行批处理作业,而您可以将异步 Bean 视为容器管理的线程。批处理容器处理作业定义并执行其生命周期(使用异步 Bean 作为执行单位)。
批处理应用程序由用户提供的以下组件组成:
批处理作业步骤:此 POJO 提供了在批处理作业中作为步骤执行的业务逻辑。批处理容器在处理作业定义的过程中调用批处理作业步骤。
批处理数据流:此 POJO 为批处理作业步骤提供对数据的访问权限。可以将批处理应用程序编写为访问一个或多个批处理数据流。可以将批处理数据流编写为提供对任何类型的数据的访问权限,这包括来自 RDB、文件系统、消息队列以及通过 J2C 连接器等的数据。
批处理容器负责在作业步骤的生命周期中对批处理数据流进行打开、关闭及检查点相关的回调操作。批处理作业步骤本身将对批处理数据流调用各个方法,以获取和放置数据。
批处理应用程序可以选择包括用户提供的以下组件:
检查点算法:批处理容器提供检查点/重新启动机制,以支持从已知的一致性点重新启动作业。可能需要中断作业然后在计划或非计划停机之后重新启动。批处理容器定期调用检查点,以确定是否应该设置检查点。
Compute Grid 提供了两个预置检查点算法,一个支持基于时间的检查点间隔,而另一个支持基于记录计数的检查点间隔。
结果算法:每个批处理作业步骤在完成时都会提供返回代码。结果算法可以获得批处理作业中的所有步骤的返回代码,并返回整个作业的最终总体返回代码。
Compute Grid 提供了预置的结构算法,该算法将数字最大的步骤的返回码作为总体作业返回代码。
通过对批处理容器(属于 Compute Grid 运行时)的简单介绍,您要记住关于批处理容器的以下重要事项:
会根据作业定义编排批处理作业的生命周期。
会生成作业日志来捕获作业的执行,包括批处理作业步骤的标准流输出。
会收集性能和使用指标,以帮助进行工作负载管理和执行会计功能。
提供了强大的作业故障转移模型(基于检查点/重新启动语义)。
批处理编程接口
Compute Grid 批处理编程模型包括四个主要接口,其中两个是构建批处理应用程序的基本接口,而另外两个是可选接口,用于高级场景中:
基本接口
BatchJobStepInterface 定义批处理容器和批处理应用程序间的交互。
表 1. BatchJobStepInterface 方法
数据 | 方法摘要 |
void | createJobStep() createJobStep 由批处理容器在调用 processJobStep 之前调用。 |
int | destroyJobStep() 批处理容器完成作业步骤处理后调用 destroyJobStep,可以在这里添加任何清理代码。 |
java.util.Properties | getProperties() 返回在 xJCL 中为批处理作业步骤指定的属性。 |
int | processJobStep() processJobStep 应该包括批处理作业步骤的所有业务逻辑。 |
void | setProperties(java.util.Properties properties) 由批处理容器调用,用于向批处理作业步骤提供在 xJCL 中指定的属性。 |
BatchDataStream 对批处理应用程序的特定输入源或输出目的地进行抽象,并定义 Compute Grid 和具体的 BatchDataStream 实现之间的交互。
表 2. BatchDataStream 方法
数据 | 方法摘要 |
void | close() 批处理容器将调用 close 方法来告知 BDS,BDS 的用户已经完成了 BDS 的使用。 |
java.lang.String | externalizeCheckpointInformation() 批处理容器将在处理工作的检查点完成阶段调用 externalizeCheckpointInformation 方法。 |
java.lang.String | getName() 返回此批处理数据流的逻辑名称。 |
java.util.Properties | getProperties() 返回在 xJCL 中为此 BDS 指定的属性。 |
void | initialize(java.lang.String logicalname, java.lang.String jobstepid) 批处理容器将在作业步骤的初始化期间调用 initialize 方法。这就允许 BDS 对流进行初始化,以供批处理步骤使用。 |
void | intermediateCheckpoint() 批处理容器调用 intermediateCheckpoint 方法告知 BDS 刚刚完成了一个检查点。 |
void | internalizeCheckpointInformation(java.lang.String chkptinfo) 批处理容器在重新启动批处理步骤期间调用 internalizeCheckpointInformation 方法,这样就可以让 BDS 将其内部状态重置为上次成功处理检查点时的状态。 |
void | open() 批处理容器调用 open 方法来指示将要使用 BDS 和为进行操作而对 BDS 做准备。 |
void | positionAtCurrentCheckpoint() 批处理容器调用 positionAtCurrentCheckpoint 方法来向 BDS 提供信号,指示应该在 internalizeCheckpointInformation 方法集中定义的点开始处理流。 |
void | positionAtInitialCheckpoint() 批处理容器调用 positionAtInitialCheckpoint 向 BDS 提供信号,指示应该在 xJCL 输入所定义的初始点开始处理流。 |
void | setProperties(java.util.Properties properties) 批处理容器调用 setProperties 来将 xJCL 中指定的 BDS 属性作为 java.util.Properties 传递到 BDS。 |
可选接口
CheckpointPolicyAlgorithm 定义 Compute Grid 和自定义检查点策略实现间的交互。检查点策略用于确定 Compute Grid 将何时对正在运行的批处理作业进行检查点操作,以便在计划或非计划中断之后重新启动。Compute Grid 包括两个现成的检查点策略,如表 3 中所示。
ResultsAlgorithm 定义 Compute Grid 和自定义结果算法间的交互。结果算法用于提供作业的总体返回代码。该算法能够获取每个作业步骤的返回代码。Compute Grid 包括一个现成的结果算法,如表 3 中所示。
表 3. 现成算法
算法 | 类名称 | 描述 |
检查点策略 | com.ibm.wsspi.batch.checkpointalgorithms.recordbased | 根据所处理的输入记录数量对批处理作业进行检查点操作。 |
检查点策略 | com.ibm.wsspi.batch.resultsalgorithms.jobsum | 返回编号最高的步骤的返回代码。 |
结果 | com.ibm.wsspi.batch.checkpointalgorithms.recordbased | 根据所处理的输入记录的数量对批处理作业步骤进行检查点操作。 |
开发批处理应用程序
我们已经了解了批处理作业的基本概念和 Compute Grid 批处理编程模型,接下来就要通过简单练习来应用这些概念,我们在其中将重点关注基本批处理接口,即 BatchJobStepInterface 和 BatchDataStream。本文剩下的部分将详细介绍如何使用 Eclipse 开发环境实现示例批处理作业步骤和批处理数据流并使用本文中所提供的实用工具(名为 Batch Simulator)来对其进行测试。
设置环境
安装 Eclipse V3.2 或更高版本(如果尚未安装 Batch Simulator 实用工具,请下载并安装。Batch Simulator 实用工具在本文中仅为演示目的而提供。)可以在下载文件中包括的 README PDF 文件中找到该实用工具的下载和安装说明。
创建批处理数据流
由于 Compute Grid 批处理框架仅仅提供 BatchDataStream 接口,因此使用提供常见模式的基本实现的抽象类对此框架进行扩展的做法将非常有用。对于本文,您将构建基于文件的通用批处理数据流,该数据流具有以下特征:
具有采用面向行的文本的基础文件。
支持基本检查点/重新启动模型。
支持读取或写入模式,而不支持读/写。
写入操作均严格限制为追加到文件结尾。
调用此批处理数据流 TextFileBatchDataStream。类声明与以下所示类似:
清单 1
package com.ibm.websphere.samples;
import com.ibm.websphere.batch.BatchContainerDataStreamException;
import com.ibm.websphere.batch.BatchDataStream;
public abstract class TextFileBatchDataStream implements BatchDataStream {
...
}
声明这个类抽象是为了强调它是框架类,旨在供子类实现进行扩展。由于仅仅支持读取或写入模式,因此将需要子类来进行此决策。因此,请将缺省构造函数设置为私有的,并要求子类实现使用此构造函数。
清单 2
protected enum ACCESS_MODE {R,W};
protected ACCESS_MODE _access_mode;
protected TextFileBatchDataStream(ACCESS_MODE access_mode) {
_access_mode= access_mode;
}
由于此批处理数据流将基于基础文本文件,因此需要知道文件名。文件名将作为批处理数据流属性传递到批处理数据流实现中。此属性在作业定义中设置,然后由批处理容器在批处理数据流初始化过程中将其传递到批处理数据流对象内。为了接收这些属性,BatchDataStream 接口声明了 setProperties 方法(将由批处理容器进行调用):
清单 3
public void setProperties(Properties properties) {
props = properties;
}
BatchDataStream 接口进一步声明了 initialize 和 open 方法。在 setProperties 之后,按照此顺序进行调用。调用 initialize 方法的目的是为了让批处理数据流准备好,以便打开。容器将在执行 initialize 方法时向批处理数据流传入两个唯一的标识符:
Logical name 是批处理作业步骤用于引用特定批处理数据流的名称。此名称通过作业定义映射到特定的批处理数据流实现。然后可以使用后期绑定机制根据作业定义使用相同的批处理作业步骤处理不用的批处理数据流实现。
Job step id 唯一地标识作业实例和当前访问批处理数据流实现的实例特定步骤。
在批处理数据流实现中没有定义这两个值的用途,但至少在包含跟踪消息来为调试工作提供辅助方面很有用处。
在您的 initialize 方法实现中,获取并存储此批处理数据流的文件名。请记住,您将需要通过作业定义(指定此批处理数据流作为作业步骤的输入)传递此属性:
清单 4
public void initialize(String logicalname, String jobstepid) {
Properties prop = getProperties();
fn = prop.getProperty("FILENAME");
}
由于此批处理数据流基于文本文件,因此请使用 Java RandomAccessFile 与其进行交互。在 open 方法中,建立此结构并打开文件:
清单 5
public void open() throws BatchContainerDataStreamException {
try {
if ( _access_mode == ACCESS_MODE.R) {
_file = new RandomAccessFile(_fn,"r");
}
else {
_file = new RandomAccessFile(_fn,"rw");
}
} catch (IOException e) {
throw new BatchContainerDataStreamException(e);
}
}
您需要声明用于获取和放置数据的方法。BatchDataStream 接口并不为此声明签名,因为不同的批处理数据流类型之间的此签名可能存在极大的差异。因此,请在您的实现中包含以下两个签名:
清单 6a
public String getNextRecord() throws
BatchContainerDataStreamException {
String input = null;
try {
input = _file.readLine();
} catch (IOException e) {
throw new BatchContainerDataStreamException(e);
}
return input;
}
清单 6b
public void putNextRecord(String r) throws
BatchContainerDataStreamException {
try {
_file.writeBytes(r);
_file.write('\n');
this._position+=r.length()+1;
} catch (IOException e) {
throw new BatchContainerDataStreamException(e);
}
为了简单起见,这里省略了 close 和 checkpoint-related 方法,但您可以在随本文提供的 Eclipse 项目中的示例代码中看到这两个方法。
实现基类的两个子类,以创建输入和输出批处理数据流:
清单 7a
package com.ibm.websphere.samples;
public class TestInputBatchDataStream extends TextFileBatchDataStream {
public TestInputBatchDataStream() {
super(ACCESS_MODE.R);
}
}
清单 7b
package com.ibm.websphere.samples;
public class TestOutputBatchDataStream extends TextFileBatchDataStream {
public TestInputBatchDataStream() {
super(ACCESS_MODE.W);
}
}
创建批处理作业步骤
由于已经实现了批处理数据流,编写批处理作业步骤就相对比较简单。所有批处理作业步骤必须实现 BatchJobStepInterface,因此按以下所示类似的方法声明您的代码:
清单 8
package com.ibm.websphere.samples;
import com.ibm.websphere.batch.BatchConstants;
import com.ibm.websphere.batch.BatchJobStepInterface;
import com.ibm.websphere.batch.BatchDataStreamMgr;
import com.ibm.websphere.batch.BatchContainerDataStreamException;
import com.ibm.websphere.batch.JobStepID;
import com.ibm.websphere.batch.context.JobStepContextMgr;
import com.ibm.websphere.batch.context.JobStepContext;
public class TestBatchJobStep implements BatchJobStepInterface {
...
}
在此示例中,有五个方法要实现;这里我们所感兴趣的有 createJobStep 和 processJobStep 方法。(您可以在示例代码中看到 getProperties、setProperties 和 destroyJobStep 方法。)
createJobStep 方法将用于设置输入和输出数据流,以便准备好供 processJobStep 方法使用:
清单 9
public void createJobStep() {
try {
JobStepID id = getJobStepID();
_testInputBatchDataStream= (TestInputBatchDataStream)
BatchDataStreamMgr.getBatchDataStream( "input", id.getJobstepid() );
_testOutputBatchDataStream= (TestOutputBatchDataStream)
BatchDataStreamMgr.getBatchDataStream( "output", id.getJobstepid() );
}
catch (BatchContainerDataStreamException e) {
throw new RuntimeException (e);
}
}
JobStepID 和 BatchDataStreamMgr 是 Compute Grid 批处理编程模型的增强组成部分:
BatchDataStreamMgr 是服务类,提供对批处理作业步骤的批处理数据流的访问。
JobStepID 是 Helper 类,用于封装作业步骤的标识(批处理容器所知的标识)。
请注意,createJobStep 方法使用了一个私有方法,名为 getJobStepID():
清单 10
private JobStepID getJobStepID() {
JobStepContext ctx= JobStepContextMgr.getContext();
return ctx.getJobStepID();
}
此方法揭示了批处理编程模型的最后一个部分:作业步骤上下文。JobStepContextMgr 服务类允许批处理作业步骤获取对其 JobStepContext 对象的引用。JobStepContext 提供两个重要的功能:
访问唯一标识当前批处理作业步骤的执行上下文的信息(如 jobid)。
一个工作区,在批处理作业步骤执行期间可以通过其中在批处理编程框架方法间传递应用程序特定的信息。
JobStepContext 对象公开以下接口:
表 4. JobStepContext 方法
数据 | 方法摘要 |
java.lang.String | getJobID() 返回当前作业的作业名称。 |
JobStepID | getJobStepID() 返回当前步骤的 JobStepID 对象。 |
java.lang.String | getStepID() 返回当前步骤的步骤名称。 |
java.lang.Object | getUserData() 返回存储在此上下文中的用户数据。 |
void | setUserData(java.lang.Object o) 在此上下文中设置用户数据对象。 |
processJobStep 方法包含处理批处理数据流的业务逻辑。对于此示例,您的业务逻辑直接将输入批处理数据流复制到输出批处理数据流:
清单 11
public int processJobStep() {
try {
String l= _testInputBatchDataStream.getNextRecord();
if ( l == null ) {
return BatchConstants.STEP_COMPLETE;
}
else {
_testOutputBatchDataStream.putNextRecord(l);
return BatchConstants.STEP_CONTINUE;
}
}
catch (Exception e) {
throw new RuntimeException("TestBatchJobStep: error in
processJobStep: ",e);
}
}
随本文提供的 Eclipse 项目中包括全部示例代码。
测试代码
正如前面提到的,Compute Grid 提供了在独立 WebSphere Application Server 中执行的测试环境,但为了进一步简化此示例(并避免其他应用程序打包和部署任务),本文提供了名为 Batch Simulator 的测试辅助工具。Batch Simulator 允许您在 Eclipse 环境内对批处理 POJO 进行基本测试。请记住,尽管 Batch Simulator 在 J2SE 环境中运行,但实际的 Compute Grid 执行环境是 Java EE,因为它在 WebSphere Application Server 内运行。但是 Batch Simulator 在测试 POJO 间的基本流和测试批处理作业步骤中的基本业务逻辑时非常有用。
运行 Batch Simulator:
您需要提供作业定义,以描述希望执行的批处理作业步骤及其批处理数据流。可以在图 3 中所示的 Eclipse 工作区找到可供此示例批处理作业步骤使用的示例作业定义。
图 3. Eclipse jobdefs 文件夹
检查该文件中的块注释,以了解关于用于描述作业定义的属性。了解 Compute Grid 作业定义通常是符合专用作业控制语言格式 xJCL 的 XML 文件。到目前为止,为了简单起见,模拟器使用的都是基于属性的方法。(Batch Simulator 还特别提供了用于基于 Batch Simulator 属性文件生成恰当 Compute Grid xJCL 文件的选项。)
您需要指定以下信息的 Eclipse 运行配置:
表 5. 现成算法
属性 | 值 |
主类 | com.ibm.websphere.batch.BatchSimulator |
程序参数 | "\${workspace_loc:${project_name}}/jobdefs/testbatchjobstep.properties" |
运行此配置时,看到 Batch Simulator 和示例应用程序的输出应该与以下所示类似:
清单 12
BatchSimulator: start job JOB1
...
BatchSimulator: end job JOB1 - RC=0
Batch Simulator 支持一个额外的运行选项 writexJCL,此选项基于 Batch Simulator 输入属性写入 xJCL 文件。在指定的情况下,必须将其作为第二个参数,跟在输入文件规范之后。此选项接受输入 Batch Simulator 属性,并输出一个相应的 Compute Grid xJCL 文件。以后在 Compute Grid 服务器环境中测试批处理作业步骤时,可以使用此文件。您可以设置系统属性 com.ibm.websphere.batch.simulator.xjcldir,以指定 xJCL 写操作的输出目录。会有一条控制台消息告知您 xJCL 名称和位置。
为了提高灵活性,Batch Simulator 工作区还提供了用于启动 Batch Simulator 的 Ant 任务:
com.ibm.websphere.batch.BatchSimulatorTask
该工作区提供了两个示例 Ant 脚本,可以将其用于启动此任务,如图 4 中所示。
图 4. Eclipse 脚本文件夹
结束语
WebSphere Extended Deployment Compute Grid 提供了批处理作业步骤及其输入输出的简单抽象。编程模型非常简洁,而且使用简单。通过内置的检查点/回滚机制,可以方便地构建可靠、可重新启动的 Java 批处理应用程序。
随本文提供的 Batch Simulator 实用工具提供了替代测试环境,可在您的 Eclipse(或 Rational Application Developer)开发环境中运行。它的 xJCL 生成功能可帮助您直接进入下一阶段在 Compute Grid 单元测试服务器中的测试。
本文源代码下载地址: http://flashview.ddvip.com/2009_09/ComputeGridBatchSimulatorPkg.zip
更多精彩
赞助商链接