下面列出了org.apache.commons.lang3.StringUtils#joinWith ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private EmitWarp<ConsumerRecord<String, byte[]>> obtainEmitWarp(ConsumerRecord<String, byte[]> record, String key, String namespace, long offset) {
String tempKey = key;
if (!StringUtils.equals("ctrl", key)) {
// eg. data_increment_heartbeat.oracle.db4_3.AMQUE.T_CONTACT_INFO.3.0.0.1531709399507|1531709398879|ok.wh_placeholder
// data_increment_data.oracle.db4_3.AMQUE.T_CONTACT_INFO.3.0.0.1531709399889.wh_placeholder
// String[] arr = ArrayUtils.insert(5, StringUtils.split(key, "."), inner.topologyId);
// tempKey = StringUtils.joinWith(".", arr);
String[] arr = StringUtils.split(key, ".");
// arr[2] = StringUtils.joinWith("!", arr[2], inner.topologyId);
arr[2] = StringUtils.joinWith("!", arr[2], inner.alias);
tempKey = StringUtils.joinWith(".", arr);
}
EmitWarp<ConsumerRecord<String, byte[]>> data = new EmitWarp<>(tempKey);
data.setData(record);
data.setTableId(readSpoutConfig.getNamespaceTableIdPair().get(namespace));
data.setOffset(offset);
return data;
}
public void rerunTopology(String topologyCode, String ctrlMsg) {
KafkaProducer<String, byte[]> producer = null;
try {
String topic = StringUtils.joinWith("_", topologyCode, "ctrl");
Properties props = zkService.getProperties(KeeperConstants.KEEPER_CTLMSG_PRODUCER_CONF);
Properties globalConf = zkService.getProperties(KeeperConstants.GLOBAL_CONF);
props.setProperty(GLOBAL_CONF_KEY_BOOTSTRAP_SERVERS, globalConf.getProperty(GLOBAL_CONF_KEY_BOOTSTRAP_SERVERS));
if (StringUtils.equals(SecurityConfProvider.getSecurityConf(zkService), Constants.SECURITY_CONFIG_TRUE_VALUE)) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
}
producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, byte[]>(topic, ctrlMsg.getBytes()), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
}
});
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (producer != null) producer.close();
}
}
private String obtainStatMessage(Stat statVo) {
String type = StringUtils.joinWith("_", "ROUTER_TYPE", inner.topologyId);
String dsName = statVo.getDsName();
if (StringUtils.isNotBlank((String) cache.getObject(dsName))) {
dsName = (String) cache.getObject(dsName);
}
StatMessage sm = new StatMessage(dsName, statVo.getSchemaName(), statVo.getTableName(), type);
Long curTime = System.currentTimeMillis();
sm.setCheckpointMS(statVo.getTime());
sm.setTxTimeMS(statVo.getTxTime());
sm.setLocalMS(curTime);
sm.setLatencyMS(curTime - statVo.getTime());
sm.setCount(statVo.getSuccessCnt());
sm.setErrorCount(statVo.getErrorCnt());
return sm.toJSONString();
}
/**
* User's home scripts directory. Some tests may override the default using the
* SystemUtilities.USER_SCRIPTS_DIR system property.
* @return the path to the default user scripts directory
*/
private static String buildUserScriptsDirectory() {
String root = System.getProperty("user.home");
String override = System.getProperty(GhidraScriptConstants.USER_SCRIPTS_DIR_PROPERTY);
if (override != null) {
Msg.debug(GhidraScriptUtil.class, "Using Ghidra script source directory: " + root);
root = override;
}
String sourcePath = StringUtils.joinWith(File.separator, root, SCRIPTS_SUBDIR_NAME);
return sourcePath;
}
public static void loadAbortTable(String dsName, Map<String, String> abortTableToTopicMap) {
IProcessorLogDao dao = new ProcessorLogDao();
List<RuleInfo> ruleInfos = dao.loadAbortTableRuleInfo(Constants.DBUS_CONFIG_DB_KEY, dsName);
for (RuleInfo ri : ruleInfos) {
String tableNameSpace = StringUtils.joinWith("|", ri.getDsName(), ri.getSchemaName(), ri.getTableName(), ri.getVersion());
if (!abortTableToTopicMap.containsKey(tableNameSpace)) {
abortTableToTopicMap.put(tableNameSpace, ri.getOutputTopic());
logger.info("abortTableToTopicMap key: {}, value: {}", tableNameSpace, ri.getOutputTopic());
}
}
}
private void initAckWindows() {
this.ackWindows = new AckWindows(1000, new AckCallBack() {
@Override
public void ack(Ack ackVo) {
String key = StringUtils.joinWith("_", ackVo.getTopic(), String.valueOf(ackVo.getPartition()));
TopicPartition tp = readSpoutConfig.getTopicPartitionMap().get(key);
if (tp != null) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(tp, new OffsetAndMetadata(ackVo.getOffset()));
logger.info("call consumer commitSync topic:{}, partition:{}, offset:{}",
ackVo.getTopic(), ackVo.getPartition(), ackVo.getOffset());
consumer.commitAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
logger.error(String.format("commitAsync fail: %s", JSON.toJSONString(offsets)), exception);
}
}
});
} else {
logger.warn(String.format("do not find key:%s of TopicPartition", key));
}
}
@Override
public void fail(Ack ackVo) {
String key = StringUtils.joinWith("_", ackVo.getTopic(), String.valueOf(ackVo.getPartition()));
TopicPartition tp = readSpoutConfig.getTopicPartitionMap().get(key);
if (tp != null) {
logger.info("call consumer seek topic:{}, partition:{}, offset:{}",
ackVo.getTopic(), ackVo.getPartition(), ackVo.getOffset());
consumer.seek(tp, ackVo.getOffset());
} else {
logger.warn(String.format("do not find key:%s of TopicPartition", key));
}
}
});
}
private void generateTopologyConfig(String projectCode, String topologyCode, String strTopoConf) {
StringReader sr = null;
BufferedReader br = null;
ByteArrayOutputStream bros = null;
OutputStreamWriter osw = null;
BufferedWriter bw = null;
try {
sr = new StringReader(strTopoConf);
br = new BufferedReader(sr);
bros = new ByteArrayOutputStream(strTopoConf.getBytes().length);
osw = new OutputStreamWriter(bros);
bw = new BufferedWriter(osw);
String line = br.readLine();
while (line != null) {
if (StringUtils.contains(line, "placeholder"))
line = StringUtils.replace(line, "placeholder", topologyCode);
bw.write(line);
bw.newLine();
line = br.readLine();
}
bw.flush();
String path = StringUtils.joinWith("/", Constants.ROUTER_ROOT, projectCode, topologyCode + "-" + Constants.ROUTER, "config.properties");
if (!zkService.isExists(path)) {
zkService.createNode(path, bros.toByteArray());
} else {
zkService.setData(path, bros.toByteArray());
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
IOUtils.closeQuietly(bw);
IOUtils.closeQuietly(osw);
IOUtils.closeQuietly(bros);
IOUtils.closeQuietly(br);
IOUtils.closeQuietly(sr);
}
}
public String obtainRouterTopologyConfigTemplate() {
String strTopoConf = StringUtils.EMPTY;
ByteArrayInputStream bais = null;
InputStreamReader isr = null;
BufferedReader br = null;
ByteArrayOutputStream bros = null;
OutputStreamWriter osw = null;
BufferedWriter bw = null;
try {
String path = StringUtils.joinWith("/", Constants.DBUS_CONF_TEMPLATE_ROOT, "Router/placeholder-router", "config.properties");
byte[] data = zkService.getData(path);
bais = new ByteArrayInputStream(data);
isr = new InputStreamReader(bais);
br = new BufferedReader(isr);
bros = new ByteArrayOutputStream(data.length);
osw = new OutputStreamWriter(bros);
bw = new BufferedWriter(osw);
String line = br.readLine();
while (line != null) {
bw.write(line);
bw.newLine();
line = br.readLine();
}
bw.flush();
strTopoConf = bros.toString();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
IOUtils.closeQuietly(bw);
IOUtils.closeQuietly(osw);
IOUtils.closeQuietly(bros);
IOUtils.closeQuietly(br);
IOUtils.closeQuietly(isr);
IOUtils.closeQuietly(bais);
}
return strTopoConf;
}
private boolean stopTopologyTable(JSONObject ctrl) throws Exception {
boolean ret = false;
JSONObject payload = ctrl.getJSONObject("payload");
String dsName = payload.getString("dsName");
String schemaName = payload.getString("schemaName");
String tableName = payload.getString("tableName");
Integer projectTopologyTableId = payload.getInteger("projectTopoTableId");
String namespace = StringUtils.joinWith(".", dsName, schemaName, tableName);
List<Sink> sinks = context.getSinks();
if (sinks != null) {
Sink delSink = null;
for (Sink sink : sinks) {
String wkNs = StringUtils.joinWith(".", sink.getDsName(), sink.getSchemaName(), sink.getTableName());
if (StringUtils.equals(wkNs, namespace)) {
delSink = sink;
break;
}
}
if (delSink != null) {
sinks.remove(delSink);
boolean isUsing = context.getInner().dbHelper.isUsingTopic(projectTopologyTableId);
if (isUsing) {
logger.info("table:{} of out put topic is using, don't need remove.", projectTopologyTableId);
} else {
context.getSpout().initConsumer();
ret = true;
}
} else {
logger.error("don't find name space:{} target sink, stop table fail.", namespace);
}
}
logger.info("monitor spout stop topology table:{} completed.", projectTopologyTableId);
return ret;
}
@Override
public void execute(Tuple input) {
try {
EmitWarp<?> data = (EmitWarp<?>) input.getValueByField("data");
if (data.isCtrl()) {
String ctrl = (String) data.getData();
processCtrlMsg(ctrl);
} else if (data.isStat()) {
String ns = StringUtils.joinWith(".", data.getNameSpace(), data.getHbTime());
statWindows.add(ns, (Stat) data.getData());
// 由于表采用的是分组分发,所有表的数据都到同一个encode bolt,所以在这里不需要进行汇总计算
// 拿到统计信息后直接发送就可以了,如果进行汇总在多个encode bolt的情况就出现没有统计的错误
/*Stat statVo = statWindows.tryPoll(ns, encodeBoltTaskIdSum);
if (statVo != null) {
logger.info("encodeBoltTaskIdSum: {}, stat vo: {}", encodeBoltTaskIdSum, JSON.toJSONString(statVo));
String stat = obtainStatMessage(statVo);
logger.info("emit stat data: {}", stat);
emitStatData(stat, ns, input);
}*/
String stat = obtainStatMessage((Stat) data.getData());
emitStatData(stat, ns, input, data.getOffset());
}
logger.info("stat bolt ack {}", data.getOffset());
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
logger.error("stat bolt execute error.", e);
collector.reportError(e);
}
}
public String getKey() {
return StringUtils.joinWith("_", host, table, timestamp);
}
@Override
public Object process(Object obj, Supplier ... suppliers) {
Object ret = new Object();
ConsumerRecord<String, byte[]> record = (ConsumerRecord) obj;
logger.info("topic:{}, key:{}, offset:{}", record.topic(), record.key(), record.offset());
if (!isBelong(record.key()))
return ret;
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
try {
if (StringUtils.isEmpty(record.key())) {
logger.warn("topic:{}, offset:{}, key is empty", record.topic(), record.offset());
return ret;
}
// data_increment_heartbeat.mysql.mydb.cbm.t1#router_test_s_r5.6.0.0.1541041451552|1541041451550|ok.wh_placeholder
String[] vals = StringUtils.split(record.key(), ".");
if (vals == null || vals.length != 10) {
logger.error("receive heartbeat key is error. topic:{}, offset:{}, key:{}", record.topic(), record.offset(), record.key());
return ret;
}
long cpTime = 0L;
long txTime = 0L;
boolean isTableOK = true;
String originDsName = StringUtils.EMPTY;
if (StringUtils.contains(vals[8], "|")) {
String times[] = StringUtils.split(vals[8], "|");
cpTime = Long.valueOf(times[0]);
txTime = Long.valueOf(times[1]);
// 表明其实表已经abort了,但心跳数据仍然, 这种情况,只发送stat,不更新zk
if ((times.length == 3 || times.length == 4) && times[2].equals("abort")) {
isTableOK = false;
logger.warn("data abort. key:{}", record.key());
}
if (times.length == 4)
originDsName = times[3];
} else {
isTableOK = false;
logger.error("it should not be here. key:{}", record.key());
}
if (!isTableOK)
return ret;
String dsName = vals[2];
if (StringUtils.contains(vals[2], "!")) {
dsName = StringUtils.split(vals[2], "!")[0];
} else {
isTableOK = false;
logger.error("it should not be here. key:{}", record.key());
}
if (StringUtils.isNoneBlank(originDsName))
dsName = originDsName;
String schemaName = vals[3];
String tableName = vals[4];
if (!isTableOK)
return ret;
// String dsPartition = vals[6];
String ns = StringUtils.joinWith(".", dsName, schemaName, tableName);
String path = StringUtils.joinWith("/", Constants.HEARTBEAT_PROJECT_MONITOR,
context.getInner().projectName, context.getInner().topologyId, ns);
// {"node":"/DBus/HeartBeat/ProjectMonitor/db4new/AMQUE/T_USER/0","time":1531180006336,"type":"checkpoint","txTime":1531180004040}
Packet packet = new Packet();
packet.setNode(path);
packet.setType("checkpoint");
packet.setTime(cpTime);
packet.setTxTime(txTime);
cache.put(path, packet);
logger.info("put cache path:{}", path);
if (isTimeUp(baseFlushTime)) {
baseFlushTime = System.currentTimeMillis();
logger.info("router update zk stat :{}", baseFlushTime);
flushCache();
}
ack(topicPartition, record.offset());
} catch (Exception e) {
logger.error("consumer record processor process fail.", e);
fail(topicPartition, record.offset());
}
return ret;
}
private String obtainCtrlTopic() {
return StringUtils.joinWith("_", context.getInner().topologyId, "ctrl");
}
private boolean firstStep(Map<String, Object> infoMap, long time, Map<String, Boolean> retMap) throws Exception {
boolean isOk = true;
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
String dsName = (String) infoMap.get("ds_name");
String schemaName = (String) infoMap.get("schema_name");
String tableName = (String) infoMap.get("table_name");
String dsPartition = (String) infoMap.get("ds_partition");
String node = StringUtils.joinWith("/",
"/DBus/HeartBeat/Monitor",
dsName, schemaName, tableName, dsPartition);
Map<String, Object> packet = new HashMap<>();
packet.put("node", node);
packet.put("time", time);
packet.put("type", "checkpoint");
packet.put("txTime", time);
String dsType = (String) infoMap.get("ds_type");
String url = (String) infoMap.get("master_url");
String schema = (String) infoMap.get("dbus_user");
String password = (String) infoMap.get("dbus_pwd");
conn = DBUtils.getConn(url, schema, password);
ps = conn.prepareStatement(getSql(dsType));
int idx = 1;
if (StringUtils.equalsIgnoreCase("mysql", dsType)) {
ps.setString(idx++, dsName);
ps.setString(idx++, schemaName);
ps.setString(idx++, tableName);
ps.setString(idx++, JSON.toJSONString(packet));
ps.setString(idx++, DateUtil.convertLongToStr4Date(System.currentTimeMillis()));
ps.setString(idx++, DateUtil.convertLongToStr4Date(System.currentTimeMillis()));
} else if (StringUtils.equalsIgnoreCase("oracle", dsType)) {
ps.setString(idx++, dsName);
ps.setString(idx++, schemaName);
ps.setString(idx++, tableName);
ps.setString(idx++, JSON.toJSONString(packet));
}
int cnt = ps.executeUpdate();
if (cnt != 1) {
isOk = false;
}
} catch (Exception e) {
isOk = false;
retMap.put("status", false);
logger.error("auto check table first step error.", e);
} finally {
DBUtils.close(rs);
DBUtils.close(ps);
DBUtils.close(conn);
}
return isOk;
}
public int delete(int id) {
Project project = this.select(id);
if (project != null) {
logger.info("********* delete project start ,projectId:{},projectName;{} *********",
project.getId(), project.getProjectName());
try {
String path = StringUtils.joinWith("/", Constants.ROUTER_ROOT, project.getProjectName());
if (zkService.isExists(path)) {
zkService.rmr(path);
logger.info("delete project znode:{}", path);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
long start = System.currentTimeMillis();
encodeColumnsMapper.deleteByProjectId(id);
long start1 = System.currentTimeMillis();
logger.info("delete table t_project_topo_table_encode_output_columns success,cost time {}", start1 - start);
metaVersionMapper.deleteByProjectId(id);
long start2 = System.currentTimeMillis();
logger.info("delete table t_project_topo_table_meta_version success,cost time {}", start2 - start1);
encodeHintMapper.deleteByProjectId(id);
long start3 = System.currentTimeMillis();
logger.info("delete table t_project_encode_hint success,cost time {}", start3 - start2);
resourceMapper.deleteByProjectId(id);
long start4 = System.currentTimeMillis();
logger.info("delete table t_project_resource success,cost time {}", start4 - start3);
projectTopoTableMapper.deleteByProjectId(id);
long start5 = System.currentTimeMillis();
logger.info("delete table t_project_topo_table success,cost time {}", start5 - start4);
mapper.deleteByPrimaryKey(id);
long start6 = System.currentTimeMillis();
logger.info("delete table t_project_topo,t_project_sink,t_project_sink,t_project_user success,cost time {}", start6 - start5);
logger.info("******* delete project end ,projectId:{},projectName;{} cost time {} ******", start6 - start);
}
return 0;
}
private String bildNSKey(DBusConsumerRecord<String, byte[]> record) {
String[] vals = StringUtils.split(record.key(), ".");
return StringUtils.joinWith(".", vals[2], vals[3], vals[4]);
}
private String bildKey(DBusConsumerRecord<String, byte[]> record) {
String[] vals = StringUtils.split(record.key(), ".");
return StringUtils.joinWith(".", vals[2], vals[3], vals[4]);
}
@Override
public final AuthenticationResponse authenticate(final LoginCredentials credentials) throws InvalidLoginCredentialsException, IdentityAccessException {
if (provider == null) {
throw new IdentityAccessException("The Kerberos authentication provider is not initialized.");
}
try {
final String rawPrincipal = credentials.getUsername();
final String parsedRealm = KerberosPrincipalParser.getRealm(rawPrincipal);
// Apply default realm from KerberosIdentityProvider's configuration specified in login-identity-providers.xml if a principal without a realm was given
// Otherwise, the default realm configured from the krb5 configuration specified in the nifi.kerberos.krb5.file property will end up being used
boolean realmInRawPrincipal = StringUtils.isNotBlank(parsedRealm);
final String identity;
if (realmInRawPrincipal) {
// there's a realm already in the given principal, use it
identity = rawPrincipal;
logger.debug("Realm was specified in principal {}, default realm was not added to the identity being authenticated", rawPrincipal);
} else if (StringUtils.isNotBlank(defaultRealm)) {
// the value for the default realm is not blank, append the realm to the given principal
identity = StringUtils.joinWith("@", rawPrincipal, defaultRealm);
logger.debug("Realm was not specified in principal {}, default realm {} was added to the identity being authenticated", rawPrincipal, defaultRealm);
} else {
// otherwise, use the given principal, which will use the default realm as specified in the krb5 configuration
identity = rawPrincipal;
logger.debug("Realm was not specified in principal {}, default realm is blank and was not added to the identity being authenticated", rawPrincipal);
}
// Perform the authentication
final UsernamePasswordAuthenticationToken token = new UsernamePasswordAuthenticationToken(identity, credentials.getPassword());
if (logger.isDebugEnabled()) {
logger.debug("Created authentication token for principal {} with name {} and is authenticated {}", token.getPrincipal(), token.getName(), token.isAuthenticated());
}
final Authentication authentication = provider.authenticate(token);
if (logger.isDebugEnabled()) {
logger.debug("Ran provider.authenticate() and returned authentication for " +
"principal {} with name {} and is authenticated {}", authentication.getPrincipal(), authentication.getName(), authentication.isAuthenticated());
}
return new AuthenticationResponse(authentication.getName(), identity, expiration, issuer);
} catch (final AuthenticationException e) {
throw new InvalidLoginCredentialsException(e.getMessage(), e);
}
}
private String obtainCtrlTopic() {
return StringUtils.joinWith("_", inner.topologyId, "ctrl");
}
public String obtainKey(Ack ackVo) {
return StringUtils.joinWith("_", ackVo.getTopic(), ackVo.getPartition());
}