下面列出了javax.jms.BytesMessage#getBodyLength ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Convert a BytesMessage to a Java Object with the specified type.
* @param message the input message
* @param targetJavaType the target type
* @return the message converted to an object
* @throws JMSException if thrown by JMS
* @throws IOException in case of I/O errors
*/
protected Object convertFromBytesMessage(BytesMessage message, JavaType targetJavaType)
throws JMSException, IOException {
String encoding = this.encoding;
if (this.encodingPropertyName != null && message.propertyExists(this.encodingPropertyName)) {
encoding = message.getStringProperty(this.encodingPropertyName);
}
byte[] bytes = new byte[(int) message.getBodyLength()];
message.readBytes(bytes);
if (encoding != null) {
try {
String body = new String(bytes, encoding);
return this.objectMapper.readValue(body, targetJavaType);
}
catch (UnsupportedEncodingException ex) {
throw new MessageConversionException("Cannot convert bytes to String", ex);
}
}
else {
// Jackson internally performs encoding detection, falling back to UTF-8.
return this.objectMapper.readValue(bytes, targetJavaType);
}
}
/**
* Convert a BytesMessage to a Java Object with the specified type.
* @param message the input message
* @param targetJavaType the target type
* @return the message converted to an object
* @throws JMSException if thrown by JMS
* @throws IOException in case of I/O errors
*/
protected Object convertFromBytesMessage(BytesMessage message, JavaType targetJavaType)
throws JMSException, IOException {
String encoding = this.encoding;
if (this.encodingPropertyName != null && message.propertyExists(this.encodingPropertyName)) {
encoding = message.getStringProperty(this.encodingPropertyName);
}
byte[] bytes = new byte[(int) message.getBodyLength()];
message.readBytes(bytes);
try {
String body = new String(bytes, encoding);
return this.objectMapper.readValue(body, targetJavaType);
}
catch (UnsupportedEncodingException ex) {
throw new MessageConversionException("Cannot convert bytes to String", ex);
}
}
public Object fromMessage(Message message) throws JMSException {
if (message instanceof TextMessage) {
return ((TextMessage)message).getText();
} else if (message instanceof BytesMessage) {
BytesMessage message1 = (BytesMessage)message;
byte[] bytes = new byte[(int)message1.getBodyLength()];
message1.readBytes(bytes);
return bytes;
} else if (message instanceof ObjectMessage) {
return ((ObjectMessage)message).getObject();
} else if (message instanceof StreamMessage) {
StreamMessage streamMessage = (StreamMessage)message;
return streamMessage.readObject();
} else {
return new byte[]{};
}
}
private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage message,
final ExternalMessageBuilder builder) throws JMSException {
if (message instanceof TextMessage) {
final String payload = ((TextMessage) message).getText();
builder.withText(payload);
} else if (message instanceof BytesMessage) {
final BytesMessage bytesMessage = (BytesMessage) message;
final long bodyLength = bytesMessage.getBodyLength();
if (bodyLength >= Integer.MIN_VALUE && bodyLength <= Integer.MAX_VALUE) {
final int length = (int) bodyLength;
final ByteBuffer byteBuffer = ByteBuffer.allocate(length);
bytesMessage.readBytes(byteBuffer.array());
builder.withBytes(byteBuffer);
} else {
throw new IllegalArgumentException("Message too large...");
}
} else {
if (log.isDebugEnabled()) {
final Destination destination = message.getJMSDestination();
final Map<String, String> headersMapFromJmsMessage = extractHeadersMapFromJmsMessage(message);
log.debug("Received message at '{}' of unsupported type ({}) with headers: {}",
destination, message.getClass().getName(), headersMapFromJmsMessage);
}
}
return builder;
}
private List<String> getQueue() throws Exception {
List<String> rows = new ArrayList<>();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(DESTINATION_NAME);
MessageConsumer consumer = session.createConsumer(destination);
Message temp;
while((temp = consumer.receive(100)) != null) {
if(temp instanceof BytesMessage) {
BytesMessage message = (BytesMessage) temp;
byte[] payload = new byte[(int) message.getBodyLength()];
message.readBytes(payload);
rows.add(new String(payload) + RECORD_SEPERATOR);
} else if(temp instanceof TextMessage) {
rows.add(((TextMessage) temp).getText());
} else {
throw new Exception("Unexpected message type");
}
}
return rows;
}
/**
* Convert a BytesMessage to a Java Object with the specified type.
* @param message the input message
* @param targetJavaType the target type
* @return the message converted to an object
* @throws JMSException if thrown by JMS
* @throws IOException in case of I/O errors
*/
protected Object convertFromBytesMessage(BytesMessage message, JavaType targetJavaType)
throws JMSException, IOException {
String encoding = this.encoding;
if (this.encodingPropertyName != null && message.propertyExists(this.encodingPropertyName)) {
encoding = message.getStringProperty(this.encodingPropertyName);
}
byte[] bytes = new byte[(int) message.getBodyLength()];
message.readBytes(bytes);
try {
String body = new String(bytes, encoding);
return this.objectMapper.readValue(body, targetJavaType);
}
catch (UnsupportedEncodingException ex) {
throw new MessageConversionException("Cannot convert bytes to String", ex);
}
}
/**
* Method implemented from MessageListener and is called
* each time this is done with the previous message
*
* @param message ActiveMQ message object containing a string message.
*/
public void onMessage(Message message) {
String messageText = null;
try {
message.acknowledge();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
messageText = textMessage.getText();
LOGGER.info("A new slug has arrived: " + messageText);
this.launcher.kubernetesProcess(messageText);
} else if (message instanceof BytesMessage) {
BytesMessage bytesMessage = (BytesMessage) message;
byte[] data = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(data);
messageText = new String(data);
LOGGER.info("A new slug has arrived: " + messageText);
this.launcher.kubernetesProcess(messageText);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void run(SourceContext<OUT> ctx) throws Exception {
while (runningChecker.isRunning()) {
exceptionListener.checkErroneous();
Message message = consumer.receive(1000);
if (! (message instanceof BytesMessage)) {
LOG.warn("Active MQ source received non bytes message: {}", message);
continue;
}
BytesMessage bytesMessage = (BytesMessage) message;
byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(bytes);
OUT value = deserializationSchema.deserialize(bytes);
synchronized (ctx.getCheckpointLock()) {
if (!autoAck && addId(bytesMessage.getJMSMessageID())) {
ctx.collect(value);
unacknowledgedMessages.put(bytesMessage.getJMSMessageID(), bytesMessage);
} else {
ctx.collect(value);
}
}
}
}
private static byte[] getMessageBytes(BytesMessage message) throws JMSException {
final long byteCount = message.getBodyLength();
if (byteCount > Integer.MAX_VALUE) {
throw new JMSException("Incoming message cannot be written to a FlowFile because its size is "
+ byteCount
+ " bytes, and the maximum size that this processor can handle is "
+ Integer.MAX_VALUE);
}
byte[] bytes = new byte[(int) byteCount];
message.readBytes(bytes);
return bytes;
}
private void validateReceivedMessageContent(byte[] sentBytes,
BytesMessage msg, boolean random, int messageSize) throws JMSException
{
Long length = msg.getBodyLength();
if(length != messageSize)
{
throw new RuntimeException("Incorrect number of bytes received");
}
byte[] recievedBytes = new byte[length.intValue()];
msg.readBytes(recievedBytes);
if(random)
{
if(!Arrays.equals(sentBytes, recievedBytes))
{
throw new RuntimeException("Incorrect value of bytes received");
}
}
else
{
for(int r = 0 ; r < messageSize ; r++)
{
if(! (recievedBytes[r] == (byte) (48 + (r % 10))))
{
throw new RuntimeException("Incorrect value of bytes received");
}
}
}
}
private void checkBookInResponse(Session session, Destination replyToDestination,
long bookId, String bookName) throws Exception {
MessageConsumer consumer = session.createConsumer(replyToDestination);
BytesMessage jmsMessage = (BytesMessage)consumer.receive(5000);
if (jmsMessage == null) {
throw new RuntimeException("No response recieved on " + replyToDestination);
}
byte[] bytes = new byte[(int)jmsMessage.getBodyLength()];
jmsMessage.readBytes(bytes);
InputStream is = new ByteArrayInputStream(bytes);
Book b = readBook(is);
assertEquals(bookId, b.getId());
assertEquals(bookName, b.getName());
}
@Override
public String mapMessage(Message message) throws Exception {
BytesMessage bytesMessage = (BytesMessage) message;
byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
return new String(bytes, StandardCharsets.UTF_8);
}
@Test
public void testPutBytesQueue() throws Exception {
String cmdLine = getConnectCommand() + "-" + CMD_PUT + " \"test\" -" + CMD_TYPE + " " + TYPE_BYTES + " TEST.QUEUE";
System.out.println("Testing cmd: " + cmdLine);
a.run(cmdLine.split(" "));
MessageConsumer mc = session.createConsumer(testQueue);
BytesMessage msg = (BytesMessage)mc.receive(TEST_TIMEOUT);
byte[] bytes = new byte[(int) msg.getBodyLength()];
msg.readBytes(bytes);
assertEquals("test",new String(bytes, StandardCharsets.UTF_8));
}
public MessageDump toDumpMessage(Message msg) throws JMSException{
MessageDump dump = new MessageDump();
dump.JMSCorrelationID = msg.getJMSCorrelationID();
dump.JMSMessageID = msg.getJMSMessageID();
dump.JMSType = msg.getJMSType();
dump.JMSDeliveryMode = msg.getJMSDeliveryMode();
dump.JMSExpiration = msg.getJMSExpiration();
dump.JMSRedelivered = msg.getJMSRedelivered();
dump.JMSTimestamp = msg.getJMSTimestamp();
dump.JMSPriority = msg.getJMSPriority();
@SuppressWarnings("rawtypes")
Enumeration propertyNames = msg.getPropertyNames();
while(propertyNames.hasMoreElements()){
String property = (String) propertyNames.nextElement();
Object propertyValue = msg.getObjectProperty(property);
if( propertyValue instanceof String){
dump.stringProperties.put(property, (String)propertyValue);
} else if ( propertyValue instanceof Integer ){
dump.intProperties.put(property, (Integer)propertyValue);
} else if ( propertyValue instanceof Long) {
dump.longProperties.put(property, (Long)propertyValue);
} else if( propertyValue instanceof Double) {
dump.doubleProperties.put(property, (Double) propertyValue);
} else if (propertyValue instanceof Short) {
dump.shortProperties.put(property, (Short)propertyValue);
} else if (propertyValue instanceof Float) {
dump.floatProperties.put(property, (Float) propertyValue);
} else if (propertyValue instanceof Byte) {
dump.byteProperties.put(property, (Byte)propertyValue);
} else if (propertyValue instanceof Boolean) {
dump.boolProperties.put(property, (Boolean)propertyValue);
} else if (propertyValue instanceof Serializable){
// Object property.. if it's on Classpath and Serializable
byte[] propBytes = SerializationUtils.serialize((Serializable) propertyValue);
dump.objectProperties.put(property, Base64.encodeBase64String(propBytes));
} else {
// Corner case.
throw new IllegalArgumentException("Property of key '"+ property +"' is not serializable. Type is: " + propertyValue.getClass().getCanonicalName());
}
}
dump.body = "";
dump.type = "";
if (msg instanceof TextMessage) {
dump.body = ((TextMessage)msg).getText();
dump.type = "TextMessage";
} else if (msg instanceof BytesMessage) {
BytesMessage bm = (BytesMessage)msg;
byte[] bytes = new byte[(int) bm.getBodyLength()];
bm.readBytes(bytes);
dump.body = Base64.encodeBase64String(bytes);
dump.type = "BytesMessage";
} else if (msg instanceof ObjectMessage) {
ObjectMessage om = (ObjectMessage)msg;
byte[] objectBytes = SerializationUtils.serialize(om.getObject());
dump.body = Base64.encodeBase64String(objectBytes);
dump.type = "ObjectMessage";
}
return dump;
}
@Test
public void testCompression() throws Exception {
Connection cconnection = null;
Connection connection = null;
try {
ActiveMQConnectionFactory cfactory = new ActiveMQConnectionFactory("tcp://" + OWHOST + ":" + OWPORT + "");
cconnection = cfactory.createConnection();
cconnection.start();
Session csession = cconnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue cQueue = csession.createQueue(queueName);
MessageConsumer consumer = csession.createConsumer(cQueue);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + OWHOST + ":" + OWPORT + "?jms.useCompression=true");
connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//text
TextMessage textMessage = session.createTextMessage();
textMessage.setText(testString);
TextMessage receivedMessage = sendAndReceive(textMessage, producer, consumer);
String receivedText = receivedMessage.getText();
assertEquals(testString, receivedText);
//MapMessage
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString(testProp, propValue);
MapMessage receivedMapMessage = sendAndReceive(mapMessage, producer, consumer);
String value = receivedMapMessage.getString(testProp);
assertEquals(propValue, value);
//Object
ObjectMessage objMessage = session.createObjectMessage();
objMessage.setObject(testString);
ObjectMessage receivedObjMessage = sendAndReceive(objMessage, producer, consumer);
String receivedObj = (String) receivedObjMessage.getObject();
assertEquals(testString, receivedObj);
//Stream
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString(testString);
StreamMessage receivedStreamMessage = sendAndReceive(streamMessage, producer, consumer);
String streamValue = receivedStreamMessage.readString();
assertEquals(testString, streamValue);
//byte
BytesMessage byteMessage = session.createBytesMessage();
byte[] bytes = testString.getBytes();
byteMessage.writeBytes(bytes);
BytesMessage receivedByteMessage = sendAndReceive(byteMessage, producer, consumer);
long receivedBodylength = receivedByteMessage.getBodyLength();
assertEquals("bodylength Correct", bytes.length, receivedBodylength);
byte[] receivedBytes = new byte[(int) receivedBodylength];
receivedByteMessage.readBytes(receivedBytes);
String receivedString = new String(receivedBytes);
assertEquals(testString, receivedString);
//Message
Message m = session.createMessage();
sendAndReceive(m, producer, consumer);
} finally {
if (cconnection != null) {
connection.close();
}
if (connection != null) {
cconnection.close();
}
}
}
private void testBytesMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
long time = System.currentTimeMillis();
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
byte[] bytes = new byte[0xf + 1];
for (int i = 0; i <= 0xf; i++) {
bytes[i] = (byte) i;
}
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUM_MESSAGES; i++) {
instanceLog.debug("Sending " + i);
BytesMessage message = session.createBytesMessage();
message.writeBytes(bytes);
message.setIntProperty("count", i);
producer.send(message);
}
Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) {
BytesMessage m = (BytesMessage) consumer.receive(5000);
Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
m.reset();
long size = m.getBodyLength();
byte[] bytesReceived = new byte[(int) size];
m.readBytes(bytesReceived);
instanceLog.debug("Received " + ByteUtil.bytesToHex(bytesReceived, 1) + " count - " + m.getIntProperty("count"));
Assert.assertArrayEquals(bytes, bytesReceived);
}
long taken = (System.currentTimeMillis() - time) / 1000;
instanceLog.debug("taken = " + taken);
}
private byte[] readMessage() throws Exception {
BytesMessage message = activemq.peekBytesMessage(sender.lazyInit.queue);
byte[] result = new byte[(int) message.getBodyLength()];
message.readBytes(result);
return result;
}
/**
* Unmarshal the given {@link BytesMessage} into an object.
* @param message the message
* @param unmarshaller the unmarshaller to use
* @return the unmarshalled object
* @throws JMSException if thrown by JMS methods
* @throws IOException in case of I/O errors
* @throws XmlMappingException in case of OXM mapping errors
* @see Unmarshaller#unmarshal(Source)
*/
protected Object unmarshalFromBytesMessage(BytesMessage message, Unmarshaller unmarshaller)
throws JMSException, IOException, XmlMappingException {
byte[] bytes = new byte[(int) message.getBodyLength()];
message.readBytes(bytes);
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
StreamSource source = new StreamSource(bis);
return unmarshaller.unmarshal(source);
}
/**
* Extract a byte array from the given {@link BytesMessage}.
* @param message the message to convert
* @return the resulting byte array
* @throws JMSException if thrown by JMS methods
*/
protected byte[] extractByteArrayFromMessage(BytesMessage message) throws JMSException {
byte[] bytes = new byte[(int) message.getBodyLength()];
message.readBytes(bytes);
return bytes;
}
/**
* Extract a byte array from the given {@link BytesMessage}.
* @param message the message to convert
* @return the resulting byte array
* @throws JMSException if thrown by JMS methods
*/
protected byte[] extractByteArrayFromMessage(BytesMessage message) throws JMSException {
byte[] bytes = new byte[(int) message.getBodyLength()];
message.readBytes(bytes);
return bytes;
}