下面列出了javax.jms.MapMessage#setObject ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected Message createMessageFromInput(final String data, String type, String encoding)
throws JMSException, UnsupportedEncodingException, IOException, JsonParseException, JsonMappingException {
Message outMsg = null;
if( type.equals(TYPE_TEXT)) {
outMsg = sess.createTextMessage(data);
} else if ( type.equals(TYPE_BYTES)) {
BytesMessage bytesMsg = sess.createBytesMessage();
bytesMsg.writeBytes(data.getBytes(encoding));
outMsg = bytesMsg;
} else if( type.equals(TYPE_MAP)) {
MapMessage mapMsg = sess.createMapMessage();
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> msg = mapper.readValue(data, new TypeReference<Map<String, Object>>() { });
for (String key : msg.keySet()) {
mapMsg.setObject(key, msg.get(key));
}
outMsg = mapMsg;
} else {
throw new IllegalArgumentException(CMD_TYPE + ": " + type);
}
return outMsg;
}
private void updateEntityUsingAmqpManagement(final String name,
final String type,
Map<String, Object> attributes,
Destination replyToDestination,
final Session session)
throws JMSException
{
MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
MapMessage createMessage = session.createMapMessage();
createMessage.setStringProperty("type", type);
createMessage.setStringProperty("operation", "UPDATE");
createMessage.setStringProperty("index", "object-path");
createMessage.setStringProperty("key", name);
createMessage.setJMSReplyTo(replyToDestination);
for (Map.Entry<String, Object> entry : attributes.entrySet())
{
createMessage.setObject(entry.getKey(), entry.getValue());
}
producer.send(createMessage);
if (session.getTransacted())
{
session.commit();
}
producer.close();
}
private Map<String, Object> sendCompressibleMapMessage() throws Exception
{
final Map<String, Object> mapToSend = createCompressibleMapMessage();
Connection senderConnection = getConnection(true);
try
{
Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(TEST_QUEUE));
MapMessage sentMessage = session.createMapMessage();
for(Map.Entry<String,Object> entry: mapToSend.entrySet())
{
String key = entry.getKey();
Object value = entry.getValue();
sentMessage.setObject(key, value);
}
producer.send(sentMessage);
}
finally
{
senderConnection.close();
}
return mapToSend;
}
protected Message createMessageFromFile(final String data, String type, String encoding)
throws IOException, JMSException, UnsupportedEncodingException, JsonParseException, JsonMappingException {
Message outMsg = null;
// Load file.
byte[] bytes = FileUtils.readFileToByteArray(new File(data
.substring(1)));
if (type.equals(TYPE_TEXT)) {
outMsg = sess.createTextMessage(new String(bytes, encoding));
} else if(type.equals(TYPE_BYTES)) {
BytesMessage bytesMsg = sess.createBytesMessage();
bytesMsg.writeBytes(bytes);
outMsg = bytesMsg;
} else if(type.equals(TYPE_MAP)) {
MapMessage mapMsg = sess.createMapMessage();
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> msg = mapper.readValue(bytes, new TypeReference<Map<String, Object>>() { });
for (String key : msg.keySet()) {
mapMsg.setObject(key, msg.get(key));
}
outMsg = mapMsg;
} else {
throw new IllegalArgumentException(CMD_TYPE + ": " + type);
}
return outMsg;
}
private void sendCompressedMapMessageUsingOpenWire() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setBoolean("boolean-type", true);
mapMessage.setByte("byte-type", (byte) 10);
mapMessage.setBytes("bytes-type", TEXT.getBytes());
mapMessage.setChar("char-type", 'A');
mapMessage.setDouble("double-type", 55.3D);
mapMessage.setFloat("float-type", 79.1F);
mapMessage.setInt("int-type", 37);
mapMessage.setLong("long-type", 56652L);
mapMessage.setObject("object-type", new String("VVVV"));
mapMessage.setShort("short-type", (short) 333);
mapMessage.setString("string-type", TEXT);
producer.send(mapMessage);
}
private void sendTestMapMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setBoolean("boolean-type", true);
mapMessage.setByte("byte-type", (byte) 10);
mapMessage.setBytes("bytes-type", TEXT.getBytes());
mapMessage.setChar("char-type", 'A');
mapMessage.setDouble("double-type", 55.3D);
mapMessage.setFloat("float-type", 79.1F);
mapMessage.setInt("int-type", 37);
mapMessage.setLong("long-type", 56652L);
mapMessage.setObject("object-type", new String("VVVV"));
mapMessage.setShort("short-type", (short) 333);
mapMessage.setString("string-type", TEXT);
producer.send(mapMessage);
connection.close();
}
@Override
protected void prepareMessage(final Message m) throws JMSException {
super.prepareMessage(m);
MapMessage mm = (MapMessage) m;
mm.setBoolean("boolean", true);
mm.setByte("byte", (byte) 3);
mm.setBytes("bytes", new byte[]{(byte) 3, (byte) 4, (byte) 5});
mm.setChar("char", (char) 6);
mm.setDouble("double", 7.0);
mm.setFloat("float", 8.0f);
mm.setInt("int", 9);
mm.setLong("long", 10L);
mm.setObject("object", new String("this is an object"));
mm.setShort("short", (short) 11);
mm.setString("string", "this is a string");
}
public MapMessage createMessage(Map<String, Object> body, Map<String, Object> properties) {
MapMessage message = this.createMapMessage();
if (body != null) {
for (Map.Entry<String, Object> entry : body.entrySet()) {
try {
message.setObject(entry.getKey(), entry.getValue());
} catch (JMSException jmsEx) {
throw new EmbeddedJMSResourceException(String.format("Failed to set body entry {%s = %s} on MapMessage", entry.getKey(), entry.getValue().toString()), jmsEx);
}
}
}
setMessageProperties(message, properties);
return message;
}
/**
* Create a JMS MapMessage for the given Map.
* @param map the Map to convert
* @param session current JMS session
* @return the resulting message
* @throws JMSException if thrown by JMS methods
* @see javax.jms.Session#createMapMessage
*/
protected MapMessage createMessageForMap(Map<?, ?> map, Session session) throws JMSException {
MapMessage message = session.createMapMessage();
for (Map.Entry<?, ?> entry : map.entrySet()) {
Object key = entry.getKey();
if (!(key instanceof String)) {
throw new MessageConversionException("Cannot convert non-String key of type [" +
ObjectUtils.nullSafeClassName(key) + "] to JMS MapMessage entry");
}
message.setObject((String) key, entry.getValue());
}
return message;
}
@Test
public void testMapMessage() throws Exception {
final String destinationName = "testObjectMessage";
MessageCreator messageCreator = session -> {
MapMessage message = session.createMapMessage();
message.setBoolean("boolean", true);
message.setByte("byte", Integer.valueOf(1).byteValue());
message.setBytes("bytes", new byte[] {2, 3, 4});
message.setShort("short", (short)32);
message.setInt("int", 64);
message.setLong("long", 128L);
message.setFloat("float", 1.25F);
message.setDouble("double", 100.867);
message.setChar("char", 'c');
message.setString("string", "someString");
message.setObject("object", "stringAsObject");
return message;
};
String expectedJson = "{" +
"\"boolean\":true," +
"\"byte\":1," +
"\"bytes\":[2, 3, 4]," +
"\"short\":32," +
"\"int\":64," +
"\"long\":128," +
"\"float\":1.25," +
"\"double\":100.867," +
"\"char\":\"c\"," +
"\"string\":\"someString\"," +
"\"object\":\"stringAsObject\"" +
"}";
testMapMessage(destinationName, messageCreator, expectedJson);
}
@Override
public JMSProducer send(Destination destination, Map<String, Object> body) {
try {
MapMessage message = session.createMapMessage();
for (Map.Entry<String, Object> entry : body.entrySet()) {
message.setObject(entry.getKey(), entry.getValue());
}
doSend(destination, message);
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
return this;
}
private void setMapValues(MapMessage message) throws JMSException
{
message.setBoolean("bool", true);
message.setByte("byte",Byte.MAX_VALUE);
message.setBytes("bytes", BYTES);
message.setChar("char",'c');
message.setDouble("double", Double.MAX_VALUE);
message.setFloat("float", Float.MAX_VALUE);
message.setFloat("smallfloat", SMALL_FLOAT);
message.setInt("int", Integer.MAX_VALUE);
message.setLong("long", Long.MAX_VALUE);
message.setShort("short", Short.MAX_VALUE);
message.setString("string-ascii", MESSAGE_ASCII);
message.setString("string-utf8", MESSAGE_NON_ASCII_UTF8);
// Test Setting Object Values
message.setObject("object-bool", true);
message.setObject("object-byte", Byte.MAX_VALUE);
message.setObject("object-bytes", BYTES);
message.setObject("object-char", 'c');
message.setObject("object-double", Double.MAX_VALUE);
message.setObject("object-float", Float.MAX_VALUE);
message.setObject("object-int", Integer.MAX_VALUE);
message.setObject("object-long", Long.MAX_VALUE);
message.setObject("object-short", Short.MAX_VALUE);
// Set a null String value
message.setString("nullString", null);
// Highlight protocol problem
message.setString("emptyString", "");
}
private void enableTransactionTimeout(final Map<String, Object> attrs) throws Exception
{
Connection connection = getConnection();
try
{
final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
final Queue queue;
if(getProtocol() == Protocol.AMQP_1_0)
{
queue = session.createQueue("$management");
}
else
{
queue = session.createQueue("ADDR:$management");
}
final MessageProducer _producer = session.createProducer(queue);
MapMessage message = session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.VirtualHost");
message.setStringProperty("operation", "UPDATE");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "");
for (final Map.Entry<String, Object> entry : attrs.entrySet())
{
message.setObject(entry.getKey(), entry.getValue());
}
_producer.send(message);
session.commit();
}
finally
{
connection.close();
}
}
@Override
void sendMapMessage(Map<String, ?> map, Map properties) throws JMSException
{
if (map == null)
return;
MapMessage message = session.createMapMessage();
for (Map.Entry<String, ?> entry : map.entrySet())
message.setObject(entry.getKey(), entry.getValue());
copyHeadersToProperties(properties, message);
sender.send(message, getDeliveryMode(), messagePriority, getTimeToLive(properties));
}
/**
* Create a JMS MapMessage for the given Map.
* @param map the Map to convert
* @param session current JMS session
* @return the resulting message
* @throws JMSException if thrown by JMS methods
* @see javax.jms.Session#createMapMessage
*/
protected MapMessage createMessageForMap(Map<?, ?> map, Session session) throws JMSException {
MapMessage message = session.createMapMessage();
for (Map.Entry<?, ?> entry : map.entrySet()) {
if (!(entry.getKey() instanceof String)) {
throw new MessageConversionException("Cannot convert non-String key of type [" +
ObjectUtils.nullSafeClassName(entry.getKey()) + "] to JMS MapMessage entry");
}
message.setObject((String) entry.getKey(), entry.getValue());
}
return message;
}
/**
* Create a JMS MapMessage for the given Map.
*
* @param map the Map to convert
* @return the resulting message
* @throws JMSException if thrown by JMS methods
*/
private Message createMessageForMap(Map<?,?> map) throws JMSException
{
MapMessage message = getSession().createMapMessage();
for (Map.Entry<?,?> entry: map.entrySet()) {
if (!(entry.getKey() instanceof String)) {
throw new RuntimeException("Cannot convert non-String key of type ["
+ entry.getKey().getClass() + "] to JMS MapMessage entry");
}
message.setObject((String)entry.getKey(), entry.getValue());
}
return message;
}
@Test
public void testResendWithLargeMessage() throws Exception {
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
ArrayList<Message> msgs = new ArrayList<>();
for (int i = 0; i < 10; i++) {
BytesMessage bm = sess.createBytesMessage();
bm.setObjectProperty(ActiveMQJMSConstants.JMS_ACTIVEMQ_INPUT_STREAM, ActiveMQTestBase.createFakeLargeStream(2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE));
msgs.add(bm);
MapMessage mm = sess.createMapMessage();
mm.setBoolean("boolean", true);
mm.setByte("byte", (byte) 3);
mm.setBytes("bytes", new byte[]{(byte) 3, (byte) 4, (byte) 5});
mm.setChar("char", (char) 6);
mm.setDouble("double", 7.0);
mm.setFloat("float", 8.0f);
mm.setInt("int", 9);
mm.setLong("long", 10L);
mm.setObject("object", new String("this is an object"));
mm.setShort("short", (short) 11);
mm.setString("string", "this is a string");
msgs.add(mm);
msgs.add(sess.createTextMessage("hello" + i));
msgs.add(sess.createObjectMessage(new SomeSerializable("hello" + i)));
}
internalTestResend(msgs, sess);
}
@Override
public JMSProducer send(final Destination destination, final Map<String, Object> body) {
final MapMessage message = wrap(context.createMapMessage());
if (body != null) {
try {
for (final Map.Entry<String, Object> entry : body.entrySet()) {
final String name = entry.getKey();
final Object v = entry.getValue();
if (v instanceof String) {
message.setString(name, (String) v);
} else if (v instanceof Long) {
message.setLong(name, (Long) v);
} else if (v instanceof Double) {
message.setDouble(name, (Double) v);
} else if (v instanceof Integer) {
message.setInt(name, (Integer) v);
} else if (v instanceof Character) {
message.setChar(name, (Character) v);
} else if (v instanceof Short) {
message.setShort(name, (Short) v);
} else if (v instanceof Boolean) {
message.setBoolean(name, (Boolean) v);
} else if (v instanceof Float) {
message.setFloat(name, (Float) v);
} else if (v instanceof Byte) {
message.setByte(name, (Byte) v);
} else if (v instanceof byte[]) {
byte[] array = (byte[]) v;
message.setBytes(name, array, 0, array.length);
} else {
message.setObject(name, v);
}
}
} catch (final JMSException e) {
throw new MessageFormatRuntimeException(e.getMessage());
}
}
send(destination, message);
return this;
}
@Override
public JMSProducer send(Destination destination, Map<String, Object> body) {
MapMessage message = context.createMapMessage();
if (body != null) {
try {
for (Entry<String, Object> entry : body.entrySet()) {
final String name = entry.getKey();
final Object v = entry.getValue();
if (v instanceof String) {
message.setString(name, (String) v);
} else if (v instanceof Long) {
message.setLong(name, (Long) v);
} else if (v instanceof Double) {
message.setDouble(name, (Double) v);
} else if (v instanceof Integer) {
message.setInt(name, (Integer) v);
} else if (v instanceof Character) {
message.setChar(name, (Character) v);
} else if (v instanceof Short) {
message.setShort(name, (Short) v);
} else if (v instanceof Boolean) {
message.setBoolean(name, (Boolean) v);
} else if (v instanceof Float) {
message.setFloat(name, (Float) v);
} else if (v instanceof Byte) {
message.setByte(name, (Byte) v);
} else if (v instanceof byte[]) {
byte[] array = (byte[]) v;
message.setBytes(name, array, 0, array.length);
} else {
message.setObject(name, v);
}
}
} catch (JMSException e) {
throw new MessageFormatRuntimeException(e.getMessage());
}
}
send(destination, message);
return this;
}
public Map<String, Object> createEntityAndAssertResponse(final String name,
final String type,
final Map<String, Object> attributes,
final Session session)
throws JMSException
{
Destination replyToDestination;
Destination replyConsumerDestination;
if (_protocol == Protocol.AMQP_1_0)
{
replyToDestination = session.createTemporaryQueue();
replyConsumerDestination = replyToDestination;
}
else
{
replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);
}
MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
MapMessage createMessage = session.createMapMessage();
createMessage.setStringProperty("type", type);
createMessage.setStringProperty("operation", "CREATE");
createMessage.setString("name", name);
createMessage.setString("object-path", name);
createMessage.setJMSReplyTo(replyToDestination);
for (Map.Entry<String, Object> entry : attributes.entrySet())
{
createMessage.setObject(entry.getKey(), entry.getValue());
}
producer.send(createMessage);
if (session.getTransacted())
{
session.commit();
}
producer.close();
return receiveManagementResponse(consumer, replyToDestination, 201);
}