下面列出了javax.jms.ServerSessionPool#org.apache.activemq.command.ActiveMQMessage 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testSendReceive() throws Exception {
// Send a message to the broker.
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, destinationType);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(this.deliveryMode);
MessageConsumer consumer = session.createConsumer(destination);
ActiveMQMessage message = new ActiveMQMessage();
producer.send(message);
// Make sure only 1 message was delivered.
assertNotNull(consumer.receive(1000));
assertNull(consumer.receiveNoWait());
}
@Test(timeout = 60 * 1000)
public void testMessageCompression() throws Exception {
ActiveMQConnection localAmqConnection = (ActiveMQConnection) localConnection;
localAmqConnection.setUseCompression(true);
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 1, included);
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
Message msg = consumer1.receive(3000);
assertNotNull(msg);
ActiveMQMessage amqMessage = (ActiveMQMessage) msg;
assertTrue(amqMessage.isCompressed());
}
// ensure no more messages received
assertNull(consumer1.receive(1000));
}
public void testTempMessageConsumedAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue();
MessageConsumer consumer = s.createConsumer(queue);
Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
String id = m.getJMSMessageID();
Message msg = consumer.receive(1000);
assertNotNull(msg);
msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
String originalId = payload.getJMSMessageID();
assertEquals(originalId, id);
}
public void testMessageConsumedAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
String id = m.getJMSMessageID();
Message msg = consumer.receive(1000);
assertNotNull(msg);
msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
String originalId = payload.getJMSMessageID();
assertEquals(originalId, id);
}
@Override
protected void assertMessageValid(int index, Message message) throws JMSException {
// check if broker path has been set
assertEquals("localhost", message.getStringProperty("BrokerPath"));
ActiveMQMessage amqMsg = (ActiveMQMessage) message;
if (index == 7) {
// check custom expiration
assertTrue("expiration is in range, depends on two distinct calls to System.currentTimeMillis", 1500 < amqMsg.getExpiration() - amqMsg.getTimestamp());
} else if (index == 9) {
// check ceiling
assertTrue("expiration ceeling is in range, depends on two distinct calls to System.currentTimeMillis", 59500 < amqMsg.getExpiration() - amqMsg.getTimestamp());
} else {
// check default expiration
assertEquals(1000, amqMsg.getExpiration() - amqMsg.getTimestamp());
}
super.assertMessageValid(index, message);
}
public static MessageDispatch createMessageDispatch(MessageReference reference,
ICoreMessage message,
WireFormat marshaller,
AMQConsumer consumer) throws IOException {
ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer);
//we can use core message id for sequenceId
amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
MessageDispatch md = new MessageDispatch();
md.setConsumerId(consumer.getId());
md.setRedeliveryCounter(reference.getDeliveryCount() - 1);
md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId());
md.setMessage(amqMessage);
ActiveMQDestination destination = amqMessage.getDestination();
md.setDestination(destination);
return md;
}
private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg,
final ICoreMessage coreMessage,
final Set<SimpleString> props,
final AMQConsumer consumer) throws IOException {
for (SimpleString s : props) {
final String keyStr = s.toString();
if (!coreMessage.containsProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {
continue;
}
final Object prop = coreMessage.getObjectProperty(s);
try {
if (prop instanceof SimpleString) {
amqMsg.setObjectProperty(keyStr, prop.toString());
} else {
if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
Long l = (Long) prop;
amqMsg.setObjectProperty(keyStr, l.intValue());
} else {
amqMsg.setObjectProperty(keyStr, prop);
}
}
} catch (JMSException e) {
throw new IOException("exception setting property " + s + " : " + prop, e);
}
}
}
private Trace createTrace(Object target, Object[] args) {
if (!validate(target, args)) {
return null;
}
MessageDispatch md = (MessageDispatch) args[0];
ActiveMQMessage message = (ActiveMQMessage) md.getMessage();
if (filterDestination(message.getDestination())) {
return null;
}
// These might trigger unmarshalling.
if (!ActiveMQClientHeader.getSampled(message, true)) {
return traceContext.disableSampling();
}
final TraceId traceId = populateTraceIdFromRequest(message);
final Trace trace = traceId == null ? traceContext.newTraceObject() : traceContext.continueTraceObject(traceId);
if (trace.canSampled()) {
SpanRecorder recorder = trace.getSpanRecorder();
recordRootSpan(recorder, target, args);
}
return trace;
}
private boolean validate(Object target, Object[] args) {
if (!(target instanceof ActiveMQMessageConsumer)) {
return false;
}
if (!(target instanceof ActiveMQSessionGetter)) {
if (isDebug) {
logger.debug("Invalid target object. Need field accessor({}).", ActiveMQSessionGetter.class.getName());
}
return false;
}
if (!validateTransport(((ActiveMQSessionGetter) target)._$PINPOINT$_getActiveMQSession())) {
return false;
}
if (args == null || args.length < 1) {
return false;
}
if (!(args[0] instanceof MessageDispatch)) {
return false;
}
MessageDispatch md = (MessageDispatch) args[0];
Message message = md.getMessage();
if (!(message instanceof ActiveMQMessage)) {
return false;
}
return true;
}
/** Test the checkpoint mark default coder, which is actually AvroCoder. */
@Test
public void testCheckpointMarkDefaultCoder() throws Exception {
JmsCheckpointMark jmsCheckpointMark = new JmsCheckpointMark();
jmsCheckpointMark.add(new ActiveMQMessage());
Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
CoderProperties.coderSerializable(coder);
CoderProperties.coderDecodeEncodeEqual(coder, jmsCheckpointMark);
}
private ConnectionFactory withSlowAcks(ConnectionFactory factory, long delay) {
return proxyMethod(
factory,
ConnectionFactory.class,
"createConnection",
(Connection connection) ->
proxyMethod(
connection,
Connection.class,
"createSession",
(Session session) ->
proxyMethod(
session,
Session.class,
"createConsumer",
(MessageConsumer consumer) ->
proxyMethod(
consumer,
MessageConsumer.class,
"receiveNoWait",
(ActiveMQMessage message) -> {
final Callback originalCallback =
message.getAcknowledgeCallback();
message.setAcknowledgeCallback(
() -> {
Thread.sleep(delay);
originalCallback.execute();
});
return message;
}))));
}
@Before
public void setUp() throws Exception {
message = new ActiveMQMessage();
message.setJMSDestination(new ActiveMQTopic("FOO.BAR"));
message.setJMSType("selector-test");
message.setJMSMessageID("connection:1:1:1:1");
message.setBooleanProperty("trueProp", true);
message.setBooleanProperty("falseProp", false);
message.setObjectProperty("nullProp", null);
}
public void testSendReceive() throws Exception {
// Send a message to the broker.
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationType);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(this.deliveryMode);
MessageConsumer consumer = session.createConsumer(destination);
ActiveMQMessage message = new ActiveMQMessage();
producer.send(message);
// Make sure only 1 message was delivered.
assertNotNull(consumer.receive(1000));
assertNull(consumer.receiveNoWait());
}
protected boolean isSame(BytesMessage msg1) throws Exception {
boolean result = false;
((ActiveMQMessage) msg1).setReadOnlyBody(true);
for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) {
result = msg1.readByte() == largeMessageData[i];
if (!result) {
break;
}
}
return result;
}
private void createAdvisorySubscription() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(topic));
advisoryConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (((ActiveMQMessage) message).getDataStructure() instanceof RemoveSubscriptionInfo) {
advisories.incrementAndGet();
}
}
});
}
@Override
public void onSlowConsumer(ServerConsumer consumer) {
if (consumer.getProtocolData() != null && consumer.getProtocolData() instanceof AMQConsumer) {
AMQConsumer amqConsumer = (AMQConsumer) consumer.getProtocolData();
ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(amqConsumer.getOpenwireDestination());
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
try {
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, amqConsumer.getId().toString());
protocolManager.fireAdvisory(context, topic, advisoryMessage, amqConsumer.getId(), null);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn("Error during method invocation", e);
}
}
}
private static void setAMQMsgBrokerPath(final ActiveMQMessage amqMsg, final String brokerPath) {
String[] brokers = brokerPath.split(",");
BrokerId[] bids = new BrokerId[brokers.length];
for (int i = 0; i < bids.length; i++) {
bids[i] = new BrokerId(brokers[i]);
}
amqMsg.setBrokerPath(bids);
}
private static void setAMQMsgClusterPath(final ActiveMQMessage amqMsg, final String clusterPath) {
String[] cluster = clusterPath.split(",");
BrokerId[] bids = new BrokerId[cluster.length];
for (int i = 0; i < bids.length; i++) {
bids[i] = new BrokerId(cluster[i]);
}
amqMsg.setCluster(bids);
}
private static void setAMQMsgDataStructure(final ActiveMQMessage amqMsg,
final WireFormat marshaller,
final byte[] dsBytes) throws IOException {
ByteSequence seq = new ByteSequence(dsBytes);
DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
amqMsg.setDataStructure(ds);
}
private static void setAMQMsgDlqDeliveryFailureCause(final ActiveMQMessage amqMsg,
final SimpleString dlqCause) throws IOException {
try {
amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString());
} catch (JMSException e) {
throw new IOException("failure to set dlq property " + dlqCause, e);
}
}
private static void setAMQMsgHdrLastValueName(final ActiveMQMessage amqMsg,
final SimpleString lastValueProperty) throws IOException {
try {
amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString());
} catch (JMSException e) {
throw new IOException("failure to set lvq property " + lastValueProperty, e);
}
}
protected void displayAdvisoryMessage(ActiveMQMessage cmdMsg) throws IOException, JMSException {
final String topic = cmdMsg.getJMSDestination().toString();
final String advisoryMsg = advisoryDataStructureToString(cmdMsg.getDataStructure());
final String advisoryType = cmdMsg.getDataStructure() != null ? "Type: " + dataStructureTypeToString(cmdMsg.getDataStructure().getDataStructureType()) : "";
output("Advisory on " + topic + advisoryType + (advisoryMsg != null ? " Info " + advisoryMsg : ""));
}
private TraceId populateTraceIdFromRequest(ActiveMQMessage message) {
String transactionId = ActiveMQClientHeader.getTraceId(message, null);
if (transactionId == null) {
return null;
}
long parentSpanId = ActiveMQClientHeader.getParentSpanId(message, SpanId.NULL);
long spanId = ActiveMQClientHeader.getSpanId(message, SpanId.NULL);
short flags = ActiveMQClientHeader.getFlags(message, (short) 0);
return traceContext.createTraceId(transactionId, parentSpanId, spanId, flags);
}
private void recordRootSpan(SpanRecorder recorder, Object target, Object[] args) {
recorder.recordServiceType(ActiveMQClientConstants.ACTIVEMQ_CLIENT);
recorder.recordApi(CONSUMER_ENTRY_METHOD_DESCRIPTOR);
ActiveMQSession session = ((ActiveMQSessionGetter) target)._$PINPOINT$_getActiveMQSession();
ActiveMQConnection connection = session.getConnection();
Transport transport = getRootTransport(((TransportGetter) connection)._$PINPOINT$_getTransport());
final String endPoint = getEndPoint(transport);
// Endpoint should be the local socket address of the consumer.
recorder.recordEndPoint(endPoint);
final String remoteAddress = transport.getRemoteAddress();
// Remote address is the socket address of where the consumer is connected to.
recorder.recordRemoteAddress(remoteAddress);
MessageDispatch md = (MessageDispatch) args[0];
ActiveMQMessage message = (ActiveMQMessage) md.getMessage();
ActiveMQDestination destination = message.getDestination();
// Rpc name is the URI of the queue/topic we're consuming from.
recorder.recordRpcName(destination.getQualifiedName());
// Record acceptor host as the queue/topic name in order to generate virtual queue node.
recorder.recordAcceptorHost(destination.getPhysicalName());
String parentApplicationName = ActiveMQClientHeader.getParentApplicationName(message, null);
if (!recorder.isRoot() && parentApplicationName != null) {
short parentApplicationType = ActiveMQClientHeader.getParentApplicationType(message, ServiceType.UNDEFINED.getCode());
recorder.recordParentApplication(parentApplicationName, parentApplicationType);
}
}
@Override
public final void setMessage(Message message, ActiveMQClientHeader key, T value) throws JMSException {
String id = key.id;
if (message instanceof ActiveMQMessage) {
ActiveMQMessage activeMQMessage = (ActiveMQMessage) message;
if (activeMQMessage.isReadOnlyProperties()) {
activeMQMessage.setReadOnlyProperties(false);
setMessage0(message, id, value);
activeMQMessage.setReadOnlyProperties(true);
return;
}
}
setMessage0(message, id, value);
}
public void testUserSendGoodPassword() throws JMSException {
Message m = doSend(false);
assertEquals("system", ((ActiveMQMessage) m).getUserID());
assertEquals("system", m.getStringProperty("JMSXUserID"));
}
public void testUserSendNoCredentials() throws JMSException {
Message m = doSend(false);
// note brokerService.useAuthenticatedPrincipalForJMXUserID=true for this
assertEquals("guest", ((ActiveMQMessage) m).getUserID());
assertEquals("guest", m.getStringProperty("JMSXUserID"));
}
public void testPredefinedDestinations() throws JMSException {
Message sent = doSend(false);
assertEquals("guest", ((ActiveMQMessage) sent).getUserID());
assertEquals("guest", sent.getStringProperty("JMSXUserID"));
}
public void testUserSendGoodPassword() throws JMSException {
Message m = doSend(false);
assertEquals("system", ((ActiveMQMessage) m).getUserID());
assertEquals("system", m.getStringProperty("JMSXUserID"));
}
public void testUserSendWrongPassword() throws JMSException {
Message m = doSend(false);
// note brokerService.useAuthenticatedPrincipalForJMXUserID=true for this
assertEquals("guest", ((ActiveMQMessage) m).getUserID());
assertEquals("guest", m.getStringProperty("JMSXUserID"));
}