下面列出了怎么用javax.management.modelmbean.XMLParseException的API类实例代码及写法,或者点击链接到github查看源代码。
private void buildHostNodes(List<XmlNodeWrapper> contexts) throws Throwable {
if (0 == contexts.size()) {
throw new XmlParseException("Missing <mqSource> node.");
}
for (XmlNodeWrapper xNode : contexts) {
String id = StringUtils.trim(xNode.getStringAttribute("id"));
if (context.getMqSourceMap().containsKey(id)) {
throw new XmlParseException("Duplicate <mqSource> id: " + id);
}
MqSourceType type = null;
String _type = StringUtils.trim(xNode.getStringAttribute("type"));
type = getMqSourceType(_type);
if (null == type) {
throw new XmlParseException("Unsupported MQ types in <mqSource>: " + id);
}
boolean defaultMs = false;
String _isDefault = StringUtils.trim(xNode.getStringAttribute("isDefault"));
if (null != _isDefault) {
defaultMs = Boolean.parseBoolean(_isDefault);
if (defaultMs) {
if (null != this.defaultMqSource) {
throw new XMLParseException("The default mqSource can only have one");
}
this.defaultMqSource = id;
}
}
Map<String, String> data = new HashMap<String, String>();
List<XmlNodeWrapper> properties = xNode.evalNodes("property");
for (XmlNodeWrapper propertyNode : properties) {
data.put(StringUtils.trim(propertyNode.getStringAttribute("name")).toUpperCase(),
StringUtils.trim(propertyNode.getStringAttribute("value")));
}
MqSourceVo hostVo = new MqSourceVo(id, type, data);
context.getMqSourceMap().put(id, hostVo);
log.info("add mq source: " + id);
}
this.context.setDefaultMqSource(defaultMqSource);
MqSourceManager manager = new MqManagerCreater().create(defaultMqSource, context.getMqSourceMap());
MqContainer.getInstance().setMqSourceManager(manager);
}
private void buildListenerNodes(List<XmlNodeWrapper> contexts) throws Throwable {
for (XmlNodeWrapper xNode : contexts) {
String service = StringUtils.trim(xNode.getStringAttribute("service"));
String channel = StringUtils.trim(xNode.getStringAttribute("channel"));
// check id
existingListenerService(service);
if (!context.getChannelVoMap().containsKey(channel)) {
throw new XmlParseException("Invalid attribute channel: " + channel);
}
BindingVo binding = null;
List<XmlNodeWrapper> innerNodeList = xNode.evalNodes("binding");
if (innerNodeList.size() > 1) {
throw new XMLParseException("the <binding> node can have at most one.");
}
if (1 == innerNodeList.size()) {
XmlNodeWrapper innerNode = innerNodeList.get(0);
String bindingKey = StringUtils.trim(innerNode.getStringAttribute("key"));
String bindingPattern = StringUtils.trim(innerNode.getStringAttribute("pattern"));
String bindingSeparator = StringUtils.trim(innerNode.getStringAttribute("separator"));
if ("".equals(bindingKey)) {
bindingKey = null;
}
if ("".equals(bindingPattern)) {
bindingPattern = null;
}
if ("".equals(bindingSeparator)) {
bindingSeparator = null;
}
if (null == bindingSeparator) {
bindingSeparator = ",";
}
if (null == bindingKey && null == bindingPattern) {
throw new XmlParseException("<binding> node key and pattern is empty. mq-listener: " + service);
}
// <binding key="{x}" pattern="abc,ef,g" separator=","/>
Object vKey = null;
if (null != bindingKey) {
if (!checkVar(bindingKey)) {
throw new XMLParseException("If key attribute exists, it must be a variable, such as {x}. mq-listener: " + service);
}
vKey = new NormalParser().parse(getRealVal(bindingKey));
}
List<BindingPattern> bindingPatternList = null;
String[] patterns = split(bindingPattern, bindingSeparator);
if (null != patterns) {
bindingPatternList = new ArrayList<BindingPattern>();
for (int i = 0; i < patterns.length; i++) {
boolean patternValueMatch = false;
if (patterns[i].indexOf("*") > -1) {
patternValueMatch = true;
}
bindingPatternList.add(new BindingPattern(patterns[i], patternValueMatch));
}
}
boolean keyAndPattern = true;
if (null == vKey || null == bindingPatternList) {
keyAndPattern = false;
}
boolean exchange = false;
ChannelVo qVo = context.getChannelVoMap().get(channel);
if (ChannelType.Topic == qVo.getType() && MqSourceType.RabbitMQ == context.getMqSourceMap().get(qVo.getMsKey()).getType()) {
exchange = true;
}
// 对于RabbitMQ.exchange: key和pattern,只能存在一个
if (exchange && null == bindingPatternList) {
throw new XMLParseException("for exchange, pattern can not be empty. mq-listener: " + service);
}
// 对于其他:key和pattern必须都存在,key必须为变量
if (!exchange && !keyAndPattern) {
throw new XMLParseException("for non-exchange, key and pattern must exist, and key must be a variable. mq-listener: " + service);
}
binding = new BindingVo(channel, vKey, bindingPatternList);
}
ListenerVo lVo = new ListenerVo(service, channel, binding);
this.context.getListenerVoList().add(lVo);
}
}
@Override
public void start() throws Throwable {
boolean durableSubscribers = false;// 持久化订阅
String clientID = null;
if (ChannelType.Topic == queue.getType()) {
// durableSubscribers = (Boolean) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_DURABLESUBSCRIBERS);
durableSubscribers = queue.isDurableSubscribers();
if (durableSubscribers) {
// clientID = (String) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_CLIENTID);
clientID = queue.getClientID();
if (null == clientID) {
throw new XMLParseException("durable subscribers is missing a clientID: " + queue.getName());
}
}
}
ActiveMqSource mqSource = (ActiveMqSource) MqContainer.getInstance().getMqSourceManager().getMqSource(queue.getMsKey());
Connection connection = null;
if (!durableSubscribers) {
connection = mqSource.getConnection();
} else {
connection = mqSource.getConnection(clientID);
this.durableSubscriberConn = connection;
}
// boolean transacted = (Boolean) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_TRANSACTED);
// int acknowledgeMode = (Integer) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_ACKNOWLEDGEMODE);
boolean transacted = queue.isTransacted();
int acknowledgeMode = queue.getAcknowledgeMode();
session = connection.createSession(transacted, acknowledgeMode);
Destination destination = null;
if (ChannelType.Queue == queue.getType()) {
destination = session.createQueue(queue.getName());
typeStr = "queue";
} else if (ChannelType.Topic == queue.getType()) {
destination = session.createTopic(queue.getName());
typeStr = "topic";
}
MessageConsumer messageConsumer = null;
if (!durableSubscribers) {
messageConsumer = session.createConsumer(destination);
} else {
messageConsumer = session.createDurableSubscriber((Topic) destination, clientID);
}
running = true;
// boolean asynReceiveMessages = (Boolean) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_ASYNRECEIVEMESSAGES);
boolean asynReceiveMessages = queue.isAsynReceive();
if (asynReceiveMessages) {
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
// TODO 如果是session.commit();, 是否需要使用同步关键字, 防止提交别的线程的东西
// System.out.println("####################:" + Thread.currentThread().getName());
processMessage(message);
} catch (Throwable e) {
log.error("listen to the [" + queue.getName() + "] error.", e);
}
}
});
} else {
// long receiveTimeout = (Long) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_RECEIVETIMEOUT);
long receiveTimeout = queue.getReceiveTimeout();
startSyncReceiveThread(messageConsumer, receiveTimeout);
}
}