下面列出了怎么用javax.annotation.Nonnegative的API类实例代码及写法,或者点击链接到github查看源代码。
RocksDBMapEntry(
@Nonnull final RocksDB db,
@Nonnegative final int userKeyOffset,
@Nonnull final byte[] rawKeyBytes,
@Nonnull final byte[] rawValueBytes,
@Nonnull final TypeSerializer<UK> keySerializer,
@Nonnull final TypeSerializer<UV> valueSerializer,
@Nonnull DataInputDeserializer dataInputView) {
this.db = db;
this.userKeyOffset = userKeyOffset;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.rawKeyBytes = rawKeyBytes;
this.rawValueBytes = rawValueBytes;
this.deleted = false;
this.dataInputView = dataInputView;
}
RocksDBCachingPriorityQueueSet(
@Nonnegative int keyGroupId,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull RocksDB db,
@Nonnull ColumnFamilyHandle columnFamilyHandle,
@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
@Nonnull DataOutputSerializer outputStream,
@Nonnull DataInputDeserializer inputStream,
@Nonnull RocksDBWriteBatchWrapper batchWrapper,
@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
this.db = db;
this.columnFamilyHandle = columnFamilyHandle;
this.byteOrderProducingSerializer = byteOrderProducingSerializer;
this.batchWrapper = batchWrapper;
this.outputView = outputStream;
this.inputView = inputStream;
this.orderedCache = orderedByteArraySetCache;
this.allElementsInCache = false;
this.groupPrefixBytes = createKeyGroupBytes(keyGroupId, keyGroupPrefixBytes);
this.seekHint = groupPrefixBytes;
this.internalIndex = HeapPriorityQueueElement.NOT_CONTAINED;
}
public RocksDBSnapshotStrategyBase(
@Nonnull String description,
@Nonnull RocksDB db,
@Nonnull ResourceGuard rocksDBResourceGuard,
@Nonnull TypeSerializer<K> keySerializer,
@Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull CloseableRegistry cancelStreamRegistry) {
super(description);
this.db = db;
this.rocksDBResourceGuard = rocksDBResourceGuard;
this.keySerializer = keySerializer;
this.kvStateInformation = kvStateInformation;
this.keyGroupRange = keyGroupRange;
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.localRecoveryConfig = localRecoveryConfig;
this.cancelStreamRegistry = cancelStreamRegistry;
}
public RocksFullSnapshotStrategy(
@Nonnull RocksDB db,
@Nonnull ResourceGuard rocksDBResourceGuard,
@Nonnull TypeSerializer<K> keySerializer,
@Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull CloseableRegistry cancelStreamRegistry,
@Nonnull StreamCompressionDecorator keyGroupCompressionDecorator) {
super(
DESCRIPTION,
db,
rocksDBResourceGuard,
keySerializer,
kvStateInformation,
keyGroupRange,
keyGroupPrefixBytes,
localRecoveryConfig,
cancelStreamRegistry);
this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
}
/**
* Returns a future that completes with a given number of stack trace samples of a task thread.
*
* @param task The task to be sampled from.
* @param numSamples The number of samples.
* @param delayBetweenSamples The time to wait between taking samples.
* @param maxStackTraceDepth The maximum depth of the returned stack traces.
* Negative means unlimited.
* @return A future containing the stack trace samples.
*/
public CompletableFuture<List<StackTraceElement[]>> requestStackTraceSample(
final StackTraceSampleableTask task,
@Nonnegative final int numSamples,
final Time delayBetweenSamples,
final int maxStackTraceDepth) {
checkNotNull(task, "task must not be null");
checkArgument(numSamples > 0, "numSamples must be positive");
checkNotNull(delayBetweenSamples, "delayBetweenSamples must not be null");
return requestStackTraceSample(
task,
numSamples,
delayBetweenSamples,
maxStackTraceDepth,
new ArrayList<>(numSamples),
new CompletableFuture<>());
}
/**
* Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity.
*
* @param elementPriorityComparator comparator for the priority of contained elements.
* @param keyExtractor function to extract a key from the contained elements.
* @param minimumCapacity the minimum and initial capacity of this priority queue.
* @param keyGroupRange the key-group range of the elements in this set.
* @param totalNumberOfKeyGroups the total number of key-groups of the job.
*/
@SuppressWarnings("unchecked")
public HeapPriorityQueueSet(
@Nonnull PriorityComparator<T> elementPriorityComparator,
@Nonnull KeyExtractorFunction<T> keyExtractor,
@Nonnegative int minimumCapacity,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalNumberOfKeyGroups) {
super(elementPriorityComparator, minimumCapacity);
this.keyExtractor = keyExtractor;
this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
this.keyGroupRange = keyGroupRange;
final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups();
final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange;
this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange];
for (int i = 0; i < keyGroupsInLocalRange; ++i) {
deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize);
}
}
@SuppressWarnings("unchecked")
StateTableKeyGroupPartitioner(
@Nonnull CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshotData,
@Nonnegative int stateTableSize,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalKeyGroups,
@Nonnull ElementWriterFunction<CopyOnWriteStateTable.StateTableEntry<K, N, S>> elementWriterFunction) {
super(
new CopyOnWriteStateTable.StateTableEntry[stateTableSize],
stateTableSize,
// We have made sure that the snapshotData is big enough to hold the flattened entries in
// CopyOnWriteStateTable#snapshotTableArrays(), we can safely reuse it as the destination array here.
snapshotData,
keyGroupRange,
totalKeyGroups,
CopyOnWriteStateTable.StateTableEntry::getKey,
elementWriterFunction);
}
public LocalRecoveryDirectoryProviderImpl(
@Nonnull File[] allocationBaseDirs,
@Nonnull JobID jobID,
@Nonnull JobVertexID jobVertexID,
@Nonnegative int subtaskIndex) {
Preconditions.checkArgument(allocationBaseDirs.length > 0);
this.allocationBaseDirs = allocationBaseDirs;
this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
for (File allocationBaseDir : allocationBaseDirs) {
Preconditions.checkNotNull(allocationBaseDir);
allocationBaseDir.mkdirs();
}
}
public TaskLocalStateStoreImpl(
@Nonnull JobID jobID,
@Nonnull AllocationID allocationID,
@Nonnull JobVertexID jobVertexID,
@Nonnegative int subtaskIndex,
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull Executor discardExecutor) {
this(
jobID,
allocationID,
jobVertexID,
subtaskIndex,
localRecoveryConfig,
discardExecutor,
new TreeMap<>(),
new Object());
}
@VisibleForTesting
TaskLocalStateStoreImpl(
@Nonnull JobID jobID,
@Nonnull AllocationID allocationID,
@Nonnull JobVertexID jobVertexID,
@Nonnegative int subtaskIndex,
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull Executor discardExecutor,
@Nonnull SortedMap<Long, TaskStateSnapshot> storedTaskStateByCheckpointID,
@Nonnull Object lock) {
this.jobID = jobID;
this.allocationID = allocationID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
this.discardExecutor = discardExecutor;
this.localRecoveryConfig = localRecoveryConfig;
this.storedTaskStateByCheckpointID = storedTaskStateByCheckpointID;
this.lock = lock;
this.disposed = false;
}
/**
* Creates a new {@link KeyGroupPartitioner}.
*
* @param partitioningSource the input for the partitioning. All elements must be densely packed in the index
* interval [0, {@link #numberOfElements}[, without null values.
* @param numberOfElements the number of elements to consider from the input, starting at input index 0.
* @param partitioningDestination the output of the partitioning. Must have capacity of at least numberOfElements.
* @param keyGroupRange the key-group range of the data that will be partitioned by this instance.
* @param totalKeyGroups the total number of key groups in the job.
* @param keyExtractorFunction this function extracts the partition key from an element.
*/
public KeyGroupPartitioner(
@Nonnull T[] partitioningSource,
@Nonnegative int numberOfElements,
@Nonnull T[] partitioningDestination,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalKeyGroups,
@Nonnull KeyExtractorFunction<T> keyExtractorFunction,
@Nonnull ElementWriterFunction<T> elementWriterFunction) {
Preconditions.checkState(partitioningSource != partitioningDestination);
Preconditions.checkState(partitioningSource.length >= numberOfElements);
Preconditions.checkState(partitioningDestination.length >= numberOfElements);
this.partitioningSource = partitioningSource;
this.partitioningDestination = partitioningDestination;
this.numberOfElements = numberOfElements;
this.keyGroupRange = keyGroupRange;
this.totalKeyGroups = totalKeyGroups;
this.keyExtractorFunction = keyExtractorFunction;
this.elementWriterFunction = elementWriterFunction;
this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
this.elementKeyGroups = new int[numberOfElements];
this.counterHistogram = new int[keyGroupRange.getNumberOfKeyGroups()];
this.computedResult = null;
}
/**
* Creates a new {@link KeyGroupPartitioner}.
*
* @param partitioningSource the input for the partitioning. All elements must be densely packed in the index
* interval [0, {@link #numberOfElements}[, without null values.
* @param numberOfElements the number of elements to consider from the input, starting at input index 0.
* @param partitioningDestination the output of the partitioning. Must have capacity of at least numberOfElements.
* @param keyGroupRange the key-group range of the data that will be partitioned by this instance.
* @param totalKeyGroups the total number of key groups in the job.
* @param keyExtractorFunction this function extracts the partition key from an element.
*/
public KeyGroupPartitioner(
@Nonnull T[] partitioningSource,
@Nonnegative int numberOfElements,
@Nonnull T[] partitioningDestination,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalKeyGroups,
@Nonnull KeyExtractorFunction<T> keyExtractorFunction,
@Nonnull ElementWriterFunction<T> elementWriterFunction) {
Preconditions.checkState(partitioningSource != partitioningDestination);
Preconditions.checkState(partitioningSource.length >= numberOfElements);
Preconditions.checkState(partitioningDestination.length >= numberOfElements);
this.partitioningSource = partitioningSource;
this.partitioningDestination = partitioningDestination;
this.numberOfElements = numberOfElements;
this.keyGroupRange = keyGroupRange;
this.totalKeyGroups = totalKeyGroups;
this.keyExtractorFunction = keyExtractorFunction;
this.elementWriterFunction = elementWriterFunction;
this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
this.elementKeyGroups = new int[numberOfElements];
this.counterHistogram = new int[keyGroupRange.getNumberOfKeyGroups()];
this.computedResult = null;
}
RocksDBMapEntry(
@Nonnull final RocksDB db,
@Nonnegative final int userKeyOffset,
@Nonnull final byte[] rawKeyBytes,
@Nonnull final byte[] rawValueBytes,
@Nonnull final TypeSerializer<UK> keySerializer,
@Nonnull final TypeSerializer<UV> valueSerializer,
@Nonnull DataInputDeserializer dataInputView) {
this.db = db;
this.userKeyOffset = userKeyOffset;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.rawKeyBytes = rawKeyBytes;
this.rawValueBytes = rawValueBytes;
this.deleted = false;
this.dataInputView = dataInputView;
}
RocksDBCachingPriorityQueueSet(
@Nonnegative int keyGroupId,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull RocksDB db,
@Nonnull ColumnFamilyHandle columnFamilyHandle,
@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
@Nonnull DataOutputSerializer outputStream,
@Nonnull DataInputDeserializer inputStream,
@Nonnull RocksDBWriteBatchWrapper batchWrapper,
@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
this.db = db;
this.columnFamilyHandle = columnFamilyHandle;
this.byteOrderProducingSerializer = byteOrderProducingSerializer;
this.batchWrapper = batchWrapper;
this.outputView = outputStream;
this.inputView = inputStream;
this.orderedCache = orderedByteArraySetCache;
this.allElementsInCache = false;
this.groupPrefixBytes = createKeyGroupBytes(keyGroupId, keyGroupPrefixBytes);
this.seekHint = groupPrefixBytes;
this.internalIndex = HeapPriorityQueueElement.NOT_CONTAINED;
}
public RocksFullSnapshotStrategy(
@Nonnull RocksDB db,
@Nonnull ResourceGuard rocksDBResourceGuard,
@Nonnull TypeSerializer<K> keySerializer,
@Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull CloseableRegistry cancelStreamRegistry,
@Nonnull StreamCompressionDecorator keyGroupCompressionDecorator) {
super(
DESCRIPTION,
db,
rocksDBResourceGuard,
keySerializer,
kvStateInformation,
keyGroupRange,
keyGroupPrefixBytes,
localRecoveryConfig,
cancelStreamRegistry);
this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
}
/**
* Returns a future that completes with a given number of stack trace samples of a task thread.
*
* @param task The task to be sampled from.
* @param numSamples The number of samples.
* @param delayBetweenSamples The time to wait between taking samples.
* @param maxStackTraceDepth The maximum depth of the returned stack traces.
* Negative means unlimited.
* @return A future containing the stack trace samples.
*/
public CompletableFuture<List<StackTraceElement[]>> requestStackTraceSample(
final StackTraceSampleableTask task,
@Nonnegative final int numSamples,
final Time delayBetweenSamples,
final int maxStackTraceDepth) {
checkNotNull(task, "task must not be null");
checkArgument(numSamples > 0, "numSamples must be positive");
checkNotNull(delayBetweenSamples, "delayBetweenSamples must not be null");
return requestStackTraceSample(
task,
numSamples,
delayBetweenSamples,
maxStackTraceDepth,
new ArrayList<>(numSamples),
new CompletableFuture<>());
}
@Override
public void readMappingsInKeyGroup(@Nonnull DataInputView inView, @Nonnegative int keyGroupId) throws IOException {
if (inView.readByte() == 0) {
return;
}
final TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
final TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
// V1 uses kind of namespace compressing format
int numNamespaces = inView.readInt();
for (int k = 0; k < numNamespaces; k++) {
N namespace = namespaceSerializer.deserialize(inView);
int numEntries = inView.readInt();
for (int l = 0; l < numEntries; l++) {
K key = keySerializer.deserialize(inView);
S state = stateSerializer.deserialize(inView);
stateTable.put(key, keyGroupId, namespace, state);
}
}
}
/**
* Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity.
*
* @param elementPriorityComparator comparator for the priority of contained elements.
* @param keyExtractor function to extract a key from the contained elements.
* @param minimumCapacity the minimum and initial capacity of this priority queue.
* @param keyGroupRange the key-group range of the elements in this set.
* @param totalNumberOfKeyGroups the total number of key-groups of the job.
*/
@SuppressWarnings("unchecked")
public HeapPriorityQueueSet(
@Nonnull PriorityComparator<T> elementPriorityComparator,
@Nonnull KeyExtractorFunction<T> keyExtractor,
@Nonnegative int minimumCapacity,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalNumberOfKeyGroups) {
super(elementPriorityComparator, minimumCapacity);
this.keyExtractor = keyExtractor;
this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
this.keyGroupRange = keyGroupRange;
final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups();
final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange;
this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange];
for (int i = 0; i < keyGroupsInLocalRange; ++i) {
deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize);
}
}
@SuppressWarnings("unchecked")
public KeyGroupPartitionedPriorityQueue(
@Nonnull KeyExtractorFunction<T> keyExtractor,
@Nonnull PriorityComparator<T> elementPriorityComparator,
@Nonnull PartitionQueueSetFactory<T, PQ> orderedCacheFactory,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalKeyGroups) {
this.keyExtractor = keyExtractor;
this.totalKeyGroups = totalKeyGroups;
this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
this.keyGroupedHeaps = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
this.heapOfKeyGroupedHeaps = new HeapPriorityQueue<>(
new InternalPriorityQueueComparator<>(elementPriorityComparator),
keyGroupRange.getNumberOfKeyGroups());
for (int i = 0; i < keyGroupedHeaps.length; i++) {
final PQ keyGroupSubHeap =
orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, keyExtractor, elementPriorityComparator);
keyGroupedHeaps[i] = keyGroupSubHeap;
heapOfKeyGroupedHeaps.add(keyGroupSubHeap);
}
}
@VisibleForTesting
TaskLocalStateStoreImpl(
@Nonnull JobID jobID,
@Nonnull AllocationID allocationID,
@Nonnull JobVertexID jobVertexID,
@Nonnegative int subtaskIndex,
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull Executor discardExecutor,
@Nonnull SortedMap<Long, TaskStateSnapshot> storedTaskStateByCheckpointID,
@Nonnull Object lock) {
this.jobID = jobID;
this.allocationID = allocationID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
this.discardExecutor = discardExecutor;
this.localRecoveryConfig = localRecoveryConfig;
this.storedTaskStateByCheckpointID = storedTaskStateByCheckpointID;
this.lock = lock;
this.disposed = false;
}
public LocalRecoveryDirectoryProviderImpl(
@Nonnull File[] allocationBaseDirs,
@Nonnull JobID jobID,
@Nonnull JobVertexID jobVertexID,
@Nonnegative int subtaskIndex) {
Preconditions.checkArgument(allocationBaseDirs.length > 0);
this.allocationBaseDirs = allocationBaseDirs;
this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
for (File allocationBaseDir : allocationBaseDirs) {
Preconditions.checkNotNull(allocationBaseDir);
allocationBaseDir.mkdirs();
}
}
public TaskLocalStateStoreImpl(
@Nonnull JobID jobID,
@Nonnull AllocationID allocationID,
@Nonnull JobVertexID jobVertexID,
@Nonnegative int subtaskIndex,
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull Executor discardExecutor) {
this(
jobID,
allocationID,
jobVertexID,
subtaskIndex,
localRecoveryConfig,
discardExecutor,
new TreeMap<>(),
new Object());
}
public FailingSource(
@Nonnull EventEmittingGenerator eventEmittingGenerator,
@Nonnegative int numberOfGeneratorInvocations,
@Nonnull TimeCharacteristic timeCharacteristic) {
this.eventEmittingGenerator = eventEmittingGenerator;
this.running = true;
this.emitCallCount = 0;
this.expectedEmitCalls = numberOfGeneratorInvocations;
this.failureAfterNumElements = numberOfGeneratorInvocations / 2;
this.checkpointStatus = new AtomicLong(INITIAL);
this.usingProcessingTime = timeCharacteristic == TimeCharacteristic.ProcessingTime;
}
/**
* The method to clip the db instance according to the target key group range using
* the {@link RocksDB#delete(ColumnFamilyHandle, byte[])}.
*
* @param db the RocksDB instance to be clipped.
* @param columnFamilyHandles the column families in the db instance.
* @param targetKeyGroupRange the target key group range.
* @param currentKeyGroupRange the key group range of the db instance.
* @param keyGroupPrefixBytes Number of bytes required to prefix the key groups.
*/
public static void clipDBWithKeyGroupRange(
@Nonnull RocksDB db,
@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@Nonnull KeyGroupRange targetKeyGroupRange,
@Nonnull KeyGroupRange currentKeyGroupRange,
@Nonnegative int keyGroupPrefixBytes) throws RocksDBException {
final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) {
RocksDBKeySerializationUtils.serializeKeyGroup(
currentKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes);
RocksDBKeySerializationUtils.serializeKeyGroup(
targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes);
deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
}
if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) {
RocksDBKeySerializationUtils.serializeKeyGroup(
targetKeyGroupRange.getEndKeyGroup() + 1, beginKeyGroupBytes);
RocksDBKeySerializationUtils.serializeKeyGroup(
currentKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes);
deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
}
}
public RocksDBSerializedCompositeKeyBuilder(
@Nonnull TypeSerializer<K> keySerializer,
@Nonnegative int keyGroupPrefixBytes,
@Nonnegative int initialSize) {
this(
keySerializer,
new DataOutputSerializer(initialSize),
keyGroupPrefixBytes,
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
0);
}
@VisibleForTesting
RocksDBSerializedCompositeKeyBuilder(
@Nonnull TypeSerializer<K> keySerializer,
@Nonnull DataOutputSerializer keyOutView,
@Nonnegative int keyGroupPrefixBytes,
boolean keySerializerTypeVariableSized,
@Nonnegative int afterKeyMark) {
this.keySerializer = keySerializer;
this.keyOutView = keyOutView;
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.keySerializerTypeVariableSized = keySerializerTypeVariableSized;
this.afterKeyMark = afterKeyMark;
}
public RocksIncrementalSnapshotStrategy(
@Nonnull RocksDB db,
@Nonnull ResourceGuard rocksDBResourceGuard,
@Nonnull TypeSerializer<K> keySerializer,
@Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull CloseableRegistry cancelStreamRegistry,
@Nonnull File instanceBasePath,
@Nonnull UUID backendUID,
@Nonnull SortedMap<Long, Set<StateHandleID>> materializedSstFiles,
long lastCompletedCheckpointId,
int numberOfTransferingThreads) {
super(
DESCRIPTION,
db,
rocksDBResourceGuard,
keySerializer,
kvStateInformation,
keyGroupRange,
keyGroupPrefixBytes,
localRecoveryConfig,
cancelStreamRegistry);
this.instanceBasePath = instanceBasePath;
this.backendUID = backendUID;
this.materializedSstFiles = materializedSstFiles;
this.lastCompletedCheckpointId = lastCompletedCheckpointId;
this.stateUploader = new RocksDBStateUploader(numberOfTransferingThreads);
this.localDirectoryName = backendUID.toString().replaceAll("[\\-]", "");
}
public HeapPriorityQueueSetFactory(
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalKeyGroups,
@Nonnegative int minimumCapacity) {
this.keyGroupRange = keyGroupRange;
this.totalKeyGroups = totalKeyGroups;
this.minimumCapacity = minimumCapacity;
}
@Override
public void storeLocalState(
@Nonnegative long checkpointId,
@Nullable TaskStateSnapshot localState) {
if (localState == null) {
localState = NULL_DUMMY;
}
if (LOG.isTraceEnabled()) {
LOG.debug(
"Stored local state for checkpoint {} in subtask ({} - {} - {}) : {}.",
checkpointId, jobID, jobVertexID, subtaskIndex, localState);
} else if (LOG.isDebugEnabled()) {
LOG.debug(
"Stored local state for checkpoint {} in subtask ({} - {} - {})",
checkpointId, jobID, jobVertexID, subtaskIndex);
}
Map.Entry<Long, TaskStateSnapshot> toDiscard = null;
synchronized (lock) {
if (disposed) {
// we ignore late stores and simply discard the state.
toDiscard = new AbstractMap.SimpleEntry<>(checkpointId, localState);
} else {
TaskStateSnapshot previous =
storedTaskStateByCheckpointID.put(checkpointId, localState);
if (previous != null) {
toDiscard = new AbstractMap.SimpleEntry<>(checkpointId, previous);
}
}
}
if (toDiscard != null) {
asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
}
}
HeapPriorityQueueStateSnapshot(
@Nonnull T[] heapArrayCopy,
@Nonnull KeyExtractorFunction<T> keyExtractor,
@Nonnull RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalKeyGroups) {
this.keyExtractor = keyExtractor;
this.heapArrayCopy = heapArrayCopy;
this.metaInfo = metaInfo;
this.keyGroupRange = keyGroupRange;
this.totalKeyGroups = totalKeyGroups;
}