下面列出了javax.jms.MapMessage#setLong ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void publishMessage(String msg) throws NamingException, JMSException {
String topicName = "throttleData";
InitialContext initialContext = ClientHelper.getInitialContextBuilder("admin", "admin",
"localhost", "5672")
.withTopic(topicName)
.build();
ConnectionFactory connectionFactory
= (ConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = (Topic) initialContext.lookup(topicName);
MessageProducer producer = session.createProducer(topic);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("throttleKey", msg);
Date date = new Date();
long time = date.getTime() + 1000;
mapMessage.setLong("expiryTimeStamp", time);
mapMessage.setBoolean("isThrottled", true);
producer.send(mapMessage);
connection.close();
}
private void sendCompressedMapMessageUsingOpenWire() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setBoolean("boolean-type", true);
mapMessage.setByte("byte-type", (byte) 10);
mapMessage.setBytes("bytes-type", TEXT.getBytes());
mapMessage.setChar("char-type", 'A');
mapMessage.setDouble("double-type", 55.3D);
mapMessage.setFloat("float-type", 79.1F);
mapMessage.setInt("int-type", 37);
mapMessage.setLong("long-type", 56652L);
mapMessage.setObject("object-type", new String("VVVV"));
mapMessage.setShort("short-type", (short) 333);
mapMessage.setString("string-type", TEXT);
producer.send(mapMessage);
}
private void sendMapMessageUsingOpenWire() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setBoolean("aboolean", true);
mapMessage.setByte("abyte", (byte) 4);
mapMessage.setBytes("abytes", new byte[]{4, 5});
mapMessage.setChar("achar", 'a');
mapMessage.setDouble("adouble", 4.4);
mapMessage.setFloat("afloat", 4.5f);
mapMessage.setInt("aint", 40);
mapMessage.setLong("along", 80L);
mapMessage.setShort("ashort", (short) 65);
mapMessage.setString("astring", "hello");
producer.send(mapMessage);
}
private void sendTestMapMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setBoolean("boolean-type", true);
mapMessage.setByte("byte-type", (byte) 10);
mapMessage.setBytes("bytes-type", TEXT.getBytes());
mapMessage.setChar("char-type", 'A');
mapMessage.setDouble("double-type", 55.3D);
mapMessage.setFloat("float-type", 79.1F);
mapMessage.setInt("int-type", 37);
mapMessage.setLong("long-type", 56652L);
mapMessage.setObject("object-type", new String("VVVV"));
mapMessage.setShort("short-type", (short) 333);
mapMessage.setString("string-type", TEXT);
producer.send(mapMessage);
connection.close();
}
@Override
protected void prepareMessage(final Message m) throws JMSException {
super.prepareMessage(m);
MapMessage mm = (MapMessage) m;
mm.setBoolean("boolean", true);
mm.setByte("byte", (byte) 3);
mm.setBytes("bytes", new byte[]{(byte) 3, (byte) 4, (byte) 5});
mm.setChar("char", (char) 6);
mm.setDouble("double", 7.0);
mm.setFloat("float", 8.0f);
mm.setInt("int", 9);
mm.setLong("long", 10L);
mm.setObject("object", new String("this is an object"));
mm.setShort("short", (short) 11);
mm.setString("string", "this is a string");
}
@Test
public void testMapMessage() throws Exception {
MessageCreator messageCreator = session -> {
MapMessage message = session.createMapMessage();
message.setBoolean("boolean", true);
message.setByte("byte", Integer.valueOf(1).byteValue());
message.setBytes("bytes", new byte[] {2, 3, 4});
message.setShort("short", (short)32);
message.setInt("int", 64);
message.setLong("long", 128L);
message.setFloat("float", 1.25F);
message.setDouble("double", 100.867);
message.setChar("char", 'c');
message.setString("string", "someString");
message.setObject("object", "stringAsObject");
return message;
};
send(messageCreator);
}
public void marshal(MapMessage mapMessage) throws JMSException {
super.marshal(mapMessage);
mapMessage.setDouble(KEY_CPU_LOAD, this.load);
mapMessage.setInt(KEY_CPU_CORES, this.cores);
mapMessage.setInt(KEY_CPU_PERCENTS, this.cpuPercents);
mapMessage.setLong(KEY_MEM_USED, this.memUsed);
mapMessage.setLong(KEY_MEM_TOTAL, this.memTotal);
mapMessage.setInt(KEY_MEM_PERCENTS, this.memPercents);
mapMessage.setLong(KEY_DISK_USED, this.diskUsed);
mapMessage.setLong(KEY_DISK_TOTAL, this.diskTotal);
mapMessage.setInt(KEY_DISK_PERCENTS, this.diskPercents);
mapMessage.setInt(KEY_SCHEDULED_JOBS, this.scheduledJobs);
mapMessage.setInt(KEY_RUNNING_JOBS, this.runningJobs);
mapMessage.setString(KEY_HOST, this.host);
mapMessage.setString(KEY_HOST_ID, this.hostId);
mapMessage.setString(KEY_STATUS, this.status);
}
private void setMapValues(MapMessage message) throws JMSException
{
message.setBoolean("bool", true);
message.setByte("byte",Byte.MAX_VALUE);
message.setBytes("bytes", BYTES);
message.setChar("char",'c');
message.setDouble("double", Double.MAX_VALUE);
message.setFloat("float", Float.MAX_VALUE);
message.setFloat("smallfloat", SMALL_FLOAT);
message.setInt("int", Integer.MAX_VALUE);
message.setLong("long", Long.MAX_VALUE);
message.setShort("short", Short.MAX_VALUE);
message.setString("string-ascii", MESSAGE_ASCII);
message.setString("string-utf8", MESSAGE_NON_ASCII_UTF8);
// Test Setting Object Values
message.setObject("object-bool", true);
message.setObject("object-byte", Byte.MAX_VALUE);
message.setObject("object-bytes", BYTES);
message.setObject("object-char", 'c');
message.setObject("object-double", Double.MAX_VALUE);
message.setObject("object-float", Float.MAX_VALUE);
message.setObject("object-int", Integer.MAX_VALUE);
message.setObject("object-long", Long.MAX_VALUE);
message.setObject("object-short", Short.MAX_VALUE);
// Set a null String value
message.setString("nullString", null);
// Highlight protocol problem
message.setString("emptyString", "");
}
@Test
public void testResendWithLargeMessage() throws Exception {
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
ArrayList<Message> msgs = new ArrayList<>();
for (int i = 0; i < 10; i++) {
BytesMessage bm = sess.createBytesMessage();
bm.setObjectProperty(ActiveMQJMSConstants.JMS_ACTIVEMQ_INPUT_STREAM, ActiveMQTestBase.createFakeLargeStream(2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE));
msgs.add(bm);
MapMessage mm = sess.createMapMessage();
mm.setBoolean("boolean", true);
mm.setByte("byte", (byte) 3);
mm.setBytes("bytes", new byte[]{(byte) 3, (byte) 4, (byte) 5});
mm.setChar("char", (char) 6);
mm.setDouble("double", 7.0);
mm.setFloat("float", 8.0f);
mm.setInt("int", 9);
mm.setLong("long", 10L);
mm.setObject("object", new String("this is an object"));
mm.setShort("short", (short) 11);
mm.setString("string", "this is a string");
msgs.add(mm);
msgs.add(sess.createTextMessage("hello" + i));
msgs.add(sess.createObjectMessage(new SomeSerializable("hello" + i)));
}
internalTestResend(msgs, sess);
}
@Test
public void testResendWithMapMessagesOnly() throws Exception {
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
ArrayList<Message> msgs = new ArrayList<>();
for (int i = 0; i < 1; i++) {
MapMessage mm = sess.createMapMessage();
mm.setBoolean("boolean", true);
mm.setByte("byte", (byte) 3);
mm.setBytes("bytes", new byte[]{(byte) 3, (byte) 4, (byte) 5});
mm.setChar("char", (char) 6);
mm.setDouble("double", 7.0);
mm.setFloat("float", 8.0f);
mm.setInt("int", 9);
mm.setLong("long", 10L);
mm.setObject("object", new String("this is an object"));
mm.setShort("short", (short) 11);
mm.setString("string", "this is a string");
msgs.add(mm);
MapMessage emptyMap = sess.createMapMessage();
msgs.add(emptyMap);
}
internalTestResend(msgs, sess);
}
private void sendMapMessageUsingCoreJms(String queueName) throws Exception {
Connection jmsConn = null;
try {
jmsConn = coreCf.createConnection();
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setBoolean("aboolean", true);
mapMessage.setByte("abyte", (byte) 4);
mapMessage.setBytes("abytes", new byte[]{4, 5});
mapMessage.setChar("achar", 'a');
mapMessage.setDouble("adouble", 4.4);
mapMessage.setFloat("afloat", 4.5f);
mapMessage.setInt("aint", 40);
mapMessage.setLong("along", 80L);
mapMessage.setShort("ashort", (short) 65);
mapMessage.setString("astring", "hello");
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.send(mapMessage);
} finally {
if (jmsConn != null) {
jmsConn.close();
}
}
}
@Override
public JMSProducer send(Destination destination, Map<String, Object> body) {
MapMessage message = context.createMapMessage();
if (body != null) {
try {
for (Entry<String, Object> entry : body.entrySet()) {
final String name = entry.getKey();
final Object v = entry.getValue();
if (v instanceof String) {
message.setString(name, (String) v);
} else if (v instanceof Long) {
message.setLong(name, (Long) v);
} else if (v instanceof Double) {
message.setDouble(name, (Double) v);
} else if (v instanceof Integer) {
message.setInt(name, (Integer) v);
} else if (v instanceof Character) {
message.setChar(name, (Character) v);
} else if (v instanceof Short) {
message.setShort(name, (Short) v);
} else if (v instanceof Boolean) {
message.setBoolean(name, (Boolean) v);
} else if (v instanceof Float) {
message.setFloat(name, (Float) v);
} else if (v instanceof Byte) {
message.setByte(name, (Byte) v);
} else if (v instanceof byte[]) {
byte[] array = (byte[]) v;
message.setBytes(name, array, 0, array.length);
} else {
message.setObject(name, v);
}
}
} catch (JMSException e) {
throw new MessageFormatRuntimeException(e.getMessage());
}
}
send(destination, message);
return this;
}
@Test
public void testMapMessage() throws Exception {
final String destinationName = "testObjectMessage";
MessageCreator messageCreator = session -> {
MapMessage message = session.createMapMessage();
message.setBoolean("boolean", true);
message.setByte("byte", Integer.valueOf(1).byteValue());
message.setBytes("bytes", new byte[] {2, 3, 4});
message.setShort("short", (short)32);
message.setInt("int", 64);
message.setLong("long", 128L);
message.setFloat("float", 1.25F);
message.setDouble("double", 100.867);
message.setChar("char", 'c');
message.setString("string", "someString");
message.setObject("object", "stringAsObject");
return message;
};
String expectedJson = "{" +
"\"boolean\":true," +
"\"byte\":1," +
"\"bytes\":[2, 3, 4]," +
"\"short\":32," +
"\"int\":64," +
"\"long\":128," +
"\"float\":1.25," +
"\"double\":100.867," +
"\"char\":\"c\"," +
"\"string\":\"someString\"," +
"\"object\":\"stringAsObject\"" +
"}";
testMapMessage(destinationName, messageCreator, expectedJson);
}
@Override
public JMSProducer send(final Destination destination, final Map<String, Object> body) {
final MapMessage message = wrap(context.createMapMessage());
if (body != null) {
try {
for (final Map.Entry<String, Object> entry : body.entrySet()) {
final String name = entry.getKey();
final Object v = entry.getValue();
if (v instanceof String) {
message.setString(name, (String) v);
} else if (v instanceof Long) {
message.setLong(name, (Long) v);
} else if (v instanceof Double) {
message.setDouble(name, (Double) v);
} else if (v instanceof Integer) {
message.setInt(name, (Integer) v);
} else if (v instanceof Character) {
message.setChar(name, (Character) v);
} else if (v instanceof Short) {
message.setShort(name, (Short) v);
} else if (v instanceof Boolean) {
message.setBoolean(name, (Boolean) v);
} else if (v instanceof Float) {
message.setFloat(name, (Float) v);
} else if (v instanceof Byte) {
message.setByte(name, (Byte) v);
} else if (v instanceof byte[]) {
byte[] array = (byte[]) v;
message.setBytes(name, array, 0, array.length);
} else {
message.setObject(name, v);
}
}
} catch (final JMSException e) {
throw new MessageFormatRuntimeException(e.getMessage());
}
}
send(destination, message);
return this;
}
@Test
public void testCreateQueueOnBrokerManagement() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getBrokerManagementConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "CREATE");
message.setString("name", getTestName());
message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 100L);
String path = getVirtualHostName() + "/" + getVirtualHostName() + "/" + getTestName();
message.setString("object-path", path);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 201);
checkResponseIsMapType(responseMessage);
assertEquals("The created queue was not a standard queue",
"org.apache.qpid.StandardQueue",
getValueFromMapResponse(responseMessage, "type"));
assertEquals("The created queue was not a standard queue",
"standard",
getValueFromMapResponse(responseMessage, "qpid-type"));
assertEquals("the created queue did not have the correct alerting threshold",
100L,
getValueFromMapResponse(responseMessage, ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
Object identity = getValueFromMapResponse(responseMessage, "identity");
message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "UPDATE");
message.setObjectProperty("identity", identity);
message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 250L);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 200);
checkResponseIsMapType(responseMessage);
assertEquals("the created queue did not have the correct alerting threshold",
250L,
getValueFromMapResponse(responseMessage, ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "DELETE");
message.setObjectProperty("index", "object-path");
message.setObjectProperty("key", path);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 204);
message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "READ");
message.setObjectProperty("identity", identity);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 404);
}
finally
{
connection.close();
}
}
/**
* @param type
* @throws JMSException
*/
private Object createBodySendAndReceive(JmsMessageType type) throws JMSException {
Object res = null;
Message msg = null;
switch (type) {
case BYTE:
BytesMessage mByte = queueProducerSession.createBytesMessage();
final int size = 20;
byte[] resByte = new byte[size];
for (int i = 0; i < size; i++) {
resByte[i] = (byte) i;
mByte.writeByte((byte) i);
}
msg = mByte;
res = resByte;
break;
case TEXT:
res = "JMS2";
msg = queueProducerSession.createTextMessage("JMS2");
break;
case STREAM:
msg = queueProducerSession.createStreamMessage();
break;
case OBJECT:
res = new Double(37.6);
msg = queueProducerSession.createObjectMessage(new Double(37.6));
break;
case MAP:
MapMessage msg1 = queueProducerSession.createMapMessage();
msg1.setInt("int", 13);
msg1.setLong("long", 37L);
msg1.setString("string", "crocodile");
msg = msg1;
Map<String, Object> map = new HashMap<>();
map.put("int", Integer.valueOf(13));
map.put("long", Long.valueOf(37L));
map.put("string", "crocodile");
res = map;
break;
default:
Assert.fail("no default...");
}
Assert.assertNotNull(msg);
msg.setStringProperty("type", type.toString());
queueProducer.send(msg);
return res;
}
@Test(timeout = 20000)
public void testSendBasicMapMessage() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
String myBoolKey = "myBool";
boolean myBool = true;
String myByteKey = "myByte";
byte myByte = 4;
String myBytesKey = "myBytes";
byte[] myBytes = myBytesKey.getBytes();
String myCharKey = "myChar";
char myChar = 'd';
String myDoubleKey = "myDouble";
double myDouble = 1234567890123456789.1234;
String myFloatKey = "myFloat";
float myFloat = 1.1F;
String myIntKey = "myInt";
int myInt = Integer.MAX_VALUE;
String myLongKey = "myLong";
long myLong = Long.MAX_VALUE;
String myShortKey = "myShort";
short myShort = 25;
String myStringKey = "myString";
String myString = myStringKey;
// Prepare a MapMessage to send to the test peer to send
MapMessage mapMessage = session.createMapMessage();
mapMessage.setBoolean(myBoolKey, myBool);
mapMessage.setByte(myByteKey, myByte);
mapMessage.setBytes(myBytesKey, myBytes);
mapMessage.setChar(myCharKey, myChar);
mapMessage.setDouble(myDoubleKey, myDouble);
mapMessage.setFloat(myFloatKey, myFloat);
mapMessage.setInt(myIntKey, myInt);
mapMessage.setLong(myLongKey, myLong);
mapMessage.setShort(myShortKey, myShort);
mapMessage.setString(myStringKey, myString);
// prepare a matcher for the test peer to use to receive and verify the message
Map<String, Object> map = new LinkedHashMap<String, Object>();
map.put(myBoolKey, myBool);
map.put(myByteKey, myByte);
map.put(myBytesKey, new Binary(myBytes));// the underlying AMQP message uses Binary rather than byte[] directly.
// TODO: see note above to explain the ugly cast
map.put(myCharKey, (int) myChar);
map.put(myDoubleKey, myDouble);
map.put(myFloatKey, myFloat);
map.put(myIntKey, myInt);
map.put(myLongKey, myLong);
map.put(myShortKey, myShort);
map.put(myStringKey, myString);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
msgAnnotationsMatcher.withEntry(AmqpMessageSupport.JMS_MSG_TYPE, equalTo(AmqpMessageSupport.JMS_MAP_MESSAGE));
MessagePropertiesSectionMatcher propertiesMatcher = new MessagePropertiesSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propertiesMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(map));
testPeer.expectTransfer(messageMatcher);
testPeer.expectClose();
// send the message
producer.send(mapMessage);
assertTrue(mapMessage.isBodyAssignableTo(Map.class));
assertTrue(mapMessage.isBodyAssignableTo(Object.class));
assertFalse(mapMessage.isBodyAssignableTo(Boolean.class));
assertFalse(mapMessage.isBodyAssignableTo(byte[].class));
assertNotNull(mapMessage.getBody(Object.class));
assertNotNull(mapMessage.getBody(Map.class));
try {
mapMessage.getBody(byte[].class);
fail("Cannot read TextMessage with this type.");
} catch (MessageFormatException mfe) {
}
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}