下面列出了怎么用javax.jms.BytesMessage的API类实例代码及写法,或者点击链接到github查看源代码。
private void doSendBinaryMessage( final Session session, final Destination destination,
final byte[] bytes,
final Map<String, ?> properties ) throws JMSException {
try {
BytesMessage message = session.createBytesMessage();
message.writeBytes(bytes);
if (properties != null) {
// Note: Setting any properties (including JMS fields) using
// setObjectProperty might not be supported by all providers
// Tested with: ActiveMQ
for (final Entry<String, ?> property : properties.entrySet()) {
message.setObjectProperty(property.getKey(), property.getValue());
}
}
final MessageProducer producer = session.createProducer(destination);
producer.send(message);
} finally {
releaseSession(false);
}
}
/**
* Method to infer the JMS message type.
*
* @param msg the message to be inferred
* @return the type of the JMS message
*/
public static String inferJMSMessageType(Message msg) {
if (isTextMessage(msg)) {
return TextMessage.class.getName();
} else if (isBytesMessage(msg)) {
return BytesMessage.class.getName();
} else if (isObjectMessage(msg)) {
return ObjectMessage.class.getName();
} else if (isStreamMessage(msg)) {
return StreamMessage.class.getName();
} else if (isMapMessage(msg)) {
return MapMessage.class.getName();
} else {
return null;
}
}
private List<String> getQueue() throws Exception {
List<String> rows = new ArrayList<>();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(DESTINATION_NAME);
MessageConsumer consumer = session.createConsumer(destination);
Message temp;
while((temp = consumer.receive(100)) != null) {
if(temp instanceof BytesMessage) {
BytesMessage message = (BytesMessage) temp;
byte[] payload = new byte[(int) message.getBodyLength()];
message.readBytes(payload);
rows.add(new String(payload) + RECORD_SEPERATOR);
} else if(temp instanceof TextMessage) {
rows.add(((TextMessage) temp).getText());
} else {
throw new Exception("Unexpected message type");
}
}
return rows;
}
protected void sendMessagesToBroker(int count, AtomicInteger sequence) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(queue);
byte[] buffer = new byte[MESSAGE_SIZE];
for (count = 0; count < MESSAGE_SIZE; count++) {
String s = String.valueOf(count % 10);
Character c = s.charAt(0);
int value = c.charValue();
buffer[count] = (byte) value;
}
LOG.info("Sending {} messages to destination: {}", MESSAGE_COUNT, queue);
for (int i = 1; i <= MESSAGE_COUNT; i++) {
BytesMessage message = session.createBytesMessage();
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID);
message.setIntProperty("JMSXGroupSeq", sequence.incrementAndGet());
message.writeBytes(buffer);
producer.send(message);
}
producer.close();
}
/**
* Fetch a text or binary message from the JMS queue and use the correct
* deserializer for the received message.
*
* @return One new IMonitoringRecord
*
* @throws ConnectorDataTransmissionException
* if the message type is neither binary nor text, or if a
* JMSException occurs
* @throws ConnectorEndOfDataException
* if the received message is null indicating that the consumer is
* closed
*/
@Override
public IMonitoringRecord deserializeNextRecord()
throws ConnectorDataTransmissionException, ConnectorEndOfDataException {
final Message message;
try {
message = this.consumer.receive();
if (message != null) {
if (message instanceof BytesMessage) {
return this.deserialize((BytesMessage) message);
} else if (message instanceof TextMessage) {
return this.deserialize(((TextMessage) message).getText());
} else {
throw new ConnectorDataTransmissionException(
"Unsupported message type " + message.getClass().getCanonicalName());
}
} else {
throw new ConnectorEndOfDataException("No more records in the queue");
}
} catch (final JMSException e) {
throw new ConnectorDataTransmissionException(e.getMessage(), e);
}
}
/**
* This implementation converts a TextMessage back to a String, a
* ByteMessage back to a byte array, a MapMessage back to a Map,
* and an ObjectMessage back to a Serializable object. Returns
* the plain Message object in case of an unknown message type.
* @see #extractStringFromMessage
* @see #extractByteArrayFromMessage
* @see #extractMapFromMessage
* @see #extractSerializableFromMessage
*/
@Override
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
if (message instanceof TextMessage) {
return extractStringFromMessage((TextMessage) message);
}
else if (message instanceof BytesMessage) {
return extractByteArrayFromMessage((BytesMessage) message);
}
else if (message instanceof MapMessage) {
return extractMapFromMessage((MapMessage) message);
}
else if (message instanceof ObjectMessage) {
return extractSerializableFromMessage((ObjectMessage) message);
}
else {
return message;
}
}
public void testSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue();
MessageConsumer consumer = s.createConsumer(queue);
assertNotNull(consumer);
Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
}
@Test
public void testWithMessageContentsDelegateForBytesMessage() throws Exception {
BytesMessage bytesMessage = mock(BytesMessage.class);
// BytesMessage contents must be unwrapped...
given(bytesMessage.getBodyLength()).willReturn(new Long(TEXT.getBytes().length));
given(bytesMessage.readBytes(any(byte[].class))).willAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock invocation) throws Throwable {
byte[] bytes = (byte[]) invocation.getArguments()[0];
ByteArrayInputStream inputStream = new ByteArrayInputStream(TEXT.getBytes());
return inputStream.read(bytes);
}
});
MessageContentsDelegate delegate = mock(MessageContentsDelegate.class);
MessageListenerAdapter adapter = new MessageListenerAdapter(delegate);
adapter.onMessage(bytesMessage);
verify(delegate).handleMessage(TEXT.getBytes());
}
private void sendBytesMessages(int nMsgs, ConnectionFactory factory) throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue(testQueueName);
MessageProducer producer = session.createProducer(queue);
BytesMessage msg = session.createBytesMessage();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < PAYLOAD; ++i) {
builder.append("A");
}
msg.writeBytes(builder.toString().getBytes(StandardCharsets.UTF_8));
for (int i = 0; i < nMsgs; ++i) {
msg.setIntProperty("i", (Integer) i);
producer.send(msg);
}
}
}
@Test
public void testWithMessageContentsDelegateForBytesMessage() throws Exception {
BytesMessage bytesMessage = mock(BytesMessage.class);
// BytesMessage contents must be unwrapped...
given(bytesMessage.getBodyLength()).willReturn(new Long(TEXT.getBytes().length));
given(bytesMessage.readBytes(any(byte[].class))).willAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock invocation) throws Throwable {
byte[] bytes = (byte[]) invocation.getArguments()[0];
ByteArrayInputStream inputStream = new ByteArrayInputStream(TEXT.getBytes());
return inputStream.read(bytes);
}
});
MessageContentsDelegate delegate = mock(MessageContentsDelegate.class);
MessageListenerAdapter adapter = new MessageListenerAdapter(delegate);
adapter.onMessage(bytesMessage);
verify(delegate).handleMessage(TEXT.getBytes());
}
/**
* Method implemented from MessageListener and is called
* each time this is done with the previous message
*
* @param message ActiveMQ message object containing a string message.
*/
public void onMessage(Message message) {
String messageText = null;
try {
message.acknowledge();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
messageText = textMessage.getText();
LOGGER.info("A new slug has arrived: " + messageText);
this.launcher.kubernetesProcess(messageText);
} else if (message instanceof BytesMessage) {
BytesMessage bytesMessage = (BytesMessage) message;
byte[] data = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(data);
messageText = new String(data);
LOGGER.info("A new slug has arrived: " + messageText);
this.launcher.kubernetesProcess(messageText);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Ignore("Broker is not respecting max binary message size")
@Test(timeout = 30000)
public void testSendLargeMessageToClientFromAMQP() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(getConnectionURI());
JmsConnection connection = (JmsConnection) factory.createConnection();
sendLargeMessageViaAMQP();
try {
Session session = connection.createSession();
Queue queue = session.createQueue(getQueueName());
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
assertNotNull(message);
assertTrue(message instanceof BytesMessage);
} finally {
connection.close();
}
}
@Test
public void sendAndReceiveEmpty() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
BytesMessage message = session.createBytesMessage();
producer.send(message);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Message receivedMessage = consumer.receive(getReceiveTimeout());
assertTrue("BytesMessage should be received", receivedMessage instanceof BytesMessage);
assertEquals("Unexpected body length", 0, ((BytesMessage) receivedMessage).getBodyLength());
}
finally
{
connection.close();
}
}
public void xtestMessageDiscardedAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = s.createTopic(getClass().getName());
MessageConsumer consumer = s.createConsumer(topic);
assertNotNull(consumer);
Topic advisoryTopic = AdvisorySupport.getMessageDiscardedAdvisoryTopic((ActiveMQDestination) topic);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(topic);
int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2);
for (int i = 0; i < count; i++) {
BytesMessage m = s.createBytesMessage();
producer.send(m);
}
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
}
@Test
public void testByteArrayConversion() throws JMSException {
Session session = mock(Session.class);
BytesMessage message = mock(BytesMessage.class);
byte[] content = "test".getBytes();
final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
given(session.createBytesMessage()).willReturn(message);
given(message.getBodyLength()).willReturn((long) content.length);
given(message.readBytes(any(byte[].class))).willAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock invocation) throws Throwable {
return byteArrayInputStream.read((byte[]) invocation.getArguments()[0]);
}
});
SimpleMessageConverter converter = new SimpleMessageConverter();
Message msg = converter.toMessage(content, session);
assertEquals(content.length, ((byte[]) converter.fromMessage(msg)).length);
verify(message).writeBytes(content);
}
private void publishMessages(AtomicLong count) throws Exception {
JmsConnection connection = (JmsConnection) factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
while (count.getAndDecrement() > 0) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(payload);
producer.send(message);
if ((count.get() % 10000) == 0) {
LOG.info("Sent message: {}", NUM_SENDS - count.get());
}
}
producer.close();
connection.close();
}
@Test
public void testSendLargeMessage() throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(lmAddress.toString());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// Create 1MB Message
int size = 1024 * 1024;
byte[] bytes = new byte[size];
BytesMessage message = session.createBytesMessage();
message.writeBytes(bytes);
producer.send(message);
}
}
@Test
public void fromBytesMessage() throws Exception {
BytesMessage bytesMessageMock = mock(BytesMessage.class);
Map<String, String> unmarshalled = Collections.singletonMap("foo", "bar");
byte[] bytes = "{\"foo\":\"bar\"}".getBytes();
final ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
given(bytesMessageMock.getStringProperty("__typeid__")).willReturn(Object.class.getName());
given(bytesMessageMock.propertyExists("__encoding__")).willReturn(false);
given(bytesMessageMock.getBodyLength()).willReturn(new Long(bytes.length));
given(bytesMessageMock.readBytes(any(byte[].class))).willAnswer(
new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock invocation) throws Throwable {
return byteStream.read((byte[]) invocation.getArguments()[0]);
}
});
Object result = converter.fromMessage(bytesMessageMock);
assertEquals("Invalid result", result, unmarshalled);
}
@Override
public void sendMessage(final Message message) {
this.jmsTemplate.send(createDestination(message.getDestination()), new MessageCreator() {
@Override
public javax.jms.Message createMessage(Session session) throws JMSException {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(message.getBody());
if(!CollectionUtils.isEmpty(message.getProperties())){
if(message.getProperties().get("JMSXGroupID") != null){
bytesMessage.setStringProperty("JMSXGroupID", message.getProperties().get("JMSXGroupID").toString());
}
if(message.getProperties().get("JMSXGroupSeq") != null){
String JMSXGroupSeq = message.getProperties().get("JMSXGroupSeq").toString();
if(StringUtil.isNumeric(JMSXGroupSeq)){
bytesMessage.setIntProperty("JMSXGroupSeq", Integer.valueOf(JMSXGroupSeq));
}
}
}
return bytesMessage;
}
});
}
@Before
public void setUp() throws Exception {
System.setProperty(Context.INITIAL_CONTEXT_FACTORY, TestContextFactory.class.getName());
QueueConnectionFactory queueConnectionFactory = Mockito.mock(QueueConnectionFactory.class);
Queue queue = Mockito.mock(Queue.class);
Context context = Mockito.mock(Context.class);
TestContextFactory.context = context;
when(context.lookup(eq("jms/queueConnectionFactory"))).thenReturn(queueConnectionFactory);
when(context.lookup(eq("jms/mailQueue"))).thenReturn(queue);
queueSender = Mockito.mock(QueueSender.class);
QueueConnection queueConnection = Mockito.mock(QueueConnection.class);
when(queueConnectionFactory.createQueueConnection()).thenReturn(queueConnection);
when(queueConnectionFactory.createQueueConnection(anyString(), anyString())).thenReturn(queueConnection);
QueueSession queueSession = Mockito.mock(QueueSession.class);
bytesMessage = Mockito.mock(BytesMessage.class);
when(queueSession.createBytesMessage()).thenReturn(bytesMessage);
when(queueConnection.createQueueSession(anyBoolean(), anyInt())).thenReturn(queueSession);
when(queueSession.createSender(eq(queue))).thenReturn(queueSender);
transport = new SmtpJmsTransport(Session.getDefaultInstance(new Properties()), new URLName("jms"));
transportListener = Mockito.mock(TransportListener.class);
transport.addTransportListener(transportListener);
}
@Test
public void validateBytesConvertedToBytesMessageOnSend() throws Exception {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class));
publisher.publish(destinationName, "hellomq".getBytes());
Message receivedMessage = jmsTemplate.receive(destinationName);
assertTrue(receivedMessage instanceof BytesMessage);
byte[] bytes = new byte[7];
((BytesMessage) receivedMessage).readBytes(bytes);
assertEquals("hellomq", new String(bytes));
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
@Override
public void onMessage(Message msg) {
try {
BytesMessage ba = (BytesMessage) msg;
validMessageConsumption &= isSame(ba);
assertTrue(ba.getBodyLength() == LARGE_MESSAGE_SIZE);
if (messageCount.incrementAndGet() >= MESSAGE_COUNT) {
synchronized (messageCount) {
messageCount.notify();
}
}
LOG.info("got message = " + messageCount);
if (messageCount.get() % 50 == 0) {
LOG.info("count = " + messageCount);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void bytesMessage() throws Exception {
context = cf.createContext();
try {
JMSProducer producer = context.createProducer();
BytesMessage bMsg = context.createBytesMessage();
bMsg.setStringProperty("COM_SUN_JMS_TESTNAME", "sendAndRecvMsgOfEachTypeCLTest");
bMsg.writeByte((byte) 1);
bMsg.writeInt(22);
CountDownLatch latch = new CountDownLatch(1);
SimpleCompletionListener listener = new SimpleCompletionListener(latch);
producer.setAsync(listener);
producer.send(queue1, bMsg);
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(listener.message.readByte(), (byte) 1);
assertEquals(listener.message.readInt(), 22);
} finally {
context.close();
}
}
/**
* Method to send a BytesMessage.
*
* @param payload content of the BytesMessage to be sent
* @throws JMSException if an error occurs sending the BytesMessage
*/
public void sendBytesMessage(byte[] payload) throws JMSException {
checkIfConnected();
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(payload);
producer.send(bytesMessage);
}
@Test
public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
final String destinationName = "fooQueue";
PublishJMS pubProc = new PublishJMS();
TestRunner runner = TestRunners.newTestRunner(pubProc);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, destinationName);
Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "foo");
attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf);
BytesMessage message = (BytesMessage) jmst.receive(destinationName);
byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
assertEquals("Hey dude!", new String(messageBytes));
assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName());
assertEquals("foo", message.getStringProperty("foo"));
}
/**
* Read
*
* @return The value
* @throws JMSException Thrown if an error occurs
*/
@Override
public int readUnsignedByte() throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("readUnsignedByte()");
}
return ((BytesMessage) message).readUnsignedByte();
}
private void publish(int numberOfMessages, int messageSize, int numberOfProducers, int deliveryMode,
Destination destination, Map<Connection, List<Session>> connectionsAndSessions) throws JMSException
{
byte[] messageBytes = generateMessage(messageSize);
for (List<Session> sessions : connectionsAndSessions.values())
{
for (Session session: sessions)
{
BytesMessage message = session.createBytesMessage();
if (messageSize > 0)
{
message.writeBytes(messageBytes);
}
for(int i = 0; i < numberOfProducers ; i++)
{
MessageProducer prod = session.createProducer(destination);
for(int j = 0; j < numberOfMessages ; j++)
{
prod.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
if(session.getTransacted())
{
session.commit();
}
}
}
}
}
}
/**
* Read
*
* @param value The value
* @param length The length
* @return The result
* @throws JMSException Thrown if an error occurs
*/
@Override
public int readBytes(final byte[] value, final int length) throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("readBytes(" + Arrays.toString(value) + ", " + length + ")");
}
return ((BytesMessage) message).readBytes(value, length);
}
protected void checkAllMessageReceivedInOrder(final ConnectionFactory cf,
final Destination dest,
final int start,
final int numMessages,
final boolean largeMessage) throws Exception {
Connection conn = null;
try {
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = sess.createConsumer(dest);
// Consume the messages
for (int i = 0; i < numMessages; i++) {
Message tm = cons.receive(3000);
Assert.assertNotNull(tm);
if (largeMessage) {
BytesMessage bmsg = (BytesMessage) tm;
Assert.assertEquals("message" + (i + start), tm.getStringProperty("msg"));
byte[] buffRead = new byte[1024];
for (int j = 0; j < 1024; j++) {
Assert.assertEquals(1024, bmsg.readBytes(buffRead));
}
} else {
Assert.assertEquals("message" + (i + start), ((TextMessage) tm).getText());
}
}
} finally {
if (conn != null) {
conn.close();
}
}
}
@Override
public JMSProducer send(Destination destination, byte[] body) {
try {
BytesMessage message = session.createBytesMessage();
message.writeBytes(body);
doSend(destination, message);
} catch (JMSException jmse) {
throw JmsExceptionSupport.createRuntimeException(jmse);
}
return this;
}