下面列出了javax.jms.BytesMessage#acknowledge ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* This was causing a text message to ber eventually converted into large message when sent over the bridge
*
* @throws Exception
*/
@Test
public void testSendBytesAsLargeOnBridgeOnly() throws Exception {
createQueue(QUEUE);
Queue queue = (Queue) context1.lookup("queue/" + QUEUE);
Connection conn1 = cf1.createConnection();
Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer prod1 = session1.createProducer(queue);
Connection conn2 = cf2.createConnection();
Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons2 = session2.createConsumer(queue);
conn2.start();
byte[] bytes = new byte[10 * 1024];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = getSamplebyte(i);
}
for (int i = 0; i < 10; i++) {
BytesMessage msg = session1.createBytesMessage();
msg.writeBytes(bytes);
prod1.send(msg);
}
session1.commit();
for (int i = 0; i < 5; i++) {
BytesMessage msg2 = (BytesMessage) cons2.receive(5000);
assertNotNull(msg2);
msg2.acknowledge();
for (int j = 0; j < bytes.length; j++) {
assertEquals("Position " + i, msg2.readByte(), bytes[j]);
}
}
conn1.close();
conn2.close();
}
/**
* The message won't be large to the client while it will be considered large through the bridge
*
* @throws Exception
*/
@Test
public void testSendLargeForBridge() throws Exception {
createQueue(QUEUE);
Queue queue = (Queue) context1.lookup("queue/" + QUEUE);
ActiveMQConnectionFactory cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, generateInVMParams(1)));
cf1.setMinLargeMessageSize(200 * 1024);
Connection conn1 = cf1.createConnection();
Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer prod1 = session1.createProducer(queue);
Connection conn2 = cf2.createConnection();
Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons2 = session2.createConsumer(queue);
conn2.start();
byte[] bytes = new byte[150 * 1024];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = getSamplebyte(i);
}
for (int i = 0; i < 10; i++) {
BytesMessage msg = session1.createBytesMessage();
msg.writeBytes(bytes);
prod1.send(msg);
}
session1.commit();
for (int i = 0; i < 5; i++) {
BytesMessage msg2 = (BytesMessage) cons2.receive(5000);
assertNotNull(msg2);
msg2.acknowledge();
for (int j = 0; j < bytes.length; j++) {
assertEquals("Position " + i, msg2.readByte(), bytes[j]);
}
}
conn1.close();
conn2.close();
}