下面列出了javax.jms.ServerSessionPool#org.apache.activemq.command.ActiveMQTextMessage 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Test Polling Messages From Queue when the JMS Spec Version is 1.1
*
* @throws Exception
*/
@Test
public void testPollingMessageFromQueue() throws Exception {
String queueName = "testQueue1";
Properties jmsProperties = JMSTestsUtils.getJMSPropertiesForDestination(queueName, PROVIDER_URL, true);
JMSBrokerController brokerController = new JMSBrokerController(PROVIDER_URL, jmsProperties);
try {
brokerController.startProcess();
brokerController.connect(queueName, true);
brokerController.pushMessage(SEND_MSG);
JMSPollingConsumer jmsPollingConsumer = new JMSPollingConsumer(jmsProperties, INTERVAL, INBOUND_EP_NAME);
Message receivedMsg = JMSTestsUtils.pollMessagesFromDestination(jmsPollingConsumer);
Assert.assertNotNull("Received message is null", receivedMsg);
Assert.assertEquals("The send message is not received.", SEND_MSG,
((ActiveMQTextMessage) receivedMsg).getText());
} finally {
brokerController.disconnect();
brokerController.stopProcess();
}
}
/**
* Test Polling Messages From Queue when the JMS Spec Version is 2.0
*
* @throws Exception
*/
@Test
public void testPollingMessageFromQueueSpecV20() throws Exception {
String queueName = "testQueue1v20";
Properties jmsProperties = JMSTestsUtils.getJMSPropertiesForDestination(queueName, PROVIDER_URL, true);
jmsProperties.put(JMSConstants.PARAM_JMS_SPEC_VER, JMSConstants.JMS_SPEC_VERSION_2_0);
JMSBrokerController brokerController = new JMSBrokerController(PROVIDER_URL, jmsProperties);
try {
brokerController.startProcess();
brokerController.connect(queueName, true);
brokerController.pushMessage(SEND_MSG);
JMSPollingConsumer jmsPollingConsumer = new JMSPollingConsumer(jmsProperties, INTERVAL, INBOUND_EP_NAME);
Message receivedMsg = JMSTestsUtils.pollMessagesFromDestination(jmsPollingConsumer);
Assert.assertNotNull("Received message is null", receivedMsg);
Assert.assertEquals("The send message is not received.", SEND_MSG,
((ActiveMQTextMessage) receivedMsg).getText());
} finally {
brokerController.disconnect();
brokerController.stopProcess();
}
}
/**
* Test Polling Messages From Queue when the JMS Spec Version is 1.0
*
* @throws Exception
*/
@Test
public void testPollingMessageFromQueueSpecV10() throws Exception {
String queueName = "testQueue1v20";
Properties jmsProperties = JMSTestsUtils.getJMSPropertiesForDestination(queueName, PROVIDER_URL, true);
jmsProperties.put(JMSConstants.PARAM_JMS_SPEC_VER, JMSConstants.JMS_SPEC_VERSION_1_0);
JMSBrokerController brokerController = new JMSBrokerController(PROVIDER_URL, jmsProperties);
try {
brokerController.startProcess();
brokerController.connect(queueName, true);
brokerController.pushMessage(SEND_MSG);
JMSPollingConsumer jmsPollingConsumer = new JMSPollingConsumer(jmsProperties, INTERVAL, INBOUND_EP_NAME);
Message receivedMsg = JMSTestsUtils.pollMessagesFromDestination(jmsPollingConsumer);
Assert.assertNotNull("Received message is null", receivedMsg);
Assert.assertEquals("The send message is not received.", SEND_MSG,
((ActiveMQTextMessage) receivedMsg).getText());
} finally {
brokerController.disconnect();
brokerController.stopProcess();
}
}
public static void main(String[] args) throws Exception {
//创建连接工厂对象
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//获取连接对象
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用Session对象创建Destination对象,其中参数为:消息队列的名称
javax.jms.Queue queue = session.createQueue("test-queue");
javax.jms.Queue queue1 = session.createQueue("test-queue1");
//使用session创建消息生产者对象
MessageProducer producer = session.createProducer(queue);
MessageProducer producer1 = session.createProducer(queue1);
//创建消息对象
TextMessage message = new ActiveMQTextMessage();
message.setText("这是一个测试消息");
//发送消息
producer.send(message);
producer1.send(message);
//关闭资源
producer.close();
session.close();
connection.close();
}
@Test
public void testActiveAlarmsMessageConversion() {
JsonRequest<AlarmValue> request = new ClientRequestImpl<AlarmValue>(
ClientRequest.ResultType.TRANSFER_ACTIVE_ALARM_LIST,
ClientRequest.RequestType.ACTIVE_ALARMS_REQUEST,
10000);
TextMessage message = new ActiveMQTextMessage();
try {
message.setText(request.toJson());
ClientRequest receivedRequest = ClientRequestMessageConverter.fromMessage(message);
assertTrue(receivedRequest.getRequestType() == ClientRequest.RequestType.ACTIVE_ALARMS_REQUEST);
assertTrue(receivedRequest.getResultType() == ClientRequest.ResultType.TRANSFER_ACTIVE_ALARM_LIST);
assertTrue(receivedRequest.getTimeout() == 10000);
}
catch (JMSException e) {
assertTrue(e.getMessage(), false);
}
}
@Test
public void testTransferTagMessageConversion() {
JsonRequest<TagUpdate> request = new ClientRequestImpl<TagUpdate>(TagUpdate.class);
TextMessage message = new ActiveMQTextMessage();
try {
message.setText(request.toJson());
ClientRequest receivedRequest = ClientRequestMessageConverter.fromMessage(message);
assertTrue(receivedRequest.getRequestType() == ClientRequest.RequestType.TAG_REQUEST);
assertTrue(receivedRequest.getResultType() == ClientRequest.ResultType.TRANSFER_TAG_LIST);
}
catch (JMSException e) {
assertTrue(e.getMessage(), false);
}
}
@Test
public void testTransferTagValueMessageConversion() {
JsonRequest<TagValueUpdate> request = new ClientRequestImpl<TagValueUpdate>(TagValueUpdate.class);
TextMessage message = new ActiveMQTextMessage();
try {
message.setText(request.toJson());
ClientRequest receivedRequest = ClientRequestMessageConverter.fromMessage(message);
assertTrue(receivedRequest.getRequestType() == ClientRequest.RequestType.TAG_REQUEST);
assertTrue(receivedRequest.getResultType() == ClientRequest.ResultType.TRANSFER_TAG_VALUE_LIST);
}
catch (JMSException e) {
assertTrue(e.getMessage(), false);
}
}
@Test
public void testAlarmValueMessageConversion() {
JsonRequest<AlarmValue> request = new ClientRequestImpl<AlarmValue>(AlarmValue.class);
TextMessage message = new ActiveMQTextMessage();
try {
message.setText(request.toJson());
ClientRequest receivedRequest = ClientRequestMessageConverter.fromMessage(message);
assertTrue(receivedRequest.getRequestType() == ClientRequest.RequestType.ALARM_REQUEST);
assertTrue(receivedRequest.getResultType() == ClientRequest.ResultType.TRANSFER_ALARM_LIST);
}
catch (JMSException e) {
assertTrue(e.getMessage(), false);
}
}
@Test
public void testTagConfigMessageConversion() {
JsonRequest<TagConfig> request = new ClientRequestImpl<TagConfig>(TagConfig.class);
TextMessage message = new ActiveMQTextMessage();
try {
message.setText(request.toJson());
ClientRequest receivedRequest = ClientRequestMessageConverter.fromMessage(message);
assertTrue(receivedRequest.getRequestType() == ClientRequest.RequestType.TAG_CONFIGURATION_REQUEST);
assertTrue(receivedRequest.getResultType() == ClientRequest.ResultType.TRANSFER_TAG_CONFIGURATION_LIST);
}
catch (JMSException e) {
assertTrue(e.getMessage(), false);
}
}
@Test
public void testCommandTagHandleMessageConversion() {
JsonRequest<CommandTagHandle> request = new ClientRequestImpl<CommandTagHandle>(CommandTagHandle.class);
TextMessage message = new ActiveMQTextMessage();
try {
message.setText(request.toJson());
ClientRequest receivedRequest = ClientRequestMessageConverter.fromMessage(message);
assertTrue(receivedRequest.getRequestType() == ClientRequest.RequestType.COMMAND_HANDLE_REQUEST);
assertTrue(receivedRequest.getResultType() == ClientRequest.ResultType.TRANSFER_COMMAND_HANDLES_LIST);
assertTrue(receivedRequest.requiresObjectResponse());
}
catch (JMSException e) {
assertTrue(e.getMessage(), false);
}
}
@Test
public void testExecuteCommandMessageConversion() {
JsonRequest<CommandReport> request = new ClientRequestImpl<CommandReport>(CommandReport.class);
TextMessage message = new ActiveMQTextMessage();
try {
message.setText(request.toJson());
ClientRequest receivedRequest = ClientRequestMessageConverter.fromMessage(message);
assertTrue(receivedRequest.getRequestType() == ClientRequest.RequestType.EXECUTE_COMMAND_REQUEST);
assertTrue(receivedRequest.getResultType() == ClientRequest.ResultType.TRANSFER_COMMAND_REPORT);
}
catch (JMSException e) {
assertTrue(e.getMessage(), false);
}
}
@Test
public void testProcessNamesMessageConversion() {
ClientRequestImpl<ProcessNameResponse> request = new ClientRequestImpl<ProcessNameResponse>(ProcessNameResponse.class);
TextMessage message = new ActiveMQTextMessage();
try {
message.setText(request.toJson());
ClientRequest receivedRequest = ClientRequestMessageConverter.fromMessage(message);
assertTrue(receivedRequest.getRequestType() == ClientRequest.RequestType.PROCESS_NAMES_REQUEST);
assertTrue(receivedRequest.getResultType() == ClientRequest.ResultType.TRANSFER_PROCESS_NAMES);
}
catch (JMSException e) {
assertTrue(e.getMessage(), false);
}
}
@Test
public void testDeviceClassNamesMessageConversion() {
ClientRequestImpl<DeviceClassNameResponse> request = new ClientRequestImpl<>(DeviceClassNameResponse.class);
TextMessage message = new ActiveMQTextMessage();
try {
message.setText(request.toJson());
ClientRequest receivedRequest = ClientRequestMessageConverter.fromMessage(message);
assertTrue(receivedRequest.getRequestType() == ClientRequest.RequestType.DEVICE_CLASS_NAMES_REQUEST);
assertTrue(receivedRequest.getResultType() == ClientRequest.ResultType.TRANSFER_DEVICE_CLASS_NAMES);
}
catch (JMSException e) {
assertTrue(e.getMessage(), false);
}
}
@Test
public void testDevicesMessageConversion() {
ClientRequestImpl<TransferDevice> request = new ClientRequestImpl<>(TransferDevice.class);
TextMessage message = new ActiveMQTextMessage();
try {
message.setText(request.toJson());
ClientRequest receivedRequest = ClientRequestMessageConverter.fromMessage(message);
assertTrue(receivedRequest.getRequestType() == ClientRequest.RequestType.DEVICE_REQUEST);
assertTrue(receivedRequest.getResultType() == ClientRequest.ResultType.TRANSFER_DEVICE_LIST);
}
catch (JMSException e) {
assertTrue(e.getMessage(), false);
}
}
@Test
public void testCompressedOverCompressedNetwork() throws Exception {
ActiveMQConnection localAmqConnection = (ActiveMQConnection) localConnection;
localAmqConnection.setUseCompression(true);
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 1, included);
StringBuilder payload = new StringBuilder("test-");
for (int i = 0; i < 100; ++i) {
payload.append(UUID.randomUUID().toString());
}
Message test = localSession.createTextMessage(payload.toString());
producer.send(test);
Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
assertNotNull(msg);
ActiveMQTextMessage message = (ActiveMQTextMessage) msg;
assertTrue(message.isCompressed());
assertEquals(payload.toString(), message.getText());
}
@Test
public void testTextMessageCompression() throws Exception {
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 1, included);
StringBuilder payload = new StringBuilder("test-");
for (int i = 0; i < 100; ++i) {
payload.append(UUID.randomUUID().toString());
}
Message test = localSession.createTextMessage(payload.toString());
producer.send(test);
Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
assertNotNull(msg);
ActiveMQTextMessage message = (ActiveMQTextMessage) msg;
assertTrue(message.isCompressed());
assertEquals(payload.toString(), message.getText());
}
@Test
public void testSendReceive() throws Exception {
for (int i = 0; i < MESSAGE_COUNT; i++) {
TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("MSG-NO:" + i);
for (int x = 0; x < producers.length; x++) {
producers[x].send(textMessage);
}
}
synchronized (receivedMessageCount) {
while (receivedMessageCount.get() < expectedReceiveCount()) {
receivedMessageCount.wait(20000);
}
}
// sleep a little - to check we don't get too many messages
Thread.sleep(2000);
LOG.info("GOT: " + receivedMessageCount.get() + " Expected: " + expectedReceiveCount());
Assert.assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get());
}
public void testMaxFrameSize() throws Exception {
OpenWireFormat wf = new OpenWireFormat();
wf.setMaxFrameSize(10);
ActiveMQTextMessage msg = new ActiveMQTextMessage();
msg.setText("This is a test");
writeObject(msg);
ds.writeInt(endOfStreamMarker);
// now lets read from the stream
ds.close();
ByteArrayInputStream in = new ByteArrayInputStream(buffer.toByteArray());
DataInputStream dis = new DataInputStream(in);
try {
wf.unmarshal(dis);
} catch (IOException ioe) {
return;
}
fail("Should fail because of the large frame size");
}
@Override
public void execute(ActiveMQDestination destination, MessageListener listener) throws Exception {
LOG.info("Initial query is creating: " + MESSAGE_COUNT + " messages");
for (int i = 0; i < MESSAGE_COUNT; i++) {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText("Initial message: " + i + " loaded from query");
listener.onMessage(message);
}
}
private String getMessage(Object result) {
final String simpleClassName = result.getClass().getSimpleName();
try {
// should we record other message types as well?
if (result instanceof ActiveMQTextMessage) {
// could trigger decoding (would it affect the client? if so, we might need to copy first)
String text = ((ActiveMQTextMessage) result).getText();
StringBuilder sb = new StringBuilder(simpleClassName);
sb.append('{').append(text).append('}');
return sb.toString();
}
} catch (JMSException e) {
// ignore
}
return simpleClassName;
}
/**
* Test SendBackTextMessages
*
* @throws Exception
*/
@Test
public void testSendBackTextMessages() throws Exception {
String replyQueueName = "testQueueReplyTxt";
Properties jmsProperties = JMSTestsUtils.getJMSPropertiesForDestination(replyQueueName, PROVIDER_URL, true);
JMSBrokerController brokerController = new JMSBrokerController(PROVIDER_URL, jmsProperties);
try {
brokerController.startProcess();
Queue replyQueue = brokerController.connect(replyQueueName, true);
CachedJMSConnectionFactory cachedJMSConnectionFactory = new CachedJMSConnectionFactory(jmsProperties);
MessageContext messageContext = JMSTestsUtils.createMessageContext();
String correlationID = UUID.randomUUID().toString();
this.setSOAPEnvelopWithTextBody(messageContext);
this.setTransportHeaders(((Axis2MessageContext) messageContext).getAxis2MessageContext(),
JMSConstants.JMS_TEXT_MESSAGE, correlationID);
messageContext.setProperty(JMSConstants.JMS_COORELATION_ID, correlationID);
JMSReplySender replySender = new JMSReplySender(replyQueue, cachedJMSConnectionFactory, null, null);
String soapAction = "urn:test";
((Axis2MessageContext) messageContext).getAxis2MessageContext().setServerSide(true);
((Axis2MessageContext) messageContext).getAxis2MessageContext()
.setProperty(BaseConstants.SOAPACTION, soapAction);
replySender.sendBack(messageContext);
Message replyMsg = brokerController.receiveMessage(replyQueue);
Assert.assertNotNull("The reply message cannot be null", replyMsg);
Assert.assertEquals("The Message type of received message does not match", JMSConstants.JMS_TEXT_MESSAGE,
replyMsg.getJMSType());
Assert.assertEquals("The Content of received message does not match", "TestSendBack",
((ActiveMQTextMessage) replyMsg).getText());
} finally {
brokerController.disconnect();
brokerController.stopProcess();
}
}
@Override
public void pushP2P(Integer userId, String message) {
MessageProducer messageProducer = p2pHashMap.get(userId);
TextMessage textMessage = new ActiveMQTextMessage();
try {
textMessage.setText(message);
messageProducer.send(textMessage);
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void pushGroup(String groupNumber, String message) {
MessageProducer messageProducer = groupHashMap.get(groupNumber);
TextMessage textMessage = new ActiveMQTextMessage();
try {
textMessage.setText(message);
messageProducer.send(textMessage);
} catch (JMSException e) {
e.printStackTrace();
}
}
@Test
public void testMap2FlowFileTextMessage() throws Exception {
TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
TextMessage textMessage = new ActiveMQTextMessage();
String payload = "Hello world!";
textMessage.setText(payload);
ProcessContext context = runner.getProcessContext();
ProcessSession session = runner.getProcessSessionFactory().createSession();
ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(), (MockProcessContext) runner.getProcessContext());
JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, textMessage, true, pic.getLogger());
assertEquals("TextMessage content length should equal to FlowFile content size", payload.length(), summary.getLastFlowFile().getSize());
final byte[] buffer = new byte[payload.length()];
runner.clearTransferState();
session.read(summary.getLastFlowFile(), new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
}
});
String contentString = new String(buffer, "UTF-8");
assertEquals("", payload, contentString);
}
@Test
public void testSupervisionMessageConversion() {
JsonRequest<SupervisionEvent> request = new ClientRequestImpl<SupervisionEvent>(SupervisionEvent.class);
TextMessage message = new ActiveMQTextMessage();
try {
message.setText(request.toJson());
ClientRequest receivedRequest = ClientRequestMessageConverter.fromMessage(message);
assertTrue(receivedRequest.getRequestType() == ClientRequest.RequestType.SUPERVISION_REQUEST);
}
catch (JMSException e) {
assertTrue(e.getMessage(), false);
}
}
void assertNoProperties(ActiveMQTextMessage message) {
try {
assertThat(message.getProperties()).isEmpty();
} catch (IOException e) {
throw new AssertionError(e);
}
}
protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
message.setDestination(destination);
message.setPersistent(false);
try {
message.setText("Test Message Payload.");
} catch (MessageNotWriteableException e) {
}
return message;
}
private Message getMessage(int i) throws Exception {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setMessageId(new MessageId(mesageIdRoot + i));
message.setDestination(destination);
message.setPersistent(false);
message.setResponseRequired(true);
message.setText("Msg:" + i + " " + text);
assertEquals(message.getMessageId().getProducerSequenceId(), i);
return message;
}
private ActiveMQTextMessage getMessage(int i) throws Exception {
ActiveMQTextMessage message = new ActiveMQTextMessage();
MessageId id = new MessageId(mesageIdRoot + i);
id.setBrokerSequenceId(i);
id.setProducerSequenceId(i);
message.setMessageId(id);
message.setDestination(destination);
message.setPersistent(true);
message.setResponseRequired(true);
message.setText("Msg:" + i + " " + text);
assertEquals(message.getMessageId().getProducerSequenceId(), i);
return message;
}
private ActiveMQTextMessage getMessage(int i) throws Exception {
ActiveMQTextMessage message = new ActiveMQTextMessage();
MessageId id = new MessageId(mesageIdRoot + i);
id.setBrokerSequenceId(i);
id.setProducerSequenceId(i);
message.setMessageId(id);
message.setDestination(destination);
message.setPersistent(true);
message.setResponseRequired(true);
message.setText("Msg:" + i + " " + text);
assertEquals(message.getMessageId().getProducerSequenceId(), i);
return message;
}