下面列出了javax.jms.Message#setIntProperty ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void run() throws JMSException {
Message message = remoteBroker.session.createMessage();
if (requestId != null) {
message.setJMSReplyTo(remoteBroker.replyQueue);
message.setJMSCorrelationID(requestId);
}
message.setStringProperty("source", coordinatorAddress().toString());
message.setStringProperty("target", target.toString());
message.setStringProperty("payload", OperationCodec.toJson(op));
message.setIntProperty("operationType", getOperationType(op).toInt());
switch (target.getAddressLevel()) {
case AGENT:
remoteBroker.agentProducer.send(message);
break;
case WORKER:
remoteBroker.workerProducer.send(message);
break;
default:
throw new RuntimeException("unhandled target:" + target);
}
}
public void send() throws JMSException {
LOG.info("Sending ... ");
Connection con = cf.createConnection();
Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(null);
Message message = sess.createMessage();
message.setIntProperty("ID", ++messageRover);
message.setBooleanProperty("COMMIT", true);
prod.send(topic, message);
msgCount++;
LOG.info("Message Sent.");
sess.close();
con.close();
}
public void sendCoordinator(SimulatorOperation op) {
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("sending [" + op + "]");
}
Destination topic = session.createTopic("coordinator");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(NON_PERSISTENT);
Message message = session.createMessage();
message.setStringProperty("source", selfAddressString);
message.setStringProperty("payload", OperationCodec.toJson(op));
message.setIntProperty("operationType", getOperationType(op).toInt());
producer.send(message);
} catch (JMSException e) {
LOGGER.error(e);
}
}
@Test
public void testBrowseWithSelector() throws Exception {
try {
conn = getConnectionFactory().createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue1);
final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
Message m = session.createMessage();
m.setIntProperty("test_counter", i + 1);
producer.send(m);
}
} finally {
removeAllMessages(queue1.getQueueName(), true);
}
}
/**
* Sends and consumes the messages.
*
* @throws Exception
*/
@Test
public void testSendReceive() throws Exception {
messages.clear();
for (int i = 0; i < data.length; i++) {
Message message = session.createTextMessage(data[i]);
message.setStringProperty("stringProperty", data[i]);
message.setIntProperty("intProperty", i);
producer.send(producerDestination, message);
messageSent();
}
assertMessagesAreReceived();
}
protected void setCustomProperty(Message message, String propertyName, Object propertyValue) throws JMSException
{
if (propertyValue instanceof Integer)
{
message.setIntProperty(propertyName, ((Integer) propertyValue).intValue());
}
else if (propertyValue instanceof Long)
{
message.setLongProperty(propertyName, ((Long) propertyValue).longValue());
}
else if (propertyValue instanceof Boolean)
{
message.setBooleanProperty(propertyName, ((Boolean) propertyValue).booleanValue());
}
else if (propertyValue instanceof Byte)
{
message.setByteProperty(propertyName, ((Byte) propertyValue).byteValue());
}
else if (propertyValue instanceof Double)
{
message.setDoubleProperty(propertyName, ((Double) propertyValue).doubleValue());
}
else if (propertyValue instanceof Float)
{
message.setFloatProperty(propertyName, ((Float) propertyValue).floatValue());
}
else if (propertyValue instanceof Short)
{
message.setShortProperty(propertyName, ((Short) propertyValue).shortValue());
}
else if (propertyValue instanceof String)
{
message.setStringProperty(propertyName, (String) propertyValue);
}
else
{
message.setObjectProperty(propertyName, propertyValue);
}
}
/**
* Test consuming an expired queue.
*
* @throws Exception
*/
public void testConsumeExpiredQueue() throws Exception {
MessageProducer producer = createProducer(timeToLive);
consumerDestination = session.createQueue(getConsumerSubject());
producerDestination = session.createQueue(getProducerSubject());
MessageConsumer consumer = createConsumer();
connection.start();
for (int i = 0; i < data.length; i++) {
Message message = session.createTextMessage(data[i]);
message.setStringProperty("stringProperty", data[i]);
message.setIntProperty("intProperty", i);
if (verbose) {
if (LOG.isDebugEnabled()) {
LOG.debug("About to send a queue message: " + message + " with text: " + data[i]);
}
}
producer.send(producerDestination, message);
}
// sleeps a second longer than the expiration time.
// Basically waits till queue expires.
Thread.sleep(timeToLive + 1000);
// message should have expired.
assertNull(consumer.receive(1000));
}
@Test
public void runtimeSelectorError() throws Exception
{
Connection connection = getConnection();
Queue queue = createQueue(getTestName());
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue , "testproperty % 5 = 1");
MessageProducer producer = session.createProducer(queue);
Message message = session.createMessage();
message.setIntProperty("testproperty", 1); // 1 % 5
producer.send(message);
connection.start();
Message receivedMessage = consumer.receive(getReceiveTimeout());
assertNotNull("Message matching selector should be received", receivedMessage);
message.setStringProperty("testproperty", "hello"); // "hello" % 5 would cause a runtime error
producer.send(message);
receivedMessage = consumer.receive(getReceiveTimeout());
assertNull("Message causing runtime selector error should not be received", receivedMessage);
MessageConsumer consumerWithoutSelector = session.createConsumer(queue);
receivedMessage = consumerWithoutSelector.receive(getReceiveTimeout());
assertNotNull("Message that previously caused a runtime error should be consumable by another consumer", receivedMessage);
}
finally
{
connection.close();
}
}
/**
* Test consuming an expired topic.
*
* @throws Exception
*/
public void testConsumeExpiredTopic() throws Exception {
MessageProducer producer = createProducer(timeToLive);
consumerDestination = session.createTopic(getConsumerSubject());
producerDestination = session.createTopic(getProducerSubject());
MessageConsumer consumer = createConsumer();
connection.start();
for (int i = 0; i < data.length; i++) {
Message message = session.createTextMessage(data[i]);
message.setStringProperty("stringProperty", data[i]);
message.setIntProperty("intProperty", i);
if (verbose) {
if (LOG.isDebugEnabled()) {
LOG.debug("About to send a topic message: " + message + " with text: " + data[i]);
}
}
producer.send(producerDestination, message);
}
// sleeps a second longer than the expiration time.
// Basically waits till topic expires.
Thread.sleep(timeToLive + 1000);
// message should have expired.
assertNull(consumer.receive(1000));
}
@Test
public void defaultFilterIsOverridden() throws Exception
{
String queueName = getTestName();
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
createQueueWithDefaultFilter(queueName, "foo = 1");
Queue queue = createQueue(queueName);
final MessageProducer prod = session.createProducer(queue);
Message message = session.createMessage();
message.setIntProperty("foo", 0);
prod.send(message);
MessageConsumer cons = session.createConsumer(queue, "foo = 0");
Message receivedMsg = cons.receive(getReceiveTimeout());
assertNotNull("Message with foo=0 should be received", receivedMsg);
assertEquals("Property foo not as expected", 0, receivedMsg.getIntProperty("foo"));
message = session.createMessage();
message.setIntProperty("foo", 1);
prod.send( message);
assertNull("Message with foo=1 should not be received", cons.receive(getReceiveTimeout()));
}
finally
{
connection.close();
}
}
public static Message createNextMessage(Session session, int msgCount) throws JMSException
{
Message message = createMessage(session, DEFAULT_MESSAGE_SIZE);
message.setIntProperty(INDEX, msgCount);
return message;
}
/**
* Sends and consumes the messages to a topic destination.
*
* @throws Exception
*/
public void testConsumeTopic() throws Exception {
MessageProducer producer = createProducer(0);
consumerDestination = session.createTopic(getConsumerSubject());
producerDestination = session.createTopic(getProducerSubject());
MessageConsumer consumer = createConsumer();
connection.start();
for (int i = 0; i < data.length; i++) {
Message message = session.createTextMessage(data[i]);
message.setStringProperty("stringProperty", data[i]);
message.setIntProperty("intProperty", i);
if (verbose) {
if (LOG.isDebugEnabled()) {
LOG.debug("About to send a topic message: " + message + " with text: " + data[i]);
}
}
producer.send(producerDestination, message);
}
// should receive a topic since there is no expiration.
assertNotNull(consumer.receive(1000));
}
public void testRemoveMessages() throws Exception {
final int QUEUE_SIZE = 30000;
final long TEST_TIMEOUT = 20000;
// Populate a test queue with uniquely-identifiable messages.
Connection conn = createConnection();
try {
conn.start();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < QUEUE_SIZE; i++) {
Message message = session.createMessage();
message.setIntProperty("id", i);
producer.send(message);
}
session.commit();
} finally {
conn.close();
}
// Access the implementation of the test queue and move the last message
// to another queue. Verify that the move occurred within the limits of
// the test.
Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(destination);
ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
context.setBroker(broker.getBroker());
context.getMessageEvaluationContext().setDestination(destination);
long startTimeMillis = System.currentTimeMillis();
Assert.assertEquals(1, queue.removeMatchingMessages("id=" + (QUEUE_SIZE - 1)));
long durationMillis = System.currentTimeMillis() - startTimeMillis;
LOG.info("It took " + durationMillis + "ms to remove the last message from a queue a " + QUEUE_SIZE + " messages.");
Assert.assertTrue("Removing the message took too long: " + durationMillis + "ms", durationMillis < TEST_TIMEOUT);
}
/**
* Test that the DurableSubscription without selector was successfully
* transfered to the new store, and functions as expected with continued use.
*/
@Test
public void testDurableSubscriptionWithoutSelector() throws Exception
{
TopicConnection connection = getTopicConnection();
try
{
connection.start();
TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
TopicPublisher publisher = session.createPublisher(topic);
int index = ThreadLocalRandom.current().nextInt();
Message messageA = session.createTextMessage("A");
messageA.setIntProperty("ID", index);
messageA.setStringProperty("testprop", "false");
publisher.publish(messageA);
Message messageB = session.createTextMessage("B");
messageB.setIntProperty("ID", index);
messageB.setStringProperty("testprop", "true");
publisher.publish(messageB);
session.commit();
TopicSubscriber subscriber = session.createDurableSubscriber(topic, SUB_NAME);
Message migrated = subscriber.receive(getReceiveTimeout());
assertThat("Failed to receive migrated message", migrated, is(notNullValue()));
Message receivedA = subscriber.receive(getReceiveTimeout());
session.commit();
assertThat("Failed to receive published message A", receivedA, is(notNullValue()));
assertThat("Message A is not Text message", receivedA, is(instanceOf(TextMessage.class)));
assertThat("Unexpected text for A", ((TextMessage) receivedA).getText(), is(equalTo("A")));
assertThat("Unexpected index", receivedA.getIntProperty("ID"), is(equalTo(index)));
Message receivedB = subscriber.receive(getReceiveTimeout());
session.commit();
assertThat("Failed to receive published message B", receivedB, is(notNullValue()));
assertThat("Message B is not Text message", receivedB, is(instanceOf(TextMessage.class)));
assertThat("Unexpected text for B", ((TextMessage) receivedB).getText(), is(equalTo("B")));
assertThat("Unexpected index for B", receivedB.getIntProperty("ID"), is(equalTo(index)));
session.commit();
session.close();
}
finally
{
connection.close();
}
}
@Test(timeout = 60 * 1000)
// https://issues.apache.org/jira/browse/AMQ-3206
public void testCleanupDeletedSubAfterRestart() throws Exception {
Connection con = createConnection("cli1");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", null, true);
session.close();
con.close();
con = createConnection("cli2");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", null, true);
session.close();
con.close();
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
final int toSend = 500;
final String payload = Arrays.toString(new byte[40 * 1024]);
int sent = 0;
for (int i = sent; i < toSend; i++) {
Message message = session.createTextMessage(payload);
message.setStringProperty("filter", "false");
message.setIntProperty("ID", i);
producer.send(topic, message);
sent++;
}
con.close();
LOG.info("sent: " + sent);
// kill off cli1
con = createConnection("cli1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.unsubscribe("SubsId");
destroyBroker();
createBroker(false);
con = createConnection("cli2");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
final DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
consumer.setMessageListener(listener);
assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("Want: " + toSend + ", current: " + listener.count);
return listener.count == toSend;
}
}));
session.close();
con.close();
destroyBroker();
createBroker(false);
final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
assertTrue("Should have less than three journal files left but was: " + pa.getStore().getJournal().getFileMap().size(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return pa.getStore().getJournal().getFileMap().size() <= 3;
}
}));
}
@Test(timeout = 20000)
public void testRemotelyCloseOneProducerDoesNotCompleteAsyncSendFromAnotherProducer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectSenderAttach();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
// Create a second producer which allows for a safe wait for credit for the
// first producer without the need for a sleep. Otherwise the first producer
// might not do an actual async send due to not having received credit yet.
session.createProducer(queue);
Message message = session.createTextMessage("content");
message.setIntProperty("test", 1);
testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
// This closes link for the second producer we created, not the one that we
// will use to send a message.
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true);
assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
TestJmsCompletionListener listener = new TestJmsCompletionListener();
try {
producer.send(message, listener);
} catch (JMSException e) {
LOG.warn("Caught unexpected error: {}", e.getMessage());
fail("No expected exception for this send.");
}
testPeer.waitForAllHandlersToComplete(2000);
assertFalse("Should not get async callback", listener.awaitCompletion(10, TimeUnit.MILLISECONDS));
// Closing the session should complete the send with an exception
testPeer.expectEnd();
session.close();
assertTrue("Did not get async callback", listener.awaitCompletion(5, TimeUnit.SECONDS));
assertNotNull(listener.exception);
assertNotNull(listener.message);
assertTrue(listener.message instanceof TextMessage);
// Message should be readable
assertNotNull("Should have a readable JMSDestination", message.getJMSDestination());
assertEquals("Message body not as expected", "content", ((TextMessage) message).getText());
assertEquals("Message property not as expected", 1, message.getIntProperty("test"));
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test
public void testConflation() throws Exception
{
final String queueName = getTestName();
final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, false);
final Connection producerConnection = getConnection();
try
{
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
Message message = producerSession.createMessage();
message.setStringProperty(KEY_PROPERTY, "A");
message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 1);
producer.send(message);
message.setStringProperty(KEY_PROPERTY, "B");
message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 2);
producer.send(message);
message.setStringProperty(KEY_PROPERTY, "A");
message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 3);
producer.send(message);
message.setStringProperty(KEY_PROPERTY, "B");
message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 4);
producer.send(message);
}
finally
{
producerConnection.close();
}
Connection consumerConnection = getConnection();
try
{
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message received1 = consumer.receive(getReceiveTimeout());
assertNotNull("First message is not received", received1);
assertEquals("Unexpected key property value", "A", received1.getStringProperty(KEY_PROPERTY));
assertEquals("Unexpected sequence property value",
3,
received1.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
Message received2 = consumer.receive(getReceiveTimeout());
assertNotNull("Second message is not received", received2);
assertEquals("Unexpected key property value", "B", received2.getStringProperty(KEY_PROPERTY));
assertEquals("Unexpected sequence property value",
4,
received2.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
assertNull("Unexpected message is received", consumer.receive(getReceiveTimeout() / 4));
}
finally
{
consumerConnection.close();
}
}
@Test(timeout=20000)
public void testProducedMessagesOnTransactedSessionCanBeReused() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
MessageProducer producer = session.createProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
Message message = session.createMessage();
for(int i = 0; i < 3; ++i) {
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(nullValue());
TransactionalState txState = new TransactionalState();
txState.setTxnId(txnId);
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
message.setIntProperty("sequence", i);
producer.send(message);
}
// Expect rollback on close without a commit call.
testPeer.expectDischarge(txnId, true);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test
@Ignore
public void _testSendCommitQueueCommitsInOrder() throws Exception {
Connection conn = null;
try {
conn = createConnection();
Session producerSess = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSess.createProducer(queue1);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Session consumerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(queue1);
CountDownLatch latch = new CountDownLatch(1);
conn.start();
myReceiver myReceiver = new myReceiver(latch, conn);
consumer.setMessageListener(myReceiver);
long lastBatchTime = System.currentTimeMillis();
int sentId = 0;
boolean started = false;
// Send some messages
while (true) {
try {
Message m = producerSess.createMessage();
m.setIntProperty("foo", sentId);
sentId++;
producer.send(m);
if (sentId == 1 || System.currentTimeMillis() - lastBatchTime > 50) {
lastBatchTime = System.currentTimeMillis();
producerSess.commit();
}
} catch (JMSException e) {
//ignore connection closed by consumer
}
// wait for the first message to be received before we continue sending
if (!started) {
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
started = true;
} else {
if (myReceiver.failed) {
throw myReceiver.e;
}
}
}
} finally {
if (conn != null) {
conn.close();
}
removeAllMessages(queue1.getQueueName(), true);
}
}
@Test
public void testBrowse() throws Exception {
conn = getConnectionFactory().createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue1);
QueueBrowser browser = session.createBrowser(queue1);
ProxyAssertSupport.assertEquals(browser.getQueue(), queue1);
ProxyAssertSupport.assertNull(browser.getMessageSelector());
Enumeration<Message> en = browser.getEnumeration();
conn.start();
Message m = session.createMessage();
m.setIntProperty("cnt", 0);
producer.send(m);
Message m2 = en.nextElement();
Assert.assertNotNull(m2);
drainDestination(getConnectionFactory(), queue1);
}