javax.jms.BytesMessage#acknowledge ( )源码实例Demo

下面列出了javax.jms.BytesMessage#acknowledge ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

/**
 * This was causing a text message to ber eventually converted into large message when sent over the bridge
 *
 * @throws Exception
 */
@Test
public void testSendBytesAsLargeOnBridgeOnly() throws Exception {
   createQueue(QUEUE);

   Queue queue = (Queue) context1.lookup("queue/" + QUEUE);
   Connection conn1 = cf1.createConnection();
   Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
   MessageProducer prod1 = session1.createProducer(queue);

   Connection conn2 = cf2.createConnection();
   Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageConsumer cons2 = session2.createConsumer(queue);
   conn2.start();

   byte[] bytes = new byte[10 * 1024];

   for (int i = 0; i < bytes.length; i++) {
      bytes[i] = getSamplebyte(i);
   }

   for (int i = 0; i < 10; i++) {
      BytesMessage msg = session1.createBytesMessage();
      msg.writeBytes(bytes);
      prod1.send(msg);
   }

   session1.commit();

   for (int i = 0; i < 5; i++) {
      BytesMessage msg2 = (BytesMessage) cons2.receive(5000);
      assertNotNull(msg2);
      msg2.acknowledge();

      for (int j = 0; j < bytes.length; j++) {
         assertEquals("Position " + i, msg2.readByte(), bytes[j]);
      }
   }

   conn1.close();
   conn2.close();
}
 
/**
 * The message won't be large to the client while it will be considered large through the bridge
 *
 * @throws Exception
 */
@Test
public void testSendLargeForBridge() throws Exception {
   createQueue(QUEUE);

   Queue queue = (Queue) context1.lookup("queue/" + QUEUE);

   ActiveMQConnectionFactory cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, generateInVMParams(1)));
   cf1.setMinLargeMessageSize(200 * 1024);

   Connection conn1 = cf1.createConnection();
   Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
   MessageProducer prod1 = session1.createProducer(queue);

   Connection conn2 = cf2.createConnection();
   Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageConsumer cons2 = session2.createConsumer(queue);
   conn2.start();

   byte[] bytes = new byte[150 * 1024];

   for (int i = 0; i < bytes.length; i++) {
      bytes[i] = getSamplebyte(i);
   }

   for (int i = 0; i < 10; i++) {
      BytesMessage msg = session1.createBytesMessage();
      msg.writeBytes(bytes);
      prod1.send(msg);
   }

   session1.commit();

   for (int i = 0; i < 5; i++) {
      BytesMessage msg2 = (BytesMessage) cons2.receive(5000);
      assertNotNull(msg2);
      msg2.acknowledge();

      for (int j = 0; j < bytes.length; j++) {
         assertEquals("Position " + i, msg2.readByte(), bytes[j]);
      }
   }

   conn1.close();
   conn2.close();
}