类org.apache.spark.sql.expressions.MutableAggregationBuffer源码实例Demo

下面列出了怎么用org.apache.spark.sql.expressions.MutableAggregationBuffer的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: BigDataPlatform   文件: GroupConcatDistinctUDAF.java
/**
 * 更新
 * 可以认为是,一个一个地将组内的字段值传递进来
 * 实现拼接的逻辑
 */
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
	// 缓冲中的已经拼接过的城市信息串
	String bufferCityInfo = buffer.getString(0);
	// 刚刚传递进来的某个城市信息
	String cityInfo = input.getString(0);
	
	// 在这里要实现去重的逻辑
	// 判断:之前没有拼接过某个城市信息,那么这里才可以接下去拼接新的城市信息
	if(!bufferCityInfo.contains(cityInfo)) {
		if("".equals(bufferCityInfo)) {
			bufferCityInfo += cityInfo;
		} else {
			// 比如1:北京
			// 1:北京,2:上海
			bufferCityInfo += "," + cityInfo;
		}
		
		buffer.update(0, bufferCityInfo);  
	}
}
 
源代码2 项目: BigDataPlatform   文件: GroupConcatDistinctUDAF.java
/**
 * 合并
 * update操作,可能是针对一个分组内的部分数据,在某个节点上发生的
 * 但是可能一个分组内的数据,会分布在多个节点上处理
 * 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来
 */
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
	String bufferCityInfo1 = buffer1.getString(0);
	String bufferCityInfo2 = buffer2.getString(0);
	
	for(String cityInfo : bufferCityInfo2.split(",")) {
		if(!bufferCityInfo1.contains(cityInfo)) {
			if("".equals(bufferCityInfo1)) {
				bufferCityInfo1 += cityInfo;
			} else {
				bufferCityInfo1 += "," + cityInfo;
			}
			}
	}
	
	buffer1.update(0, bufferCityInfo1);  
}
 
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
    //TODO: only implemented for ACTIVE and COMPLETED state so far
    String value = BpmnaiVariables.PROCESS_STATE_ACTIVE;
    if (!buffer1.isNullAt(0) && !buffer2.isNullAt(0)) {
        String b1 = buffer1.getString(0);
        String b2 = buffer2.getString(0);

        if(b1.equals(BpmnaiVariables.PROCESS_STATE_COMPLETED)){
            value = b1;
        } else {
            if(b2.equals(BpmnaiVariables.PROCESS_STATE_COMPLETED)) {
                value = BpmnaiVariables.PROCESS_STATE_COMPLETED;
            }
        }
    } else if(!buffer1.isNullAt(0)){
        value = buffer2.getString(0);
    } else {
        value = buffer1.getString(0);
    }
    buffer1.update(0, value);
}
 
/**
 * Updates the given aggregation buffer `buffer` with new input data from `input`.
 *
 * @param buffer buffer to update.
 * @param input  input to update with.
 */
public void update(final MutableAggregationBuffer buffer, final Row input) {
  if (!input.isNullAt(0)) {
    long updatedSum = buffer.getLong(0) + input.getLong(0);
    long updatedCount = buffer.getLong(1) + 1;
    buffer.update(0, updatedSum);
    buffer.update(1, updatedCount);
  }
}
 
/**
 * Merges two aggregation buffers and stores the updated buffer values back to `buffer1`.
 *
 * @param buffer1 first buffer.
 * @param buffer2 second buffer.
 */
public void merge(final MutableAggregationBuffer buffer1, final Row buffer2) {
  long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
  long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
  buffer1.update(0, mergedSum);
  buffer1.update(1, mergedCount);
}
 
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
    //TODO: only implemented for ACTIVE and COMPLETED state so far
    if (!input.isNullAt(0)) {
        String currentValue = (buffer.size() == 0 || buffer.getString(0) == null ? BpmnaiVariables.PROCESS_STATE_ACTIVE : buffer.getString(0));

        String value = currentValue;
        if(!currentValue.equals(BpmnaiVariables.PROCESS_STATE_COMPLETED)){
            if(input.getString(0).equals(BpmnaiVariables.PROCESS_STATE_COMPLETED)) {
                buffer.update(0, BpmnaiVariables.PROCESS_STATE_COMPLETED);
            }
        }
    }
}
 
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
    if (!input.isNullAt(0)) {
        String currentValue = (buffer.size() == 0 || buffer.getString(0) == null ? "" : buffer.getString(0));
        String value = (currentValue.equals("") ? input.getString(0) : currentValue);
        buffer.update(0, value);
    }
}
 
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
    String value = "";
    if (!buffer1.isNullAt(0) && !buffer2.isNullAt(0)) {
        String b1 = buffer1.getString(0);
        String b2 = buffer2.getString(0);
        value = (b1.equals("") ? b2 : b1);
    } else if(!buffer1.isNullAt(0)){
        value = buffer2.getString(0);
    } else {
        value = buffer1.getString(0);
    }
    buffer1.update(0, value);
}
 
源代码9 项目: nemo   文件: JavaUserDefinedUntypedAggregation.java
/**
 * Updates the given aggregation buffer `buffer` with new input data from `input`.
 *
 * @param buffer buffer to update.
 * @param input input to update with.
 */
public void update(final MutableAggregationBuffer buffer, final Row input) {
  if (!input.isNullAt(0)) {
    long updatedSum = buffer.getLong(0) + input.getLong(0);
    long updatedCount = buffer.getLong(1) + 1;
    buffer.update(0, updatedSum);
    buffer.update(1, updatedCount);
  }
}
 
源代码10 项目: nemo   文件: JavaUserDefinedUntypedAggregation.java
/**
 * Merges two aggregation buffers and stores the updated buffer values back to `buffer1`.
 *
 * @param buffer1 first buffer.
 * @param buffer2 second buffer.
 */
public void merge(final MutableAggregationBuffer buffer1, final Row buffer2) {
  long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
  long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
  buffer1.update(0, mergedSum);
  buffer1.update(1, mergedCount);
}
 
源代码11 项目: BigDataPlatform   文件: GroupConcatDistinctUDAF.java
/**
 * 初始化
 * 可以认为是,你自己在内部指定一个初始的值
 */
@Override
public void initialize(MutableAggregationBuffer buffer) {
	buffer.update(0, "");
}
 
源代码12 项目: bpmn.ai   文件: ProcessStatesAggregationFunction.java
@Override
public void initialize(MutableAggregationBuffer buffer) {
    buffer.update(0, BpmnaiVariables.PROCESS_STATE_ACTIVE);
}
 
@Override
public void initialize(MutableAggregationBuffer buffer) {
    buffer.update(0, "");
}
 
@Override
public void initialize(MutableAggregationBuffer bufferAgg) {
	bufferAgg.update(0, 0.0);
	bufferAgg.update(1, 0.0);		
}
 
@Override
public void update(MutableAggregationBuffer bufferAgg, Row row) {
	bufferAgg.update(0, bufferAgg.getDouble(0)+row.getDouble(0));
	bufferAgg.update(1, bufferAgg.getDouble(1)+2.0);
}
 
@Override
public void merge(MutableAggregationBuffer bufferAgg, Row row) {
	bufferAgg.update(0, bufferAgg.getDouble(0)+row.getDouble(0));
	bufferAgg.update(1, bufferAgg.getDouble(1)+row.getDouble(1));		
}
 
/**
 * Initializes the given aggregation buffer.
 * The buffer itself is a `Row` that in addition to standard methods like retrieving a value at an
 * index (e.g., get(), getBoolean()), provides the opportunity to update its values.
 * Note that arrays and maps inside the buffer are still immutable.
 *
 * @param buffer buffer to initialize.
 */
public void initialize(final MutableAggregationBuffer buffer) {
  buffer.update(0, 0L);
  buffer.update(1, 0L);
}
 
源代码18 项目: nemo   文件: JavaUserDefinedUntypedAggregation.java
/**
 * Initializes the given aggregation buffer.
 * The buffer itself is a `Row` that in addition to standard methods like retrieving a value at an
 * index (e.g., get(), getBoolean()), provides the opportunity to update its values.
 * Note that arrays and maps inside the buffer are still immutable.
 *
 * @param buffer buffer to initialize.
 */
public void initialize(final MutableAggregationBuffer buffer) {
  buffer.update(0, 0L);
  buffer.update(1, 0L);
}
 
 类所在包
 类方法
 同包方法