下面列出了怎么用javax.jms.DeliveryMode的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void start() throws Exception {
String uri = uriPrefix + hostName + ":" + port;
LOG.info("ACTIVEMQ: Starting ActiveMQ on {}", uri);
configure();
broker = new BrokerService();
broker.addConnector(uri);
broker.start();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri + uriPostfix);
Connection conn = factory.createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
dest = session.createQueue(queueName);
consumer = session.createConsumer(dest);
producer = session.createProducer(dest);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
@Test
public void replyWithFullQoS() throws JMSException {
Session session = mock(Session.class);
Queue replyDestination = mock(Queue.class);
given(session.createQueue("queueOut")).willReturn(replyDestination);
MessageProducer messageProducer = mock(MessageProducer.class);
TextMessage responseMessage = mock(TextMessage.class);
given(session.createTextMessage("Response")).willReturn(responseMessage);
given(session.createProducer(replyDestination)).willReturn(messageProducer);
MessagingMessageListenerAdapter listener = getPayloadInstance("Response", "replyPayloadToQueue", Message.class);
QosSettings settings = new QosSettings(DeliveryMode.NON_PERSISTENT, 6, 6000);
listener.setResponseQosSettings(settings);
listener.onMessage(mock(javax.jms.Message.class), session);
verify(session).createQueue("queueOut");
verify(session).createTextMessage("Response");
verify(messageProducer).send(responseMessage, DeliveryMode.NON_PERSISTENT, 6, 6000);
verify(messageProducer).close();
}
@Test
public void jmsReadOnlyPropertiesNotMapped() throws JMSException {
Message<String> message = initBuilder()
.setHeader(JmsHeaders.DESTINATION, new Destination() {})
.setHeader(JmsHeaders.DELIVERY_MODE, DeliveryMode.NON_PERSISTENT)
.setHeader(JmsHeaders.EXPIRATION, 1000L)
.setHeader(JmsHeaders.MESSAGE_ID, "abc-123")
.setHeader(JmsHeaders.PRIORITY, 9)
.setHeader(JmsHeaders.REDELIVERED, true)
.setHeader(JmsHeaders.TIMESTAMP, System.currentTimeMillis())
.build();
javax.jms.Message jmsMessage = new StubTextMessage();
mapper.fromHeaders(message.getHeaders(), jmsMessage);
assertNull(jmsMessage.getJMSDestination());
assertEquals(DeliveryMode.PERSISTENT, jmsMessage.getJMSDeliveryMode());
assertEquals(0, jmsMessage.getJMSExpiration());
assertNull(jmsMessage.getJMSMessageID());
assertEquals(javax.jms.Message.DEFAULT_PRIORITY, jmsMessage.getJMSPriority());
assertFalse(jmsMessage.getJMSRedelivered());
assertEquals(0, jmsMessage.getJMSTimestamp());
}
protected void publish(int numMessages) throws Exception {
connection = createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = createDestination();
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < numMessages; i++) {
TextMessage msg = session.createTextMessage("This is a test: " + messageCount++);
producer.send(msg);
}
producer.close();
producer = null;
closeSession();
}
@TransactionAttribute(value = TransactionAttributeType.NOT_SUPPORTED)
@Override
public void sendMessage() throws JMSException {
System.out.println("Sending reply message");
Connection conn = null;
Session session = null;
MessageProducer prod = null;
try {
conn = myCF.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
prod = session.createProducer(myReplyQueue);
ObjectMessage msg = session.createObjectMessage();
msg.setObject("Hello world!");
prod.send(msg, DeliveryMode.PERSISTENT, 0, 0);
} finally {
if (prod != null)
prod.close();
if (session != null)
session.close();
if (conn != null)
conn.close();
}
}
public void testMessageExpire() throws Exception {
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("TestTopic");
consumer = session.createDurableSubscriber(topic, "sub1");
producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.setTimeToLive(1000);
connection.start();
// Make sure it works when the durable sub is active.
producer.send(session.createTextMessage("Msg:1"));
assertTextMessageEquals("Msg:1", consumer.receive(1000));
consumer.close();
producer.send(session.createTextMessage("Msg:2"));
producer.send(session.createTextMessage("Msg:3"));
consumer = session.createDurableSubscriber(topic, "sub1");
// Try to get the message.
assertTextMessageEquals("Msg:2", consumer.receive(1000));
Thread.sleep(1000);
assertNull(consumer.receive(1000));
}
@Override
public void sendMessage() throws JMSException {
Connection conn = null;
Session session = null;
MessageProducer prod = null;
try {
conn = myCF.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
prod = session.createProducer(myReplyQueue);
ObjectMessage msg = session.createObjectMessage();
msg.setObject("Hello world!");
prod.send(msg, DeliveryMode.PERSISTENT, 0, 0);
} finally {
if (prod != null)
prod.close();
if (session != null)
session.close();
if (conn != null)
conn.close();
}
}
@TransactionAttribute(value = TransactionAttributeType.REQUIRED)
@Override
public void sendMessage() throws JMSException {
Connection conn = null;
Session session = null;
MessageProducer prod = null;
try {
conn = myCF.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
prod = session.createProducer(myReplyQueue);
ObjectMessage msg = session.createObjectMessage();
msg.setObject("Hello world!");
prod.send(msg, DeliveryMode.PERSISTENT, 0, 0);
} finally {
if (prod != null)
prod.close();
if (session != null)
session.close();
if (conn != null)
conn.close();
}
}
public static void send(String queueName, String text, int delayMillis) {
new Thread(() -> {
try {
logger.info("*** artificial delay {}: {}", queueName, delayMillis);
Thread.sleep(delayMillis);
Connection connection = getConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = session.createTextMessage(text);
producer.send(message);
logger.info("*** sent message {}: {}", queueName, text);
session.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).start();
}
@Test
public void testTextMessageCompression() throws Exception {
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 1, included);
StringBuilder payload = new StringBuilder("test-");
for (int i = 0; i < 100; ++i) {
payload.append(UUID.randomUUID().toString());
}
Message test = localSession.createTextMessage(payload.toString());
producer.send(test);
Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
assertNotNull(msg);
ActiveMQTextMessage message = (ActiveMQTextMessage) msg;
assertTrue(message.isCompressed());
assertEquals(payload.toString(), message.getText());
}
@Test
public void testProducerParticipantResultAttributes() throws Exception
{
ProducerParticipantResult result = new ProducerParticipantResult();
int priority = 2;
long timeToLive = 30;
long producerInterval = 50;
int messageSize = 60;
int deliveryMode = DeliveryMode.PERSISTENT;
result.setPriority(priority);
result.setTimeToLive(timeToLive);
result.setInterval(producerInterval);
result.setPayloadSize(messageSize);
result.setDeliveryMode(deliveryMode);
assertEquals(priority, result.getAttributes().get(PRIORITY));
assertEquals(timeToLive, result.getAttributes().get(TIME_TO_LIVE));
assertEquals(producerInterval, result.getAttributes().get(PRODUCER_INTERVAL));
assertEquals(messageSize, result.getAttributes().get(PAYLOAD_SIZE));
assertEquals(deliveryMode, result.getAttributes().get(DELIVERY_MODE));
}
private void sendMessage(Queue queue, Session session) throws Exception {
MessageProducer mp = session.createProducer(queue);
try {
mp.setDisableMessageID(true);
mp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
mp.setPriority(Message.DEFAULT_PRIORITY);
mp.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
mp.send(session.createTextMessage("This is message for " + queue.getQueueName()));
} finally {
mp.close();
}
}
@Test
public void testObjectMessageCompression() throws Exception {
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 1, included);
StringBuilder payload = new StringBuilder("test-");
for (int i = 0; i < 100; ++i) {
payload.append(UUID.randomUUID().toString());
}
Message test = localSession.createObjectMessage(payload.toString());
producer.send(test);
Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
assertNotNull(msg);
ActiveMQObjectMessage message = (ActiveMQObjectMessage) msg;
assertTrue(message.isCompressed());
assertEquals(payload.toString(), message.getObject());
}
@Test
public void jmsReadOnlyPropertiesNotMapped() throws JMSException {
Message<String> message = initBuilder()
.setHeader(JmsHeaders.DESTINATION, new Destination() {})
.setHeader(JmsHeaders.DELIVERY_MODE, DeliveryMode.NON_PERSISTENT)
.setHeader(JmsHeaders.EXPIRATION, 1000L)
.setHeader(JmsHeaders.MESSAGE_ID, "abc-123")
.setHeader(JmsHeaders.PRIORITY, 9)
.setHeader(JmsHeaders.REDELIVERED, true)
.setHeader(JmsHeaders.TIMESTAMP, System.currentTimeMillis())
.build();
javax.jms.Message jmsMessage = new StubTextMessage();
mapper.fromHeaders(message.getHeaders(), jmsMessage);
assertNull(jmsMessage.getJMSDestination());
assertEquals(DeliveryMode.PERSISTENT, jmsMessage.getJMSDeliveryMode());
assertEquals(0, jmsMessage.getJMSExpiration());
assertNull(jmsMessage.getJMSMessageID());
assertEquals(javax.jms.Message.DEFAULT_PRIORITY, jmsMessage.getJMSPriority());
assertFalse(jmsMessage.getJMSRedelivered());
assertEquals(0, jmsMessage.getJMSTimestamp());
}
@Test
public void testTopicNonPersistentMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.setClientID("clientId");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createTopic(defaultTopicName));
publishTestTopicMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
// consume all messages
consumeTestMessages(consumer, 200);
// All messages should now be gone
verifyPendingStats(defaultTopicName, 0, 0);
connection.close();
}
/**
* We Take the first {@link SourceDataTagValue} object from collection to determine
* the Quality-of-Service settings for the message sending
* @param sourceDataTagValue the first tag extracted from {@link DataTagValueUpdate}
* @return the Quality-of-Service settings for determine the {@link JmsTemplate}
*/
static QosSettings extractQosSettings(SourceDataTagValue sourceDataTagValue) {
QosSettings settings = new QosSettings();
settings.setPriority(sourceDataTagValue.getPriority());
settings.setTimeToLive(sourceDataTagValue.getTimeToLive());
if (sourceDataTagValue.isGuaranteedDelivery()) {
log.debug("\t sending PERSISTENT message");
settings.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
log.debug("\t sending NON-PERSISTENT message");
settings.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
return settings;
}
/**
* Make sure that a temp queue does not drop message if there are no active
* consumers.
*
* @throws JMSException
*/
@Test
public void testTempQueueHoldsMessagesWithoutConsumers() throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = session.createTextMessage("Hello");
producer.send(message);
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
Message message2 = consumer.receive(3000);
Assert.assertNotNull(message2);
Assert.assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage);
Assert.assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText()));
}
/**
* Create an embedded AMQ broker and a client as the producer for our test.
* Create a queue with the supplied queue name.
*
* @throws Exception
*/
private void createAMQClient(String brokerURL) throws Exception
{
startEmbeddedActiveMQBroker();
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
// Create a Connection
connection = connectionFactory.createConnection();
connection.start();
// Create a Session
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create the destination queue
Destination destination = session.createQueue(qNameToUse);
// Create a MessageProducer from the Session to the Topic or Queue
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
/**
* Test that the JMS property <code>JMSDeliveryMode</code> is treated as having the values <code>'PERSISTENT'</code>
* or <code>'NON_PERSISTENT'</code> when used in a message selector (chapter 3.8.1.3).
*/
@Test
public void testJMSDeliveryModeInSelector() throws Exception {
if (receiver != null) {
receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "JMSDeliveryMode = 'PERSISTENT'");
TextMessage dummyMessage = senderSession.createTextMessage();
dummyMessage.setText("testJMSDeliveryModeInSelector:1");
// send a dummy message in *non persistent* mode
sender.send(dummyMessage, DeliveryMode.NON_PERSISTENT, sender.getPriority(), sender.getTimeToLive());
TextMessage message = senderSession.createTextMessage();
message.setText("testJMSDeliveryModeInSelector:2");
// send a message in *persistent*
sender.send(message, DeliveryMode.PERSISTENT, sender.getPriority(), sender.getTimeToLive());
TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
Assert.assertTrue("No message was received", msg != null);
// only the message sent in persistent mode should be received.
Assert.assertEquals(DeliveryMode.PERSISTENT, msg.getJMSDeliveryMode());
Assert.assertEquals("testJMSDeliveryModeInSelector:2", msg.getText());
}
private void produceMsg(int numMessages) throws Exception
{
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a messages
String text = "Hello world! From tester producer";
TextMessage message = session.createTextMessage(text);
for (int i = 0; i < numMessages; i++) {
producer.send(message);
}
// Clean up
session.close();
connection.close();
}
public void testDLQBrowsing() throws Exception {
super.topic = false;
deliveryMode = DeliveryMode.PERSISTENT;
durableSubscriber = false;
messageCount = 1;
connection.start();
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
LOG.info("Will redeliver messages: " + rollbackCount + " times");
sendMessages();
// now lets receive and rollback N times
for (int i = 0; i < rollbackCount; i++) {
makeConsumer();
Message message = consumer.receive(5000);
assertNotNull("No message received: ", message);
session.rollback();
LOG.info("Rolled back: " + rollbackCount + " times");
consumer.close();
}
makeDlqBrowser();
browseDlq();
dlqBrowser.close();
session.close();
Thread.sleep(1000);
session = connection.createSession(transactedMode, acknowledgeMode);
Queue testQueue = new ActiveMQQueue("ActiveMQ.DLQ.Queue.ActiveMQ.DLQ.Queue." + getClass().getName() + "." + getName());
MessageConsumer testConsumer = session.createConsumer(testQueue);
assertNull("The message shouldn't be sent to another DLQ", testConsumer.receive(1000));
}
/**
* Set up the test with a queue and persistent delivery mode.
*
* @see junit.framework.TestCase#setUp()
*/
@Override
protected void setUp() throws Exception {
topic = false;
deliveryMode = DeliveryMode.PERSISTENT;
super.setUp();
}
@Test
public void testDeliveryMode() {
JMSProducer producer = context.createProducer();
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
assertEquals(DeliveryMode.PERSISTENT, producer.getDeliveryMode());
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
assertEquals(DeliveryMode.NON_PERSISTENT, producer.getDeliveryMode());
}
@Test
public void testExpirationOnReceiveNoWait() throws Exception {
Message m = queueProducerSession.createMessage();
queueProducer.send(m, DeliveryMode.NON_PERSISTENT, 4, 1000);
// DeliveryImpl is asynch - need to give enough time to get to the consumer
Thread.sleep(2000);
ProxyAssertSupport.assertNull(queueConsumer.receiveNoWait());
}
@Override
public void setJMSDeliveryMode(int deliveryMode) throws JMSException {
switch (deliveryMode) {
case DeliveryMode.PERSISTENT:
persistent = true;
break;
case DeliveryMode.NON_PERSISTENT:
persistent = false;
break;
default:
throw new JMSException(String.format("Invalid DeliveryMode specific: %d", deliveryMode));
}
}
@Test
public void testMessageSizeSharedDurable() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.setClientID("clientId");
connection.start();
// The publish method will create a second shared consumer
Session s = connection.createSession();
MessageConsumer c = s.createSharedDurableConsumer(s.createTopic(defaultTopicName), "sub1");
publishTestMessagesDurable(connection, new String[] {"sub1",}, 200, publishedMessageSize,
DeliveryMode.PERSISTENT, true);
// verify the count and size - double because two durables so two queue
// bindings
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
c.close();
// consume messages for sub1
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
verifyPendingStats(defaultTopicName, 0, publishedMessageSize.get());
verifyPendingDurableStats(defaultTopicName, 0, publishedMessageSize.get());
connection.close();
}
@Override
protected void setUp() throws Exception {
super.setUp();
connectionFactory = createConnectionFactory();
connection = createConnection();
if (durable) {
connection.setClientID(getClass().getName());
}
LOG.info("Created connection: " + connection);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
LOG.info("Created session: " + session);
producer = session.createProducer(null);
producer.setDeliveryMode(deliveryMode);
LOG.info("Created producer: " + producer + " delivery mode = " + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT"));
if (topic) {
consumerDestination = session.createTopic(getConsumerSubject());
producerDestination = session.createTopic(getProducerSubject());
} else {
consumerDestination = session.createQueue(getConsumerSubject());
producerDestination = session.createQueue(getProducerSubject());
}
LOG.info("Created consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass());
LOG.info("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass());
consumer = createConsumer();
consumer.setMessageListener(this);
connection.start();
// log.info("Created connection: " + connection);
}
@Override
public void sendMessage() throws JMSException {
System.out.println("Sending reply message");
Connection conn = null;
Session session = null;
MessageProducer prod = null;
UserTransaction ux = sessionContext.getUserTransaction();
try {
ux.begin();
conn = myCF.createConnection();
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
prod = session.createProducer(myReplyQueue);
ObjectMessage msg = session.createObjectMessage();
msg.setObject("Hello world!");
prod.send(msg, DeliveryMode.PERSISTENT, 0, 0);
ux.commit();
} catch (Exception e) {
e.printStackTrace();
try {
ux.rollback();
} catch (Exception ex) {
throw new EJBException(
"rollback failed: " + ex.getMessage(), ex);
}
} finally {
if (prod != null)
prod.close();
if (session != null)
session.close();
if (conn != null)
conn.close();
}
}
@Override
public void setUp() throws Exception {
topic = true;
durable = true;
deliveryMode = DeliveryMode.PERSISTENT;
super.setUp();
}
@Override
protected PerfProducer createProducer(ConnectionFactory fac,
Destination dest,
int number,
byte payload[]) throws JMSException {
PerfProducer pp = new PerfProducer(fac, dest, payload);
pp.setDeliveryMode(DeliveryMode.PERSISTENT);
return pp;
}