下面列出了怎么用javax.jms.Connection的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Execute the given remote invocation, sending an invoker request message
* to this accessor's target queue and waiting for a corresponding response.
* @param invocation the RemoteInvocation to execute
* @return the RemoteInvocationResult object
* @throws JMSException in case of JMS failure
* @see #doExecuteRequest
*/
protected RemoteInvocationResult executeRequest(RemoteInvocation invocation) throws JMSException {
Connection con = createConnection();
Session session = null;
try {
session = createSession(con);
Queue queueToUse = resolveQueue(session);
Message requestMessage = createRequestMessage(session, invocation);
con.start();
Message responseMessage = doExecuteRequest(session, queueToUse, requestMessage);
if (responseMessage != null) {
return extractInvocationResult(responseMessage);
}
else {
return onReceiveTimeout(invocation);
}
}
finally {
JmsUtils.closeSession(session);
ConnectionFactoryUtils.releaseConnection(con, getConnectionFactory(), true);
}
}
/**
* test Object messages can be changed after sending with no side-affects
*
* @throws Exception
*/
@SuppressWarnings("rawtypes")
public void testDoChangeSentMessage() throws Exception {
Destination destination = createDestination("test-" + ChangeSentMessageTest.class.getName());
Connection connection = createConnection();
connection.start();
Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(destination);
Session publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = publisherSession.createProducer(destination);
HashMap<String, Integer> map = new HashMap<>();
ObjectMessage message = publisherSession.createObjectMessage();
for (int i = 0; i < COUNT; i++) {
map.put(VALUE_NAME, Integer.valueOf(i));
message.setObject(map);
producer.send(message);
assertTrue(message.getObject() == map);
}
for (int i = 0; i < COUNT; i++) {
ObjectMessage msg = (ObjectMessage) consumer.receive();
HashMap receivedMap = (HashMap) msg.getObject();
Integer intValue = (Integer) receivedMap.get(VALUE_NAME);
assertTrue(intValue.intValue() == i);
}
}
@Ignore("Used only when a new version of the store needs to archive it's test data.")
@Test
public void testCreateStore() throws Exception {
JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
File dir = new File("src/test/resources/org/apache/activemq/store/schedulerDB/legacy");
IOHelper.deleteFile(dir);
scheduler.setDirectory(dir);
scheduler.setJournalMaxFileLength(1024 * 1024);
broker = createBroker(scheduler);
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = cf.createConnection();
connection.start();
scheduleRepeating(connection);
connection.close();
broker.stop();
}
@Test
public void testSharedDurableConsumer() throws Exception {
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("myTopic")).addRoutingType(RoutingType.MULTICAST));
ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
Connection connection = cf.createConnection();
Session session = connection.createSession();
Connection connecton2 = cf.createConnection();
Session session2 = connecton2.createSession();
try {
Topic topic = session.createTopic("myTopic");
MessageConsumer messageConsumer = session.createSharedDurableConsumer(topic, "consumer1");
MessageConsumer messageConsumer2 = session2.createSharedDurableConsumer(topic, "consumer1");
connection.close();
} finally {
connection.close();
connecton2.close();
}
}
public void createSession(final CreateSessionCommand command)
{
try
{
final Connection connection = _testConnections.get(command.getConnectionName());
if (connection == null)
{
throw new DistributedTestException("No test connection found called: " + command.getConnectionName(),
command);
}
final boolean transacted = command.getAcknowledgeMode() == Session.SESSION_TRANSACTED;
final Session newSession = connection.createSession(transacted, command.getAcknowledgeMode());
LOGGER.debug("Created session {} with transacted = {} and acknowledgeMode = {}",
command.getSessionName(),
newSession.getTransacted(),
newSession.getAcknowledgeMode());
addSession(command.getSessionName(), newSession);
_testSessionToConnections.put(newSession, connection);
}
catch (final JMSException jmse)
{
throw new DistributedTestException("Unable to create new session: " + command, jmse);
}
}
@GET
@Produces({ MediaType.TEXT_PLAIN })
@Path("/send2imq")
public Response testSend1(@QueryParam("msg") String msg) throws IOException, JMSException {
Connection connection = imq_connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(imq_barQueue);
System.out.println("*************producer.getClass() = "+ producer.getClass());
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a message
TextMessage message = session.createTextMessage((msg!=null && !msg.isEmpty()) ? msg : "Hello World!");
// Tell the producer to send the message
producer.send(message);
return Response.ok().build();
}
@Test
public void testJMSTransaction() throws Exception {
CamelContext camelctx = new DefaultCamelContext(new JndiBeanRepository());
camelctx.addComponent("jms", jmsComponent);
camelctx.addRoutes(configureJmsRoutes());
camelctx.start();
PollingConsumer consumer = camelctx.getEndpoint("seda:success").createPollingConsumer();
consumer.start();
// Send a message to queue camel-jms-queue-one
Connection connection = connectionFactory.createConnection();
sendMessage(connection, JmsQueue.QUEUE_ONE.getJndiName(), "Hello Kermit");
// The JMS transaction should have been committed and the message payload sent to the direct:success endpoint
String result = consumer.receive(3000).getIn().getBody(String.class);
Assert.assertNotNull(result);
Assert.assertEquals("Hello Kermit", result);
connection.close();
camelctx.close();
}
@Test(timeout = 30000)
public void testCreateTemporaryQueueNotAuthorized() throws JMSException {
Connection connection = createConnection(guestUser, guestPass);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
session.createTemporaryQueue();
} catch (JMSSecurityException jmsse) {
instanceLog.debug("Client should have thrown a JMSSecurityException but only threw JMSException");
}
// Should not be fatal
assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
} finally {
connection.close();
}
}
@Test
public void testRemoveNotScheduled() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the Browse Destination and the Reply To location
Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
MessageProducer producer = session.createProducer(management);
try {
// Send the remove request
Message remove = session.createMessage();
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, new IdGenerator().generateId());
producer.send(remove);
} catch (Exception e) {
fail("Caught unexpected exception during remove of unscheduled message.");
}
}
@Test
public void testSimpleSendNoXAJMS1() throws Exception {
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
try (ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession();
ClientConsumer consVerify = session.createConsumer(MDBQUEUE);
Connection conn = qraConnectionFactory.createConnection();
) {
Session jmsSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.start();
MessageProducer producer = jmsSess.createProducer(q);
// These next 4 lines could be written in a single line however it makes difficult for debugging
TextMessage msgsend = jmsSess.createTextMessage("hello");
msgsend.setStringProperty("strvalue", "hello");
producer.send(msgsend);
ClientMessage msg = consVerify.receive(1000);
assertNotNull(msg);
assertEquals("hello", msg.getStringProperty("strvalue"));
}
}
@Test
public void testSetClientID() throws Exception {
Connection connection = createConnection();
final String clientID = "my-test-client-id";
connection.setClientID(clientID);
ProxyAssertSupport.assertEquals(clientID, connection.getClientID());
Connection connection2 = createConnection();
try {
connection2.setClientID(clientID);
Assert.fail("setClientID was expected to throw an exception");
} catch (JMSException e) {
// expected
}
connection.close();
connection2.setClientID(clientID);
}
private void jmsSendMessage(Serializable value, String destName, boolean isQueue) throws JMSException {
ConnectionFactory factory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
String jmsDest;
if (isQueue) {
jmsDest = QUEUE_QUALIFIED_PREFIX + destName;
} else {
jmsDest = TOPIC_QUALIFIED_PREFIX + destName;
}
Destination destination = ActiveMQDestination.fromPrefixedName(jmsDest);
Connection conn = factory.createConnection();
try {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
ObjectMessage message = session.createObjectMessage();
message.setStringProperty(HttpHeaderProperty.CONTENT_TYPE, "application/xml");
message.setObject(value);
producer.send(message);
} finally {
conn.close();
}
}
@Test
public void testScheduledStats() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(defaultQueueName));
producer.setDeliveryDelay(2000);
producer.send(session.createTextMessage("test"));
verifyPendingStats(defaultQueueName, 1, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 1, publishedMessageSize.get());
verifyScheduledStats(defaultQueueName, 1, publishedMessageSize.get());
consumeTestQueueMessages(1);
verifyPendingStats(defaultQueueName, 0, 0);
verifyPendingDurableStats(defaultQueueName, 0, 0);
verifyScheduledStats(defaultQueueName, 0, 0);
connection.close();
}
private void sendBytesMessageUsingCoreJms(String queueName, byte[] data) throws Exception {
Connection jmsConn = null;
try {
jmsConn = coreCf.createConnection();
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(data);
bytesMessage.writeBoolean(true);
bytesMessage.writeLong(99999L);
bytesMessage.writeChar('h');
bytesMessage.writeInt(987);
bytesMessage.writeShort((short) 1099);
bytesMessage.writeUTF("hellobytes");
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.send(bytesMessage);
} finally {
if (jmsConn != null) {
jmsConn.close();
}
}
}
protected String consume() throws Exception {
Connection con = null;
MessageConsumer c = consumer;
if (connectionPerMessage) {
con = factory.createConnection();
con.start();
Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
c = s.createConsumer(getConsumeDestination());
}
TextMessage result = (TextMessage) c.receive(timeout);
if (result != null) {
if (audit.isDuplicate(result.getJMSMessageID())) {
throw new JMSException("Received duplicate " + result.getText());
}
if (!audit.isInOrder(result.getJMSMessageID())) {
throw new JMSException("Out of order " + result.getText());
}
if (connectionPerMessage) {
Thread.sleep(SLEEP_TIME);//give the broker a chance
con.close();
}
}
return result != null ? result.getText() : null;
}
@Override
public void onMessage(Message message) {
final TextMessage incomingMessage = (TextMessage) message;
Connection connection = null;
Session session = null;
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = TraceeMessageWriter.wrap(session.createProducer(responses));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
final TextMessage responseMessage = session.createTextMessage(incomingMessage.getText());
producer.send(responseMessage);
} catch (JMSException e) {
throw new IllegalStateException(e);
} finally {
try {
if (session != null) session.close();
if (connection != null) connection.close();
} catch (JMSException ignored) {
}
}
}
@Test(timeout = 20000)
public void testSaslXOauth2Connection() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Expect a XOAUTH2 connection
String user = "user";
String pass = "eyB1c2VyPSJ1c2VyIiB9";
testPeer.expectSaslXOauth2(user, pass);
testPeer.expectOpen();
// Each connection creates a session for managing temporary destinations etc
testPeer.expectBegin();
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort());
Connection connection = factory.createConnection(user, pass);
// Set a clientID to provoke the actual AMQP connection process to occur.
connection.setClientID("clientName");
testPeer.waitForAllHandlersToComplete(1000);
assertNull(testPeer.getThrowable());
testPeer.expectClose();
connection.close();
}
}
@Test(timeout = 30000)
public void testDurableConsumerUnsubscribeWhileNoSubscription() throws Exception {
Connection connection = createConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getTotalConsumerCount() == 0;
}
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
try {
session.unsubscribe("DurbaleTopic");
fail("Should have thrown as subscription is in use.");
} catch (JMSException ex) {
}
} finally {
connection.close();
}
}
private void testStreamMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUM_MESSAGES; i++) {
StreamMessage message = session.createStreamMessage();
message.writeInt(i);
message.writeBoolean(true);
message.writeString("test");
producer.send(message);
}
Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) {
StreamMessage m = (StreamMessage) consumer.receive(5000);
Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
Assert.assertEquals(i, m.readInt());
Assert.assertEquals(true, m.readBoolean());
Assert.assertEquals("test", m.readString());
}
}
@Test
public void produceMessageAndAbortTransactionByClosingSession() throws Exception
{
final Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
Session transactedSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer transactedProducer = transactedSession.createProducer(queue);
transactedProducer.send(transactedSession.createTextMessage("A"));
transactedSession.close();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.send(session.createTextMessage("B"));
connection.start();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message message = messageConsumer.receive(getReceiveTimeout());
assertTrue("Text message should be received", message instanceof TextMessage);
assertEquals("Unexpected message received", "B", ((TextMessage) message).getText());
}
finally
{
connection.close();
}
}
private void testJmsConnection(final Connection connection) throws JMSException {
final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
final Topic topic = session.createTopic("test");
final MessageProducer producer = session.createProducer(topic);
producer.send(session.createMessage());
producer.close();
session.close();
connection.close();
}
public void produceMsg(String text) throws Exception
{
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a messages
TextMessage message = session.createTextMessage(text);
producer.send(message);
// Clean up
session.close();
connection.close();
}
public PollingMessageListenerContainer(Connection connection, Destination destination,
MessageListener listenerHandler, ExceptionListener exceptionListener) {
this.connection = connection;
this.destination = destination;
this.listenerHandler = listenerHandler;
this.exceptionListener = exceptionListener;
}
/**
* Create the mock objects for testing.
*/
@Before
public void setupMocks() throws Exception {
this.jndiContext = mock(Context.class);
this.connectionFactory = mock(ConnectionFactory.class);
this.connection = mock(Connection.class);
this.session = mock(Session.class);
this.queue = mock(Queue.class);
given(this.connectionFactory.createConnection()).willReturn(this.connection);
given(this.connection.createSession(useTransactedTemplate(), Session.AUTO_ACKNOWLEDGE)).willReturn(this.session);
given(this.session.getTransacted()).willReturn(useTransactedSession());
given(this.jndiContext.lookup("testDestination")).willReturn(this.queue);
}
protected void assertConsumersConnect(String brokerName,
Destination destination,
final int count,
long timeout) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
Connection conn = brokerItem.createConnection();
conn.start();
ConsumerEventSource ces = new ConsumerEventSource(conn, destination);
try {
final AtomicInteger actualConnected = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
ces.setConsumerListener(new ConsumerListener() {
@Override
public void onConsumerEvent(ConsumerEvent event) {
if (actualConnected.get() < count) {
actualConnected.set(event.getConsumerCount());
}
if (event.getConsumerCount() >= count) {
latch.countDown();
}
}
});
ces.start();
latch.await(timeout, TimeUnit.MILLISECONDS);
assertTrue("Expected at least " + count + " consumers to connect, but only " + actualConnected.get() + " connectect within " + timeout + " ms", actualConnected.get() >= count);
} finally {
ces.stop();
conn.close();
brokerItem.connections.remove(conn);
}
}
/**
* Create a connection
*
* @return The connection
* @throws JMSException Thrown if the operation fails
*/
@Override
public Connection createConnection() throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createConnection()");
}
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.CONNECTION);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("Created connection: " + s);
}
return s;
}
/**
* @throws JMSException
*/
public Message doSend(boolean fail) throws JMSException {
Connection adminConnection = factory.createConnection("system", "manager");
connections.add(adminConnection);
adminConnection.start();
Session adminSession = adminConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = adminSession.createConsumer(destination);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
sendMessages(session, destination, 1);
} catch (JMSException e) {
// If test is expected to fail, the cause must only be a
// SecurityException
// otherwise rethrow the exception
if (!fail || !(e.getCause() instanceof SecurityException)) {
throw e;
}
}
Message m = consumer.receive(1000);
if (fail) {
assertNull(m);
} else {
assertNotNull(m);
assertEquals("0", ((TextMessage) m).getText());
assertNull(consumer.receiveNoWait());
}
return m;
}
public void internalTestDurableSubscriber(final boolean largeMessage, final int batchSize) throws Exception {
JMSBridgeImpl bridge = null;
try {
final int NUM_MESSAGES = 10;
bridge = new JMSBridgeImpl(cff0, cff1, sourceTopicFactory, targetQueueFactory, null, null, null, null, null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE, batchSize, -1, "subTest", "clientid123", false).setBridgeName("test-bridge");
bridge.start();
sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, true, largeMessage);
checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, largeMessage);
Assert.assertEquals(0L, bridge.getAbortedMessageCount());
Assert.assertEquals("We didn't get the correct number processed messages", NUM_MESSAGES, bridge.getMessageCount());
} finally {
if (bridge != null) {
bridge.stop();
}
// Now unsubscribe
Connection conn = cf0.createConnection();
conn.setClientID("clientid123");
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
sess.unsubscribe("subTest");
conn.close();
}
}
public static void main(final String[] args) throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?incomingInterceptorList=" + SimpleInterceptor.class.getName());
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("exampleQueue");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("This is a text message");
System.out.println("Sending message [" + message.getText() +
"] with String property: " +
message.getStringProperty("newproperty"));
producer.send(message);
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
System.out.println("Received message [" + messageReceived.getText() +
"] with String property: " +
messageReceived.getStringProperty("newproperty"));
if (messageReceived.getStringProperty("newproperty") == null) {
throw new IllegalStateException("Check your configuration as the example interceptor wasn't actually called!");
}
}
}
public void doTestSendLargeMessage(int expectedSize) throws Exception{
LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
byte[] payload = createLargePayload(expectedSize);
assertEquals(expectedSize, payload.length);
Connection connection = createAmqpConnection();
long startTime = System.currentTimeMillis();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.getMethodName());
MessageProducer producer = session.createProducer(queue);
BytesMessage message = session.createBytesMessage();
message.writeBytes(payload);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Set this to non-default to get a Header in the encoded message.
producer.setPriority(4);
producer.send(message);
long endTime = System.currentTimeMillis();
LOG.info("Returned from send after {} ms", endTime - startTime);
startTime = System.currentTimeMillis();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
LOG.info("Calling receive");
Message received = consumer.receive();
assertNotNull(received);
assertTrue(received instanceof BytesMessage);
BytesMessage bytesMessage = (BytesMessage) received;
assertNotNull(bytesMessage);
endTime = System.currentTimeMillis();
LOG.info("Returned from receive after {} ms", endTime - startTime);
byte[] bytesReceived = new byte[expectedSize];
assertEquals(expectedSize, bytesMessage.readBytes(bytesReceived, expectedSize));
assertTrue(Arrays.equals(payload, bytesReceived));
connection.close();
}