下面列出了org.apache.commons.lang3.mutable.MutableLong#add ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void updateCollectives(Map<String, MutableLong> collectives, String name, int value){
MutableLong v = collectives.get(name);
if (v == null) {
collectives.put(name, new MutableLong(value));
} else {
v.add(value);
}
}
public void collect(List<T> metadataList) {
resetHolders();
boolean first = true;
for (T metadata : metadataList) {
long localRowCount = (long) TableStatisticsKind.ROW_COUNT.getValue(metadata);
for (Map.Entry<SchemaPath, ColumnStatistics> columnsStatistics : metadata.getColumnsStatistics().entrySet()) {
SchemaPath schemaPath = columnsStatistics.getKey();
ColumnStatistics statistics = columnsStatistics.getValue();
MutableLong emptyCount = new MutableLong();
MutableLong previousCount = columnValueCounts.putIfAbsent(schemaPath, emptyCount);
if (previousCount == null) {
previousCount = emptyCount;
}
Long nullsNum = (Long) statistics.getStatistic(ColumnStatisticsKind.NULLS_COUNT);
if (previousCount.longValue() != GroupScan.NO_COLUMN_STATS && nullsNum != null && nullsNum != GroupScan.NO_COLUMN_STATS) {
previousCount.add(localRowCount - nullsNum);
} else {
previousCount.setValue(GroupScan.NO_COLUMN_STATS);
}
ColumnMetadata columnMetadata = SchemaPathUtils.getColumnMetadata(schemaPath, metadata.getSchema());
TypeProtos.MajorType majorType = columnMetadata != null ? columnMetadata.majorType() : null;
boolean partitionColumn = checkForPartitionColumn(statistics, first, localRowCount, majorType, schemaPath);
if (partitionColumn) {
Object value = partitionValueMap.get(metadata.getLocation(), schemaPath);
Object currentValue = statistics.getStatistic(ColumnStatisticsKind.MAX_VALUE);
if (value != null && value != BaseParquetMetadataProvider.NULL_VALUE) {
if (value != currentValue) {
partitionColTypeMap.remove(schemaPath);
}
} else {
// the value of a column with primitive type can not be null,
// so checks that there are really null value and puts it to the map
if (localRowCount == (long) statistics.getStatistic(ColumnStatisticsKind.NULLS_COUNT)) {
partitionValueMap.put(metadata.getLocation(), schemaPath, BaseParquetMetadataProvider.NULL_VALUE);
} else {
partitionValueMap.put(metadata.getLocation(), schemaPath, currentValue);
}
}
} else {
partitionColTypeMap.remove(schemaPath);
}
}
this.rowCount += localRowCount;
first = false;
}
}
@Test
public void testPeriodicWatermark() {
final MutableLong clock = new MutableLong();
final MutableBoolean isTemporaryIdle = new MutableBoolean();
final List<Watermark> watermarks = new ArrayList<>();
String fakeStream1 = "fakeStream1";
StreamShardHandle shardHandle =
new StreamShardHandle(
fakeStream1,
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)));
TestSourceContext<String> sourceContext =
new TestSourceContext<String>() {
@Override
public void emitWatermark(Watermark mark) {
watermarks.add(mark);
}
@Override
public void markAsTemporarilyIdle() {
isTemporaryIdle.setTrue();
}
};
HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = new HashMap<>();
final KinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<String>(
Collections.singletonList(fakeStream1),
sourceContext,
new java.util.Properties(),
new KinesisDeserializationSchemaWrapper<>(new org.apache.flink.streaming.util.serialization.SimpleStringSchema()),
1,
1,
new AtomicReference<>(),
new LinkedList<>(),
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(new HashMap<>())) {
@Override
protected long getCurrentTimeMillis() {
return clock.getValue();
}
};
Whitebox.setInternalState(fetcher, "periodicWatermarkAssigner", watermarkAssigner);
SequenceNumber seq = new SequenceNumber("fakeSequenceNumber");
// register shards to subsequently emit records
int shardIndex =
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(shardHandle), shardHandle, seq));
StreamRecord<String> record1 =
new StreamRecord<>(String.valueOf(Long.MIN_VALUE), Long.MIN_VALUE);
fetcher.emitRecordAndUpdateState(record1.getValue(), record1.getTimestamp(), shardIndex, seq);
Assert.assertEquals(record1, sourceContext.getCollectedOutputs().poll());
fetcher.emitWatermark();
Assert.assertTrue("potential watermark equals previous watermark", watermarks.isEmpty());
StreamRecord<String> record2 = new StreamRecord<>(String.valueOf(1), 1);
fetcher.emitRecordAndUpdateState(record2.getValue(), record2.getTimestamp(), shardIndex, seq);
Assert.assertEquals(record2, sourceContext.getCollectedOutputs().poll());
fetcher.emitWatermark();
Assert.assertFalse("watermark advanced", watermarks.isEmpty());
Assert.assertEquals(new Watermark(record2.getTimestamp()), watermarks.remove(0));
Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
// test idle timeout
long idleTimeout = 10;
// advance clock idleTimeout
clock.add(idleTimeout + 1);
fetcher.emitWatermark();
Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
Assert.assertTrue("not idle, no new watermark", watermarks.isEmpty());
// activate idle timeout
Whitebox.setInternalState(fetcher, "shardIdleIntervalMillis", idleTimeout);
fetcher.emitWatermark();
Assert.assertTrue("idle", isTemporaryIdle.booleanValue());
Assert.assertTrue("idle, no watermark", watermarks.isEmpty());
}
@Test
public void testPeriodicWatermark() {
final MutableLong clock = new MutableLong();
final MutableBoolean isTemporaryIdle = new MutableBoolean();
final List<Watermark> watermarks = new ArrayList<>();
String fakeStream1 = "fakeStream1";
StreamShardHandle shardHandle =
new StreamShardHandle(
fakeStream1,
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)));
TestSourceContext<String> sourceContext =
new TestSourceContext<String>() {
@Override
public void emitWatermark(Watermark mark) {
watermarks.add(mark);
}
@Override
public void markAsTemporarilyIdle() {
isTemporaryIdle.setTrue();
}
};
HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = new HashMap<>();
final KinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<String>(
Collections.singletonList(fakeStream1),
sourceContext,
new java.util.Properties(),
new KinesisDeserializationSchemaWrapper<>(new org.apache.flink.streaming.util.serialization.SimpleStringSchema()),
1,
1,
new AtomicReference<>(),
new LinkedList<>(),
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(new HashMap<>())) {
@Override
protected long getCurrentTimeMillis() {
return clock.getValue();
}
};
Whitebox.setInternalState(fetcher, "periodicWatermarkAssigner", watermarkAssigner);
SequenceNumber seq = new SequenceNumber("fakeSequenceNumber");
// register shards to subsequently emit records
int shardIndex =
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(shardHandle), shardHandle, seq));
StreamRecord<String> record1 =
new StreamRecord<>(String.valueOf(Long.MIN_VALUE), Long.MIN_VALUE);
fetcher.emitRecordAndUpdateState(record1.getValue(), record1.getTimestamp(), shardIndex, seq);
Assert.assertEquals(record1, sourceContext.getCollectedOutputs().poll());
fetcher.emitWatermark();
Assert.assertTrue("potential watermark equals previous watermark", watermarks.isEmpty());
StreamRecord<String> record2 = new StreamRecord<>(String.valueOf(1), 1);
fetcher.emitRecordAndUpdateState(record2.getValue(), record2.getTimestamp(), shardIndex, seq);
Assert.assertEquals(record2, sourceContext.getCollectedOutputs().poll());
fetcher.emitWatermark();
Assert.assertFalse("watermark advanced", watermarks.isEmpty());
Assert.assertEquals(new Watermark(record2.getTimestamp()), watermarks.remove(0));
Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
// test idle timeout
long idleTimeout = 10;
// advance clock idleTimeout
clock.add(idleTimeout + 1);
fetcher.emitWatermark();
Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
Assert.assertTrue("not idle, no new watermark", watermarks.isEmpty());
// activate idle timeout
Whitebox.setInternalState(fetcher, "shardIdleIntervalMillis", idleTimeout);
fetcher.emitWatermark();
Assert.assertTrue("idle", isTemporaryIdle.booleanValue());
Assert.assertTrue("idle, no watermark", watermarks.isEmpty());
}
@Override
public MutableLong accumulate(MutableLong accumulatedValue, Long input)
{
accumulatedValue.add(input);
return accumulatedValue;
}
@Override
public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
{
accumulatedValue1.add(accumulatedValue2);
return accumulatedValue1;
}
@Override
public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
{
accumulatedValue1.add(accumulatedValue2);
return accumulatedValue1;
}
@Override
public MutableLong accumulate(MutableLong accumulatedValue, Long input)
{
accumulatedValue.add(input);
return accumulatedValue;
}
@Override
public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
{
accumulatedValue1.add(accumulatedValue2);
return accumulatedValue1;
}
@Test
public void testPeriodicWatermark() {
final MutableLong clock = new MutableLong();
final MutableBoolean isTemporaryIdle = new MutableBoolean();
final List<Watermark> watermarks = new ArrayList<>();
String fakeStream1 = "fakeStream1";
StreamShardHandle shardHandle =
new StreamShardHandle(
fakeStream1,
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)));
TestSourceContext<String> sourceContext =
new TestSourceContext<String>() {
@Override
public void emitWatermark(Watermark mark) {
watermarks.add(mark);
}
@Override
public void markAsTemporarilyIdle() {
isTemporaryIdle.setTrue();
}
};
HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = new HashMap<>();
final KinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<String>(
Collections.singletonList(fakeStream1),
sourceContext,
new java.util.Properties(),
new KinesisDeserializationSchemaWrapper<>(new org.apache.flink.streaming.util.serialization.SimpleStringSchema()),
1,
1,
new AtomicReference<>(),
new LinkedList<>(),
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(new HashMap<>())) {
@Override
protected long getCurrentTimeMillis() {
return clock.getValue();
}
};
Whitebox.setInternalState(fetcher, "periodicWatermarkAssigner", watermarkAssigner);
SequenceNumber seq = new SequenceNumber("fakeSequenceNumber");
// register shards to subsequently emit records
int shardIndex =
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(shardHandle), shardHandle, seq));
StreamRecord<String> record1 =
new StreamRecord<>(String.valueOf(Long.MIN_VALUE), Long.MIN_VALUE);
fetcher.emitRecordAndUpdateState(record1.getValue(), record1.getTimestamp(), shardIndex, seq);
Assert.assertEquals(record1, sourceContext.getCollectedOutputs().poll());
fetcher.emitWatermark();
Assert.assertTrue("potential watermark equals previous watermark", watermarks.isEmpty());
StreamRecord<String> record2 = new StreamRecord<>(String.valueOf(1), 1);
fetcher.emitRecordAndUpdateState(record2.getValue(), record2.getTimestamp(), shardIndex, seq);
Assert.assertEquals(record2, sourceContext.getCollectedOutputs().poll());
fetcher.emitWatermark();
Assert.assertFalse("watermark advanced", watermarks.isEmpty());
Assert.assertEquals(new Watermark(record2.getTimestamp()), watermarks.remove(0));
Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
// test idle timeout
long idleTimeout = 10;
// advance clock idleTimeout
clock.add(idleTimeout + 1);
fetcher.emitWatermark();
Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
Assert.assertTrue("not idle, no new watermark", watermarks.isEmpty());
// activate idle timeout
Whitebox.setInternalState(fetcher, "shardIdleIntervalMillis", idleTimeout);
fetcher.emitWatermark();
Assert.assertTrue("idle", isTemporaryIdle.booleanValue());
Assert.assertTrue("idle, no watermark", watermarks.isEmpty());
}