下面列出了javax.jms.MessageProducer#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void createEntityUsingAmqpManagement(final String name,
final Session session,
final String type,
Map<String, Object> attributes)
throws JMSException
{
MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
MapMessage createMessage = session.createMapMessage();
createMessage.setStringProperty("type", type);
createMessage.setStringProperty("operation", "CREATE");
createMessage.setString("name", name);
createMessage.setString("object-path", name);
for (Map.Entry<String, Object> entry : attributes.entrySet())
{
createMessage.setObject(entry.getKey(), entry.getValue());
}
producer.send(createMessage);
if (session.getTransacted())
{
session.commit();
}
producer.close();
}
@Test
public void testCronScheduleWithTtlSet() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
MessageProducer producer = session.createProducer(destination);
producer.setTimeToLive(TimeUnit.MINUTES.toMillis(1));
TextMessage message = session.createTextMessage("test msg ");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
producer.send(message);
producer.close();
Thread.sleep(TimeUnit.MINUTES.toMillis(2));
assertNotNull(consumer.receiveNoWait());
assertNull(consumer.receiveNoWait());
}
public void sendOrder(int customerId, Date date, String... itemIds) throws Exception {
// format the JMS message from the input parameters
String d = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss").format(date);
String body = customerId + "," + d;
for (String id : itemIds) {
body += "," + id;
}
// use JMS code to send the message (a bit ugly code but it works)
Connection con = fac.createConnection();
con.start();
Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = ses.createQueue("order");
MessageProducer prod = ses.createProducer(dest);
prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Message msg = ses.createTextMessage(body);
prod.send(msg);
prod.close();
ses.close();
con.close();
}
@Test(timeout=20000)
public void testProducerCloseDoesNotBlock() throws Exception {
connection = (JmsConnection) factory.createConnection();
connection.addConnectionListener(new ConnectionInterruptionListener());
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(_testName.getMethodName());
MessageProducer producer = session.createProducer(queue);
mockPeer.shutdown();
connectionInterrupted.await(9, TimeUnit.SECONDS);
producer.close();
connection.close();
}
public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory, String queueName) throws JMSException {
Connection connection = connectionFactory.createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
final javax.jms.Message inMessage = consumer.receive();
String requestMessageId = inMessage.getJMSMessageID();
LOG.debug("Received message " + requestMessageId);
final TextMessage replyMessage = session.createTextMessage("Result");
replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID());
final MessageProducer producer = session.createProducer(inMessage.getJMSReplyTo());
LOG.debug("Sending reply to " + inMessage.getJMSReplyTo());
producer.send(replyMessage);
producer.close();
consumer.close();
session.close();
} finally {
connection.close();
}
}
@TransactionAttribute(value = TransactionAttributeType.NOT_SUPPORTED)
@Override
public void sendMessage() throws JMSException {
System.out.println("Sending reply message");
Connection conn = null;
Session session = null;
MessageProducer prod = null;
try {
conn = myCF.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
prod = session.createProducer(myReplyQueue);
ObjectMessage msg = session.createObjectMessage();
msg.setObject("Hello world!");
prod.send(msg, DeliveryMode.PERSISTENT, 0, 0);
} finally {
if (prod != null)
prod.close();
if (session != null)
session.close();
if (conn != null)
conn.close();
}
}
private void testJmsConnection(final javax.jms.Connection connection) throws JMSException {
final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
final Topic topic = session.createTopic("test");
final MessageProducer producer = session.createProducer(topic);
producer.send(session.createMessage());
producer.close();
session.close();
connection.close();
}
@Test
public void testReadBytesMessages() throws Exception {
// produce message
Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE));
BytesMessage message = session.createBytesMessage();
message.writeBytes("This Is A Test".getBytes(StandardCharsets.UTF_8));
producer.send(message);
producer.close();
session.close();
connection.close();
// read from the queue
PCollection<String> output =
pipeline.apply(
JmsIO.<String>readMessage()
.withConnectionFactory(connectionFactory)
.withQueue(QUEUE)
.withUsername(USERNAME)
.withPassword(PASSWORD)
.withMaxNumRecords(1)
.withCoder(SerializableCoder.of(String.class))
.withMessageMapper(new BytesMessageToStringMessageMapper()));
PAssert.thatSingleton(output.apply("Count", Count.<String>globally())).isEqualTo(1L);
pipeline.run();
connection = connectionFactory.createConnection(USERNAME, PASSWORD);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE));
Message msg = consumer.receiveNoWait();
assertNull(msg);
}
private void testJmsConnection(final Connection connection) throws JMSException {
final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
final Topic topic = session.createTopic("test");
final MessageProducer producer = session.createProducer(topic);
producer.send(session.createMessage());
producer.close();
session.close();
connection.close();
}
@Test
public void testSendTextToQueue() throws Exception {
PutJMS putJms = new PutJMS();
TestRunner putRunner = TestRunners.newTestRunner(putJms);
putRunner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
putRunner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
putRunner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
putRunner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
putRunner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(putRunner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
final Message message = jmsSession.createTextMessage("Hello World");
producer.send(message);
jmsSession.commit();
GetJMSQueue getJmsQueue = new GetJMSQueue();
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
runner.run();
List<MockFlowFile> flowFiles = runner
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
assertTrue(flowFiles.size() == 1);
MockFlowFile successFlowFile = flowFiles.get(0);
successFlowFile.assertContentEquals("Hello World");
successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing");
producer.close();
jmsSession.close();
}
private void scheduleRepeating(Connection connection) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test.queue");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("test msg");
long time = 1000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
producer.send(message);
producer.close();
}
private void produceMessages() throws Exception {
Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; ++i) {
producer.send(session.createTextMessage("Test Message: " + i));
}
producer.close();
}
@Test
public void singleSendProfile() throws Exception {
connection = createAmqpConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getDestinationName());
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = session.createTextMessage();
message.setText("hello");
producer.send(message);
producer.close();
}
private void testJmsConnection(final javax.jms.Connection connection) throws JMSException {
final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
final Topic topic = session.createTopic("test");
final MessageProducer producer = session.createProducer(topic);
producer.send(session.createMessage());
producer.close();
session.close();
connection.close();
}
@Test(timeout = 20000)
public void testCreditDrainedAfterSend() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setSendTimeout(500);
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
MessageProducer producer = session.createProducer(destination);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
// After the first send lets drain off the remaining credit from the sender
testPeer.expectTransferRespondWithDrain(messageMatcher, 1);
testPeer.expectLinkFlow(true, false, Matchers.equalTo(UnsignedInteger.ZERO));
testPeer.expectDetach(true, true, true);
testPeer.expectClose();
producer.send(session.createMessage());
// We don't have any credit now since we were drained, so the send should
// block until more credit is issued.
try {
producer.send(session.createMessage());
fail("Should have timed out waiting for credit to send.");
} catch (JmsSendTimedOutException jmsEx) {
LOG.info("Caught expected send timeout.");
}
producer.close();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
private Message doSearchRequest(final Session session, final Message message) throws JMSException {
final Destination replyQueue = acquireTempQueue(session);
if (log.isDebugEnabled()) {
log.debug("doSearchRequest replyQueue=" + replyQueue);
}
try {
final MessageConsumer responseConsumer = session.createConsumer(replyQueue);
message.setJMSReplyTo(replyQueue);
final String correlationId = createRandomString();
message.setJMSCorrelationID(correlationId);
final MessageProducer producer = session.createProducer(searchQueue_);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setTimeToLive(timeToLive_);
if (log.isDebugEnabled()) {
log.debug("Sending search request message with correlationId=" + correlationId);
}
producer.send(message);
producer.close();
Message returnedMessage = null;
final long start = System.currentTimeMillis();
while (true) {
final long diff = (start + receiveTimeout_) - System.currentTimeMillis();
if (diff <= 0) {
// timeout
log.info("Timeout in search. Remaining time zero or negative.");
break;
}
if (log.isDebugEnabled()) {
log.debug("doSearchRequest: call receive with timeout=" + diff);
}
returnedMessage = responseConsumer.receive(diff);
if (returnedMessage == null) {
// timeout case, we're stopping now with a reply...
log.info("Timeout in search. Repy was null.");
break;
} else if (!correlationId.equals(returnedMessage.getJMSCorrelationID())) {
// we got an old reply from a previous search request
log.info("Got a response with a wrong correlationId. Ignoring and waiting for the next");
continue;
} else {
// we got a valid reply
break;
}
}
responseConsumer.close();
if (log.isDebugEnabled()) {
log.debug("doSearchRequest: returnedMessage=" + returnedMessage);
}
return returnedMessage;
} finally {
releaseTempQueue(replyQueue);
}
}
public void run(String... args) throws Exception {
String[] split = args[1].split("@");
String host = args[0];
String vpnName = split[1];
String username = split[0];
String password = args[2];
System.out.printf("QueueProducer is connecting to Solace messaging at %s...%n", host);
// Programmatically create the connection factory using default settings
SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setVPN(vpnName);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// Enables persistent queues or topic endpoints to be created dynamically
// on the router, used when Session.createQueue() is called below
connectionFactory.setDynamicDurables(true);
// Create connection to the Solace router
Connection connection = connectionFactory.createConnection();
// Create a non-transacted, auto ACK session.
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.printf("Connected to the Solace Message VPN '%s' with client username '%s'.%n", vpnName,
username);
// Create the queue programmatically and the corresponding router resource
// will also be created dynamically because DynamicDurables is enabled.
Queue queue = session.createQueue(QUEUE_NAME);
// Create the message producer for the created queue
MessageProducer messageProducer = session.createProducer(queue);
// Create a text message.
TextMessage message = session.createTextMessage("Hello world Queues!");
System.out.printf("Sending message '%s' to queue '%s'...%n", message.getText(), queue.toString());
// Send the message
// NOTE: JMS Message Priority is not supported by the Solace Message Bus
messageProducer.send(queue, message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY,
Message.DEFAULT_TIME_TO_LIVE);
System.out.println("Sent successfully. Exiting...");
// Close everything in the order reversed from the opening order
// NOTE: as the interfaces below extend AutoCloseable,
// with them it's possible to use the "try-with-resources" Java statement
// see details at https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
messageProducer.close();
session.close();
connection.close();
}
public void test() throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(true);
broker.setPlugins(new BrokerPlugin[]{new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin()});
TransportConnector tcpConnector = broker.addConnector("tcp://localhost:0");
broker.addConnector("stomp://localhost:0");
broker.start();
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcpConnector.getConnectUri());
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination Queue
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a messages
Message sentMessage = session.createMessage();
// Tell the producer to send the message
long beforeSend = System.currentTimeMillis();
producer.send(sentMessage);
long afterSend = System.currentTimeMillis();
// assert message timestamp is in window
assertTrue(beforeSend <= sentMessage.getJMSTimestamp() && sentMessage.getJMSTimestamp() <= afterSend);
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message receivedMessage = consumer.receive(1000);
// assert we got the same message ID we sent
assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
// assert message timestamp is in window
assertTrue("JMS Message Timestamp should be set during the send method: \n" + " beforeSend = " + beforeSend + "\n" + " getJMSTimestamp = " + receivedMessage.getJMSTimestamp() + "\n" + " afterSend = " + afterSend + "\n", beforeSend <= receivedMessage.getJMSTimestamp() && receivedMessage.getJMSTimestamp() <= afterSend);
// assert message timestamp is unchanged
assertEquals("JMS Message Timestamp of received message should be the same as the sent message\n ", sentMessage.getJMSTimestamp(), receivedMessage.getJMSTimestamp());
// Clean up
producer.close();
consumer.close();
session.close();
connection.close();
}
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void testProducerCloseBeforeRollbackTransaction(String port,
String adminUsername,
String adminPassword,
String brokerHostname) throws NamingException, JMSException {
String queueName = "testPublisherCloseBeforeRollbackTransaction";
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withQueue(queueName)
.build();
ConnectionFactory connectionFactory
= (ConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
Connection connection = connectionFactory.createConnection();
connection.start();
// send 100 messages
Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = producerSession.createQueue(queueName);
MessageProducer producer = producerSession.createProducer(queue);
int numberOfMessages = 100;
for (int i = 0; i < numberOfMessages; i++) {
producer.send(producerSession.createTextMessage("Test message " + i));
}
// close publisher before rollback
producer.close();
// rollback all sent messages
producerSession.rollback();
// consume messages
Session subscriberSession = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination subscriberDestination = (Destination) initialContextForQueue.lookup(queueName);
MessageConsumer consumer = subscriberSession.createConsumer(subscriberDestination);
// none of messages should receive after publisher rollback
Message message = consumer.receive(1000);
Assert.assertNull(message, "Messages should not receive upon publisher rollback");
producerSession.close();
subscriberSession.close();
connection.close();
}
private Message doSearchRequest(final Session session, final Message message) throws JMSException {
final Destination replyQueue = acquireTempQueue(session);
if (log.isDebugEnabled()) {
log.debug("doSearchRequest replyQueue=" + replyQueue);
}
try {
final MessageConsumer responseConsumer = session.createConsumer(replyQueue);
message.setJMSReplyTo(replyQueue);
final String correlationId = createRandomString();
message.setJMSCorrelationID(correlationId);
final MessageProducer producer = session.createProducer(searchQueue_);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setTimeToLive(timeToLive_);
if (log.isDebugEnabled()) {
log.debug("Sending search request message with correlationId=" + correlationId);
}
producer.send(message);
producer.close();
Message returnedMessage = null;
final long start = System.currentTimeMillis();
while (true) {
final long diff = (start + receiveTimeout_) - System.currentTimeMillis();
if (diff <= 0) {
// timeout
log.info("Timeout in search. Remaining time zero or negative.");
break;
}
if (log.isDebugEnabled()) {
log.debug("doSearchRequest: call receive with timeout=" + diff);
}
returnedMessage = responseConsumer.receive(diff);
if (returnedMessage == null) {
// timeout case, we're stopping now with a reply...
log.info("Timeout in search. Reply was null.");
break;
} else if (!correlationId.equals(returnedMessage.getJMSCorrelationID())) {
// we got an old reply from a previous search request
log.info("Got a response with a wrong correlationId. Ignoring and waiting for the next");
continue;
} else {
// we got a valid reply
break;
}
}
responseConsumer.close();
if (log.isDebugEnabled()) {
log.debug("doSearchRequest: returnedMessage=" + returnedMessage);
}
return returnedMessage;
} finally {
releaseTempQueue(replyQueue);
}
}