使用 Java API 处理 WebSphere MQ 大消息
2009-12-31 00:00:00 来源:WEB开发网对于每一个消息片段,我们还应标识这是一个消息片段(MQMF_SEGMENT):
myMsg.messageFlags = MQC.MQMF_SEGMENT;
对于最后一个消息片段,也需要设置特殊标识(MQMF_LAST_SEGMENT):
myMsg.messageFlags = MQC.MQMF_LAST_SEGMENT;
同样的,在接收方程序中,我们也是把所有的消息片段放在一个同步点中接收,所以需要设置 MQGetMessageOptions 为 MQGMO_SYNCPOINT;同时,我们也设置 MQGMO_LOGICAL_ORDER 来保证所有的消息片段是按逻辑顺序被取出;另外,我们还需设置所有的消息片段都到达后才处理的选项(MQGMO_ALL_SEGMENTS_AVAILABLE),这是为了防止万一由于异常导致消息片段丢失而引起程序无限等待的情形:
MQGetMessageOptions gmo = new MQGetMessageOptions ();
gmo.options = MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_SEGMENTS_AVAILABLE;
由于我们是按逻辑顺序来取消息片段的,所以设置循环取消息的时候,只要遇到某一个消息片段是最后一个的标识,我们就认为已经取到了完整的消息。如果没有设置按照逻辑顺序来取消息片段,则需要应用程序根据消息序列号、偏移量、是否是最后一个消息片段等标识来判断是否已经取到完整的消息。
应用程序实现消息分片的部分代码如清单 2,您可以下载详细的示例代码:
清单 2 应用程序实现消息分片AppSegSender.java
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
myQMgr = new MQQueueManager ("QM1");
myQueue = myQMgr.accessQueue("TESTQ", openOptions);
for(int i=0;i<3;i++)
{
MQMessage myMsg = new MQMessage ();
MQPutMessageOptions pmo = new MQPutMessageOptions ();
pmo.options = MQC.MQPMO_LOGICAL_ORDER + MQC.MQPMO_SYNCPOINT;
if (i<2)
myMsg.messageFlags = MQC.MQMF_SEGMENT;
else
myMsg.messageFlags = MQC.MQMF_LAST_SEGMENT;
String strMsg = "Hello" + i;
myMsg.write(strMsg.getBytes());
myQueue.put(myMsg,pmo);
System.out.println("Put message '" + strMsg + "'! ");
}
myQMgr.commit();
myQueue.close();
myQMgr.disconnect();
AppSegReceiver.java
int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING;
myQMgr = new MQQueueManager ("QM1");
myQueue = myQMgr.accessQueue("TESTQ", openOptions);
MQMessage myMsg;
MQGetMessageOptions gmo = new MQGetMessageOptions ();
gmo.options =
MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_SEGMENTS_AVAILABLE;
String strMsg = "";
boolean isLastSegment = false;
while(!isLastSegment)
{
myMsg = new MQMessage ();
myQueue.get(myMsg, gmo);
if (myMsg.messageFlags == MQC.MQMF_SEGMENT + MQC.MQMF_LAST_SEGMENT)
isLastSegment = true;
byte[] b = new byte[myMsg.getMessageLength()];
myMsg.readFully(b);
strMsg += new String(b);
}
System.out.println("Got message:\n" + strMsg);
myQMgr.commit();
myQueue.close();
myQMgr.disconnect();
更多精彩
赞助商链接