下面列出了javax.jms.MessageProducer#setPriority ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static MessageProducer createMessageProducer(
Session session,
Destination destination,
MessageProducerOption producerOption) throws JMSException {
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryDelay(producerOption.getDeliveryDelay());
producer.setDeliveryMode(producerOption.getDeliveryMode());
producer.setDisableMessageTimestamp(producerOption.isDisableMessageTimestamp());
producer.setDisableMessageID(producerOption.isDisableMessageId());
producer.setPriority(producerOption.getPriority());
producer.setTimeToLive(producerOption.getTimeToLive());
return producer;
}
private void sendMessage(Queue queue, Session session) throws Exception {
MessageProducer mp = session.createProducer(queue);
try {
mp.setDisableMessageID(true);
mp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
mp.setPriority(Message.DEFAULT_PRIORITY);
mp.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
mp.send(session.createTextMessage("This is message for " + queue.getQueueName()));
} finally {
mp.close();
}
}
@Test
public void testSetPriority() throws Exception {
Connection pconn = createConnection();
try {
Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = ps.createProducer(ActiveMQServerTestCase.topic1);
p.setPriority(9);
ProxyAssertSupport.assertEquals(9, p.getPriority());
p.setPriority(0);
ProxyAssertSupport.assertEquals(0, p.getPriority());
} finally {
pconn.close();
}
}
/**
* Test that after sending an initial message with priority 0, it is able to be repeatedly reflected back to the queue using
* default priority and then consumed again, with separate transacted sessions with prefetch 1 for producer and consumer.
*
* Highlighted defect with PriorityQueues resolved in QPID-3927.
*/
@Test
public void testMessageReflectionWithPriorityIncreaseOnTransactedSessionsWithPrefetch1() throws Exception
{
Queue queue = createPriorityQueue(getTestName(), 10);
Connection connection = getConnectionBuilder().setPrefetch(1).build();
try
{
connection.start();
final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
//create the consumer, producer, add message listener
CountDownLatch latch = new CountDownLatch(5);
MessageConsumer consumer = producerSession.createConsumer(queue);
MessageProducer producer = producerSession.createProducer(queue);
ReflectingMessageListener listener =
new ReflectingMessageListener(producerSession, producer, consumerSession, latch);
consumer.setMessageListener(listener);
//Send low priority 0 message to kick start the asynchronous reflection process
producer.setPriority(0);
producer.send(nextMessage(producerSession, 1));
producerSession.commit();
//wait for the reflection process to complete
assertTrue("Test process failed to complete in allowed time", latch.await(10, TimeUnit.SECONDS));
assertNull("Unexpected throwable encountered", listener.getThrown());
}
finally
{
connection.close();
}
}
private void testPriority(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue1 = session1.createQueue(getQueueName());
javax.jms.Queue queue2 = session2.createQueue(getQueueName());
final MessageConsumer consumer2 = session2.createConsumer(queue2);
MessageProducer producer = session1.createProducer(queue1);
producer.setPriority(2);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message received = consumer2.receive(100);
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
assertEquals(2, received.getJMSPriority());
} finally {
connection1.close();
connection2.close();
}
}
public void doTestBatchWithLowPriorityFirst(boolean clientPrioritySupport) throws Exception {
connection.start();
connection.setMessagePrioritySupported(clientPrioritySupport);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
MessageProducer producer = session.createProducer(destination);
producer.setPriority(0);
sendMessages(session, producer, 2);
producer.close();
MessageProducer producer2 = session.createProducer(destination);
producer2.setPriority(9);
sendMessages(session, producer2, 3);
producer2.close();
session.close();
Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = consumerSession.createConsumer(destination);
for (int i = 0; i < 5; i++) {
Message message = messageConsumer.receive(4000);
LOG.info("MessageID: " + message.getJMSMessageID());
}
consumerSession.commit();
consumerSession.close();
// should be nothing left
consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
messageConsumer = consumerSession.createConsumer(destination);
assertNull("No message left", messageConsumer.receive(1000));
consumerSession.close();
}
@Test(timeout = 10000)
public void testPriorityConfiguration() throws Exception {
MessageProducer producer = session.createProducer(null);
assertEquals(Message.DEFAULT_PRIORITY, producer.getPriority());
producer.setPriority(9);
assertEquals(9, producer.getPriority());
}
public void doTestSendLargeMessage(int expectedSize) throws Exception{
LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
byte[] payload = createLargePayload(expectedSize);
assertEquals(expectedSize, payload.length);
Connection connection = createAmqpConnection();
long startTime = System.currentTimeMillis();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.getMethodName());
MessageProducer producer = session.createProducer(queue);
BytesMessage message = session.createBytesMessage();
message.writeBytes(payload);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Set this to non-default to get a Header in the encoded message.
producer.setPriority(4);
producer.send(message);
long endTime = System.currentTimeMillis();
LOG.info("Returned from send after {} ms", endTime - startTime);
startTime = System.currentTimeMillis();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
LOG.info("Calling receive");
Message received = consumer.receive();
assertNotNull(received);
assertTrue(received instanceof BytesMessage);
BytesMessage bytesMessage = (BytesMessage) received;
assertNotNull(bytesMessage);
endTime = System.currentTimeMillis();
LOG.info("Returned from receive after {} ms", endTime - startTime);
byte[] bytesReceived = new byte[expectedSize];
assertEquals(expectedSize, bytesMessage.readBytes(bytesReceived, expectedSize));
assertTrue(Arrays.equals(payload, bytesReceived));
connection.close();
}
public void createProducer(final CreateProducerCommand command)
{
try
{
final Session session = _testSessions.get(command.getSessionName());
if (session == null)
{
throw new DistributedTestException("No test session found called: " + command.getSessionName(), command);
}
synchronized(session)
{
final Destination destination;
if(command.isTopic())
{
destination = session.createTopic(command.getDestinationName());
}
else
{
destination = session.createQueue(command.getDestinationName());
}
final MessageProducer jmsProducer = session.createProducer(destination);
if (command.getPriority() != -1)
{
jmsProducer.setPriority(command.getPriority());
}
if (command.getTimeToLive() > 0)
{
jmsProducer.setTimeToLive(command.getTimeToLive());
}
if (command.getDeliveryMode() == DeliveryMode.NON_PERSISTENT
|| command.getDeliveryMode() == DeliveryMode.PERSISTENT)
{
jmsProducer.setDeliveryMode(command.getDeliveryMode());
}
addProducer(command.getParticipantName(), jmsProducer);
}
}
catch (final JMSException jmse)
{
throw new DistributedTestException("Unable to create new producer: " + command, jmse);
}
}
@Test
public void testPriority() throws Exception
{
final int priorities = 10;
final Queue queue = createPriorityQueue(getTestName(), priorities);
final Connection producerConnection = getConnection();
try
{
final Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
final MessageProducer producer = producerSession.createProducer(queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
producer.setPriority(msg % priorities);
producer.send(nextMessage(producerSession, msg));
}
producerSession.commit();
}
finally
{
producerConnection.close();
}
final Connection consumerConnection = getConnection();
try
{
final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message previous = null;
for (int messageCount = 0, expectedPriority = priorities - 1; messageCount < MSG_COUNT; messageCount++)
{
Message received = consumer.receive(getReceiveTimeout());
assertNotNull(String.format("Message '%d' is not received", messageCount), received);
assertEquals(String.format("Unexpected message '%d' priority", messageCount),
expectedPriority,
received.getJMSPriority());
if (previous != null)
{
assertTrue(String.format(
"Messages '%d' arrived in unexpected order : previous message '%d' priority is '%d', received message '%d' priority is '%d'",
messageCount,
previous.getIntProperty("msg"),
previous.getJMSPriority(),
received.getIntProperty("msg"),
received.getJMSPriority()),
previous.getJMSPriority() > received.getJMSPriority()
|| (previous.getJMSPriority() == received.getJMSPriority()
&& previous.getIntProperty("msg") < received.getIntProperty("msg")));
}
previous = received;
if (messageCount > 0 && (messageCount + 1) % (MSG_COUNT / priorities) == 0)
{
expectedPriority--;
}
}
}
finally
{
consumerConnection.close();
}
}
@Test
public void testOddOrdering() throws Exception
{
final Queue queue = createPriorityQueue(getTestName(), 3);
final Connection producerConnection = getConnection();
try
{
final Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
final MessageProducer producer = producerSession.createProducer(queue);
// In order ABC
producer.setPriority(9);
producer.send(nextMessage(producerSession, 1));
producer.setPriority(4);
producer.send(nextMessage(producerSession, 2));
producer.setPriority(1);
producer.send(nextMessage(producerSession, 3));
// Out of order BAC
producer.setPriority(4);
producer.send(nextMessage(producerSession, 4));
producer.setPriority(9);
producer.send(nextMessage(producerSession, 5));
producer.setPriority(1);
producer.send(nextMessage(producerSession, 6));
// Out of order BCA
producer.setPriority(4);
producer.send(nextMessage(producerSession, 7));
producer.setPriority(1);
producer.send(nextMessage(producerSession, 8));
producer.setPriority(9);
producer.send(nextMessage(producerSession, 9));
// Reverse order CBA
producer.setPriority(1);
producer.send(nextMessage(producerSession, 10));
producer.setPriority(4);
producer.send(nextMessage(producerSession, 11));
producer.setPriority(9);
producer.send(nextMessage(producerSession, 12));
producerSession.commit();
}
finally
{
producerConnection.close();
}
final Connection consumerConnection = getConnection();
try
{
final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message msg = consumer.receive(getReceiveTimeout());
assertEquals(1, msg.getIntProperty("msg"));
msg = consumer.receive(getReceiveTimeout());
assertEquals(5, msg.getIntProperty("msg"));
msg = consumer.receive(getReceiveTimeout());
assertEquals(9, msg.getIntProperty("msg"));
msg = consumer.receive(getReceiveTimeout());
assertEquals(12, msg.getIntProperty("msg"));
msg = consumer.receive(getReceiveTimeout());
assertEquals(2, msg.getIntProperty("msg"));
msg = consumer.receive(getReceiveTimeout());
assertEquals(4, msg.getIntProperty("msg"));
msg = consumer.receive(getReceiveTimeout());
assertEquals(7, msg.getIntProperty("msg"));
msg = consumer.receive(getReceiveTimeout());
assertEquals(11, msg.getIntProperty("msg"));
msg = consumer.receive(getReceiveTimeout());
assertEquals(3, msg.getIntProperty("msg"));
msg = consumer.receive(getReceiveTimeout());
assertEquals(6, msg.getIntProperty("msg"));
msg = consumer.receive(getReceiveTimeout());
assertEquals(8, msg.getIntProperty("msg"));
msg = consumer.receive(getReceiveTimeout());
assertEquals(10, msg.getIntProperty("msg"));
}
finally
{
consumerConnection.close();
}
}
@Test
public void testIndividualACKMessageConsumer() throws Exception {
Connection conn = cf.createConnection();
Session session = conn.createSession(false, ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
MessageProducer producer = session.createProducer(jBossQueue);
MessageConsumer consumer = session.createConsumer(jBossQueue);
int noOfMessages = 100;
for (int i = 0; i < noOfMessages; i++) {
producer.setPriority(2);
producer.send(session.createTextMessage("m" + i));
}
conn.start();
final AtomicInteger errors = new AtomicInteger(0);
final ReusableLatch latch = new ReusableLatch();
latch.setCount(noOfMessages);
class MessageAckEven implements MessageListener {
int count = 0;
@Override
public void onMessage(Message msg) {
try {
TextMessage txtmsg = (TextMessage) msg;
if (!txtmsg.getText().equals("m" + count)) {
errors.incrementAndGet();
}
if (count % 2 == 0) {
msg.acknowledge();
}
count++;
} catch (Exception e) {
errors.incrementAndGet();
} finally {
latch.countDown();
}
}
}
consumer.setMessageListener(new MessageAckEven());
Assert.assertTrue(latch.await(5000));
session.close();
session = conn.createSession(false, ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
consumer = session.createConsumer(jBossQueue);
// Consume odd numbers first
for (int i = 0; i < noOfMessages; i++) {
if (i % 2 == 0) {
continue;
}
TextMessage m = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals("m" + i, m.getText());
}
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
conn.close();
Queue queue = server.locateQueue(queueName);
Wait.assertEquals(0, queue::getDeliveringCount);
Wait.assertEquals(0, queue::getMessageCount);
}
public void doTestSendLargeMessage(int expectedSize) throws Exception {
LOG.debug("doTestSendLargeMessage called with expectedSize " + expectedSize);
byte[] payload = createLargePayload(expectedSize);
assertEquals(expectedSize, payload.length);
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
try (Connection connection = factory.createConnection()) {
long startTime = System.currentTimeMillis();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.getMethodName());
MessageProducer producer = session.createProducer(queue);
BytesMessage message = session.createBytesMessage();
message.writeBytes(payload);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Set this to non-default to get a Header in the encoded message.
producer.setPriority(4);
producer.send(message);
long endTime = System.currentTimeMillis();
LOG.debug("Returned from send after {} ms", endTime - startTime);
startTime = System.currentTimeMillis();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
LOG.debug("Calling receive");
Message received = consumer.receive();
assertNotNull(received);
assertTrue(received instanceof BytesMessage);
BytesMessage bytesMessage = (BytesMessage) received;
assertNotNull(bytesMessage);
endTime = System.currentTimeMillis();
LOG.debug("Returned from receive after {} ms", endTime - startTime);
byte[] bytesReceived = new byte[expectedSize];
assertEquals(expectedSize, bytesMessage.readBytes(bytesReceived, expectedSize));
assertTrue(Arrays.equals(payload, bytesReceived));
connection.close();
}
}
private void doSendingMessageNonPersistentTestImpl(boolean anonymousProducer, boolean setPriority, boolean setOnProducer) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
//Add capability to indicate support for ANONYMOUS-RELAY
Symbol[] serverCapabilities = new Symbol[]{ANONYMOUS_RELAY};
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
testPeer.expectBegin();
String queueName = "myQueue";
TargetMatcher targetMatcher = new TargetMatcher();
if(anonymousProducer) {
targetMatcher.withAddress(nullValue());
} else {
targetMatcher.withAddress(equalTo(queueName));
}
testPeer.expectSenderAttach(targetMatcher, false, false);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer;
if(anonymousProducer) {
producer = session.createProducer(null);
} else {
producer = session.createProducer(queue);
}
byte priority = 5;
String text = "myMessage";
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
if(setPriority) {
MessageHeaderSectionMatcher headerMatcher = new MessageHeaderSectionMatcher(true);
headerMatcher.withDurable(equalTo(false));
headerMatcher.withPriority(equalTo(UnsignedByte.valueOf(priority)));
messageMatcher.setHeadersMatcher(headerMatcher);
}
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
testPeer.expectClose();
Message message = session.createTextMessage(text);
assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
if(setOnProducer) {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
if(setPriority) {
producer.setPriority(priority);
}
if(anonymousProducer) {
producer.send(queue, message);
} else {
producer.send(message);
}
} else {
if(anonymousProducer) {
producer.send(queue, message, DeliveryMode.NON_PERSISTENT, setPriority ? priority : Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
} else {
producer.send(message, DeliveryMode.NON_PERSISTENT, setPriority ? priority : Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
}
}
assertEquals("Should have NON_PERSISTENT delivery mode set", DeliveryMode.NON_PERSISTENT, message.getJMSDeliveryMode());
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
public String send(Session session, Destination dest, String correlationId, String message, String messageType, long timeToLive, int deliveryMode, int priority, boolean ignoreInvalidDestinationException, Map<String, Object> properties) throws NamingException, JMSException, SenderException {
javax.jms.Message msg = createMessage(session, correlationId, message);
MessageProducer mp;
try {
if (useJms102()) {
if ((session instanceof TopicSession) && (dest instanceof Topic)) {
mp = getTopicPublisher((TopicSession)session, (Topic)dest);
} else {
if ((session instanceof QueueSession) && (dest instanceof Queue)) {
mp = getQueueSender((QueueSession)session, (Queue)dest);
} else {
throw new SenderException("classes of Session ["+session.getClass().getName()+"] and Destination ["+dest.getClass().getName()+"] do not match (Queue vs Topic)");
}
}
} else {
mp = session.createProducer(dest);
}
} catch (InvalidDestinationException e) {
if (ignoreInvalidDestinationException) {
log.warn("queue ["+dest+"] doesn't exist");
return null;
} else {
throw e;
}
}
if (messageType!=null) {
msg.setJMSType(messageType);
}
if (deliveryMode>0) {
msg.setJMSDeliveryMode(deliveryMode);
mp.setDeliveryMode(deliveryMode);
}
if (priority>=0) {
msg.setJMSPriority(priority);
mp.setPriority(priority);
}
if (timeToLive>0) {
mp.setTimeToLive(timeToLive);
}
if (properties!=null) {
for (Iterator<String> it = properties.keySet().iterator(); it.hasNext();) {
String key = it.next();
Object value = properties.get(key);
log.debug("setting property ["+name+"] to value ["+value+"]");
msg.setObjectProperty(key, value);
}
}
String result = send(mp, msg, ignoreInvalidDestinationException);
mp.close();
return result;
}