下面列出了javax.jms.MessageProducer#setDisableMessageTimestamp ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static MessageProducer createMessageProducer(
Session session,
Destination destination,
MessageProducerOption producerOption) throws JMSException {
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryDelay(producerOption.getDeliveryDelay());
producer.setDeliveryMode(producerOption.getDeliveryMode());
producer.setDisableMessageTimestamp(producerOption.isDisableMessageTimestamp());
producer.setDisableMessageID(producerOption.isDisableMessageId());
producer.setPriority(producerOption.getPriority());
producer.setTimeToLive(producerOption.getTimeToLive());
return producer;
}
@Override
public void send(Destination destination, Message message, boolean disableTimestamp) throws JMSException {
MessageProducer producer = session.createProducer(destination);
if (disableTimestamp) {
producer.setDisableMessageTimestamp(true);
}
producer.send(message);
}
@Test(timeout = 10000)
public void testGetDisableTimeStamp() throws Exception {
MessageProducer producer = session.createProducer(null);
assertFalse(producer.getDisableMessageTimestamp());
producer.setDisableMessageTimestamp(true);
assertTrue(producer.getDisableMessageTimestamp());
}
/**
* Test that after sending a message with the disableMessageTimestamp hint set, the
* message object has a 0 JMSTimestamp value, and no creation-time field value was set.
*
* @throws Exception if an error occurs during the test.
*/
@Test(timeout = 20000)
public void testSendingMessageWithDisableMessageTimestampHint() throws Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
String text = "myMessage";
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
propsMatcher.withCreationTime(nullValue()); // Check there is no creation-time value;
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
testPeer.expectClose();
Message message = session.createTextMessage(text);
assertEquals("JMSTimestamp should not yet be set", 0, message.getJMSTimestamp());
producer.setDisableMessageTimestamp(true);
producer.send(message);
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
assertEquals("JMSTimestamp should still not be set", 0, message.getJMSTimestamp());
}
}
private void doProducerWithTTLTestImpl(boolean disableTimestamp, Long propJMS_AMQP_TTL) throws Exception {
connection = createAmqpConnection();
assertNotNull(connection);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(name.getMethodName());
Message message = session.createMessage();
if(propJMS_AMQP_TTL != null) {
message.setLongProperty(AmqpMessageSupport.JMS_AMQP_TTL, propJMS_AMQP_TTL);
}
MessageProducer producer = session.createProducer(queue);
if(disableTimestamp) {
producer.setDisableMessageTimestamp(true);
}
producer.setTimeToLive(100);
producer.send(message);
TimeUnit.SECONDS.sleep(1);
MessageConsumer consumer = session.createConsumer(queue);
message = consumer.receive(150);
if (message != null) {
LOG.info("Unexpected message received: JMSExpiration = {} JMSTimeStamp = {} TTL = {}",
new Object[] { message.getJMSExpiration(), message.getJMSTimestamp(),
message.getJMSExpiration() - message.getJMSTimestamp()});
}
assertNull("Unexpected message received, see log for details", message);
}
private void sendMessages(final int numberOfMessages,
final int txBatchSize,
final boolean durable,
final boolean transacted,
final boolean display,
final int throttleRate,
final int messageSize) throws Exception {
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(perfParams.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
producer.setDisableMessageID(perfParams.isDisableMessageID());
producer.setDisableMessageTimestamp(perfParams.isDisableTimestamp());
BytesMessage message = session.createBytesMessage();
byte[] payload = PerfBase.randomByteArray(messageSize);
message.writeBytes(payload);
final int modulo = 2000;
TokenBucketLimiter tbl = throttleRate != -1 ? new TokenBucketLimiterImpl(throttleRate, false) : null;
boolean committed = false;
for (int i = 1; i <= numberOfMessages; i++) {
producer.send(message);
if (transacted) {
if (i % txBatchSize == 0) {
session.commit();
committed = true;
} else {
committed = false;
}
}
if (display && i % modulo == 0) {
double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
PerfBase.log.info(String.format("sent %6d messages in %2.2fs", i, duration));
}
if (tbl != null) {
tbl.limit();
}
}
if (transacted && !committed) {
session.commit();
}
}
public void testDuplexSendFromHubToSpoke() throws Exception {
//create hub producer
MessageProducer hubProducer = hubSession.createProducer(null);
hubProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
hubProducer.setDisableMessageID(true);
hubProducer.setDisableMessageTimestamp(true);
//create spoke producer
MessageProducer spokeProducer = hubSession.createProducer(null);
spokeProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
spokeProducer.setDisableMessageID(true);
spokeProducer.setDisableMessageTimestamp(true);
Queue excludedQueueHub = hubSession.createQueue("exclude.test.foo");
TextMessage excludedMsgHub = hubSession.createTextMessage();
excludedMsgHub.setText(excludedQueueHub.toString());
Queue includedQueueHub = hubSession.createQueue("include.test.foo");
TextMessage includedMsgHub = hubSession.createTextMessage();
includedMsgHub.setText(includedQueueHub.toString());
Queue alwaysIncludedQueueHub = hubSession.createQueue("always.include.test.foo");
TextMessage alwaysIncludedMsgHub = hubSession.createTextMessage();
alwaysIncludedMsgHub.setText(alwaysIncludedQueueHub.toString());
// Sending from Hub queue
hubProducer.send(excludedQueueHub, excludedMsgHub);
hubProducer.send(includedQueueHub, includedMsgHub);
hubProducer.send(alwaysIncludedQueueHub, alwaysIncludedMsgHub);
Queue excludedQueueSpoke = spokeSession.createQueue("exclude.test.foo");
MessageConsumer excludedConsumerSpoke = spokeSession.createConsumer(excludedQueueSpoke);
Thread.sleep(100);
Queue includedQueueSpoke = spokeSession.createQueue("include.test.foo");
MessageConsumer includedConsumerSpoke = spokeSession.createConsumer(includedQueueSpoke);
Thread.sleep(100);
Queue alwaysIncludedQueueSpoke = spokeSession.createQueue("always.include.test.foo");
MessageConsumer alwaysIncludedConsumerSpoke = spokeSession.createConsumer(alwaysIncludedQueueHub);
Thread.sleep(100);
TextMessage alwaysIncludedMsgSpoke = spokeSession.createTextMessage();
alwaysIncludedMsgSpoke.setText(alwaysIncludedQueueSpoke.toString());
spokeProducer.send(alwaysIncludedQueueSpoke, alwaysIncludedMsgSpoke);
MessageConsumer alwaysIncludedConsumerHub = spokeSession.createConsumer(alwaysIncludedQueueHub);
assertNotNull(alwaysIncludedConsumerHub);
// Receiving from excluded Spoke queue
Message msg = excludedConsumerSpoke.receive(200);
assertNull(msg);
// Receiving from included Spoke queue
msg = includedConsumerSpoke.receive(200);
assertEquals(includedMsgHub, msg);
// Receiving from included Spoke queue
msg = alwaysIncludedConsumerSpoke.receive(200);
assertEquals(alwaysIncludedMsgHub, msg);
// we should be able to receive excluded queue message on Hub
MessageConsumer excludedConsumerHub = hubSession.createConsumer(excludedQueueHub);
msg = excludedConsumerHub.receive(200);
assertEquals(excludedMsgHub, msg);
hubProducer.close();
excludedConsumerSpoke.close();
}
@Test
public void testSetTimestampDisabled() throws Exception {
Connection pconn = createConnection();
Connection cconn = createConnection();
try {
Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session cs = cconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = ps.createProducer(queue1);
MessageConsumer c = cs.createConsumer(queue1);
cconn.start();
p.setDisableMessageTimestamp(true);
ProxyAssertSupport.assertTrue(p.getDisableMessageTimestamp());
p.send(ps.createMessage());
Message m = c.receive(3000);
ProxyAssertSupport.assertEquals(0L, m.getJMSTimestamp());
p.setDisableMessageTimestamp(false);
ProxyAssertSupport.assertFalse(p.getDisableMessageTimestamp());
long t1 = System.currentTimeMillis();
p.send(ps.createMessage());
m = c.receive(3000);
long t2 = System.currentTimeMillis();
long timestamp = m.getJMSTimestamp();
ProxyAssertSupport.assertTrue(timestamp >= t1);
ProxyAssertSupport.assertTrue(timestamp <= t2);
} finally {
pconn.close();
cconn.close();
}
}
/**
* Create a JMS MessageProducer for the given Session and Destination,
* configuring it to disable message ids and/or timestamps (if necessary).
* <p>Delegates to {@link #doCreateProducer} for creation of the raw
* JMS MessageProducer.
* @param session the JMS Session to create a MessageProducer for
* @param destination the JMS Destination to create a MessageProducer for
* @return the new JMS MessageProducer
* @throws JMSException if thrown by JMS API methods
* @see #setMessageIdEnabled
* @see #setMessageTimestampEnabled
*/
protected MessageProducer createProducer(Session session, @Nullable Destination destination) throws JMSException {
MessageProducer producer = doCreateProducer(session, destination);
if (!isMessageIdEnabled()) {
producer.setDisableMessageID(true);
}
if (!isMessageTimestampEnabled()) {
producer.setDisableMessageTimestamp(true);
}
return producer;
}
/**
* Create a JMS MessageProducer for the given Session and Destination,
* configuring it to disable message ids and/or timestamps (if necessary).
* <p>Delegates to {@link #doCreateProducer} for creation of the raw
* JMS MessageProducer.
* @param session the JMS Session to create a MessageProducer for
* @param destination the JMS Destination to create a MessageProducer for
* @return the new JMS MessageProducer
* @throws JMSException if thrown by JMS API methods
* @see #setMessageIdEnabled
* @see #setMessageTimestampEnabled
*/
protected MessageProducer createProducer(Session session, @Nullable Destination destination) throws JMSException {
MessageProducer producer = doCreateProducer(session, destination);
if (!isMessageIdEnabled()) {
producer.setDisableMessageID(true);
}
if (!isMessageTimestampEnabled()) {
producer.setDisableMessageTimestamp(true);
}
return producer;
}
/**
* Create a JMS MessageProducer for the given Session and Destination,
* configuring it to disable message ids and/or timestamps (if necessary).
* <p>Delegates to {@link #doCreateProducer} for creation of the raw
* JMS MessageProducer.
* @param session the JMS Session to create a MessageProducer for
* @param destination the JMS Destination to create a MessageProducer for
* @return the new JMS MessageProducer
* @throws JMSException if thrown by JMS API methods
* @see #setMessageIdEnabled
* @see #setMessageTimestampEnabled
*/
protected MessageProducer createProducer(Session session, Destination destination) throws JMSException {
MessageProducer producer = doCreateProducer(session, destination);
if (!isMessageIdEnabled()) {
producer.setDisableMessageID(true);
}
if (!isMessageTimestampEnabled()) {
producer.setDisableMessageTimestamp(true);
}
return producer;
}