下面列出了javax.jms.MapMessage#getString ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* @param message JMSMap message
* @return XML representation of JMS Map message
*/
public static OMElement convertJMSMapToXML(MapMessage message) {
OMFactory fac = OMAbstractFactory.getOMFactory();
OMNamespace jmsMapNS = OMAbstractFactory.getOMFactory().createOMNamespace(JMSConstants.JMS_MAP_NS, "");
OMElement jmsMap = fac.createOMElement(JMSConstants.JMS_MAP_ELEMENT_NAME, jmsMapNS);
try {
Enumeration names = message.getMapNames();
while (names.hasMoreElements()) {
String nextName = names.nextElement().toString();
String nextVal = message.getString(nextName);
OMElement next = fac.createOMElement(nextName.replace(" ", ""), jmsMapNS);
next.setText(nextVal);
jmsMap.addChild(next);
}
} catch (JMSException e) {
log.error("Error while processing the JMS Map Message. " + e.getMessage());
}
return jmsMap;
}
public void onMessage(Message message) {
if ( message instanceof MapMessage ) {
final MapMessage mapMessage = (MapMessage) message;
try {
String event = mapMessage.getString("event");
if ( "ErrorOccurred".equals(event) ) {
errorHandler(mapMessage);
} else if ( "CompileFinished".equals(event) ) {
compileFinishedHandler(mapMessage);
} else if ( "TestPointFinished".equals(event) ) {
testPointFinishedHandler(mapMessage);
} else if ( "AllTestPointsFinished".equals(event) ) {
allTestPointsFinishedHandler(mapMessage);
} else if ( "KeepAlive".equals(event) ) {
receiveFromAliveJudgersHandler(mapMessage);
} else {
LOGGER.warn(String.format("Unknown Event Received. [Event = %s]",
new Object[] { event }));
}
} catch (JMSException ex) {
LOGGER.catching(ex);
}
}
}
/**
* 处理评测机编译完成时的消息.
* @param mapMessage - 消息队列中收到的MapMessage对象
* @throws JMSException
*/
private void compileFinishedHandler(MapMessage mapMessage) throws JMSException {
long submissionId = mapMessage.getLong("submissionId");
boolean isSuccessful = mapMessage.getBoolean("isSuccessful");
String log = mapMessage.getString("log");
if ( isSuccessful ) {
String message = "Compile Successfully.\n\n";
eventPublisher.publishEvent(new SubmissionEvent(this, submissionId, "Running", message, false));
LOGGER.info(String.format("Submission #%d returned [Compile Successfully].", submissionId));
} else {
eventPublisher.publishEvent(new SubmissionEvent(this, submissionId, "Compiler Error", log, true));
LOGGER.info(String.format("Submission #%d returned [Compile Error].\n\tError Message:%s",
new Object[] { submissionId, log }));
}
}
/**
* 处理评测机完成单个测试点的消息.
* @param mapMessage - 消息队列中收到的MapMessage对象
* @throws JMSException
*/
private void testPointFinishedHandler(MapMessage mapMessage) throws JMSException {
long submissionId = mapMessage.getLong("submissionId");
int checkpointId = mapMessage.getInt("checkpointId");
String runtimeResult = mapMessage.getString("runtimeResult");
int usedTime = mapMessage.getInt("usedTime");
int usedMemory = mapMessage.getInt("usedMemory");
int score = mapMessage.getInt("score");
String message = String.format("- Test Point #%d: %s, Time = %d ms, Memory = %d KB, Score = %d\n",
new Object[] { checkpointId, runtimeResult, usedTime, usedMemory, score });
eventPublisher.publishEvent(new SubmissionEvent(this, submissionId, "Running", message, false));
LOGGER.info(String.format("Submission #%d/ CheckPoint#%d returned [%s] (Time = %dms, Memory = %d KB, Score = %d).",
new Object[] { submissionId, checkpointId, runtimeResult, usedTime, usedMemory, score }));
}
public void onMessage(Message message) {
if ( message instanceof MapMessage ) {
final MapMessage mapMessage = (MapMessage) message;
try {
String event = mapMessage.getString("event");
if ( "SubmissionCreated".equals(event) ) {
newSubmissionHandler(mapMessage);
} else {
LOGGER.warn(String.format("Unknown Event Received. [Event = %s]",
new Object[] { event }));
}
} catch (Exception ex) {
LOGGER.catching(ex);
}
}
}
public void unmarshal(MapMessage from) throws JMSException {
super.unmarshal(from);
this.load = from.getDouble(KEY_CPU_LOAD);
this.cores = from.getInt(KEY_CPU_CORES);
this.cpuPercents = from.getInt(KEY_CPU_PERCENTS);
this.memUsed = from.getLong(KEY_MEM_USED);
this.memTotal = from.getLong(KEY_MEM_TOTAL);
this.memPercents = from.getInt(KEY_MEM_PERCENTS);
this.diskUsed = from.getLong(KEY_DISK_USED);
this.diskTotal = from.getLong(KEY_DISK_TOTAL);
this.diskPercents = from.getInt(KEY_DISK_PERCENTS);
this.scheduledJobs = from.getInt(KEY_SCHEDULED_JOBS);
this.runningJobs = from.getInt(KEY_RUNNING_JOBS);
this.host = from.getString(KEY_HOST);
this.hostId = from.getString(KEY_HOST_ID);
this.status = from.getString(KEY_STATUS);
}
public void unmarshal(MapMessage from) throws JMSException {
super.unmarshal(from);
this.operation = from.getString(KEY_OPERATION);
if (from.getString(KEY_STATE) != null) {
this.state = JobState.valueOf(from.getString(KEY_STATE));
}
this.stateDetail = from.getString(KEY_STATE_DETAIL);
this.jobId = from.getString(KEY_JOB_ID);
try {
if (from.getString(KEY_START_TIME) != null) {
this.startTime = Date.from(Instant.parse(from.getString(KEY_START_TIME)));
}
if (from.getString(KEY_END_TIME) != null) {
this.endTime = Date.from(Instant.parse(from.getString(KEY_END_TIME)));
}
} catch (DateTimeParseException e) {
throw new JMSException(e.toString());
}
this.errorMessage = from.getString(KEY_ERROR_MESSAGE);
this.outputText = from.getString(KEY_OUTPUT_TEXT);
this.username = from.getString(KEY_USERNAME);
this.compHost = from.getString(KEY_COMP_HOST);
}
public void unmarshal(MapMessage from) throws JMSException {
super.unmarshal(from);
try {
String messageString = from.getString(KEY_URL_LIST);
String[] urlArray = messageString.split(URL_DELIMITER);
urlList = new LinkedList<URL>();
for (String urlString : urlArray) {
urlList.add(new URL(urlString));
}
} catch (MalformedURLException e) {
handleException(e);
}
}
private void receiveMapMessage(boolean useCore) throws Exception {
MapMessage mapMessage = (MapMessage) receiveMessage(useCore);
boolean booleanVal = mapMessage.getBoolean("boolean-type");
assertTrue(booleanVal);
byte byteVal = mapMessage.getByte("byte-type");
assertEquals((byte) 10, byteVal);
byte[] bytesVal = mapMessage.getBytes("bytes-type");
byte[] originVal = TEXT.getBytes();
assertEquals(originVal.length, bytesVal.length);
for (int i = 0; i < bytesVal.length; i++) {
assertTrue(bytesVal[i] == originVal[i]);
}
char charVal = mapMessage.getChar("char-type");
assertEquals('A', charVal);
double doubleVal = mapMessage.getDouble("double-type");
assertEquals(55.3D, doubleVal, 0.1D);
float floatVal = mapMessage.getFloat("float-type");
assertEquals(79.1F, floatVal, 0.1F);
int intVal = mapMessage.getInt("int-type");
assertEquals(37, intVal);
long longVal = mapMessage.getLong("long-type");
assertEquals(56652L, longVal);
Object objectVal = mapMessage.getObject("object-type");
Object origVal = new String("VVVV");
assertTrue(objectVal.equals(origVal));
short shortVal = mapMessage.getShort("short-type");
assertEquals((short) 333, shortVal);
String strVal = mapMessage.getString("string-type");
assertEquals(TEXT, strVal);
}
/**
* 处理评测机完成全部测试点的消息.
* @param mapMessage - 消息队列中收到的MapMessage对象
* @throws JMSException
*/
private void allTestPointsFinishedHandler(MapMessage mapMessage) throws JMSException {
long submissionId = mapMessage.getLong("submissionId");
String runtimeResult = mapMessage.getString("runtimeResult");
int usedTime = mapMessage.getInt("totalTime");
int usedMemory = mapMessage.getInt("maxMemory");
int score = mapMessage.getInt("totalScore");
String message = String.format("\n%s, Time = %d ms, Memory = %d KB, Score = %d\n",
new Object[] { runtimeResult, usedTime, usedMemory, score });
eventPublisher.publishEvent(new SubmissionEvent(this, submissionId, runtimeResult, message, true));
LOGGER.info(String.format("Submission #%d judge completed and returned [%s] (Time = %d ms, Memory = %d KB, Score = %d).",
new Object[] { submissionId, runtimeResult, usedTime, usedMemory, score }));
}
/**
* 处理来自评测机的Keep-Alive消息.
* 用于在Web端获取后端评测机的信息.
* @param mapMessage - 消息队列中收到的MapMessage对象
* @throws JMSException
*/
private void receiveFromAliveJudgersHandler(MapMessage mapMessage) throws JMSException {
String judgerUsername = mapMessage.getString("username");
String judgerDescription = mapMessage.getString("description");
long heartbeatTimeInMillis = mapMessage.getLong("heartbeatTime");
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(heartbeatTimeInMillis);
Date heartbeatTime = calendar.getTime();
eventPublisher.publishEvent(new KeepAliveEvent(this, judgerUsername, judgerDescription, heartbeatTime));
LOGGER.info(String.format("Received heartbeat from Judger[%s]", judgerUsername));
}
@Override
public void unmarshal(MapMessage from) throws JMSException {
super.unmarshal(from);
// load details
details = from.getString(KEY_DETAILS);
email = from.getString(KEY_EMAIL);
url = from.getString(KEY_SESSION);
logs = Arrays.asList(!from.getString(KEY_LOGS).equals("") ?
from.getString(KEY_LOGS).split(";") : new String[] {});
logger.debug("Unmarshalled " + KEY_DETAILS + " : " + details);
logger.debug("Unmarshalled " + KEY_EMAIL + " : " + email);
logger.debug("Unmarshalled " + KEY_SESSION + " : " + url);
logger.debug("Unmarshalled " + KEY_LOGS + " : " + logs);
}
public void unmarshal(MapMessage from) throws JMSException {
super.unmarshal(from);
String urlString = from.getString(KEY_URL);
if (urlString == null) {
this.url = null;
} else {
try {
this.url = new URL(from.getString(KEY_URL));
} catch (MalformedURLException e) {
handleException(e);
}
}
}
public void unmarshal(MapMessage from) throws JMSException {
super.unmarshal(from);
this.success = from.getBoolean(KEY_SUCCESS);
this.errorMessage = from.getString(KEY_ERROR_MESSAGE);
this.details = from.getString(KEY_DETAILS);
this.exceptionString = from.getString(KEY_EXCEPTION);
}
@Override
public void unmarshal(MapMessage from) throws JMSException {
super.unmarshal(from);
// load ids
this.jobId = from.getString(KEY_JOB_ID);
this.toolId = from.getString(KEY_TOOL_ID);
logger.debug("Unmarshalled " + KEY_JOB_ID + " : " + jobId);
logger.debug("Unmarshalled " + KEY_TOOL_ID + " : " + toolId);
}
@Test
public void testCompression() throws Exception {
Connection cconnection = null;
Connection connection = null;
try {
ActiveMQConnectionFactory cfactory = new ActiveMQConnectionFactory("tcp://" + OWHOST + ":" + OWPORT + "");
cconnection = cfactory.createConnection();
cconnection.start();
Session csession = cconnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue cQueue = csession.createQueue(queueName);
MessageConsumer consumer = csession.createConsumer(cQueue);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + OWHOST + ":" + OWPORT + "?jms.useCompression=true");
connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//text
TextMessage textMessage = session.createTextMessage();
textMessage.setText(testString);
TextMessage receivedMessage = sendAndReceive(textMessage, producer, consumer);
String receivedText = receivedMessage.getText();
assertEquals(testString, receivedText);
//MapMessage
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString(testProp, propValue);
MapMessage receivedMapMessage = sendAndReceive(mapMessage, producer, consumer);
String value = receivedMapMessage.getString(testProp);
assertEquals(propValue, value);
//Object
ObjectMessage objMessage = session.createObjectMessage();
objMessage.setObject(testString);
ObjectMessage receivedObjMessage = sendAndReceive(objMessage, producer, consumer);
String receivedObj = (String) receivedObjMessage.getObject();
assertEquals(testString, receivedObj);
//Stream
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString(testString);
StreamMessage receivedStreamMessage = sendAndReceive(streamMessage, producer, consumer);
String streamValue = receivedStreamMessage.readString();
assertEquals(testString, streamValue);
//byte
BytesMessage byteMessage = session.createBytesMessage();
byte[] bytes = testString.getBytes();
byteMessage.writeBytes(bytes);
BytesMessage receivedByteMessage = sendAndReceive(byteMessage, producer, consumer);
long receivedBodylength = receivedByteMessage.getBodyLength();
assertEquals("bodylength Correct", bytes.length, receivedBodylength);
byte[] receivedBytes = new byte[(int) receivedBodylength];
receivedByteMessage.readBytes(receivedBytes);
String receivedString = new String(receivedBytes);
assertEquals(testString, receivedString);
//Message
Message m = session.createMessage();
sendAndReceive(m, producer, consumer);
} finally {
if (cconnection != null) {
connection.close();
}
if (connection != null) {
cconnection.close();
}
}
}
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
MapMessage mapMessage = (MapMessage) message;
return new Employee(mapMessage.getString("name"), mapMessage.getInt("age"));
}
@Override
public void unmarshal(MapMessage from) throws JMSException {
super.unmarshal(from);
this.password = from.getString(KEY_PASSWORD);
}
public void unmarshal(MapMessage from) throws JMSException {
super.unmarshal(from);
this.value = from.getString(KEY_JSON);
}
@Override
public void unmarshal(MapMessage from) throws JMSException {
super.unmarshal(from);
this.command = from.getString(KEY_COMMAND);
}