下面列出了javax.jms.Topic#javax.jms.MessageProducer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void testSendInvalidMessage() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList = new ConsumerBean();
messageList.setVerbose(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = new ActiveMQQueue("MyQueue");
MessageConsumer c1 = session.createConsumer(destination);
c1.setMessageListener(messageList);
MessageProducer producer = session.createProducer(destination);
assertNotNull(producer);
producer.send(createMessage(session, "invalidBody", "myHeader", "xyz"));
producer.send(createMessage(session, "validBody", "myHeader", "abc"));
messageList.assertMessagesArrived(1);
assertEquals("validBody", ((TextMessage) messageList.flushMessages().get(0)).getText());
}
@Test
public void testAutoAck() throws Exception {
Connection connection = factory.createConnection();
Collection<Session> sessions = new LinkedList<>();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage msg = session.createTextMessage("test");
msg.setStringProperty("abc", "testAutoACK");
producer.send(msg);
Assert.assertNull(consumer.receive(100));
connection.start();
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
connection.close();
System.err.println("Done!!!");
}
private void sendTestStreamMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeBoolean(true);
streamMessage.writeByte((byte) 10);
streamMessage.writeBytes(TEXT.getBytes());
streamMessage.writeChar('A');
streamMessage.writeDouble(55.3D);
streamMessage.writeFloat(79.1F);
streamMessage.writeInt(37);
streamMessage.writeLong(56652L);
streamMessage.writeObject(new String("VVVV"));
streamMessage.writeShort((short) 333);
streamMessage.writeString(TEXT);
producer.send(streamMessage);
connection.close();
}
@Test
public void testMoveSelector() throws Exception{
final String cmdLine = getConnectCommand() + "-" + CMD_MOVE_QUEUE + " SOURCE.QUEUE -s identity='theOne' TARGET.QUEUE";
MessageProducer mp = session.createProducer(sourceQueue);
Message theOne = session.createTextMessage("theOne"); // message
theOne.setStringProperty("identity","theOne");
Message theOther = session.createTextMessage("theOther"); // message
theOther.setStringProperty("identity","theOther");
mp.send(theOne);
mp.send(theOther);
a.run(cmdLine.split(" "));
List<TextMessage> msgs = getAllMessages(session.createConsumer(sourceQueue));
assertEquals(1,msgs.size());
assertEquals("theOther",msgs.get(0).getText());
msgs = getAllMessages(session.createConsumer(targetQueue));
assertEquals(1,msgs.size());
assertEquals("theOne",msgs.get(0).getText());
}
@Test(expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = "Error while committing dtx session.*")
public void testTwoPhaseCommitWithoutPrepare() throws Exception {
String topicName = "testSubscriberWithCommit";
String testMessage = "testSubscriberWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory().withTopic(topicName).build();
XATopicConnectionFactory xaTopicConnectionFactory =
(XATopicConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
xaConnection = xaTopicConnectionFactory.createXATopicConnection();
xaSession = xaConnection.createXATopicSession();
xaResource = xaSession.getXAResource();
Topic topic = (Topic) initialContext.lookup(topicName);
MessageProducer producer = xaSession.createProducer(topic);
xaConnection.start();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(xaSession.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.commit(xid, false);
}
@Test
public void testPublishToTempTopicSuccess() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
isLegacyClient() ? String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"amq.topic\"", USER1) :
String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE temporary=\"true\"", USER1));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
TemporaryTopic temporaryTopic = session.createTemporaryTopic();
MessageProducer producer = session.createProducer(temporaryTopic);
producer.send(session.createMessage());
session.commit();
}
finally
{
connection.close();
}
}
public void testBrokerStats() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
Queue query = session.createQueue(StatisticsBroker.STATS_BROKER_PREFIX);
MessageProducer producer = session.createProducer(query);
Message msg = session.createMessage();
msg.setJMSReplyTo(replyTo);
producer.send(msg);
MapMessage reply = (MapMessage) consumer.receive(10 * 1000);
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
assertTrue(reply.getJMSTimestamp() > 0);
assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());
/*
for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
String name = e.nextElement().toString();
System.err.println(name+"="+reply.getObject(name));
}
*/
}
/**
* Send the given response message to the given destination.
* @param response the JMS message to send
* @param destination the JMS destination to send to
* @param session the JMS session to operate on
* @throws JMSException if thrown by JMS API methods
* @see #postProcessProducer
* @see javax.jms.Session#createProducer
* @see javax.jms.MessageProducer#send
*/
protected void sendResponse(Session session, Destination destination, Message response) throws JMSException {
MessageProducer producer = session.createProducer(destination);
try {
postProcessProducer(producer, response);
QosSettings settings = getResponseQosSettings();
if (settings != null) {
producer.send(response, settings.getDeliveryMode(), settings.getPriority(),
settings.getTimeToLive());
}
else {
producer.send(response);
}
}
finally {
JmsUtils.closeMessageProducer(producer);
}
}
public void sendMessages(ConnectionFactory connectionFactory) throws Exception {
for (int i = 0; i < NUM_MESSAGES; i++) {
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE);
MessageProducer producer = session.createProducer(destination);
String msgTo = "hello";
TextMessage message = session.createTextMessage(msgTo);
producer.send(message);
connection.close();
LOG.debug("sent " + i + " messages using " + connectionFactory.getClass());
}
}
private void sendReply(final Session session, final Destination jmsReplyTo, final Serializable correlationId)
throws JMSException
{
final Message replyToMessage = session.createMessage();
if (correlationId != null)
{
if (correlationId instanceof byte[])
{
replyToMessage.setJMSCorrelationIDAsBytes((byte[]) correlationId);
}
else
{
replyToMessage.setJMSCorrelationID((String) correlationId);
}
}
System.out.println(String.format("Sending reply message: %s", replyToMessage));
MessageProducer producer = session.createProducer(jmsReplyTo);
try
{
producer.send(replyToMessage);
}
finally
{
producer.close();
}
}
@Test
public void testConverter() throws Exception {
JmsTemplate template = createTemplate();
template.setConnectionFactory(connectionFactory);
template.setMessageConverter(new SimpleMessageConverter());
String s = "Hello world";
MessageProducer messageProducer = mock(MessageProducer.class);
TextMessage textMessage = mock(TextMessage.class);
given(session.createProducer(queue)).willReturn(messageProducer);
given(session.createTextMessage("Hello world")).willReturn(textMessage);
template.convertAndSend(queue, s);
verify(messageProducer).send(textMessage);
verify(messageProducer).close();
if (useTransactedTemplate()) {
verify(session).commit();
}
verify(session).close();
verify(connection).close();
}
@Test
public void replyPayloadToDestination() throws JMSException {
Session session = mock(Session.class);
MessageProducer messageProducer = mock(MessageProducer.class);
TextMessage responseMessage = mock(TextMessage.class);
given(session.createTextMessage("Response")).willReturn(responseMessage);
given(session.createProducer(sharedReplyDestination)).willReturn(messageProducer);
MessagingMessageListenerAdapter listener = getPayloadInstance("Response", "replyPayloadToDestination", Message.class);
listener.onMessage(mock(javax.jms.Message.class), session);
verify(session, times(0)).createQueue(anyString());
verify(session).createTextMessage("Response");
verify(messageProducer).send(responseMessage);
verify(messageProducer).close();
}
public TextMessage testReplyWithJackson(String methodName, String replyContent) throws JMSException {
Queue replyDestination = mock(Queue.class);
Session session = mock(Session.class);
MessageProducer messageProducer = mock(MessageProducer.class);
TextMessage responseMessage = mock(TextMessage.class);
given(session.createTextMessage(replyContent)).willReturn(responseMessage);
given(session.createProducer(replyDestination)).willReturn(messageProducer);
MessagingMessageListenerAdapter listener = getPayloadInstance("Response", methodName, Message.class);
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setTargetType(MessageType.TEXT);
listener.setMessageConverter(messageConverter);
listener.setDefaultResponseDestination(replyDestination);
listener.onMessage(mock(javax.jms.Message.class), session);
verify(session, times(0)).createQueue(anyString());
verify(session).createTextMessage(replyContent);
verify(messageProducer).send(responseMessage);
verify(messageProducer).close();
return responseMessage;
}
private void sendMultipleTextMessagesUsingCoreJms(String queueName, String text, int num) throws Exception {
Connection jmsConn = null;
try {
jmsConn = coreCf.createConnection();
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < num; i++) {
TextMessage msg = session.createTextMessage(text + i);
producer.send(msg);
}
} finally {
if (jmsConn != null) {
jmsConn.close();
}
}
}
public JmsPoolMessageProducer(JmsPoolSession session, MessageProducer messageProducer, Destination destination, AtomicInteger refCount) throws JMSException {
this.session = session;
this.messageProducer = messageProducer;
this.destination = destination;
this.refCount = refCount;
this.anonymousProducer = destination == null;
this.deliveryMode = messageProducer.getDeliveryMode();
this.disableMessageID = messageProducer.getDisableMessageID();
this.disableMessageTimestamp = messageProducer.getDisableMessageTimestamp();
this.priority = messageProducer.getPriority();
this.timeToLive = messageProducer.getTimeToLive();
if (session.isJMSVersionSupported(2, 0)) {
this.deliveryDelay = messageProducer.getDeliveryDelay();
}
}
public void publishMessage(String msg) throws NamingException, JMSException {
String topicName = "throttleData";
InitialContext initialContext = ClientHelper.getInitialContextBuilder("admin", "admin",
"localhost", "5672")
.withTopic(topicName)
.build();
ConnectionFactory connectionFactory
= (ConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = (Topic) initialContext.lookup(topicName);
MessageProducer producer = session.createProducer(topic);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("throttleKey", msg);
Date date = new Date();
long time = date.getTime() + 1000;
mapMessage.setLong("expiryTimeStamp", time);
mapMessage.setBoolean("isThrottled", true);
producer.send(mapMessage);
connection.close();
}
@Test
public void testRemoveNotScheduled() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the Browse Destination and the Reply To location
Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
MessageProducer producer = session.createProducer(management);
try {
// Send the remove request
Message remove = session.createMessage();
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, new IdGenerator().generateId());
producer.send(remove);
} catch (Exception e) {
fail("Caught unexpected exception during remove of unscheduled message.");
}
}
@Test(timeout=30000)
public void testBrokerStopWontHangConnectionClose() throws Exception {
connection = createAmqpConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = session.createTextMessage("Sample text");
producer.send(m);
stopPrimaryBroker();
try {
connection.close();
} catch (Exception ex) {
LOG.error("Should not thrown on disconnected connection close(): {}", ex);
fail("Should not have thrown an exception.");
}
}
/**
* Test that when a message is sent with default priority of 4, the emitted AMQP message has no value in the header
* priority field, since the default for that field is already 4.
*
* @throws Exception if an error occurs during the test.
*/
@Test(timeout = 20000)
public void testDefaultPriorityProducesMessagesWithoutPriorityField() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
// Create and transfer a new message
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true)
.withPriority(equalTo(null));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
testPeer.expectTransfer(messageMatcher);
testPeer.expectClose();
Message message = session.createTextMessage();
assertEquals(Message.DEFAULT_PRIORITY, message.getJMSPriority());
producer.send(message);
assertEquals(Message.DEFAULT_PRIORITY, message.getJMSPriority());
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test
public void testRoutingWithExchangeAndRoutingKeyDestination() throws Exception
{
assumeThat("AMQP 1.0 test", getProtocol(), is(equalTo(Protocol.AMQP_1_0)));
prepare();
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination sendingDestination = session.createTopic(EXCHANGE_NAME + "/" + ROUTING_KEY);
Destination receivingDestination = session.createQueue(QUEUE_NAME);
Message message = session.createTextMessage("test");
MessageProducer messageProducer = session.createProducer(sendingDestination);
messageProducer.send(message);
MessageConsumer messageConsumer = session.createConsumer(receivingDestination);
Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
assertNotNull("Message not received", receivedMessage);
assertEquals("test", ((TextMessage) message).getText());
}
finally
{
connection.close();
}
}
private void populateDestination(final int nbMessages,
final Destination destination,
javax.jms.Connection connection) throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
for (int i = 1; i <= nbMessages; i++) {
producer.send(session.createTextMessage("<hello id='" + i + "'/>"));
}
producer.close();
session.close();
}
@Test
public void testPublishFromAnotherConnectionAllowed() throws Exception
{
final Connection connection = getConnection();
try
{
final Connection connection2 = getConnection();
try
{
final Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryQueue queue = session1.createTemporaryQueue();
assertNotNull("Temporary queue cannot be null", queue);
MessageProducer producer = session2.createProducer(queue);
producer.send(session2.createMessage());
connection.start();
MessageConsumer consumer = session1.createConsumer(queue);
Message message = consumer.receive(getReceiveTimeout());
assertNotNull("Message not received", message);
}
finally
{
connection2.close();
}
}
finally
{
connection.close();
}
}
public void testBrokerZeroPrefetchConfig() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(brokerZeroQueue);
producer.send(session.createTextMessage("Msg1"));
// now lets receive it
MessageConsumer consumer = session.createConsumer(brokerZeroQueue);
TextMessage answer = (TextMessage) consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg1");
}
@Test
public void testSimpleMessageSendAndReceive() throws Exception {
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
Session s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
MessageProducer mp = s.createProducer(q);
MessageConsumer consumer = s.createConsumer(q);
Message message = s.createTextMessage("test");
mp.send(message);
queueConnection.start();
TextMessage textMessage = (TextMessage) consumer.receive(1000);
assertNotNull(textMessage);
assertEquals(textMessage.getText(), "test");
}
/**
* This test simply validates that {@link ConnectionFactory} can be setup by
* pointing to the location of the client libraries at runtime. It uses
* ActiveMQ which is not present at the POM but instead pulled from Maven
* repo using {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which
* implies that for this test to run the computer must be connected to the
* Internet. If computer is not connected to the Internet, this test will
* quietly fail logging a message.
*/
@Test
public void validateFactoryCreationWithActiveMQLibraries() throws Exception {
try {
String libPath = TestUtils.setupActiveMqLibForTesting(true);
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI,
"vm://localhost?broker.persistent=false");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
"org.apache.activemq.ActiveMQConnectionFactory");
runner.enableControllerService(cfProvider);
runner.assertValid(cfProvider);
Connection connection = cfProvider.getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("myqueue");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = session.createTextMessage("Hello");
producer.send(message);
assertEquals("Hello", ((TextMessage) consumer.receive()).getText());
connection.stop();
connection.close();
} catch (Exception e) {
logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e);
}
}
private void testPriority(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue1 = session1.createQueue(getQueueName());
javax.jms.Queue queue2 = session2.createQueue(getQueueName());
final MessageConsumer consumer2 = session2.createConsumer(queue2);
MessageProducer producer = session1.createProducer(queue1);
producer.setPriority(2);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message received = consumer2.receive(100);
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
assertEquals(2, received.getJMSPriority());
} finally {
connection1.close();
connection2.close();
}
}
private void sendConsumeDurableMessage() throws Exception {
try {
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue jmsQueue = s.createQueue(address.toString());
MessageProducer p = s.createProducer(jmsQueue);
p.setDeliveryMode(DeliveryMode.PERSISTENT);
conn.start();
p.send(s.createTextMessage("payload"));
} catch (JMSException expected) {
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
public void testFailoverTransportReconnect() throws Exception {
Connection exConn = null;
try {
String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + ")";
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
Queue queue = new ActiveMQQueue(durableQueueName);
exConn = exFact.createConnection();
exConn.start();
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.send(session.createTextMessage("Test"));
MessageConsumer consumer = session.createConsumer(queue);
assertNotNull(consumer.receive(5000));
server.stop();
Thread.sleep(3000);
server.start();
server.waitForActivation(10, TimeUnit.SECONDS);
messageProducer.send(session.createTextMessage("Test2"));
assertNotNull(consumer.receive(5000));
} finally {
if (exConn != null) {
exConn.close();
}
}
}
protected Session createProducer() throws JMSException {
final String consumerText = "Consumer: " + (++consumerCounter);
LOG.info("Creating consumer: " + consumerText + " on destination: " + destination);
Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = answer.createProducer(destination);
assertNotNull(producer);
return answer;
}
@Test
public void testSharedDurableConsumerWithClientID() throws Exception {
conn = cf.createConnection();
conn.setClientID("C1");
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Connection conn2 = cf.createConnection();
conn2.setClientID("C2");
Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
{
Connection conn3 = cf.createConnection();
boolean exception = false;
try {
conn3.setClientID("C2");
} catch (Exception e) {
exception = true;
}
Assert.assertTrue(exception);
conn3.close();
}
topic = ActiveMQJMSClient.createTopic(T_NAME);
MessageConsumer cons = session.createSharedDurableConsumer(topic, "test1");
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("test"));
TextMessage txt = (TextMessage) cons.receive(5000);
Assert.assertNotNull(txt);
}