消息中介的实用介绍——第 3 部分:使用中介修改消息
2009-10-21 00:00:00 来源:WEB开发网引言
如果您读过本系列的前两篇文章,消息中介基础(第 1 部分)和使用中介路由消息(第 2 部分),那么消息中介的基础对您来说已不再陌生。消息中介是 IBM® WebSphere® Application Server 的消息传递功能的新的可编程扩展,可以用于简化使用消息传递的连接系统、服务、应用程序或组件。这两篇文章,连同 IBM 同事最近发表的其他文章,主要介绍了路由消息和简单的日志记录。在第 3 部分中,我们将展示如何使用中介访问和操纵 JMS 消息的实际内容。
中介编程模型允许使用服务数据对象 (SDO) 接口访问消息正文,它简化了多个层和不同数据源之间的数据交换。SDO 规范已通过 JSR 235 提交到 Java Community Process。(要了解 SDO 为何存在以及如何开发使用 SDO 的简单应用程序,请参阅 Introduction to Service Data Objects。)在本文中,我们将采用实际方法在中介中通过 SDO 阅读和撰写消息内容。
在继续阅读本文前,必须确保您已经了解以前的文章内容。本文假定您已经掌握开发、部署和测试简单中介处理程序的方式。如果您尚未读过上述文章,则现在应该读一读。
准备
要开发和测试本文包含的中介,必须安装以下软件组合之一:
IBM WebSphere Application Server Toolkit Version 6.0 和 WebSphere Application Server V6。
IBM Rational® Application Developer V6,包括集成的 WebSphere Application Server V6 测试环境。
IBM Rational Application Developer V6 和 WebSphere Application Server V6。
IBM Rational Software Architect V6.0,包括集成的 WebSphere Application Server V6 测试环境。
还应该下载和应用所安装产品的最新服务级别。在撰写这篇文章时,WebSphere Application Server V6 Refresh Pack 1 (V6.0.1) 是将要发布的最新修订包。请参阅参考资料,以获取关于 WebSphere Application Server 修订包和如何得到它们的详细信息。可以使用 Rational Product Updater(安装任何 Rational 产品时它都会自动安装)获取 Rational 产品的更新程序。或者,从 IBM Rational Support 网站手动下载修订包。
本文中所显示的屏幕图像来自于 Rational Application Developer V6。如果您使用的是 WebSphere Application Server Toolkit,则仍旧可以使用这里给出的步骤,因为这些步骤对于 Rational Application Developer 和工具包来说都是相同的。您可以从 WebSphere Application Server 安装媒体或映像安装 WebSphere Application Server Toolkit。
使用中介访问消息内容
本系列的第 2 部分阐述了如何和为何使用中介路由消息。中介的其他主要功能是可以访问和修改传递中的消息的内容。例如:
中介可用于在两种不同(甚至不兼容)格式之间转换消息内容。
中介可以利用从数据库获取的数据扩充消息内容。
记录消息内容,供调试或审核时使用。
所有这些用途都依赖于对消息中所包含数据的获取和可能的更改。中介最明显的用途是记录消息内容,以供审核或调试使用;例如,在传输的消息图像的前后均做日志记录。稍后,我们将利用这一功能来开发中介,而现在,我们首先看看如何访问消息内容。
中介处理程序框架允许使用消息的 SDO 表示访问消息。要访问消息 SDO,首先需要通过调用 getSIMessage() 方法从 SIMessageContext 获取 SIMessage。获取对 SIMessage 的引用后,即可使用 SIMessage.getDataGraph() 方法获取 SDO DataGraph。消息 SDO DataObject 包含在 DataGraph 根对象中。
这可能听起来有些复杂,但是如清单 1 所示,只要给定了 SIMessageContext,获取消息的 DataObject SDO 表示就是很容易的事了。
清单 1. 获取消息 SDO DataObject private DataObject getMessageDataObject(SIMessageContext ctx){
DataObject result = null;
SIMessage msg = ctx.getSIMessage();
try {
result = msg.getDataGraph().getRootObject();
} catch (SIDataGraphSchemaNotFoundException e) {
e.printStackTrace();
} catch (SIMessageException e) {
e.printStackTrace();
}
return result;
}
通过 DataObject,可以使用 XPATH 语法导航结构。JMS 消息的结构非常简单;消息的有效负载包含在 SDO 消息 DataObject 的 data/value 元素中。其属性的类型和内容取决于发送到中介的 JMS 消息的类型。使用 SIMessage.getFormat() 可以获取 JMS 消息的类型,此操作将返回一个字符串,如下表所示:
JMS 消息类型 | SIMessage.getFormat() 值 |
TextMessage | JMS:文本 |
BytesMessage | JMS:字节 |
ObjectMessage | JMS:对象 |
StreamMessage | JMS:流 |
您将注意到上表中没有 JMS MapMessage。当前版本的中介框架不支持 MapMessage 类型,尝试使用这种类型将导致发生异常。JMS 消息类型格式定义在用于引用的 com.ibm.websphere.sib.SIApiConstants 类中。
获取 DataObject 后,只需调用 getString (data/value) 或 getBytes (data/value) 即可检索文本和字节消息的消息内容。检索对象和流消息稍微复杂一些。
检索对象消息
可以通过从有效负载构造 ByteArrayInputStream 来反序列化对象消息,然后再使用它创建 ObjectInputStream 和调用 readObject() 方法。要成功地反序列化消息,需要确保中介处理程序具有访问适当类的权限。清单 2 说明如何反序列化 JMS ObjectMessages。
清单 2. 反序列化 JMS ObjectMessage 的内容 private Object getObject(SIMessageContext ctx) {
Object result = null;
byte[] msgBodyBytes = null;
DataObject msgDataObj = getMessageDataObject(ctx);
String format = ctx.getSIMessage().getFormat();
if (msgDataObj !=null && msgDataObj.isSet("data/value") {
msgBodyBytes = msgDataObj.getBytes("data/value");
}
if ((format.equals(SIApiConstants.JMS_FORMAT_OBJECT)) &&
(msgBodyBytes != null)) {
try {
ObjectInputStream in = new ObjectInputStream(
new ByteArrayInputStream(msgBodyBytes));
result = in.readObject();
} catch (Throwable e) {
// Error de-serialising object
e.printStackTrace();
}
}
return result;
}
检索流消息
流消息包含一个 Java 基元或字节数组的序列。这表示 data/value 元素将包含一个列表。您可以:
使用 getList("data/value"),它返回一个包含流元素的 java.util.List,或者
使用 get("data/value[n]") 访问列表(在该列表中,n 表示第 n 个元素)中的特定项;SDO 索引从 1 开始。
清单 3 是一个显示如何打印 JMS StreamMessage 的内容的示例。
清单 3. 打印 JMS StreamMessage 的内容 private void printStreamMessage(SIMessageContext ctx) {
DataObject msgDataObj = getMessageDataObject(ctx);
String msgFormat = ctx.getSIMessage().getFormat();
List msgBodyStream = null;
Object streamItem;
if (msgDataObj != null && (msgFormat.equals(SIApiConstants.JMS_FORMAT_STREAM)){
if msgDataObj.isSet("data/value"){
msgBodyStream = msgDataObj.getList("data/value");
}
}
if (msgBodyStream != null) {
ListIterator iterator = msgBodyStream.listIterator();
for (int i = 1; iterator.hasNext(); i++) {
streamItem = iterator.next();
System.out.print("Item index: " + i);
System.out.print(" type: ");
if (streamItem instanceof byte[]) {
System.out.print("byte[] data: ");
System.out.println(new String(
(byte[]) streamItem));
} else {
System.out.print(streamItem.getClass().getName());
System.out.print(" value: ");
System.out.println(streamItem.toString());
}
}
}
}
从上例中可以看出,对于 JMS 消息来说,阅读消息正文相当简单。Web 服务消息,因起源于 XML 而具有更多复杂结构。这使得编写访问 Web 服务消息的中介也更加复杂。如上所述,SIMessage 包含一个 SDO 数据图形。使用 XPATH,可以从数据图形的根(通过调用 getRootObject() 获取)导航结构。SDO 编程模型也可以提供对元数据的访问;例如,可以使用 getType() 和 getProperties() 来发现任何给定 DataObject 的运行时类型和属性。
LoggingMediation:DebugMediation 回顾
在第 1 部分中,我们介绍了中介的概念,同时提到一个简单的调试中介。想一下该中介如何将关于消息的会话和上下文数据简单地写入日志。根据上一节的内容,我们现在来看看如何重写 DebugMediation 以使其更加可重用,以及如何扩展其功能以包括对消息内容的记录。
我们将把日志编写功能分离到中介处理程序的不同类中,而不再编写实现所需 MediationHandler 接口的 Java 类。这有助于重用,允许其他处理程序在不强制继承 DebugMediation 类的情况下进行记录。
对于这一示例,我们将创建一个 MediationLogWriter 类,来使用特定于应用服务器构造的日志名称和级别为其编写条目。MediationLogWriter 使用 MessageBodyFormatter 类,我们还需要为其编码。最后,我们编码用于测试日志写入器的简单中介。下面记录的步骤假定您已经了解了前面文章的示例。
利用前面文章中使用的工作区启动部署环境。
将 mediation.handlers.MediationLogWriter 类的源代码从下载文件导入 MessageMediation 项目。源代码如清单 4 所示,以供参考。此代码的大部分基于第一篇文章的 DebugMediation。除了某些格式改进和少量重构外,MediationLogWriter 现在调用 MessageBodyFormatter,以便在将 JMS 消息正文写入日志前为其编排格式。
清单 4. MediationLogWriter
package mediation.handlers;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.ibm.websphere.sib.SIMessage;
import com.ibm.websphere.sib.mediation.messagecontext.SIMessageContext;
import com.ibm.websphere.sib.mediation.session.SIMediationSession;
public class MediationLogWriter {
private Logger logger;
private Level level;
private MessageBodyFormatter msgBodyFmt = new MessageBodyFormatter();
private MediationLogWriter() {
super();
}
public MediationLogWriter(Logger loggerToUse, Level levelToLogAt) {
this();
logger = loggerToUse;
level = levelToLogAt;
}
public void writeMediationInvoked(SIMessageContext ctx) {
StringBuffer buf = new StringBuffer("Mediation invoked (");
appendCommonHeading(buf, ctx);
buf.append(')');
logger.log(level, buf.toString());
}
public void writeAll(SIMessageContext ctx) {
StringBuffer buf = new StringBuffer();
String indent = " ";
appendCommonHeading(buf, ctx);
appendContext(buf, ctx, indent);
appendSession(buf, ctx, indent);
appendMessageHeaders(buf, ctx, indent);
appendMessageBody(buf, ctx, indent);
logger.log(level, buf.toString());
}
public void writeSession(SIMessageContext ctx) {
StringBuffer buf = new StringBuffer();
appendCommonHeading(buf, ctx);
appendSession(buf, ctx, "");
logger.log(level, buf.toString());
}
private void appendSession(StringBuffer buf, SIMessageContext ctx,
String indent) {
String newLine = '\n' + indent;
SIMediationSession session = ctx.getSession();
buf.append("\nMediation Session:" + newLine);
buf.append("Mediation = " + session.getMediationName() + newLine);
buf.append("Destination = " + session.getDestinationName() + newLine);
buf.append("Service Bus = " + session.getBusName() + newLine);
buf.append("Messaging Engine = " + session.getMessagingEngineName());
if (session.getDiscriminator() != null
&& !session.getDiscriminator().equals("")) {
buf.append(newLine + "Discriminator = ");
buf.append(session.getDiscriminator());
}
if (session.getMessageSelector() != null
&& !session.getMessageSelector().equals("")) {
buf.append(newLine + "Message selector = ");
buf.append(session.getDiscriminator());
}
}
public void writeContext(SIMessageContext ctx) {
StringBuffer buf = new StringBuffer();
appendCommonHeading(buf, ctx);
appendContext(buf, ctx, "");
logger.log(level, buf.toString());
}
private void appendContext(StringBuffer buf, SIMessageContext ctx,
String indent) {
Iterator i = ctx.getPropertyNames();
String newLine = '\n' + indent;
buf.append("\nMessage Context:");
if (i != null && !i.hasNext()) {
String propName = null;
Object propValue;
for (; i.hasNext();) {
propName = (String) i.next();
if (propName != null && !propName.equals("")) {
buf.append(newLine + indent);
buf.append(propName);
buf.append(" = ");
try {
propValue = ctx.getProperty(propName);
buf.append('(' + propValue.getClass().getName() + ") ");
buf.append(propValue.toString());
} catch (Exception e) {
buf.append("exception ");
buf.append(e.toString());
buf.append("getting value");
}
}
}
if (propName != null && !propName.equals("")) {
buf.append("[]");
buf.append(newLine);
}
} else {
buf.append("[]");
buf.append(newLine);
}
}
public void writeMessageHeaders(SIMessageContext ctx) {
StringBuffer buf = new StringBuffer();
appendCommonHeading(buf, ctx);
appendMessageHeaders(buf, ctx, "");
logger.log(level, buf.toString());
}
private void appendMessageHeaders(StringBuffer buf, SIMessageContext ctx,
String indent) {
String newLine = '\n' + indent;
SIMessage msg = ctx.getSIMessage();
buf.append("Message Headers:" + newLine);
buf.append("API message id = " + msg.getApiMessageId() + newLine);
buf.append("System message id = " + msg.getSystemMessageId() + newLine);
buf.append("Correlation id = " + msg.getCorrelationId() + newLine);
buf.append("Message format = \"" + msg.getFormat() + '\"' + newLine);
buf.append("Message descriminator = " + msg.getDiscriminator()
+ newLine);
List list = msg.getUserPropertyNames();
buf.append("User properties: ");
Iterator i;
if (list != null && !list.isEmpty()) {
i = list.iterator();
String propName;
Object propValue;
for (; i.hasNext();) {
buf.append(newLine + indent);
propName = (String) i.next();
buf.append(propName);
buf.append(" = ");
try {
propValue = msg.getUserProperty(propName);
buf.append('(' + propValue.getClass().getName() + ") ");
buf.append(propValue.toString());
} catch (Exception e) {
buf.append("exception ");
buf.append(e.toString());
buf.append("getting value");
}
}
buf.append(newLine);
} else {
buf.append("[]");
buf.append(newLine);
}
buf.append("Forward routing path = " + msg.getForwardRoutingPath()
+ newLine);
buf.append("Reverse routing path = " + msg.getReverseRoutingPath()
+ newLine);
buf.append("Reliability = " + msg.getReliability() + newLine);
buf.append("Priority = " + msg.getPriority() + newLine);
buf
.append("Redelivered Count = " + msg.getRedeliveredCount()
+ newLine);
buf.append("User id = " + msg.getUserId());
}
public void writeMessageBody(SIMessageContext ctx) {
StringBuffer buf = new StringBuffer();
appendCommonHeading(buf, ctx);
appendMessageBody(buf, ctx, "");
logger.log(level, buf.toString());
}
public void appendCommonHeading(StringBuffer buf, SIMessageContext ctx) {
buf.append("Msg ");
buf.append(ctx.getSIMessage().getApiMessageId());
buf.append(" Mediation: ");
buf.append(ctx.getSession().getMediationName());
buf.append(" Destination: ");
buf.append(ctx.getSession().getDestinationName());
}
private void appendMessageBody(StringBuffer buf, SIMessageContext ctx,
String indent) {
msgBodyFmt.formatJMSMessage(buf, ctx.getSIMessage(), indent);
}
}
在 MediationHandlers 项目中创建一个新类 mediation.handlers.MessageBodyFormatter,并添加所需的导入语句(清单 5)。
清单 5. 导入语句
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.List;
import com.ibm.websphere.sib.SIApiConstants;
import com.ibm.websphere.sib.SIMessage;
import commonj.sdo.DataObject;
添加方法 formatJMSMessage(StringBuffer buf, SIMessage msg, String indent),theMediationLogWriter 将调用它(清单 6)。
清单 6. formatJMSMessage
public void formatJMSMessage(StringBuffer buf, SIMessage msg, String indent) {
String msgfmt = msg.getFormat();
DataObject msgRoot;
String newLine = '\n' + indent;
try {
msgRoot = msg.getDataGraph().getRootObject();
if (msgfmt.equals(SIApiConstants.JMS_FORMAT_TEXT)) {
buf.append("\nJMS TextMessage :" + newLine);
String msgText = msgRoot.getString("data/value");
buf.append(msgText.replaceAll("\n", newLine));
buf.append("\nJMS TextMessage, as bytes :");
byte[] msgBytes = msgRoot.getString("data/value").getBytes();
appendBytes(buf, msgBytes, indent);
} else if (msgfmt.equals(SIApiConstants.JMS_FORMAT_BYTES)) {
buf.append("\nJMS BytesMessage : ");
appendBytes(buf, msgRoot.getBytes("data/value"), indent);
} else if (msgfmt.equals(SIApiConstants.JMS_FORMAT_OBJECT)) {
buf.append("\nJMS ObjectMessage : (");
appendObjectMessage(buf, indent, msgRoot, newLine);
} else if (msgfmt.equals(SIApiConstants.JMS_FORMAT_STREAM)) {
buf.append("\nJMS StreamMessage :");
appendStreamMessage(buf, indent, msgRoot, newLine);
}
} catch (Exception e) {
buf.append("Cannot format message (" + msg.getApiMessageId());
buf.append(") body using format " + msgfmt + " due to " + e);
}
}
添加方法 appendStreamMessage(StringBuffer buf, String indent, DataObject msgRoot, String newLine) 为流消息编排格式(清单 7)。
清单 7. appendStreamMessage
private void appendStreamMessage(StringBuffer buf, String indent,
DataObject msgRoot, String newLine) {
List streamList = msgRoot.getList("data/value");
Object streamItem;
if (streamList != null && !streamList.isEmpty()) {
for (int i = 1; i <= streamList.size(); i++) {
streamItem = msgRoot.get("data/value[" + i + "]");
buf.append(newLine);
buf.append("Stream item " + i + " : (");
if (streamItem instanceof byte[]) {
buf.append("byte[])");
appendBytes(buf, (byte[]) streamItem, " " + indent);
} else {
buf.append(streamItem.getClass().getName());
buf.append(") " + streamItem.toString());
}
}
} else {
buf.append("[]");
}
}
添加两个方法 appendObjectMessage(StringBuffer buf, String indent, DataObject msgRoot, String newLine) 和 getObject(DataObject msgRoot, String string) 为 ObjectMessages 编排格式(清单 8)。
清单 8. appendObjectMessage 和 getObject
private void appendObjectMessage(StringBuffer buf, String indent,
DataObject msgRoot, String newLine) {
Object obj = null;
try {
obj = getObject(msgRoot, "data/value");
buf.append(obj.getClass().getName());
String objString = obj.toString();
if (objString.indexOf('\n')>0){
buf.append(")\n");
objString = indent + objString.replaceAll("\n", newLine);
buf.append(objString);
} else {
buf.append(") " + objString);
}
} catch (RuntimeException e) {
buf.append(e.getMessage() + " due to " + e.getCause());
}
}
private Object getObject(DataObject msgRoot, String string) {
Object result = null;
byte[] msgBodyBytes = msgRoot.getBytes("data/value");
try {
ObjectInputStream in = new ObjectInputStream(
new ByteArrayInputStream(msgBodyBytes));
result = in.readObject();
} catch (Throwable e) {
// Error de-serialising object
throw new RuntimeException("Error deserialising object", e);
}
return result;
}
最后,添加方法 appendBytes(StringBuffer buf, byte[] bytes, String indent),将字节数组的格式编排为熟悉的十六进制数列和相关的 ASCII 字符(清单 9)。
清单 9. appendBytes
private void appendBytes(StringBuffer buf, byte[] bytes,
String indent) {
int length = bytes.length;
String newLine = '\n' + indent;
for (int lineStart = 0; lineStart<length; lineStart += 16) {
int lineEnd = Math.min(lineStart+16, length);
StringBuffer hex=new StringBuffer();
StringBuffer ascii=new StringBuffer();
for (int i=lineStart; i<lineEnd; i++) {
int b = bytes[i];
b=(b+256)%256;
int c1=b/16;
int c2=b%16;
hex.append((char)(c1<10 ? '0'+c1 : 'a'+c1-10));
hex.append((char)(c2<10 ? '0'+c2 : 'a'+c2-10));
if (i%2 == 1) hex.append(' ');
if ((b>=0x20 && b<=0x7e)) ascii.append((char)b);
else ascii.append('.');
}
int pad=16-(lineEnd-lineStart);
int spaces=(pad*5 + pad%2)/2;
spaces+=3;
for (int i=0; i<spaces; i++) hex.append(' ');
String offset="0000"+Integer.toHexString(lineStart);
offset=offset.substring(offset.length()-4);
buf.append(newLine);
buf.append(offset);
buf.append(" ");
buf.append(hex.toString());
buf.append(ascii.toString());
}
}
进行下一步之前,保存和编译两个类。
现在编码将使用 MediationLogWriter 的 LoggingMediation 处理程序类。将以下中介添加到 MediationHandlers 项目中(清单 10)。
清单 10. 新中介
package mediation.handlers;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.rpc.handler.MessageContext;
import com.ibm.websphere.sib.mediation.handler.MediationHandler;
import com.ibm.websphere.sib.mediation.handler.MessageContextException;
import com.ibm.websphere.sib.mediation.messagecontext.SIMessageContext;
public class LoggingMediation implements MediationHandler {
private static final String className = LoggingMediation.class.getName();
private final Logger logger = Logger.getLogger(className);
private MediationLogWriter log;
public boolean handle(MessageContext ctx) throws MessageContextException {
getLog().writeSession((SIMessageContext) ctx);
getLog().writeContext((SIMessageContext) ctx);
getLog().writeMessageHeaders((SIMessageContext) ctx);
getLog().writeMessageBody((SIMessageContext) ctx);
// Alternatively you could log everything in one go using :
// getLog().writeAll((SIMessageContext) ctx);
// Or just log a "Mediation invoked" message using
// getLog().writeMediationInvoked((SIMessageContext)ctx);
return true;
}
private MediationLogWriter getLog(){
if (log == null){
log = new MediationLogWriter(logger,Level.INFO);
}
return log;
}
}
使用 EJB 部署描述符编辑器的 Mediation Handlers 选项卡将此代码组装到 DeployableMediation 项目中,如图 1 所示。
图 1. 组装的中介处理程序类
将企业应用程序发布到应用服务器,然后运行所提供的 LoggingMediationAdmin.jacl 脚本来配置中介和其他消息传递资源。您应该看到如清单 11 所示的消息。
清单 11. LoggingMediationAdmin 消息
Create SIB Queue
$AdminTask createSIBDestination -bus SimpleBus -name loggingQueue
-type queue -node neodogNode -server server1
$AdminTask createSIBJMSConnectionFactory server1(cells/....) -name CF4
-jndiName jms/CF4 -busName SimpleBus -type queue
$AdminTask createSIBJMSQueue server1(cells/....) -name messageQueue
-jndiName jms/loggingQueue -queueName loggingQueue
$AdminTask createSIBMediation -bus SimpleBus -mediationName
LoggingMediation -handlerListName LoggingMediation
$AdminTask mediateSIBDestination -bus SimpleBus -destinationName
loggingQueue -mediationName LoggingMediation -node neodogNode
-server server1
Configuration saved
重新启动应用服务器以更新 JNDI。
使用 Universal Test Client 将消息发送到使用连接 factoryjms/CF4 的目的地 jms/loggingQueue。此操作将生成一组日志条目,如清单 12 所示。
清单 12. 日志条目
[26/05/05 12:26:32:804 BST] 0000007a LoggingMediat I
Msg ID:ab6a8a35d4c176a305d19602110a134f0000000000000001
Mediation: LoggingMediation Destination: loggingQueue
Mediation Session:
Mediation = LoggingMediation
Destination = loggingQueue
Service Bus = SimpleBus
Messaging Engine = neodogNode.server1-SimpleBus
[26/05/05 12:26:32:805 BST] 0000007a LoggingMediat I
Msg ID:ab6a8a35d4c176a305d19602110a134f0000000000000001
Mediation: LoggingMediation Destination: loggingQueue
Message Context:
[26/05/05 12:26:32:811 BST] 0000007a LoggingMediat I
Msg ID:ab6a8a35d4c176a305d19602110a134f0000000000000001
Mediation: LoggingMediation Destination: loggingQueueMessage Headers:
API message id = ID:ab6a8a35d4c176a305d19602110a134f0000000000000001
System message id = BAC54242D3B04F6F_9000001
Correlation id = null
Message format = "JMS:text"
Message descriminator = null
User properties: []
Forward routing path = []
Reverse routing path = []
Reliability = ReliablePersistent
Priority = 4
Redelivered Count = 0
User id =
[26/05/05 12:26:32:815 BST] 0000007a LoggingMediat I
Msg ID:ab6a8a35d4c176a305d19602110a134f0000000000000001
Mediation: LoggingMediation Destination: loggingQueue
JMS TextMessage :
Sample text message contains a
new line character
JMS TextMessage, as bytes :
0000: 5361 6d70 6c65 2074 6578 7420 6d65 7373 Sample text mess
0010: 6167 6520 636f 6e74 6169 6e73 2061 0d0a age contains a..
0020: 6e65 7720 6c69 6e65 2063 6861 7261 6374 new line charact
0030: 6572 er
更改消息内容
正如在上面的示例中所看到的,阅读消息正文一点也不复杂,尤其是在仅使用 JMS 字节或文本消息时。您将会高兴地发现,更改消息正文也不再是复杂的事情了。如果回顾一下获取消息正文时,在从 SIMessage 得到的 DataObject 引用上调用 get("data/value"),那么,当您了解到 SIMessage 具有一个对应的 setDataGraph(DataGraph dg, String format) 方法时,就不会感到惊奇了。同样,DataGraph 和 DataObject 类也都具有 setter 方法。下面的代码示例说明了如何设置 JMS 文本消息的内容。
清单 13. 设置 JMS 文本消息的内容 try {
DataGraph dg = siMsg.getDataGraph();
dg.getRootObject().setString("data/value", msgText);
siMsg.setDataGraph(dg, siMsg.getFormat();
} catch (Exception e){
// The actual exceptions that can be throw by the setDataGraph method are :
// SIMessageDomainNotSupportedException, SIDataGraphSchemaNotFoundException,
// SIDataGraphFormatMismatchException, SIMessageException
}
现在,我们通过编写一个用于修改消息的简单中介处理程序将这一概念付诸实践;我们使用该中介来执行一个简单的文本消息的搜索和替换操作。要替换的文本和实际替换文本均在部署描述符中配置。(如果我们在生产环境中使用类似的中介,则使用上下文属性来配置中介效果会更好一些。)我们的示例还使用了 MediationLogWriter,以在消息的图像前后进行记录。
将中介处理程序类(如清单 14 所示)添加到 MediationHandlers 项目。
清单 14. 中介处理程序类 package mediation.handlers;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.rpc.handler.MessageContext;
import com.ibm.websphere.sib.SIApiConstants;
import com.ibm.websphere.sib.SIMessage;
import com.ibm.websphere.sib.mediation.handler.MediationHandler;
import com.ibm.websphere.sib.mediation.handler.MessageContextException;
import com.ibm.websphere.sib.mediation.messagecontext.SIMessageContext;
import commonj.sdo.DataGraph;
public class ReplaceTextMediation implements MediationHandler {
private final static String className = ReplaceTextMediation.class.getName();
private final Logger logger = Logger.getLogger(className);
private final MediationLogWriter log = new MediationLogWriter(logger, Level.INFO);
private String textToReplace = "Message";
private String replacementText = "MediatedMessage";
public boolean handle(MessageContext ctx) throws MessageContextException {
log.writeMediationInvoked((SIMessageContext)ctx);
SIMessage siMsg = ((SIMessageContext)ctx).getSIMessage();
if (siMsg.getFormat().equals(SIApiConstants.JMS_FORMAT_TEXT)) {
try {
DataGraph dg = siMsg.getDataGraph();
if (dg.getRootObject().isSet("data/value")){
String msgText = dg.getRootObject().getString("data/value");
if (msgText.indexOf(textToReplace) >= 0) {
log.writeMessageBody((SIMessageContext)ctx);
msgText = msgText.replaceAll(textToReplace, replacementText);
dg.getRootObject().setString("data/value", msgText);
siMsg.setDataGraph(dg, siMsg.getFormat());
log.writeMessageBody((SIMessageContext)ctx);
}
}
} catch (Exception e) {
logger.log(Level.SEVERE, "Could not process message due to exception", e);
}
}else{
logger.log(Level.INFO, "Message {0} not changed becuase it is not a text message", siMsg.getApiMessageId());
}
return true;
}
public String getReplacementText() {
return replacementText;
}
public void setReplacementText(String replacementText) {
this.replacementText = replacementText;
}
public String getTextToReplace() {
return textToReplace;
}
public void setTextToReplace(String textToReplace) {
this.textToReplace = textToReplace;
}
}
将处理程序类作为名为 ReplaceTextMediation 的新中介处理程序编译和组装到 DeployableMediation 项目中。
公布项目并运行所提供的 jacl 脚本 ReplaceTextMediationAdmin.jacl 来创建中介和所需的消息传递资源。
重新启动服务器。
使用 jms/CF5 将消息发送到 jms/replaceTextQueue,并且在消息正文中包括文本 Message。应用服务器日志将包括与以下内容类似的三个日志条目:
清单 15. 应用服务器日志条目 [27/05/05 12:14:43:165 BST] 00000064 ReplaceTextMe I Mediation invoked
(Msg ID:82f4131b80304e398e1dfbbb110a134f0000000000000001
Mediation: ReplaceTextMediation Destination: sampleMediationQueue)
[27/05/05 12:14:49:346 BST] 00000064 ReplaceTextMe I
Msg ID:82f4131b80304e398e1dfbbb110a134f0000000000000001
Mediation: ReplaceTextMediation Destination:sampleMediationQueue
JMS TextMessage : Message text
JMS TextMessage, as bytes :
0000: 4d65 7373 6167 6520 7465 7874 Message text
[27/05/05 12:14:49:388 BST] 00000064 ReplaceTextMe I
Msg ID:82f4131b80304e398e1dfbbb110a134f0000000000000001
Mediation: ReplaceTextMediation Destination:sampleMediationQueue
JMS TextMessage : MediatedMessage text
JMS TextMessage, as bytes :
0000: 4d65 6469 6174 6564 4d65 7373 6167 6520 MediatedMessage
0010: 7465 7874 text
结束语
本系列文章的第 3 部分阐述了如何使用中介阅读和更改消息有效负载。转换正在应用程序间路由的消息是一种强大功能,它允许将两个格式要求不同(甚至 JMS 消息类型要求可能也不同的)的消息的应用程序连接在一起。
简单的 ReplaceTextMediation 示例说明了更改文本消息的唯一方式。如何更改消息正文以及将其更改为什么,具体情况取决于所使用的 JMS 消息的类型和内容。
本文没有谈及如何传递 Web 服务消息。这需要配置 Web Services Gateway 和其他资源。WebSphere Application Server 信息中心包含关于将 Web 服务消息映射到 SDO 消息的信息。请拭目以待,关于 Web 服务消息中介的文章即将发布。
下一期我们将介绍如何使用基于 XSLT 的中介来执行更复杂的消息转换。
本文示例源代码或素材下载
- ››消息称中国移动即将获得iPhone 4销售权
- ››实用模式:内部域特定语言
- ››实用才好 浏览PDF文档用美图看看
- ››消息称联通高层赴美谈引入iPhone4 将带WiFi
- ››消息称微软将在近期发布IE9 beta
- ››消息称台湾最快7到8月引入新版iPhone
- ››消息称微软中国Windows 7正版验证计划或将推迟
- ››消息称富士康今年交付2400万部iPhone 4G
- ››消息称谷歌与运营商分享 Android 广告收入
- ››消息称联通约见开发者商谈 iPhone 应用软件
- ››消息称 WiFi 版 iPhone 已经到位 可能在3月份推出...
- ››实用小技巧:看看你的系统几岁了?
更多精彩
赞助商链接