下面列出了javax.jms.Message#getJMSTimestamp ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void setMessageAge(Message message, AbstractSpan span) {
try {
long messageTimestamp = message.getJMSTimestamp();
if (messageTimestamp > 0) {
long now = System.currentTimeMillis();
if (now > messageTimestamp) {
span.getContext().getMessage().withAge(now - messageTimestamp);
} else {
span.getContext().getMessage().withAge(0);
}
}
} catch (JMSException e) {
logger.warn("Failed to get message timestamp", e);
}
}
@Override
public boolean advance() throws IOException {
try {
Message message = this.consumer.receiveNoWait();
if (message == null) {
currentMessage = null;
return false;
}
checkpointMark.add(message);
currentMessage = this.source.spec.getMessageMapper().mapMessage(message);
currentTimestamp = new Instant(message.getJMSTimestamp());
return true;
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* Acknowledge all outstanding message. Since we believe that messages will be delivered in
* timestamp order, and acknowledged messages will not be retried, the newest message in this
* batch is a good bound for future messages.
*/
@Override
public void finalizeCheckpoint() {
lock.writeLock().lock();
try {
for (Message message : messages) {
try {
message.acknowledge();
Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
oldestMessageTimestamp = currentMessageTimestamp;
}
} catch (Exception e) {
LOG.error("Exception while finalizing message: ", e);
}
}
messages.clear();
} finally {
lock.writeLock().unlock();
}
}
/**
* Will receive messages from consumer. If timeout is hit, consumer.receive(timeout)
* will return null, and the observable will be completed.
*/
private void receiveLoop() {
Message message;
try {
while ( !closed.get() && ( message = consumer.receive( receiverTimeout ) ) != null ) {
streamStep.logDebug( message.toString() );
Date date = new Date( message.getJMSTimestamp() );
DateFormat formatter = new SimpleDateFormat( "MM-dd-yyyy HH:mm:ss a" );
formatter.setTimeZone( TimeZone.getTimeZone( "UTC" ) );
String jmsTimestamp = formatter.format( date );
acceptRows( singletonList( Arrays.asList( message.getBody( Object.class ), jmsDelegate.destinationName, message.getJMSMessageID(), jmsTimestamp, message.getJMSRedelivered() ) ) );
}
} catch ( JMSRuntimeException | JMSException jmsException ) {
error( jmsException );
} finally {
super.close();
if ( !closed.get() ) {
close();
streamStep.logBasic( getString( PKG, "JmsStreamSource.HitReceiveTimeout" ) );
}
}
}
private long getMessageTimestamp(final Message message)
{
try
{
return message.getJMSTimestamp();
}
catch (JMSException e)
{
throw new DistributedTestException("Cannot get message timestamp!", e);
}
}
private ClientMessage buildClientMessage(final Message message) throws JMSException
{
String jmsMessageID = message.getJMSMessageID();
String jmsCorrelationID = message.getJMSCorrelationID();
byte[] jmsCorrelationIDAsBytes;
try
{
jmsCorrelationIDAsBytes = message.getJMSCorrelationIDAsBytes();
}
catch (JMSException e)
{
jmsCorrelationIDAsBytes = null;
}
long jmsTimestamp = message.getJMSTimestamp();
int jmsDeliveryMode = message.getJMSDeliveryMode();
boolean jmsRedelivered = message.getJMSRedelivered();
String jmsType = message.getJMSType();
long jmsExpiration = message.getJMSExpiration();
int jmsPriority = message.getJMSPriority();
return new JMSMessageAdaptor(jmsMessageID,
jmsTimestamp,
jmsCorrelationID,
jmsCorrelationIDAsBytes,
jmsDeliveryMode,
jmsRedelivered,
jmsType,
jmsExpiration,
jmsPriority);
}
private static void gatherStatistics(final LongSummaryStatistics stats, final Message message) {
try {
final long duration = System.currentTimeMillis() - message.getJMSTimestamp();
stats.accept(duration);
} catch (final JMSException e) {
LOG.error("Failed to get timestamp from message: {}", e.getMessage());
}
}
void add(Message message) throws Exception {
lock.writeLock().lock();
try {
Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
oldestMessageTimestamp = currentMessageTimestamp;
}
messages.add(message);
} finally {
lock.writeLock().unlock();
}
}
/**
* Checks if the message was not delivered fast enough.
*/
public void checkDeliveryTime(Message message) throws JMSException {
long creation = message.getJMSTimestamp();
long min = System.currentTimeMillis() - (offline.max + online.min);
if (min > creation) {
SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
exit("" + this + ".checkDeliveryTime failed. Message time: " + df.format(new Date(creation)) + ", min: " + df.format(new Date(min)) + "\r\n" + message);
}
}
/**
* Checks if the message was not delivered fast enough.
*/
@SuppressWarnings("unused")
public void checkDeliveryTime(Message message) throws JMSException {
long creation = message.getJMSTimestamp();
long min = System.currentTimeMillis() - (offline.max + online.min) * (BROKER_RESTART > 0 ? 4 : 1);
if (false && min > creation) {
SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
exit("" + this + ".checkDeliveryTime failed. Message time: " + df.format(new Date(creation)) + ", min: " + df.format(new Date(min)) + "\r\n" + message);
}
}
/**
* Checks if the message was not delivered fast enough.
*/
public void checkDeliveryTime(Message message) throws JMSException {
long creation = message.getJMSTimestamp();
long min = System.currentTimeMillis() - (offline.max + online.min) * (BROKER_RESTART > 0 ? 4 : 1);
if (false && min > creation) {
SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
exit("" + this + ".checkDeliveryTime failed. Message time: " + df.format(new Date(creation)) + ", min: " + df.format(new Date(min)) + "\r\n" + message);
}
}
protected static long getQueueFirstMessageAge(Session jSession,
String queueName, String messageSelector, long currentTime,
boolean warn) throws JMSException {
QueueBrowser queueBrowser = null;
try {
Queue queue = jSession.createQueue(queueName);
if (messageSelector == null) {
queueBrowser = jSession.createBrowser(queue);
} else {
queueBrowser = jSession.createBrowser(queue, messageSelector);
}
Enumeration enm = queueBrowser.getEnumeration();
if (enm.hasMoreElements()) {
Object o = enm.nextElement();
if (o instanceof Message) {
Message msg = (Message) o;
long jmsTimestamp = msg.getJMSTimestamp();
return currentTime - jmsTimestamp;
} else {
if (warn) {
log.warn("message was not of type Message, but ["
+ o.getClass().getName() + "]");
}
return -2;
}
} else {
return -1;
}
} finally {
if (queueBrowser != null) {
try {
queueBrowser.close();
} catch (JMSException e) {
log.warn("Exception on closing queueBrowser", e);
}
}
}
}
@SuppressWarnings("unused")
@Advice.OnMethodEnter(suppress = Throwable.class)
@Nullable
public static Transaction beforeOnMessage(@Advice.Argument(0) @Nullable final Message message,
@Advice.Origin Class<?> clazz) {
if (message == null || tracer == null || tracer.currentTransaction() != null) {
return null;
}
Destination destination = null;
String destinationName = null;
long timestamp = 0;
try {
destination = message.getJMSDestination();
timestamp = message.getJMSTimestamp();
} catch (JMSException e) {
logger.warn("Failed to retrieve message's destination", e);
}
//noinspection ConstantConditions
JmsInstrumentationHelper<Destination, Message, MessageListener> helper =
jmsInstrHelperManager.getForClassLoaderOfClass(MessageListener.class);
if (helper == null) {
return null;
}
if (destination != null) {
destinationName = helper.extractDestinationName(message, destination);
if (helper.ignoreDestination(destinationName)) {
return null;
}
}
// Create a transaction - even if running on same JVM as the sender
Transaction transaction = helper.startJmsTransaction(message, clazz);
if (transaction != null) {
transaction.withType(MESSAGING_TYPE)
.withName(RECEIVE_NAME_PREFIX);
if (destinationName != null) {
helper.addDestinationDetails(message, destination, destinationName, transaction.appendToName(" from "));
}
helper.addMessageDetails(message, transaction);
helper.setMessageAge(message, transaction);
transaction.activate();
}
return transaction;
}
private static void verifyMessageHeaders(final MessageDescription messageDescription,
final Message message) throws VerificationException
{
try
{
for (Map.Entry<MessageDescription.MessageHeader, Serializable> entry : messageDescription.getHeaders()
.entrySet())
{
Object actualValue;
switch (entry.getKey())
{
case DESTINATION:
actualValue = message.getJMSDestination();
break;
case DELIVERY_MODE:
actualValue = message.getJMSDeliveryMode();
break;
case MESSAGE_ID:
actualValue = message.getJMSMessageID();
break;
case TIMESTAMP:
actualValue = message.getJMSTimestamp();
break;
case CORRELATION_ID:
if (entry.getValue() instanceof byte[])
{
actualValue = message.getJMSCorrelationIDAsBytes();
}
else
{
actualValue = message.getJMSCorrelationID();
}
break;
case REPLY_TO:
actualValue = message.getJMSReplyTo();
break;
case REDELIVERED:
actualValue = message.getJMSRedelivered();
break;
case TYPE:
actualValue = message.getJMSType();
break;
case EXPIRATION:
actualValue = message.getJMSExpiration();
break;
case PRIORITY:
actualValue = message.getJMSPriority();
break;
default:
throw new RuntimeException(String.format("unexpected message header '%s'", entry.getKey()));
}
verifyEquals(String.format("Unexpected message header '%s'", entry.getKey()),
entry.getValue(),
actualValue);
}
}
catch (JMSException e)
{
throw new RuntimeException("Unexpected exception during message header verification", e);
}
}
@Test
public void testSetTimestampDisabled() throws Exception {
Connection pconn = createConnection();
Connection cconn = createConnection();
try {
Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session cs = cconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = ps.createProducer(queue1);
MessageConsumer c = cs.createConsumer(queue1);
cconn.start();
p.setDisableMessageTimestamp(true);
ProxyAssertSupport.assertTrue(p.getDisableMessageTimestamp());
p.send(ps.createMessage());
Message m = c.receive(3000);
ProxyAssertSupport.assertEquals(0L, m.getJMSTimestamp());
p.setDisableMessageTimestamp(false);
ProxyAssertSupport.assertFalse(p.getDisableMessageTimestamp());
long t1 = System.currentTimeMillis();
p.send(ps.createMessage());
m = c.receive(3000);
long t2 = System.currentTimeMillis();
long timestamp = m.getJMSTimestamp();
ProxyAssertSupport.assertTrue(timestamp >= t1);
ProxyAssertSupport.assertTrue(timestamp <= t2);
} finally {
pconn.close();
cconn.close();
}
}
@Override
public void onMessage(final Message message) {
if (log.isDebugEnabled()) {
log.debug("onMessage, message=" + message);
}
try {
final long sentTimestamp = message.getJMSTimestamp();
final long currentTimestamp = System.currentTimeMillis();
// check if received message is not too old because in case of overload we could have old search-messages
if ((currentTimestamp - sentTimestamp) < receiveTimeout) {
final String correlationID = message.getJMSCorrelationID();
final Destination replyTo = message.getJMSReplyTo();
if (message instanceof ObjectMessage) {
final ObjectMessage objectMessage = (ObjectMessage) message;
final SearchRequest searchRequest = (SearchRequest) objectMessage.getObject();
taskExecutorService.runTask(new Runnable() {
@Override
public void run() {
onSearchMessage(searchRequest, correlationID, replyTo);
}
});
} else if (message instanceof TextMessage) {
final TextMessage testMessage = (TextMessage) message;
final String spellText = testMessage.getText();
taskExecutorService.runTask(new Runnable() {
@Override
public void run() {
onSpellMessage(spellText, correlationID, replyTo);
}
});
}
} else {
// JMS message is too old, discard it (do nothing)
log.warn("JMS message was too old, discard message, timeout=" + receiveTimeout + "ms , received time=" + (currentTimestamp - sentTimestamp) + "ms");
}
} catch (final JMSException e) {
log.error("error when receiving jms messages", e);
return; // signal search not available
} catch (final Error err) {
log.warn("Error in onMessage, ", err);
// OLAT-3973: don't throw exceptions here
} catch (final RuntimeException runEx) {
log.warn("RuntimeException in onMessage, ", runEx);
// OLAT-3973: don't throw exceptions here
}
}
@Override
public void onMessage(final Message message) {
if (log.isDebugEnabled()) {
log.debug("onMessage, message=" + message);
}
try {
final long sentTimestamp = message.getJMSTimestamp();
final long currentTimestamp = System.currentTimeMillis();
// check if received message is not too old because in case of overload we could have old search-messages
if ((currentTimestamp - sentTimestamp) < receiveTimeout) {
final String correlationID = message.getJMSCorrelationID();
final Destination replyTo = message.getJMSReplyTo();
if (message instanceof ObjectMessage) {
final ObjectMessage objectMessage = (ObjectMessage) message;
final SearchRequest searchRequest = (SearchRequest) objectMessage.getObject();
taskExecutorService.runTask(new Runnable() {
@Override
public void run() {
onSearchMessage(searchRequest, correlationID, replyTo);
}
});
} else if (message instanceof TextMessage) {
final TextMessage testMessage = (TextMessage) message;
final String spellText = testMessage.getText();
taskExecutorService.runTask(new Runnable() {
@Override
public void run() {
onSpellMessage(spellText, correlationID, replyTo);
}
});
}
} else {
// JMS message is too old, discard it (do nothing)
log.warn("JMS message was too old, discard message, timeout=" + receiveTimeout + "ms , received time=" + (currentTimestamp - sentTimestamp) + "ms");
}
} catch (final JMSException e) {
log.error("error when receiving jms messages", e);
return; // signal search not available
} catch (final Error err) {
log.warn("Error in onMessage, ", err);
// OLAT-3973: don't throw exceptions here
} catch (final RuntimeException runEx) {
log.warn("RuntimeException in onMessage, ", runEx);
// OLAT-3973: don't throw exceptions here
}
}
public MessageDump toDumpMessage(Message msg) throws JMSException{
MessageDump dump = new MessageDump();
dump.JMSCorrelationID = msg.getJMSCorrelationID();
dump.JMSMessageID = msg.getJMSMessageID();
dump.JMSType = msg.getJMSType();
dump.JMSDeliveryMode = msg.getJMSDeliveryMode();
dump.JMSExpiration = msg.getJMSExpiration();
dump.JMSRedelivered = msg.getJMSRedelivered();
dump.JMSTimestamp = msg.getJMSTimestamp();
dump.JMSPriority = msg.getJMSPriority();
@SuppressWarnings("rawtypes")
Enumeration propertyNames = msg.getPropertyNames();
while(propertyNames.hasMoreElements()){
String property = (String) propertyNames.nextElement();
Object propertyValue = msg.getObjectProperty(property);
if( propertyValue instanceof String){
dump.stringProperties.put(property, (String)propertyValue);
} else if ( propertyValue instanceof Integer ){
dump.intProperties.put(property, (Integer)propertyValue);
} else if ( propertyValue instanceof Long) {
dump.longProperties.put(property, (Long)propertyValue);
} else if( propertyValue instanceof Double) {
dump.doubleProperties.put(property, (Double) propertyValue);
} else if (propertyValue instanceof Short) {
dump.shortProperties.put(property, (Short)propertyValue);
} else if (propertyValue instanceof Float) {
dump.floatProperties.put(property, (Float) propertyValue);
} else if (propertyValue instanceof Byte) {
dump.byteProperties.put(property, (Byte)propertyValue);
} else if (propertyValue instanceof Boolean) {
dump.boolProperties.put(property, (Boolean)propertyValue);
} else if (propertyValue instanceof Serializable){
// Object property.. if it's on Classpath and Serializable
byte[] propBytes = SerializationUtils.serialize((Serializable) propertyValue);
dump.objectProperties.put(property, Base64.encodeBase64String(propBytes));
} else {
// Corner case.
throw new IllegalArgumentException("Property of key '"+ property +"' is not serializable. Type is: " + propertyValue.getClass().getCanonicalName());
}
}
dump.body = "";
dump.type = "";
if (msg instanceof TextMessage) {
dump.body = ((TextMessage)msg).getText();
dump.type = "TextMessage";
} else if (msg instanceof BytesMessage) {
BytesMessage bm = (BytesMessage)msg;
byte[] bytes = new byte[(int) bm.getBodyLength()];
bm.readBytes(bytes);
dump.body = Base64.encodeBase64String(bytes);
dump.type = "BytesMessage";
} else if (msg instanceof ObjectMessage) {
ObjectMessage om = (ObjectMessage)msg;
byte[] objectBytes = SerializationUtils.serialize(om.getObject());
dump.body = Base64.encodeBase64String(objectBytes);
dump.type = "ObjectMessage";
}
return dump;
}
@Override
public void onMessage(Message message, Session session) throws JMSException {
TransactionStatus txStatus=null;
long onMessageStart= System.currentTimeMillis();
long jmsTimestamp= message.getJMSTimestamp();
threadsProcessing.increase();
Thread.currentThread().setName(getReceiver().getName()+"["+threadsProcessing.getValue()+"]");
try {
if (TX!=null) {
txStatus = txManager.getTransaction(TX);
}
Map<String,Object> threadContext = new HashMap<>();
try {
IPortConnectedListener<Message> listener = getListener();
threadContext.put(THREAD_CONTEXT_SESSION_KEY,session);
// if (log.isDebugEnabled()) log.debug("transaction status before: "+JtaUtil.displayTransactionStatus());
getReceiver().processRawMessage(listener, message, threadContext);
// if (log.isDebugEnabled()) log.debug("transaction status after: "+JtaUtil.displayTransactionStatus());
} catch (ListenerException e) {
getReceiver().increaseRetryIntervalAndWait(e,getLogPrefix());
if (txStatus!=null) {
txStatus.setRollbackOnly();
}
} finally {
if (txStatus==null && jmsContainer.isSessionTransacted()) {
log.debug(getLogPrefix()+"committing JMS session");
session.commit();
}
}
} finally {
if (txStatus!=null) {
txManager.commit(txStatus);
}
threadsProcessing.decrease();
if (log.isInfoEnabled()) {
long onMessageEnd= System.currentTimeMillis();
log.info(getLogPrefix()+"A) JMSMessageTime ["+DateUtils.format(jmsTimestamp)+"]");
log.info(getLogPrefix()+"B) onMessageStart ["+DateUtils.format(onMessageStart)+"] diff (~'queing' time) ["+(onMessageStart-jmsTimestamp)+"]");
log.info(getLogPrefix()+"C) onMessageEnd ["+DateUtils.format(onMessageEnd)+"] diff (process time) ["+(onMessageEnd-onMessageStart)+"]");
}
// boolean simulateCrashAfterCommit=true;
// if (simulateCrashAfterCommit) {
// toggle=!toggle;
// if (toggle) {
// JtaUtil.setRollbackOnly();
// throw new JMSException("simulate crash just before final commit");
// }
// }
}
}
/**
* Convert a message into a Kafka Connect SourceRecord.
*
* @param context the JMS context to use for building messages
* @param topic the Kafka topic
* @param messageBodyJms whether to interpret MQ messages as JMS messages
* @param message the message
*
* @return the Kafka Connect SourceRecord
*
* @throws JMSException Message could not be converted
*/
@Override public SourceRecord toSourceRecord(JMSContext context, String topic, boolean messageBodyJms, Message message) throws JMSException {
SchemaAndValue key = this.getKey(context, topic, message);
SchemaAndValue value = this.getValue(context, topic, messageBodyJms, message);
if (copyJmsPropertiesFlag && messageBodyJms)
return new SourceRecord(null, null, topic, (Integer) null, key.schema(), key.value(), value.schema(), value.value(), message.getJMSTimestamp(), jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message));
else
return new SourceRecord(null, null, topic, key.schema(), key.value(), value.schema(), value.value());
}