下面列出了怎么用org.apache.spark.sql.expressions.MutableAggregationBuffer的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* 更新
* 可以认为是,一个一个地将组内的字段值传递进来
* 实现拼接的逻辑
*/
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
// 缓冲中的已经拼接过的城市信息串
String bufferCityInfo = buffer.getString(0);
// 刚刚传递进来的某个城市信息
String cityInfo = input.getString(0);
// 在这里要实现去重的逻辑
// 判断:之前没有拼接过某个城市信息,那么这里才可以接下去拼接新的城市信息
if(!bufferCityInfo.contains(cityInfo)) {
if("".equals(bufferCityInfo)) {
bufferCityInfo += cityInfo;
} else {
// 比如1:北京
// 1:北京,2:上海
bufferCityInfo += "," + cityInfo;
}
buffer.update(0, bufferCityInfo);
}
}
/**
* 合并
* update操作,可能是针对一个分组内的部分数据,在某个节点上发生的
* 但是可能一个分组内的数据,会分布在多个节点上处理
* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来
*/
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
String bufferCityInfo1 = buffer1.getString(0);
String bufferCityInfo2 = buffer2.getString(0);
for(String cityInfo : bufferCityInfo2.split(",")) {
if(!bufferCityInfo1.contains(cityInfo)) {
if("".equals(bufferCityInfo1)) {
bufferCityInfo1 += cityInfo;
} else {
bufferCityInfo1 += "," + cityInfo;
}
}
}
buffer1.update(0, bufferCityInfo1);
}
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
//TODO: only implemented for ACTIVE and COMPLETED state so far
String value = BpmnaiVariables.PROCESS_STATE_ACTIVE;
if (!buffer1.isNullAt(0) && !buffer2.isNullAt(0)) {
String b1 = buffer1.getString(0);
String b2 = buffer2.getString(0);
if(b1.equals(BpmnaiVariables.PROCESS_STATE_COMPLETED)){
value = b1;
} else {
if(b2.equals(BpmnaiVariables.PROCESS_STATE_COMPLETED)) {
value = BpmnaiVariables.PROCESS_STATE_COMPLETED;
}
}
} else if(!buffer1.isNullAt(0)){
value = buffer2.getString(0);
} else {
value = buffer1.getString(0);
}
buffer1.update(0, value);
}
/**
* Updates the given aggregation buffer `buffer` with new input data from `input`.
*
* @param buffer buffer to update.
* @param input input to update with.
*/
public void update(final MutableAggregationBuffer buffer, final Row input) {
if (!input.isNullAt(0)) {
long updatedSum = buffer.getLong(0) + input.getLong(0);
long updatedCount = buffer.getLong(1) + 1;
buffer.update(0, updatedSum);
buffer.update(1, updatedCount);
}
}
/**
* Merges two aggregation buffers and stores the updated buffer values back to `buffer1`.
*
* @param buffer1 first buffer.
* @param buffer2 second buffer.
*/
public void merge(final MutableAggregationBuffer buffer1, final Row buffer2) {
long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
buffer1.update(0, mergedSum);
buffer1.update(1, mergedCount);
}
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
//TODO: only implemented for ACTIVE and COMPLETED state so far
if (!input.isNullAt(0)) {
String currentValue = (buffer.size() == 0 || buffer.getString(0) == null ? BpmnaiVariables.PROCESS_STATE_ACTIVE : buffer.getString(0));
String value = currentValue;
if(!currentValue.equals(BpmnaiVariables.PROCESS_STATE_COMPLETED)){
if(input.getString(0).equals(BpmnaiVariables.PROCESS_STATE_COMPLETED)) {
buffer.update(0, BpmnaiVariables.PROCESS_STATE_COMPLETED);
}
}
}
}
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
String currentValue = (buffer.size() == 0 || buffer.getString(0) == null ? "" : buffer.getString(0));
String value = (currentValue.equals("") ? input.getString(0) : currentValue);
buffer.update(0, value);
}
}
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
String value = "";
if (!buffer1.isNullAt(0) && !buffer2.isNullAt(0)) {
String b1 = buffer1.getString(0);
String b2 = buffer2.getString(0);
value = (b1.equals("") ? b2 : b1);
} else if(!buffer1.isNullAt(0)){
value = buffer2.getString(0);
} else {
value = buffer1.getString(0);
}
buffer1.update(0, value);
}
/**
* Updates the given aggregation buffer `buffer` with new input data from `input`.
*
* @param buffer buffer to update.
* @param input input to update with.
*/
public void update(final MutableAggregationBuffer buffer, final Row input) {
if (!input.isNullAt(0)) {
long updatedSum = buffer.getLong(0) + input.getLong(0);
long updatedCount = buffer.getLong(1) + 1;
buffer.update(0, updatedSum);
buffer.update(1, updatedCount);
}
}
/**
* Merges two aggregation buffers and stores the updated buffer values back to `buffer1`.
*
* @param buffer1 first buffer.
* @param buffer2 second buffer.
*/
public void merge(final MutableAggregationBuffer buffer1, final Row buffer2) {
long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
buffer1.update(0, mergedSum);
buffer1.update(1, mergedCount);
}
/**
* 初始化
* 可以认为是,你自己在内部指定一个初始的值
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, "");
}
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, BpmnaiVariables.PROCESS_STATE_ACTIVE);
}
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, "");
}
@Override
public void initialize(MutableAggregationBuffer bufferAgg) {
bufferAgg.update(0, 0.0);
bufferAgg.update(1, 0.0);
}
@Override
public void update(MutableAggregationBuffer bufferAgg, Row row) {
bufferAgg.update(0, bufferAgg.getDouble(0)+row.getDouble(0));
bufferAgg.update(1, bufferAgg.getDouble(1)+2.0);
}
@Override
public void merge(MutableAggregationBuffer bufferAgg, Row row) {
bufferAgg.update(0, bufferAgg.getDouble(0)+row.getDouble(0));
bufferAgg.update(1, bufferAgg.getDouble(1)+row.getDouble(1));
}
/**
* Initializes the given aggregation buffer.
* The buffer itself is a `Row` that in addition to standard methods like retrieving a value at an
* index (e.g., get(), getBoolean()), provides the opportunity to update its values.
* Note that arrays and maps inside the buffer are still immutable.
*
* @param buffer buffer to initialize.
*/
public void initialize(final MutableAggregationBuffer buffer) {
buffer.update(0, 0L);
buffer.update(1, 0L);
}
/**
* Initializes the given aggregation buffer.
* The buffer itself is a `Row` that in addition to standard methods like retrieving a value at an
* index (e.g., get(), getBoolean()), provides the opportunity to update its values.
* Note that arrays and maps inside the buffer are still immutable.
*
* @param buffer buffer to initialize.
*/
public void initialize(final MutableAggregationBuffer buffer) {
buffer.update(0, 0L);
buffer.update(1, 0L);
}