下面列出了org.apache.commons.lang3.tuple.MutablePair#setRight ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Pair<String, String> prefixSplitter(String input) {
MutablePair<String, String> result = new MutablePair<>("", input);
if (input.startsWith("Device")) {
result.setLeft("Device");
result.setRight(input.replaceFirst("Device", ""));
} else if (input.startsWith("OperatingSystem")) {
result.setLeft("Operating System");
result.setRight(input.replaceFirst("OperatingSystem", ""));
} else if (input.startsWith("LayoutEngine")) {
result.setLeft("Layout Engine");
result.setRight(input.replaceFirst("LayoutEngine", ""));
} else if (input.startsWith("Agent")) {
result.setLeft("Agent");
result.setRight(input.replaceFirst("Agent", ""));
}
return result;
}
/**
* Adds or updates the query param.
* Format: key=value
*
* @param param
* @return
*/
public QueryParamsBuilder add( String param )
{
String[] splited = param.split( "=" );
MutablePair pair = getByKey( splited[0] );
if ( pair != null )
{
pair.setRight( splited[1] );
return this;
}
queryParams.add( MutablePair.of( splited[0], splited[1] ) );
return this;
}
@Override
public CommentsNodeImpl getNode(String path)
{
MutablePair<String, CommentsNodeImpl> nodePair = this.dataMap.get(path);
CommentsNodeImpl node = (nodePair == null) ? null : nodePair.getRight();
if (node == null)
{
MutablePair<String, CommentsNodeImpl> anyNodePair = this.dataMap.get(ANY);
node = (anyNodePair == null) ? null : anyNodePair.getRight();
if (node == null)
{
CommentsNodeImpl commentsNode = new CommentsNodeImpl(this);
if (nodePair != null)
{
nodePair.setRight(commentsNode);
}
else
{
this.dataMap.put(path, new MutablePair<>(null, commentsNode));
}
return commentsNode;
}
return node;
}
return node;
}
/**
* Implement InputOperator Interface.
*/
@Override
public void emitTuples()
{
if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
return;
}
int count = consumer.getQueueSize();
if (maxTuplesPerWindow > 0) {
count = Math.min(count, maxTuplesPerWindow - emitCount);
}
for (int i = 0; i < count; i++) {
Pair<String, Record> data = consumer.pollRecord();
String shardId = data.getFirst();
String recordId = data.getSecond().getSequenceNumber();
emitTuple(data);
MutablePair<String, Integer> shardOffsetAndCount = currentWindowRecoveryState.get(shardId);
if (shardOffsetAndCount == null) {
currentWindowRecoveryState.put(shardId, new MutablePair<String, Integer>(recordId, 1));
} else {
shardOffsetAndCount.setRight(shardOffsetAndCount.right + 1);
}
shardPosition.put(shardId, recordId);
}
emitCount += count;
}
public static MutablePair<Text, Boolean> transportOneRecord(
Record record, char fieldDelimiter, List<Configuration> columnsConfiguration, TaskPluginCollector taskPluginCollector) {
MutablePair<List<Object>, Boolean> transportResultList = transportOneRecord(record,columnsConfiguration,taskPluginCollector);
//保存<转换后的数据,是否是脏数据>
MutablePair<Text, Boolean> transportResult = new MutablePair<Text, Boolean>();
transportResult.setRight(false);
if(null != transportResultList){
Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter));
transportResult.setRight(transportResultList.getRight());
transportResult.setLeft(recordResult);
}
return transportResult;
}
public static VoxelShape getShape(Cuboid6 cuboid) {
VoxelShape shape = cuboidToShapeCache.getIfPresent(cuboid);
if (shape == null) {
shape = VoxelShapes.create(cuboid.min.x, cuboid.min.y, cuboid.min.z, cuboid.max.x, cuboid.max.y, cuboid.max.z);
cuboidToShapeCache.put(cuboid, shape);
MutablePair<AxisAlignedBB, Cuboid6> entry = getReverse(shape);
if (entry.getRight() == null) {
entry.setRight(cuboid);
}
}
return shape;
}
public static Cuboid6 getCuboid(VoxelShape shape) {
MutablePair<AxisAlignedBB, Cuboid6> entry = getReverse(shape);
if (entry.getRight() == null) {
entry.setRight(new Cuboid6(// I hope this is okay, don't want to rely on AABB cache.
shape.getStart(Direction.Axis.X), shape.getStart(Direction.Axis.Y), shape.getStart(Direction.Axis.Z),//
shape.getEnd(Direction.Axis.X), shape.getEnd(Direction.Axis.Y), shape.getEnd(Direction.Axis.Z)//
));
}
return entry.getRight();
}
@Override
public void emitTuples()
{
if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
return;
}
int count = consumer.messageSize() + ((pendingMessage != null) ? 1 : 0);
if (maxTuplesPerWindow > 0) {
count = Math.min(count, maxTuplesPerWindow - emitCount);
}
KafkaConsumer.KafkaMessage message = null;
for (int i = 0; i < count; i++) {
if (pendingMessage != null) {
message = pendingMessage;
pendingMessage = null;
} else {
message = consumer.pollMessage();
}
// If the total size transmitted in the window will be exceeded don't transmit anymore messages in this window
// Make an exception for the case when no message has been transmitted in the window and transmit at least one
// message even if the condition is violated so that the processing doesn't get stuck
if ((emitCount > 0) && ((maxTotalMsgSizePerWindow - emitTotalMsgSize) < message.msg.size())) {
pendingMessage = message;
break;
}
emitTuple(message);
emitCount++;
emitTotalMsgSize += message.msg.size();
offsetStats.put(message.kafkaPart, message.offSet);
MutablePair<Long, Integer> offsetAndCount = currentWindowRecoveryState.get(message.kafkaPart);
if (offsetAndCount == null) {
currentWindowRecoveryState.put(message.kafkaPart, new MutablePair<Long, Integer>(message.offSet, 1));
} else {
offsetAndCount.setRight(offsetAndCount.right + 1);
}
}
}
@Override
public MutablePair<Double, Long> accumulate(MutablePair<Double, Long> accu, Double input)
{
accu.setLeft(accu.getLeft() * ((double)accu.getRight() / (accu.getRight() + 1)) + input / (accu.getRight() + 1));
accu.setRight(accu.getRight() + 1);
return accu;
}
@Override
public MutablePair<Double, Long> merge(MutablePair<Double, Long> accu1, MutablePair<Double, Long> accu2)
{
accu1.setLeft(accu1.getLeft() * ((double)accu1.getRight() / accu1.getRight() + accu2.getRight()) +
accu2.getLeft() * ((double)accu2.getRight() / accu1.getRight() + accu2.getRight()));
accu1.setRight(accu1.getRight() + accu2.getRight());
return accu1;
}
protected void reportHandled(final String anyType, final String key) {
MutablePair<Integer, String> pair = handled.get(anyType);
if (pair == null) {
pair = MutablePair.of(0, null);
handled.put(anyType, pair);
}
pair.setLeft(pair.getLeft() + 1);
pair.setRight(key);
}
@Override
public void reportHandled(final ObjectClass objectClass, final Name name) {
MutablePair<Integer, String> pair = handled.get(objectClass);
if (pair == null) {
pair = MutablePair.of(0, null);
handled.put(objectClass, pair);
}
pair.setLeft(pair.getLeft() + 1);
pair.setRight(name.getNameValue());
}
/**
* @return MutablePair<String, Boolean> left: formated data line; right: is
* dirty data or not, true means meeting dirty data
* */
public static MutablePair<String, Boolean> transportOneRecord(
Record record, String nullFormat, String dateFormat,
char fieldDelimiter, String fileFormat,
TaskPluginCollector taskPluginCollector) {
// warn: default is null
if (null == nullFormat) {
nullFormat = "null";
}
MutablePair<String, Boolean> transportResult = new MutablePair<String, Boolean>();
transportResult.setRight(false);
List<String> splitedRows = new ArrayList<String>();
int recordLength = record.getColumnNumber();
if (0 != recordLength) {
Column column;
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
if (null != column.getRawData()) {
boolean isDateColumn = column instanceof DateColumn;
if (!isDateColumn) {
splitedRows.add(column.asString());
} else {
// if (null != dateFormat) {
if (StringUtils.isNotBlank(dateFormat)) {
try {
SimpleDateFormat dateParse = new SimpleDateFormat(
dateFormat);
splitedRows.add(dateParse.format(column
.asDate()));
} catch (Exception e) {
// warn: 此处认为似乎脏数据
String message = String.format(
"使用您配置的格式 [%s] 转换 [%s] 错误.",
dateFormat, column.asString());
taskPluginCollector.collectDirtyRecord(record,
message);
transportResult.setRight(true);
break;
}
} else {
splitedRows.add(column.asString());
}
}
} else {
// warn: it's all ok if nullFormat is null
splitedRows.add(nullFormat);
}
}
}
transportResult.setLeft(UnstructuredStorageWriterUtil
.doTransportOneRecord(splitedRows, fieldDelimiter, fileFormat));
return transportResult;
}
public static MutablePair<List<Object>, Boolean> transportOneRecord(
Record record,List<Configuration> columnsConfiguration,
TaskPluginCollector taskPluginCollector){
MutablePair<List<Object>, Boolean> transportResult = new MutablePair<List<Object>, Boolean>();
transportResult.setRight(false);
List<Object> recordList = Lists.newArrayList();
int recordLength = record.getColumnNumber();
if (0 != recordLength) {
Column column;
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
//todo as method
String rowData = column.getRawData() == null ? null : column.getRawData().toString();
if (null != column.getRawData() && StringUtils.isNotBlank(rowData)) {
SupportHiveDataType columnType = SupportHiveDataType.valueOf(
columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase());
//根据writer端类型配置做类型转换
try {
switch (columnType) {
case TINYINT:
recordList.add(Byte.valueOf(rowData));
break;
case SMALLINT:
recordList.add(Short.valueOf(rowData));
break;
case INT:
recordList.add(Integer.valueOf(rowData));
break;
case BIGINT:
recordList.add(column.asLong());
break;
case FLOAT:
recordList.add(Float.valueOf(rowData));
break;
case DOUBLE:
recordList.add(column.asDouble());
break;
case DECIMAL://decimal,added by lubiao
recordList.add(HiveDecimal.create(column.asBigDecimal()));
break;
case BINARY://binary,added by lubiao
recordList.add(new BytesWritable(column.asBytes()));
break;
case STRING:
case VARCHAR:
case CHAR:
recordList.add(column.asString());
break;
case BOOLEAN:
recordList.add(column.asBoolean());
break;
case DATE:
recordList.add(new java.sql.Date(column.asDate().getTime()));
break;
case TIMESTAMP:
recordList.add(new java.sql.Timestamp(column.asDate().getTime()));
break;
default:
ErrorRecord.addError(String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d]. 请修改表中该字段的类型或者不同步该字段.",
columnsConfiguration.get(i).getString(Key.NAME),
columnsConfiguration.get(i).getString(Key.TYPE)));
throw DataXException
.asDataXException(
HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d]. 请修改表中该字段的类型或者不同步该字段.",
columnsConfiguration.get(i).getString(Key.NAME),
columnsConfiguration.get(i).getString(Key.TYPE)));
}
} catch (Exception e) {
// warn: 此处认为脏数据
String message = String.format(
"字段类型转换错误:你目标字段为[%s]类型,实际字段值为[%s].",
columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData().toString());
taskPluginCollector.collectDirtyRecord(record, message);
transportResult.setRight(true);
break;
}
}else {
// warn: it's all ok if nullFormat is null
recordList.add(null);
}
}
}
transportResult.setLeft(recordList);
return transportResult;
}