使用 Java API 处理 WebSphere MQ 大消息
2009-12-31 00:00:00 来源:WEB开发网对于每一个消息,我们还应标识这是一个组内的消息(MQMF_MSG_IN_GROUP):
myMsg.messageFlags = MQC.MQMF_MSG_IN_GROUP;
对于组内的最后一个消息,也需要设置特殊标识(MQMF_LAST_MSG_IN_GROUP):
myMsg.messageFlags = MQC.MQMF_LAST_MSG_IN_GROUP;
同样的,在接收方程序中,我们也是把同一组的所有消息放在一个同步点中接收,所以需要设置 MQGetMessageOptions 为 MQGMO_SYNCPOINT;同时,我们也设置 MQGMO_LOGICAL_ORDER 来保证同一个组里的所有消息是按逻辑顺序被取出;另外,我们还需设置同一组所有的消息都到达后才处理的选项(MQGMO_ALL_MSGS_AVAILABLE),这是为了防止万一由于异常导致某一成员消息丢失而引起程序无限等待的情形:
MQGetMessageOptions gmo = new MQGetMessageOptions ();
gmo.options = MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_MSGS_AVAILABLE;
由于我们是按逻辑顺序来取组内成员消息的,所以设置循环取消息的时候,只要遇到某一个消息是组内最后一个的标识,我们就认为已经取到了该组所有的消息。如果没有设置按照逻辑顺序来取消息片段,则需要应用程序根据消息序列号、取到的消息个数、是否是组内最后一个消息等标识来判断是否已经取到该组所有的消息。
部分代码如清单 3,您可以下载详细的示例代码。
清单 3 消息分组AppGrpSender.java
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
myQMgr = new MQQueueManager ("QM1");
myQueue = myQMgr.accessQueue("TESTQ", openOptions);
for(int i=0;i<3;i++)
{
MQMessage myMsg = new MQMessage ();
MQPutMessageOptions pmo = new MQPutMessageOptions ();
pmo.options = MQC.MQPMO_LOGICAL_ORDER + MQC.MQPMO_SYNCPOINT;
if (i<2)
myMsg.messageFlags = MQC.MQMF_MSG_IN_GROUP;
else
myMsg.messageFlags = MQC.MQMF_LAST_MSG_IN_GROUP;
String strMsg = "Hello" + i;
myMsg.write(strMsg.getBytes());
myQueue.put(myMsg,pmo);
System.out.println("Put message" + (i+1) + " '" + strMsg + "'! ");
}
myQMgr.commit();
myQueue.close();
myQMgr.disconnect();
AppGrpReceiver.java
int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING;
myQMgr = new MQQueueManager ("QM1");
myQueue = myQMgr.accessQueue("TESTQ", openOptions);
MQMessage myMsg;
MQGetMessageOptions gmo = new MQGetMessageOptions ();
gmo.options =
MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_MSGS_AVAILABLE;
String strMsg = "";
boolean isLastMsg = false;
int seq = 0;
while(!isLastMsg)
{
seq++;
myMsg = new MQMessage ();
myQueue.get(myMsg, gmo);
if (myMsg.messageFlags == MQC.MQMF_MSG_IN_GROUP + MQC.MQMF_LAST_MSG_IN_GROUP)
isLastMsg = true;
byte[] b = new byte[myMsg.getMessageLength()];
myMsg.readFully(b);
strMsg = new String(b);
System.out.println("Got message" + seq + ":\n" + strMsg);
}
myQMgr.commit();
myQueue.close();
myQMgr.disconnect();
程序功能介绍:
AppGrpSender 程序是使用一个 for 循环,构造一个组的三个消息,分别写入队列 TESTQ 中。
AppGrpReceiver 程序是从队列 TESTQ 中循环读取消息,根据其逻辑顺序以及是否是组内最后一个消息来判断是否已取完同一组内的所有消息。
相对于消息分片,消息分组不仅仅是处理大消息的一种方法,更为重要的是,消息分组还能维护一组业务数据中的逻辑关系。
结束语
消息分片和消息分组是在 WebSphere MQ 的编程中处理大消息的常用手段,到底采用哪种方式比较合适,需要根据实际的需求而定。如果大消息需要分割成有实际业务意义的一批小消息,那么采用消息分组比较合适;反之,如果大消息无法分割成有实际业务意义的小消息,那么就采用消息分片。甚至在某些复杂的场合下,消息分片和消息分组可以结合起来使用,比如,某批消息传输时由于有先后顺序的要求,被归并到一个组内,同时由于部分消息比较大,又需要分片传输,有兴趣的读者可以自己来实现一下这个复杂的场景。
本文示例源代码或素材下载
更多精彩
赞助商链接