下面列出了怎么用javax.jms.TemporaryTopic的API类实例代码及写法,或者点击链接到github查看源代码。
@Test(timeout = 60000)
public void testCantConsumeFromTemporaryTopicCreatedOnAnotherConnection() throws Exception {
connection = createAmqpConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic tempTopic = session.createTemporaryTopic();
session.createConsumer(tempTopic);
Connection connection2 = createAmqpConnection();
try {
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
session2.createConsumer(tempTopic);
fail("should not be able to consumer from temporary topic from another connection");
} catch (InvalidDestinationException ide) {
// expected
}
} finally {
connection2.close();
}
}
@Test
public void testConsumeFromTempTopicSuccess() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL ALLOW-LOG %s CREATE QUEUE temporary=\"true\"", USER1),
String.format("ACL ALLOW-LOG %s CONSUME QUEUE temporary=\"true\"", USER1),
String.format(isLegacyClient()
? "ACL ALLOW-LOG %s BIND EXCHANGE name=\"amq.topic\""
: "ACL ALLOW-LOG %s BIND EXCHANGE temporary=\"true\"", USER1));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
TemporaryTopic temporaryTopic = session.createTemporaryTopic();
session.createConsumer(temporaryTopic);
}
finally
{
connection.close();
}
}
@Test
public void testCreateTempTopicSuccess() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic temporaryTopic = session.createTemporaryTopic();
assertNotNull(temporaryTopic);
}
finally
{
connection.close();
}
}
@Test
public void testPublishToTempTopicSuccess() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
isLegacyClient() ? String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"amq.topic\"", USER1) :
String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE temporary=\"true\"", USER1));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
TemporaryTopic temporaryTopic = session.createTemporaryTopic();
MessageProducer producer = session.createProducer(temporaryTopic);
producer.send(session.createMessage());
session.commit();
}
finally
{
connection.close();
}
}
@Test(timeout=30000)
public void testDeleteOfTempTopicOnClosedConnection() throws JMSException, IOException {
connection = new JmsConnection(connectionInfo, provider);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic tempTopic = session.createTemporaryTopic();
assertNotNull(tempTopic);
connection.close();
try {
tempTopic.delete();
fail("Should have thrown an IllegalStateException");
} catch (IllegalStateException ex) {
}
}
@Test(timeout = 60000)
public void testCantDeleteTemporaryTopicWithConsumers() throws Exception {
connection = createAmqpConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic tempTopic = session.createTemporaryTopic();
MessageConsumer consumer = session.createConsumer(tempTopic);
try {
tempTopic.delete();
fail("should not be able to delete temporary topic with active consumers");
} catch (IllegalStateException ide) {
// expected
}
consumer.close();
// Now it should be allowed
tempTopic.delete();
}
public void testLoadTempAdvisoryTopics() throws Exception {
for (int i = 0; i < MESSAGE_COUNT; i++) {
TemporaryTopic tempTopic = session.createTemporaryTopic();
MessageConsumer consumer = session.createConsumer(tempTopic);
MessageProducer producer = session.createProducer(tempTopic);
consumer.close();
producer.close();
tempTopic.delete();
}
AdvisoryBroker ab = (AdvisoryBroker) broker.getBroker().getAdaptor(AdvisoryBroker.class);
assertTrue(ab.getAdvisoryDestinations().size() == 0);
assertTrue(ab.getAdvisoryConsumers().size() == 0);
assertTrue(ab.getAdvisoryProducers().size() == 0);
RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class);
for (Destination dest : rb.getDestinationMap().values()) {
LOG.debug("Destination: {}", dest);
}
// there should be at least 2 destinations - advisories -
// 1 for the connection + 1 generic ones
assertTrue("Should be at least 2 destinations", rb.getDestinationMap().size() > 2);
}
@Test
public void testTemporaryTopicShouldNotBeInJNDI() throws Exception {
Connection producerConnection = createConnection();
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic tempTopic = producerSession.createTemporaryTopic();
String topicName = tempTopic.getTopicName();
try {
ic.lookup("/topic/" + topicName);
ProxyAssertSupport.fail("The temporary queue should not be bound to JNDI");
} catch (NamingException e) {
// Expected
}
}
/**
* https://jira.jboss.org/jira/browse/JBMESSAGING-1566
*/
@Test
public void testCanNotCreateConsumerFromAnotherCnnectionForTemporaryTopic() throws Exception {
Connection conn = createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic tempTopic = sess.createTemporaryTopic();
Connection anotherConn = createConnection();
Session sessFromAnotherConn = anotherConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
sessFromAnotherConn.createConsumer(tempTopic);
ProxyAssertSupport.fail("Only temporary destination's own connection is allowed to create MessageConsumers for them.");
} catch (JMSException e) {
}
}
public static byte destinationType(Destination destination) {
if (destination instanceof Queue) {
if (destination instanceof TemporaryQueue) {
return TEMP_QUEUE_TYPE;
} else {
return QUEUE_TYPE;
}
} else if (destination instanceof Topic) {
if (destination instanceof TemporaryTopic) {
return TEMP_TOPIC_TYPE;
} else {
return TOPIC_TYPE;
}
}
return QUEUE_TYPE;
}
/**
* Tests that a connection with a 'prefixes' set on its does not alter the
* address for a temporary queue in the to/reply-to fields for incoming messages.
*
* @throws Exception if an error occurs during the test.
*/
@Test(timeout = 20000)
public void testReceivedMessageWithTemporaryTopicDestinationsOnConnectionWithPrefixes() throws Exception {
Class<? extends Destination> destType = TemporaryTopic.class;
String destPrefix = "q12321-";
String destName = "temp-topic://myTempTopic";
String replyName = "temp-topic://myReplyTempTopic";
String destAddress = destName; // We won't manipulate the temporary addresses generated by the broker
String replyAddress = replyName; // We won't manipulate the temporary addresses generated by the broker
String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL.toString();
Byte annotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE;
String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL.toString();
Byte replyAnnotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE;
doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
destAddress, replyAddress, annotationName,
annotationValue, replyAnnotationName, replyAnnotationValue);
}
@Test(timeout = 20000)
public void testCreateTemporaryTopic() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String dynamicAddress = "myTempTopicAddress";
testPeer.expectTempTopicCreationAttach(dynamicAddress);
TemporaryTopic tempTopic = session.createTemporaryTopic();
assertNotNull("TemporaryTopic object was null", tempTopic);
assertNotNull("TemporaryTopic name was null", tempTopic.getTopicName());
assertEquals("TemporaryTopic name not as expected", dynamicAddress, tempTopic.getTopicName());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testCreateAndDeleteTemporaryTopic() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String dynamicAddress = "myTempTopicAddress";
testPeer.expectTempTopicCreationAttach(dynamicAddress);
TemporaryTopic tempTopic = session.createTemporaryTopic();
// Deleting the TemporaryTopic will be achieved by closing its creating link.
testPeer.expectDetach(true, true, true);
tempTopic.delete();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Override
public TemporaryTopic createTemporaryTopic() {
try {
return getSession().createTemporaryTopic();
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public TemporaryTopic createTemporaryTopic() {
try {
return getSession().createTemporaryTopic();
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Test(timeout = 60000)
public void testCreateTemporaryTopic() throws Exception {
JmsPoolConnection connection = (JmsPoolConnection) cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull(topic);
assertTrue(topic instanceof MockJMSTemporaryTopic);
}
@Test
public void testMessageDeliveryUsingTemporaryTopic() throws Exception
{
final Connection connection = getConnection();
try
{
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull("Temporary topic is null", topic);
final MessageProducer producer = session.createProducer(topic);
final MessageConsumer consumer1 = session.createConsumer(topic);
final MessageConsumer consumer2 = session.createConsumer(topic);
connection.start();
producer.send(session.createTextMessage("hello"));
final TextMessage tm1 = (TextMessage) consumer1.receive(getReceiveTimeout());
final TextMessage tm2 = (TextMessage) consumer2.receive(getReceiveTimeout());
assertNotNull("Message not received by subscriber1", tm1);
assertEquals("hello", tm1.getText());
assertNotNull("Message not received by subscriber2", tm2);
assertEquals("hello", tm2.getText());
}
finally
{
connection.close();
}
}
@Test
public void testUseFromAnotherConnectionProhibited() throws Exception
{
final Connection connection = getConnection();
try
{
final Connection connection2 = getConnection();
try
{
final Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryTopic topic = session1.createTemporaryTopic();
try
{
session2.createConsumer(topic);
fail("Expected a JMSException when subscribing to a temporary topic created on a different connection");
}
catch (JMSException je)
{
// pass
}
}
finally
{
connection2.close();
}
}
finally
{
connection.close();
}
}
@Test
public void testTempTopicDelete() throws Exception {
connection.start();
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic tempTopic = topicSession.createTemporaryTopic();
ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection();
try {
TopicSession newTopicSession = newConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = newTopicSession.createPublisher(tempTopic);
// need to wait here because the ActiveMQ client's temp destination map is updated asynchronously, not waiting can introduce a race
assertTrue(Wait.waitFor(() -> newConn.activeTempDestinations.size() == 1, 2000, 100));
TextMessage msg = newTopicSession.createTextMessage("Test Message");
publisher.publish(msg);
try {
TopicSubscriber consumer = newTopicSession.createSubscriber(tempTopic);
fail("should have gotten exception but got consumer: " + consumer);
} catch (JMSException ex) {
//correct
}
connection.close();
try {
Message newMsg = newTopicSession.createMessage();
publisher.publish(newMsg);
} catch (JMSException e) {
//ok
}
} finally {
newConn.close();
}
}
@Test
public void testTempTopicLeak() throws Exception {
Connection connection = null;
try {
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic temporaryTopic = session.createTemporaryTopic();
temporaryTopic.delete();
Object[] queueResources = server.getManagementService().getResources(QueueControl.class);
for (Object queueResource : queueResources) {
if (((QueueControl) queueResource).getAddress().equals("ActiveMQ.Advisory.TempTopic")) {
QueueControl queueControl = (QueueControl) queueResource;
Wait.waitFor(() -> queueControl.getMessageCount() == 0);
assertNotNull("addressControl for temp advisory", queueControl);
Wait.assertEquals(0, queueControl::getMessageCount);
Wait.assertEquals(2, queueControl::getMessagesAdded);
}
}
} finally {
if (connection != null) {
connection.close();
}
}
}
protected void deleteTemporaryDestination(Destination dest) throws JMSException {
if (topic) {
((TemporaryTopic) dest).delete();
} else {
((TemporaryQueue) dest).delete();
}
}
@Test(timeout = 30000)
public void testDeleteTemporaryTopic() throws Exception {
Connection connection = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final javax.jms.Topic topic = session.createTemporaryTopic();
assertNotNull(topic);
assertTrue(topic instanceof TemporaryTopic);
Queue queueView = getProxyToQueue(topic.getTopicName());
assertNotNull(queueView);
TemporaryTopic tempTopic = (TemporaryTopic) topic;
tempTopic.delete();
assertTrue("Temp Queue should be deleted.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return getProxyToQueue(topic.getTopicName()) == null;
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
} finally {
connection.close();
}
}
protected void deleteTemporaryDestination(Destination dest) throws JMSException {
if (topic) {
((TemporaryTopic) dest).delete();
} else {
((TemporaryQueue) dest).delete();
}
}
@Test
public void testTemporaryTopicDeleteWithConsumer() throws Exception {
Connection conn = null;
try {
conn = createConnection();
Session producerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session consumerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic tempTopic = producerSession.createTemporaryTopic();
MessageConsumer consumer = consumerSession.createConsumer(tempTopic);
try {
tempTopic.delete();
ProxyAssertSupport.fail("Should throw JMSException");
} catch (JMSException e) {
// Should fail - you can't delete a temp topic if it has active consumers
}
consumer.close();
} finally {
if (conn != null) {
conn.close();
}
}
}
/**
* Add temporary topic
*
* @param temp The temporary topic
*/
@Override
public void addTemporaryTopic(final TemporaryTopic temp) {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("addTemporaryTopic(" + temp + ")");
}
synchronized (tempTopics) {
tempTopics.add(temp);
}
}
@Override
public TemporaryTopic createTemporaryTopic() throws JMSException {
// As per spec. section 4.11
if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) {
throw new IllegalStateException("Cannot create a temporary topic on a QueueSession");
}
try {
final ActiveMQTemporaryTopic topic;
if (enable1xPrefixes) {
topic = ActiveMQDestination.createTemporaryTopic(this, PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString());
} else {
topic = ActiveMQDestination.createTemporaryTopic(this);
}
SimpleString simpleAddress = topic.getSimpleAddress();
// We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS
// checks when routing messages to a topic that
// does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
// subscriptions - core has no notion of a topic
session.createQueue(new QueueConfiguration(simpleAddress).setAddress(simpleAddress).setFilterString(ActiveMQSession.REJECTING_FILTER).setDurable(false).setTemporary(true));
connection.addTemporaryQueue(simpleAddress);
return topic;
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
}
@Override
public TemporaryTopic createTemporaryTopic() {
checkSession();
try {
return session.createTemporaryTopic();
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
}
@Override
public TemporaryTopic createTemporaryTopic() {
try {
return session().createTemporaryTopic();
} catch (final JMSException e) {
throw toRuntimeException(e);
}
}
/**
* @return a newly initialized TemporaryTopic instance.
*/
protected TemporaryTopic createTemporaryTopic() throws JMSException {
String destinationName = connectionInfo.getId() + ":" + tempDestIdGenerator.incrementAndGet();
JmsTemporaryTopic topic = new JmsTemporaryTopic(destinationName);
createResource(topic);
tempDestinations.put(topic, topic);
topic.setConnection(this);
return topic;
}
@Override
public TemporaryTopic createTemporaryTopic() {
try {
return getSession().createTemporaryTopic();
} catch (JMSException jmse) {
throw JmsExceptionSupport.createRuntimeException(jmse);
}
}