下面列出了javax.jms.Message#setLongProperty ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Test cases from Mats Henricson
*/
public void testMatsHenricsonUseCases() throws Exception {
Message message = createMessage();
assertSelector(message, "SessionserverId=1870414179", false);
message.setLongProperty("SessionserverId", 1870414179);
assertSelector(message, "SessionserverId=1870414179", true);
message.setLongProperty("SessionserverId", 1234);
assertSelector(message, "SessionserverId=1870414179", false);
assertSelector(message, "Command NOT IN ('MirrorLobbyRequest', 'MirrorLobbyReply')", false);
message.setStringProperty("Command", "Cheese");
assertSelector(message, "Command NOT IN ('MirrorLobbyRequest', 'MirrorLobbyReply')", true);
message.setStringProperty("Command", "MirrorLobbyRequest");
assertSelector(message, "Command NOT IN ('MirrorLobbyRequest', 'MirrorLobbyReply')", false);
message.setStringProperty("Command", "MirrorLobbyReply");
assertSelector(message, "Command NOT IN ('MirrorLobbyRequest', 'MirrorLobbyReply')", false);
}
public void testFunctionSelector() throws Exception {
Message message = createMessage();
assertSelector(message, "REGEX('1870414179', SessionserverId)", false);
message.setLongProperty("SessionserverId", 1870414179);
assertSelector(message, "REGEX('1870414179', SessionserverId)", true);
assertSelector(message, "REGEX('[0-9]*', SessionserverId)", true);
assertSelector(message, "REGEX('^[1-8]*$', SessionserverId)", false);
assertSelector(message, "REGEX('^[1-8]*$', SessionserverId)", false);
assertSelector(message, "INLIST(SPLIT('Tom,Dick,George',','), name)", false);
assertSelector(message, "INLIST(SPLIT('Tom,James,George',','), name)", true);
assertSelector(message, "INLIST(MAKELIST('Tom','Dick','George'), name)", false);
assertSelector(message, "INLIST(MAKELIST('Tom','James','George'), name)", true);
assertSelector(message, "REGEX('connection1111', REPLACE(JMSMessageID,':',''))", true);
}
protected Message createMessage() throws JMSException {
Message message = createMessage("FOO.BAR");
message.setJMSType("selector-test");
message.setJMSMessageID("connection:1:1:1:1");
message.setObjectProperty("name", "James");
message.setObjectProperty("location", "London");
message.setByteProperty("byteProp", (byte) 123);
message.setByteProperty("byteProp2", (byte) 33);
message.setShortProperty("shortProp", (short) 123);
message.setIntProperty("intProp", 123);
message.setLongProperty("longProp", 123);
message.setFloatProperty("floatProp", 123);
message.setDoubleProperty("doubleProp", 123);
message.setIntProperty("rank", 123);
message.setIntProperty("version", 2);
message.setStringProperty("quote", "'In God We Trust'");
message.setStringProperty("foo", "_foo");
message.setStringProperty("punctuation", "!#$&()*+,-./:;<=>[email protected][\\]^`{|}~");
message.setBooleanProperty("trueProp", true);
message.setBooleanProperty("falseProp", false);
return message;
}
public void testSchedule() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
final long time = 1000;
ProducerThread producer = new ProducerThread(session, destination) {
@Override
protected Message createMessage(int i) throws Exception {
Message message = super.createMessage(i);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
return message;
}
};
producer.setMessageCount(200);
producer.setDaemon(true);
producer.start();
Thread.sleep(5000);
}
protected void setCustomProperty(Message message, String propertyName, Object propertyValue) throws JMSException
{
if (propertyValue instanceof Integer)
{
message.setIntProperty(propertyName, ((Integer) propertyValue).intValue());
}
else if (propertyValue instanceof Long)
{
message.setLongProperty(propertyName, ((Long) propertyValue).longValue());
}
else if (propertyValue instanceof Boolean)
{
message.setBooleanProperty(propertyName, ((Boolean) propertyValue).booleanValue());
}
else if (propertyValue instanceof Byte)
{
message.setByteProperty(propertyName, ((Byte) propertyValue).byteValue());
}
else if (propertyValue instanceof Double)
{
message.setDoubleProperty(propertyName, ((Double) propertyValue).doubleValue());
}
else if (propertyValue instanceof Float)
{
message.setFloatProperty(propertyName, ((Float) propertyValue).floatValue());
}
else if (propertyValue instanceof Short)
{
message.setShortProperty(propertyName, ((Short) propertyValue).shortValue());
}
else if (propertyValue instanceof String)
{
message.setStringProperty(propertyName, (String) propertyValue);
}
else
{
message.setObjectProperty(propertyName, propertyValue);
}
}
/**
* Set JMS Headers to the message according to publisher configuration
*
* @param message message to set properties
*/
private void setMessageProperties(Message message) throws JMSException {
List<JMSHeaderProperty> headerPropertyList = publisherConfig.getJMSHeaderProperties();
for (JMSHeaderProperty jmsHeaderProperty : headerPropertyList) {
JMSHeaderPropertyType type = jmsHeaderProperty.getType();
String propertyKey = jmsHeaderProperty.getKey();
Object propertyValue = jmsHeaderProperty.getValue();
switch (type) {
case OBJECT:
message.setObjectProperty(propertyKey, propertyValue);
break;
case BYTE:
message.setByteProperty(propertyKey, (Byte) propertyValue);
break;
case BOOLEAN:
message.setBooleanProperty(propertyKey, (Boolean) propertyValue);
break;
case DOUBLE:
message.setDoubleProperty(propertyKey, (Double) propertyValue);
break;
case FLOAT:
message.setFloatProperty(propertyKey, (Float) propertyValue);
break;
case SHORT:
message.setShortProperty(propertyKey, (Short) propertyValue);
break;
case STRING:
message.setStringProperty(propertyKey, (String) propertyValue);
break;
case INTEGER:
message.setIntProperty(propertyKey, (Integer) propertyValue);
break;
case LONG:
message.setLongProperty(propertyKey, (Long) propertyValue);
break;
}
}
}
public static Message sendMessageWithProperty(final Session session,
final Destination destination,
final String key,
final long value) throws JMSException {
MessageProducer producer = session.createProducer(destination);
Message message = session.createMessage();
message.setLongProperty(key, value);
producer.send(message);
return message;
}
/**
* if a property is set as a <code>long</code>,
* it can also be read as a <code>String</code>.
*/
@Test
public void testLong2String() {
try {
Message message = senderSession.createMessage();
message.setLongProperty("prop", 127L);
Assert.assertEquals("127", message.getStringProperty("prop"));
} catch (JMSException e) {
fail(e);
}
}
/**
* if a property is set as a <code>long</code>,
* it can also be read as a <code>long</code>.
*/
@Test
public void testLong2Long() {
try {
Message message = senderSession.createMessage();
message.setLongProperty("prop", 127L);
Assert.assertEquals(127L, message.getLongProperty("prop"));
} catch (JMSException e) {
fail(e);
}
}
protected void prepareMessage(final Message m) throws JMSException {
m.setBooleanProperty("booleanProperty", true);
m.setByteProperty("byteProperty", (byte) 3);
m.setDoubleProperty("doubleProperty", 4.0);
m.setFloatProperty("floatProperty", 5.0f);
m.setIntProperty("intProperty", 6);
m.setLongProperty("longProperty", 7);
m.setShortProperty("shortProperty", (short) 8);
m.setStringProperty("stringProperty", "this is a String property");
m.setJMSCorrelationID("this is the correlation ID");
m.setJMSReplyTo(ActiveMQServerTestCase.topic1);
m.setJMSType("someArbitraryType");
}
private void doProducerWithTTLTestImpl(boolean disableTimestamp, Long propJMS_AMQP_TTL) throws Exception {
connection = createAmqpConnection();
assertNotNull(connection);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(name.getMethodName());
Message message = session.createMessage();
if(propJMS_AMQP_TTL != null) {
message.setLongProperty(AmqpMessageSupport.JMS_AMQP_TTL, propJMS_AMQP_TTL);
}
MessageProducer producer = session.createProducer(queue);
if(disableTimestamp) {
producer.setDisableMessageTimestamp(true);
}
producer.setTimeToLive(100);
producer.send(message);
TimeUnit.SECONDS.sleep(1);
MessageConsumer consumer = session.createConsumer(queue);
message = consumer.receive(150);
if (message != null) {
LOG.info("Unexpected message received: JMSExpiration = {} JMSTimeStamp = {} TTL = {}",
new Object[] { message.getJMSExpiration(), message.getJMSTimestamp(),
message.getJMSExpiration() - message.getJMSTimestamp()});
}
assertNull("Unexpected message received, see log for details", message);
}
public void writeProp(Message jmsMessage, String origName, Object value) throws JMSException {
String name = origName.replace(".", "__");
if (value == null) {
jmsMessage.setStringProperty(name, null);
return;
}
Class<?> cls = value.getClass();
if (cls == String.class) {
jmsMessage.setStringProperty(name, (String)value);
} else if (cls == Integer.TYPE || cls == Integer.class) {
jmsMessage.setIntProperty(name, (Integer)value);
} else if (cls == Double.TYPE || cls == Double.class) {
jmsMessage.setDoubleProperty(name, (Double)value);
} else if (cls == Float.TYPE || cls == Float.class) {
jmsMessage.setFloatProperty(name, (Float)value);
} else if (cls == Long.TYPE || cls == Long.class) {
jmsMessage.setLongProperty(name, (Long)value);
} else if (cls == Boolean.TYPE || cls == Boolean.class) {
jmsMessage.setBooleanProperty(name, (Boolean)value);
} else if (cls == Short.TYPE || cls == Short.class) {
jmsMessage.setShortProperty(name, (Short)value);
} else if (cls == Byte.TYPE || cls == Byte.class) {
jmsMessage.setShortProperty(name, (Byte)value);
} else {
jmsMessage.setObjectProperty(name, value);
}
}
/**
* Iterates through all of the flow file's metadata and for any metadata key that starts with <code>jms.</code>, the value for the corresponding key is written to the JMS message as a property.
* The name of this property is equal to the key of the flow file's metadata minus the <code>jms.</code>. For example, if the flowFile has a metadata entry:
* <br /><br />
* <code>jms.count</code> = <code>8</code>
* <br /><br />
* then the JMS message will have a String property added to it with the property name <code>count</code> and value <code>8</code>.
*
* If the flow file also has a metadata key with the name <code>jms.count.type</code>, then the value of that metadata entry will determine the JMS property type to use for the value. For example,
* if the flow file has the following properties:
* <br /><br />
* <code>jms.count</code> = <code>8</code><br />
* <code>jms.count.type</code> = <code>integer</code>
* <br /><br />
* Then <code>message</code> will have an INTEGER property added with the value 8.
* <br /><br/>
* If the type is not valid for the given value (e.g., <code>jms.count.type</code> = <code>integer</code> and <code>jms.count</code> = <code>hello</code>, then this JMS property will not be added
* to <code>message</code>.
*
* @param flowFile The flow file whose metadata should be examined for JMS properties.
* @param message The JMS message to which we want to add properties.
* @throws JMSException ex
*/
private void copyAttributesToJmsProps(final FlowFile flowFile, final Message message) throws JMSException {
final ComponentLog logger = getLogger();
final Map<String, String> attributes = flowFile.getAttributes();
for (final Entry<String, String> entry : attributes.entrySet()) {
final String key = entry.getKey();
final String value = entry.getValue();
if (key.toLowerCase().startsWith(ATTRIBUTE_PREFIX.toLowerCase()) && !key.toLowerCase().endsWith(ATTRIBUTE_TYPE_SUFFIX.toLowerCase())) {
final String jmsPropName = key.substring(ATTRIBUTE_PREFIX.length());
final String type = attributes.get(key + ATTRIBUTE_TYPE_SUFFIX);
try {
if (type == null || type.equalsIgnoreCase(PROP_TYPE_STRING)) {
message.setStringProperty(jmsPropName, value);
} else if (type.equalsIgnoreCase(PROP_TYPE_INTEGER)) {
message.setIntProperty(jmsPropName, Integer.parseInt(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_BOOLEAN)) {
message.setBooleanProperty(jmsPropName, Boolean.parseBoolean(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_SHORT)) {
message.setShortProperty(jmsPropName, Short.parseShort(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_LONG)) {
message.setLongProperty(jmsPropName, Long.parseLong(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_BYTE)) {
message.setByteProperty(jmsPropName, Byte.parseByte(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_DOUBLE)) {
message.setDoubleProperty(jmsPropName, Double.parseDouble(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_FLOAT)) {
message.setFloatProperty(jmsPropName, Float.parseFloat(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_OBJECT)) {
message.setObjectProperty(jmsPropName, value);
} else {
logger.warn("Attribute key '{}' for {} has value '{}', but expected one of: integer, string, object, byte, double, float, long, short, boolean; not adding this property",
new Object[]{key, flowFile, value});
}
} catch (NumberFormatException e) {
logger.warn("Attribute key '{}' for {} has value '{}', but attribute key '{}' has value '{}'. Not adding this JMS property",
new Object[]{key, flowFile, value, key + ATTRIBUTE_TYPE_SUFFIX, PROP_TYPE_INTEGER});
}
}
}
}
public void setMessageDelay(QueueSender sender, Message message, long delaySeconds) throws JMSException {
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delaySeconds * 1000);
}
public void testJmx() throws Exception {
LOG.info("Initial scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection conn = factory.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = sess.createQueue(this.getClass().getName());
final ProducerThread producer = new ProducerThread(sess, dest) {
@Override
protected Message createMessage(int i) throws Exception {
Message message = super.createMessage(i);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, WAIT_TIME_MILLS / 2);
return message;
}
};
producer.setMessageCount(100);
producer.start();
assertEquals(7 * 1024, broker.getAdminView().getJobSchedulerStoreLimit());
// wait for the producer to block
Thread.sleep(WAIT_TIME_MILLS / 2);
assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() > 100);
broker.getAdminView().setJobSchedulerStoreLimit(1024 * 1024 * 33);
Thread.sleep(WAIT_TIME_MILLS);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return producer.getSentCount() == producer.getMessageCount();
}
}, WAIT_TIME_MILLS * 2);
assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
LOG.info("Final scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() < 100);
}
public void sendingMessageWithJMS_AMQP_TTLSetTestImpl(long jmsTtl, long amqpTtl) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
long currentTime = System.currentTimeMillis();
Date expirationLower = new Date(currentTime + jmsTtl);
Date expirationUpper = new Date(currentTime + jmsTtl + 3000);
// Create matcher to expect the absolute-expiry-time field of the properties section to
// be set to a value greater than 'now'+ttl, within a delta.
Matcher<Date> inRange = both(greaterThanOrEqualTo(expirationLower)).and(lessThanOrEqualTo(expirationUpper));
String text = "myMessage";
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
headersMatcher.withDurable(equalTo(true));
// verify the ttl field matches the JMS_AMQP_TTL value, rather than the standard JMS send TTL value.
if (amqpTtl == 0) {
headersMatcher.withTtl(nullValue());
} else {
headersMatcher.withTtl(equalTo(UnsignedInteger.valueOf(amqpTtl)));
}
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withAbsoluteExpiryTime(inRange);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
testPeer.expectClose();
Message message = session.createTextMessage(text);
message.setLongProperty(AmqpMessageSupport.JMS_AMQP_TTL, amqpTtl);
producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, jmsTtl);
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout = 20000)
public void testSendMessageWithApplicationProperties() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
ApplicationPropertiesSectionMatcher appPropsMatcher = new ApplicationPropertiesSectionMatcher(true);
appPropsMatcher.withEntry(NULL_STRING_PROP, nullValue());
appPropsMatcher.withEntry(STRING_PROP, equalTo(STRING_PROP_VALUE));
appPropsMatcher.withEntry(BOOLEAN_PROP, equalTo(BOOLEAN_PROP_VALUE));
appPropsMatcher.withEntry(BYTE_PROP, equalTo(BYTE_PROP_VALUE));
appPropsMatcher.withEntry(SHORT_PROP, equalTo(SHORT_PROP_VALUE));
appPropsMatcher.withEntry(INT_PROP, equalTo(INT_PROP_VALUE));
appPropsMatcher.withEntry(LONG_PROP, equalTo(LONG_PROP_VALUE));
appPropsMatcher.withEntry(FLOAT_PROP, equalTo(FLOAT_PROP_VALUE));
appPropsMatcher.withEntry(DOUBLE_PROP, equalTo(DOUBLE_PROP_VALUE));
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(queueName));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setApplicationPropertiesMatcher(appPropsMatcher);
//TODO: currently we aren't sending any body section, decide if this is allowed
//messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
testPeer.expectTransfer(messageMatcher);
Message message = session.createMessage();
message.setStringProperty(NULL_STRING_PROP, NULL_STRING_PROP_VALUE);
message.setStringProperty(STRING_PROP, STRING_PROP_VALUE);
message.setBooleanProperty(BOOLEAN_PROP, BOOLEAN_PROP_VALUE);
message.setByteProperty(BYTE_PROP, BYTE_PROP_VALUE);
message.setShortProperty(SHORT_PROP, SHORT_PROP_VALUE);
message.setIntProperty(INT_PROP, INT_PROP_VALUE);
message.setLongProperty(LONG_PROP, LONG_PROP_VALUE);
message.setFloatProperty(FLOAT_PROP, FLOAT_PROP_VALUE);
message.setDoubleProperty(DOUBLE_PROP, DOUBLE_PROP_VALUE);
producer.send(message);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
/**
* Test setting SQSMessage property
*/
@Test
public void testProperty() throws JMSException {
when(mockSQSSession.createMessage()).thenReturn(new SQSMessage());
Message message = mockSQSSession.createMessage();
message.setBooleanProperty("myTrueBoolean", true);
message.setBooleanProperty("myFalseBoolean", false);
message.setIntProperty("myInteger", 100);
message.setDoubleProperty("myDouble", 2.1768);
message.setFloatProperty("myFloat", 3.1457f);
message.setLongProperty("myLong", 1290772974281L);
message.setShortProperty("myShort", (short) 123);
message.setByteProperty("myByteProperty", (byte) 'a');
message.setStringProperty("myString", "StringValue");
message.setStringProperty("myNumber", "500");
Assert.assertTrue(message.propertyExists("myTrueBoolean"));
Assert.assertEquals(message.getObjectProperty("myTrueBoolean"), true);
Assert.assertEquals(message.getBooleanProperty("myTrueBoolean"), true);
Assert.assertTrue(message.propertyExists("myFalseBoolean"));
Assert.assertEquals(message.getObjectProperty("myFalseBoolean"), false);
Assert.assertEquals(message.getBooleanProperty("myFalseBoolean"), false);
Assert.assertTrue(message.propertyExists("myInteger"));
Assert.assertEquals(message.getObjectProperty("myInteger"), 100);
Assert.assertEquals(message.getIntProperty("myInteger"), 100);
Assert.assertTrue(message.propertyExists("myDouble"));
Assert.assertEquals(message.getObjectProperty("myDouble"), 2.1768);
Assert.assertEquals(message.getDoubleProperty("myDouble"), 2.1768);
Assert.assertTrue(message.propertyExists("myFloat"));
Assert.assertEquals(message.getObjectProperty("myFloat"), 3.1457f);
Assert.assertEquals(message.getFloatProperty("myFloat"), 3.1457f);
Assert.assertTrue(message.propertyExists("myLong"));
Assert.assertEquals(message.getObjectProperty("myLong"), 1290772974281L);
Assert.assertEquals(message.getLongProperty("myLong"), 1290772974281L);
Assert.assertTrue(message.propertyExists("myShort"));
Assert.assertEquals(message.getObjectProperty("myShort"), (short) 123);
Assert.assertEquals(message.getShortProperty("myShort"), (short) 123);
Assert.assertTrue(message.propertyExists("myByteProperty"));
Assert.assertEquals(message.getObjectProperty("myByteProperty"), (byte) 'a');
Assert.assertEquals(message.getByteProperty("myByteProperty"), (byte) 'a');
Assert.assertTrue(message.propertyExists("myString"));
Assert.assertEquals(message.getObjectProperty("myString"), "StringValue");
Assert.assertEquals(message.getStringProperty("myString"), "StringValue");
Assert.assertTrue(message.propertyExists("myNumber"));
Assert.assertEquals(message.getObjectProperty("myNumber"), "500");
Assert.assertEquals(message.getStringProperty("myNumber"), "500");
Assert.assertEquals(message.getLongProperty("myNumber"), 500L);
Assert.assertEquals(message.getFloatProperty("myNumber"), 500f);
Assert.assertEquals(message.getShortProperty("myNumber"), (short) 500);
Assert.assertEquals(message.getDoubleProperty("myNumber"), 500d);
Assert.assertEquals(message.getIntProperty("myNumber"), 500);
// Validate property names
Set<String> propertyNamesSet = new HashSet<String>(Arrays.asList(
"myTrueBoolean",
"myFalseBoolean",
"myInteger",
"myDouble",
"myFloat",
"myLong",
"myShort",
"myByteProperty",
"myNumber",
"myString"));
Enumeration<String > propertyNames = message.getPropertyNames();
int counter = 0;
while (propertyNames.hasMoreElements()) {
assertTrue(propertyNamesSet.contains(propertyNames.nextElement()));
counter++;
}
assertEquals(propertyNamesSet.size(), counter);
message.clearProperties();
Assert.assertFalse(message.propertyExists("myTrueBoolean"));
Assert.assertFalse(message.propertyExists("myInteger"));
Assert.assertFalse(message.propertyExists("myDouble"));
Assert.assertFalse(message.propertyExists("myFloat"));
Assert.assertFalse(message.propertyExists("myLong"));
Assert.assertFalse(message.propertyExists("myShort"));
Assert.assertFalse(message.propertyExists("myByteProperty"));
Assert.assertFalse(message.propertyExists("myString"));
Assert.assertFalse(message.propertyExists("myNumber"));
propertyNames = message.getPropertyNames();
assertFalse(propertyNames.hasMoreElements());
}
/**
* Iterates through all of the flow file's metadata and for any metadata key that starts with <code>jms.</code>, the value for the corresponding key is written to the JMS message as a property.
* The name of this property is equal to the key of the flow file's metadata minus the <code>jms.</code>. For example, if the flowFile has a metadata entry:
* <br /><br />
* <code>jms.count</code> = <code>8</code>
* <br /><br />
* then the JMS message will have a String property added to it with the property name <code>count</code> and value <code>8</code>.
*
* If the flow file also has a metadata key with the name <code>jms.count.type</code>, then the value of that metadata entry will determine the JMS property type to use for the value. For example,
* if the flow file has the following properties:
* <br /><br />
* <code>jms.count</code> = <code>8</code><br />
* <code>jms.count.type</code> = <code>integer</code>
* <br /><br />
* Then <code>message</code> will have an INTEGER property added with the value 8.
* <br /><br/>
* If the type is not valid for the given value (e.g., <code>jms.count.type</code> = <code>integer</code> and <code>jms.count</code> = <code>hello</code>, then this JMS property will not be added
* to <code>message</code>.
*
* @param flowFile The flow file whose metadata should be examined for JMS properties.
* @param message The JMS message to which we want to add properties.
* @throws JMSException ex
*/
private void copyAttributesToJmsProps(final FlowFile flowFile, final Message message) throws JMSException {
final ComponentLog logger = getLogger();
final Map<String, String> attributes = flowFile.getAttributes();
for (final Entry<String, String> entry : attributes.entrySet()) {
final String key = entry.getKey();
final String value = entry.getValue();
if (key.toLowerCase().startsWith(ATTRIBUTE_PREFIX.toLowerCase()) && !key.toLowerCase().endsWith(ATTRIBUTE_TYPE_SUFFIX.toLowerCase())) {
final String jmsPropName = key.substring(ATTRIBUTE_PREFIX.length());
final String type = attributes.get(key + ATTRIBUTE_TYPE_SUFFIX);
try {
if (type == null || type.equalsIgnoreCase(PROP_TYPE_STRING)) {
message.setStringProperty(jmsPropName, value);
} else if (type.equalsIgnoreCase(PROP_TYPE_INTEGER)) {
message.setIntProperty(jmsPropName, Integer.parseInt(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_BOOLEAN)) {
message.setBooleanProperty(jmsPropName, Boolean.parseBoolean(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_SHORT)) {
message.setShortProperty(jmsPropName, Short.parseShort(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_LONG)) {
message.setLongProperty(jmsPropName, Long.parseLong(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_BYTE)) {
message.setByteProperty(jmsPropName, Byte.parseByte(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_DOUBLE)) {
message.setDoubleProperty(jmsPropName, Double.parseDouble(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_FLOAT)) {
message.setFloatProperty(jmsPropName, Float.parseFloat(value));
} else if (type.equalsIgnoreCase(PROP_TYPE_OBJECT)) {
message.setObjectProperty(jmsPropName, value);
} else {
logger.warn("Attribute key '{}' for {} has value '{}', but expected one of: integer, string, object, byte, double, float, long, short, boolean; not adding this property",
new Object[]{key, flowFile, value});
}
} catch (NumberFormatException e) {
logger.warn("Attribute key '{}' for {} has value '{}', but attribute key '{}' has value '{}'. Not adding this JMS property",
new Object[]{key, flowFile, value, key + ATTRIBUTE_TYPE_SUFFIX, PROP_TYPE_INTEGER});
}
}
}
}