类javax.annotation.Nonnegative源码实例Demo

下面列出了怎么用javax.annotation.Nonnegative的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: Flink-CEPplus   文件: RocksDBMapState.java
RocksDBMapEntry(
		@Nonnull final RocksDB db,
		@Nonnegative final int userKeyOffset,
		@Nonnull final byte[] rawKeyBytes,
		@Nonnull final byte[] rawValueBytes,
		@Nonnull final TypeSerializer<UK> keySerializer,
		@Nonnull final TypeSerializer<UV> valueSerializer,
		@Nonnull DataInputDeserializer dataInputView) {
	this.db = db;

	this.userKeyOffset = userKeyOffset;
	this.keySerializer = keySerializer;
	this.valueSerializer = valueSerializer;

	this.rawKeyBytes = rawKeyBytes;
	this.rawValueBytes = rawValueBytes;
	this.deleted = false;
	this.dataInputView = dataInputView;
}
 
RocksDBCachingPriorityQueueSet(
	@Nonnegative int keyGroupId,
	@Nonnegative int keyGroupPrefixBytes,
	@Nonnull RocksDB db,
	@Nonnull ColumnFamilyHandle columnFamilyHandle,
	@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
	@Nonnull DataOutputSerializer outputStream,
	@Nonnull DataInputDeserializer inputStream,
	@Nonnull RocksDBWriteBatchWrapper batchWrapper,
	@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
	this.db = db;
	this.columnFamilyHandle = columnFamilyHandle;
	this.byteOrderProducingSerializer = byteOrderProducingSerializer;
	this.batchWrapper = batchWrapper;
	this.outputView = outputStream;
	this.inputView = inputStream;
	this.orderedCache = orderedByteArraySetCache;
	this.allElementsInCache = false;
	this.groupPrefixBytes = createKeyGroupBytes(keyGroupId, keyGroupPrefixBytes);
	this.seekHint = groupPrefixBytes;
	this.internalIndex = HeapPriorityQueueElement.NOT_CONTAINED;
}
 
public RocksDBSnapshotStrategyBase(
	@Nonnull String description,
	@Nonnull RocksDB db,
	@Nonnull ResourceGuard rocksDBResourceGuard,
	@Nonnull TypeSerializer<K> keySerializer,
	@Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
	@Nonnull KeyGroupRange keyGroupRange,
	@Nonnegative int keyGroupPrefixBytes,
	@Nonnull LocalRecoveryConfig localRecoveryConfig,
	@Nonnull CloseableRegistry cancelStreamRegistry) {

	super(description);
	this.db = db;
	this.rocksDBResourceGuard = rocksDBResourceGuard;
	this.keySerializer = keySerializer;
	this.kvStateInformation = kvStateInformation;
	this.keyGroupRange = keyGroupRange;
	this.keyGroupPrefixBytes = keyGroupPrefixBytes;
	this.localRecoveryConfig = localRecoveryConfig;
	this.cancelStreamRegistry = cancelStreamRegistry;
}
 
源代码4 项目: Flink-CEPplus   文件: RocksFullSnapshotStrategy.java
public RocksFullSnapshotStrategy(
	@Nonnull RocksDB db,
	@Nonnull ResourceGuard rocksDBResourceGuard,
	@Nonnull TypeSerializer<K> keySerializer,
	@Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
	@Nonnull KeyGroupRange keyGroupRange,
	@Nonnegative int keyGroupPrefixBytes,
	@Nonnull LocalRecoveryConfig localRecoveryConfig,
	@Nonnull CloseableRegistry cancelStreamRegistry,
	@Nonnull StreamCompressionDecorator keyGroupCompressionDecorator) {
	super(
		DESCRIPTION,
		db,
		rocksDBResourceGuard,
		keySerializer,
		kvStateInformation,
		keyGroupRange,
		keyGroupPrefixBytes,
		localRecoveryConfig,
		cancelStreamRegistry);

	this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
}
 
源代码5 项目: Flink-CEPplus   文件: StackTraceSampleService.java
/**
 * Returns a future that completes with a given number of stack trace samples of a task thread.
 *
 * @param task                The task to be sampled from.
 * @param numSamples          The number of samples.
 * @param delayBetweenSamples The time to wait between taking samples.
 * @param maxStackTraceDepth  The maximum depth of the returned stack traces.
 *                            Negative means unlimited.
 * @return A future containing the stack trace samples.
 */
public CompletableFuture<List<StackTraceElement[]>> requestStackTraceSample(
		final StackTraceSampleableTask task,
		@Nonnegative final int numSamples,
		final Time delayBetweenSamples,
		final int maxStackTraceDepth) {

	checkNotNull(task, "task must not be null");
	checkArgument(numSamples > 0, "numSamples must be positive");
	checkNotNull(delayBetweenSamples, "delayBetweenSamples must not be null");

	return requestStackTraceSample(
		task,
		numSamples,
		delayBetweenSamples,
		maxStackTraceDepth,
		new ArrayList<>(numSamples),
		new CompletableFuture<>());
}
 
源代码6 项目: Flink-CEPplus   文件: HeapPriorityQueueSet.java
/**
 * Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity.
 *
 * @param elementPriorityComparator comparator for the priority of contained elements.
 * @param keyExtractor function to extract a key from the contained elements.
 * @param minimumCapacity the minimum and initial capacity of this priority queue.
 * @param keyGroupRange the key-group range of the elements in this set.
 * @param totalNumberOfKeyGroups the total number of key-groups of the job.
 */
@SuppressWarnings("unchecked")
public HeapPriorityQueueSet(
	@Nonnull PriorityComparator<T> elementPriorityComparator,
	@Nonnull KeyExtractorFunction<T> keyExtractor,
	@Nonnegative int minimumCapacity,
	@Nonnull KeyGroupRange keyGroupRange,
	@Nonnegative int totalNumberOfKeyGroups) {

	super(elementPriorityComparator, minimumCapacity);

	this.keyExtractor = keyExtractor;

	this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
	this.keyGroupRange = keyGroupRange;

	final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups();
	final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange;
	this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange];
	for (int i = 0; i < keyGroupsInLocalRange; ++i) {
		deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize);
	}
}
 
@SuppressWarnings("unchecked")
StateTableKeyGroupPartitioner(
	@Nonnull CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshotData,
	@Nonnegative int stateTableSize,
	@Nonnull KeyGroupRange keyGroupRange,
	@Nonnegative int totalKeyGroups,
	@Nonnull ElementWriterFunction<CopyOnWriteStateTable.StateTableEntry<K, N, S>> elementWriterFunction) {

	super(
		new CopyOnWriteStateTable.StateTableEntry[stateTableSize],
		stateTableSize,
		// We have made sure that the snapshotData is big enough to hold the flattened entries in
		// CopyOnWriteStateTable#snapshotTableArrays(), we can safely reuse it as the destination array here.
		snapshotData,
		keyGroupRange,
		totalKeyGroups,
		CopyOnWriteStateTable.StateTableEntry::getKey,
		elementWriterFunction);
}
 
public LocalRecoveryDirectoryProviderImpl(
	@Nonnull File[] allocationBaseDirs,
	@Nonnull JobID jobID,
	@Nonnull JobVertexID jobVertexID,
	@Nonnegative int subtaskIndex) {

	Preconditions.checkArgument(allocationBaseDirs.length > 0);
	this.allocationBaseDirs = allocationBaseDirs;
	this.jobID = jobID;
	this.jobVertexID = jobVertexID;
	this.subtaskIndex = subtaskIndex;

	for (File allocationBaseDir : allocationBaseDirs) {
		Preconditions.checkNotNull(allocationBaseDir);
		allocationBaseDir.mkdirs();
	}
}
 
源代码9 项目: Flink-CEPplus   文件: TaskLocalStateStoreImpl.java
public TaskLocalStateStoreImpl(
	@Nonnull JobID jobID,
	@Nonnull AllocationID allocationID,
	@Nonnull JobVertexID jobVertexID,
	@Nonnegative int subtaskIndex,
	@Nonnull LocalRecoveryConfig localRecoveryConfig,
	@Nonnull Executor discardExecutor) {

	this(
		jobID,
		allocationID,
		jobVertexID,
		subtaskIndex,
		localRecoveryConfig,
		discardExecutor,
		new TreeMap<>(),
		new Object());
}
 
源代码10 项目: Flink-CEPplus   文件: TaskLocalStateStoreImpl.java
@VisibleForTesting
TaskLocalStateStoreImpl(
	@Nonnull JobID jobID,
	@Nonnull AllocationID allocationID,
	@Nonnull JobVertexID jobVertexID,
	@Nonnegative int subtaskIndex,
	@Nonnull LocalRecoveryConfig localRecoveryConfig,
	@Nonnull Executor discardExecutor,
	@Nonnull SortedMap<Long, TaskStateSnapshot> storedTaskStateByCheckpointID,
	@Nonnull Object lock) {

	this.jobID = jobID;
	this.allocationID = allocationID;
	this.jobVertexID = jobVertexID;
	this.subtaskIndex = subtaskIndex;
	this.discardExecutor = discardExecutor;
	this.localRecoveryConfig = localRecoveryConfig;
	this.storedTaskStateByCheckpointID = storedTaskStateByCheckpointID;
	this.lock = lock;
	this.disposed = false;
}
 
源代码11 项目: Flink-CEPplus   文件: KeyGroupPartitioner.java
/**
 * Creates a new {@link KeyGroupPartitioner}.
 *
 * @param partitioningSource the input for the partitioning. All elements must be densely packed in the index
 *                              interval [0, {@link #numberOfElements}[, without null values.
 * @param numberOfElements the number of elements to consider from the input, starting at input index 0.
 * @param partitioningDestination the output of the partitioning. Must have capacity of at least numberOfElements.
 * @param keyGroupRange the key-group range of the data that will be partitioned by this instance.
 * @param totalKeyGroups the total number of key groups in the job.
 * @param keyExtractorFunction this function extracts the partition key from an element.
 */
public KeyGroupPartitioner(
	@Nonnull T[] partitioningSource,
	@Nonnegative int numberOfElements,
	@Nonnull T[] partitioningDestination,
	@Nonnull KeyGroupRange keyGroupRange,
	@Nonnegative int totalKeyGroups,
	@Nonnull KeyExtractorFunction<T> keyExtractorFunction,
	@Nonnull ElementWriterFunction<T> elementWriterFunction) {

	Preconditions.checkState(partitioningSource != partitioningDestination);
	Preconditions.checkState(partitioningSource.length >= numberOfElements);
	Preconditions.checkState(partitioningDestination.length >= numberOfElements);

	this.partitioningSource = partitioningSource;
	this.partitioningDestination = partitioningDestination;
	this.numberOfElements = numberOfElements;
	this.keyGroupRange = keyGroupRange;
	this.totalKeyGroups = totalKeyGroups;
	this.keyExtractorFunction = keyExtractorFunction;
	this.elementWriterFunction = elementWriterFunction;
	this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
	this.elementKeyGroups = new int[numberOfElements];
	this.counterHistogram = new int[keyGroupRange.getNumberOfKeyGroups()];
	this.computedResult = null;
}
 
源代码12 项目: flink   文件: KeyGroupPartitioner.java
/**
 * Creates a new {@link KeyGroupPartitioner}.
 *
 * @param partitioningSource the input for the partitioning. All elements must be densely packed in the index
 *                              interval [0, {@link #numberOfElements}[, without null values.
 * @param numberOfElements the number of elements to consider from the input, starting at input index 0.
 * @param partitioningDestination the output of the partitioning. Must have capacity of at least numberOfElements.
 * @param keyGroupRange the key-group range of the data that will be partitioned by this instance.
 * @param totalKeyGroups the total number of key groups in the job.
 * @param keyExtractorFunction this function extracts the partition key from an element.
 */
public KeyGroupPartitioner(
	@Nonnull T[] partitioningSource,
	@Nonnegative int numberOfElements,
	@Nonnull T[] partitioningDestination,
	@Nonnull KeyGroupRange keyGroupRange,
	@Nonnegative int totalKeyGroups,
	@Nonnull KeyExtractorFunction<T> keyExtractorFunction,
	@Nonnull ElementWriterFunction<T> elementWriterFunction) {

	Preconditions.checkState(partitioningSource != partitioningDestination);
	Preconditions.checkState(partitioningSource.length >= numberOfElements);
	Preconditions.checkState(partitioningDestination.length >= numberOfElements);

	this.partitioningSource = partitioningSource;
	this.partitioningDestination = partitioningDestination;
	this.numberOfElements = numberOfElements;
	this.keyGroupRange = keyGroupRange;
	this.totalKeyGroups = totalKeyGroups;
	this.keyExtractorFunction = keyExtractorFunction;
	this.elementWriterFunction = elementWriterFunction;
	this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
	this.elementKeyGroups = new int[numberOfElements];
	this.counterHistogram = new int[keyGroupRange.getNumberOfKeyGroups()];
	this.computedResult = null;
}
 
源代码13 项目: flink   文件: RocksDBMapState.java
RocksDBMapEntry(
		@Nonnull final RocksDB db,
		@Nonnegative final int userKeyOffset,
		@Nonnull final byte[] rawKeyBytes,
		@Nonnull final byte[] rawValueBytes,
		@Nonnull final TypeSerializer<UK> keySerializer,
		@Nonnull final TypeSerializer<UV> valueSerializer,
		@Nonnull DataInputDeserializer dataInputView) {
	this.db = db;

	this.userKeyOffset = userKeyOffset;
	this.keySerializer = keySerializer;
	this.valueSerializer = valueSerializer;

	this.rawKeyBytes = rawKeyBytes;
	this.rawValueBytes = rawValueBytes;
	this.deleted = false;
	this.dataInputView = dataInputView;
}
 
源代码14 项目: flink   文件: RocksDBCachingPriorityQueueSet.java
RocksDBCachingPriorityQueueSet(
	@Nonnegative int keyGroupId,
	@Nonnegative int keyGroupPrefixBytes,
	@Nonnull RocksDB db,
	@Nonnull ColumnFamilyHandle columnFamilyHandle,
	@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
	@Nonnull DataOutputSerializer outputStream,
	@Nonnull DataInputDeserializer inputStream,
	@Nonnull RocksDBWriteBatchWrapper batchWrapper,
	@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
	this.db = db;
	this.columnFamilyHandle = columnFamilyHandle;
	this.byteOrderProducingSerializer = byteOrderProducingSerializer;
	this.batchWrapper = batchWrapper;
	this.outputView = outputStream;
	this.inputView = inputStream;
	this.orderedCache = orderedByteArraySetCache;
	this.allElementsInCache = false;
	this.groupPrefixBytes = createKeyGroupBytes(keyGroupId, keyGroupPrefixBytes);
	this.seekHint = groupPrefixBytes;
	this.internalIndex = HeapPriorityQueueElement.NOT_CONTAINED;
}
 
源代码15 项目: flink   文件: RocksFullSnapshotStrategy.java
public RocksFullSnapshotStrategy(
	@Nonnull RocksDB db,
	@Nonnull ResourceGuard rocksDBResourceGuard,
	@Nonnull TypeSerializer<K> keySerializer,
	@Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
	@Nonnull KeyGroupRange keyGroupRange,
	@Nonnegative int keyGroupPrefixBytes,
	@Nonnull LocalRecoveryConfig localRecoveryConfig,
	@Nonnull CloseableRegistry cancelStreamRegistry,
	@Nonnull StreamCompressionDecorator keyGroupCompressionDecorator) {
	super(
		DESCRIPTION,
		db,
		rocksDBResourceGuard,
		keySerializer,
		kvStateInformation,
		keyGroupRange,
		keyGroupPrefixBytes,
		localRecoveryConfig,
		cancelStreamRegistry);

	this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
}
 
源代码16 项目: flink   文件: StackTraceSampleService.java
/**
 * Returns a future that completes with a given number of stack trace samples of a task thread.
 *
 * @param task                The task to be sampled from.
 * @param numSamples          The number of samples.
 * @param delayBetweenSamples The time to wait between taking samples.
 * @param maxStackTraceDepth  The maximum depth of the returned stack traces.
 *                            Negative means unlimited.
 * @return A future containing the stack trace samples.
 */
public CompletableFuture<List<StackTraceElement[]>> requestStackTraceSample(
		final StackTraceSampleableTask task,
		@Nonnegative final int numSamples,
		final Time delayBetweenSamples,
		final int maxStackTraceDepth) {

	checkNotNull(task, "task must not be null");
	checkArgument(numSamples > 0, "numSamples must be positive");
	checkNotNull(delayBetweenSamples, "delayBetweenSamples must not be null");

	return requestStackTraceSample(
		task,
		numSamples,
		delayBetweenSamples,
		maxStackTraceDepth,
		new ArrayList<>(numSamples),
		new CompletableFuture<>());
}
 
源代码17 项目: flink   文件: StateTableByKeyGroupReaders.java
@Override
public void readMappingsInKeyGroup(@Nonnull DataInputView inView, @Nonnegative int keyGroupId) throws IOException {

	if (inView.readByte() == 0) {
		return;
	}

	final TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
	final TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();

	// V1 uses kind of namespace compressing format
	int numNamespaces = inView.readInt();
	for (int k = 0; k < numNamespaces; k++) {
		N namespace = namespaceSerializer.deserialize(inView);
		int numEntries = inView.readInt();
		for (int l = 0; l < numEntries; l++) {
			K key = keySerializer.deserialize(inView);
			S state = stateSerializer.deserialize(inView);
			stateTable.put(key, keyGroupId, namespace, state);
		}
	}
}
 
源代码18 项目: flink   文件: HeapPriorityQueueSet.java
/**
 * Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity.
 *
 * @param elementPriorityComparator comparator for the priority of contained elements.
 * @param keyExtractor function to extract a key from the contained elements.
 * @param minimumCapacity the minimum and initial capacity of this priority queue.
 * @param keyGroupRange the key-group range of the elements in this set.
 * @param totalNumberOfKeyGroups the total number of key-groups of the job.
 */
@SuppressWarnings("unchecked")
public HeapPriorityQueueSet(
	@Nonnull PriorityComparator<T> elementPriorityComparator,
	@Nonnull KeyExtractorFunction<T> keyExtractor,
	@Nonnegative int minimumCapacity,
	@Nonnull KeyGroupRange keyGroupRange,
	@Nonnegative int totalNumberOfKeyGroups) {

	super(elementPriorityComparator, minimumCapacity);

	this.keyExtractor = keyExtractor;

	this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
	this.keyGroupRange = keyGroupRange;

	final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups();
	final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange;
	this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange];
	for (int i = 0; i < keyGroupsInLocalRange; ++i) {
		deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize);
	}
}
 
源代码19 项目: flink   文件: KeyGroupPartitionedPriorityQueue.java
@SuppressWarnings("unchecked")
public KeyGroupPartitionedPriorityQueue(
	@Nonnull KeyExtractorFunction<T> keyExtractor,
	@Nonnull PriorityComparator<T> elementPriorityComparator,
	@Nonnull PartitionQueueSetFactory<T, PQ> orderedCacheFactory,
	@Nonnull KeyGroupRange keyGroupRange,
	@Nonnegative int totalKeyGroups) {

	this.keyExtractor = keyExtractor;
	this.totalKeyGroups = totalKeyGroups;
	this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
	this.keyGroupedHeaps = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
	this.heapOfKeyGroupedHeaps = new HeapPriorityQueue<>(
		new InternalPriorityQueueComparator<>(elementPriorityComparator),
		keyGroupRange.getNumberOfKeyGroups());
	for (int i = 0; i < keyGroupedHeaps.length; i++) {
		final PQ keyGroupSubHeap =
			orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, keyExtractor, elementPriorityComparator);
		keyGroupedHeaps[i] = keyGroupSubHeap;
		heapOfKeyGroupedHeaps.add(keyGroupSubHeap);
	}
}
 
源代码20 项目: flink   文件: TaskLocalStateStoreImpl.java
@VisibleForTesting
TaskLocalStateStoreImpl(
	@Nonnull JobID jobID,
	@Nonnull AllocationID allocationID,
	@Nonnull JobVertexID jobVertexID,
	@Nonnegative int subtaskIndex,
	@Nonnull LocalRecoveryConfig localRecoveryConfig,
	@Nonnull Executor discardExecutor,
	@Nonnull SortedMap<Long, TaskStateSnapshot> storedTaskStateByCheckpointID,
	@Nonnull Object lock) {

	this.jobID = jobID;
	this.allocationID = allocationID;
	this.jobVertexID = jobVertexID;
	this.subtaskIndex = subtaskIndex;
	this.discardExecutor = discardExecutor;
	this.localRecoveryConfig = localRecoveryConfig;
	this.storedTaskStateByCheckpointID = storedTaskStateByCheckpointID;
	this.lock = lock;
	this.disposed = false;
}
 
源代码21 项目: flink   文件: LocalRecoveryDirectoryProviderImpl.java
public LocalRecoveryDirectoryProviderImpl(
	@Nonnull File[] allocationBaseDirs,
	@Nonnull JobID jobID,
	@Nonnull JobVertexID jobVertexID,
	@Nonnegative int subtaskIndex) {

	Preconditions.checkArgument(allocationBaseDirs.length > 0);
	this.allocationBaseDirs = allocationBaseDirs;
	this.jobID = jobID;
	this.jobVertexID = jobVertexID;
	this.subtaskIndex = subtaskIndex;

	for (File allocationBaseDir : allocationBaseDirs) {
		Preconditions.checkNotNull(allocationBaseDir);
		allocationBaseDir.mkdirs();
	}
}
 
源代码22 项目: flink   文件: TaskLocalStateStoreImpl.java
public TaskLocalStateStoreImpl(
	@Nonnull JobID jobID,
	@Nonnull AllocationID allocationID,
	@Nonnull JobVertexID jobVertexID,
	@Nonnegative int subtaskIndex,
	@Nonnull LocalRecoveryConfig localRecoveryConfig,
	@Nonnull Executor discardExecutor) {

	this(
		jobID,
		allocationID,
		jobVertexID,
		subtaskIndex,
		localRecoveryConfig,
		discardExecutor,
		new TreeMap<>(),
		new Object());
}
 
源代码23 项目: Flink-CEPplus   文件: FailingSource.java
public FailingSource(
	@Nonnull EventEmittingGenerator eventEmittingGenerator,
	@Nonnegative int numberOfGeneratorInvocations,
	@Nonnull TimeCharacteristic timeCharacteristic) {
	this.eventEmittingGenerator = eventEmittingGenerator;
	this.running = true;
	this.emitCallCount = 0;
	this.expectedEmitCalls = numberOfGeneratorInvocations;
	this.failureAfterNumElements = numberOfGeneratorInvocations / 2;
	this.checkpointStatus = new AtomicLong(INITIAL);
	this.usingProcessingTime = timeCharacteristic == TimeCharacteristic.ProcessingTime;
}
 
/**
 * The method to clip the db instance according to the target key group range using
 * the {@link RocksDB#delete(ColumnFamilyHandle, byte[])}.
 *
 * @param db the RocksDB instance to be clipped.
 * @param columnFamilyHandles the column families in the db instance.
 * @param targetKeyGroupRange the target key group range.
 * @param currentKeyGroupRange the key group range of the db instance.
 * @param keyGroupPrefixBytes Number of bytes required to prefix the key groups.
 */
public static void clipDBWithKeyGroupRange(
	@Nonnull RocksDB db,
	@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
	@Nonnull KeyGroupRange targetKeyGroupRange,
	@Nonnull KeyGroupRange currentKeyGroupRange,
	@Nonnegative int keyGroupPrefixBytes) throws RocksDBException {

	final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
	final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];

	if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) {
		RocksDBKeySerializationUtils.serializeKeyGroup(
			currentKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes);
		RocksDBKeySerializationUtils.serializeKeyGroup(
			targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes);
		deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
	}

	if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) {
		RocksDBKeySerializationUtils.serializeKeyGroup(
			targetKeyGroupRange.getEndKeyGroup() + 1, beginKeyGroupBytes);
		RocksDBKeySerializationUtils.serializeKeyGroup(
			currentKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes);
		deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
	}
}
 
public RocksDBSerializedCompositeKeyBuilder(
	@Nonnull TypeSerializer<K> keySerializer,
	@Nonnegative int keyGroupPrefixBytes,
	@Nonnegative int initialSize) {
	this(
		keySerializer,
		new DataOutputSerializer(initialSize),
		keyGroupPrefixBytes,
		RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
		0);
}
 
@VisibleForTesting
RocksDBSerializedCompositeKeyBuilder(
	@Nonnull TypeSerializer<K> keySerializer,
	@Nonnull DataOutputSerializer keyOutView,
	@Nonnegative int keyGroupPrefixBytes,
	boolean keySerializerTypeVariableSized,
	@Nonnegative int afterKeyMark) {
	this.keySerializer = keySerializer;
	this.keyOutView = keyOutView;
	this.keyGroupPrefixBytes = keyGroupPrefixBytes;
	this.keySerializerTypeVariableSized = keySerializerTypeVariableSized;
	this.afterKeyMark = afterKeyMark;
}
 
public RocksIncrementalSnapshotStrategy(
	@Nonnull RocksDB db,
	@Nonnull ResourceGuard rocksDBResourceGuard,
	@Nonnull TypeSerializer<K> keySerializer,
	@Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
	@Nonnull KeyGroupRange keyGroupRange,
	@Nonnegative int keyGroupPrefixBytes,
	@Nonnull LocalRecoveryConfig localRecoveryConfig,
	@Nonnull CloseableRegistry cancelStreamRegistry,
	@Nonnull File instanceBasePath,
	@Nonnull UUID backendUID,
	@Nonnull SortedMap<Long, Set<StateHandleID>> materializedSstFiles,
	long lastCompletedCheckpointId,
	int numberOfTransferingThreads) {

	super(
		DESCRIPTION,
		db,
		rocksDBResourceGuard,
		keySerializer,
		kvStateInformation,
		keyGroupRange,
		keyGroupPrefixBytes,
		localRecoveryConfig,
		cancelStreamRegistry);

	this.instanceBasePath = instanceBasePath;
	this.backendUID = backendUID;
	this.materializedSstFiles = materializedSstFiles;
	this.lastCompletedCheckpointId = lastCompletedCheckpointId;
	this.stateUploader = new RocksDBStateUploader(numberOfTransferingThreads);
	this.localDirectoryName = backendUID.toString().replaceAll("[\\-]", "");
}
 
public HeapPriorityQueueSetFactory(
	@Nonnull KeyGroupRange keyGroupRange,
	@Nonnegative int totalKeyGroups,
	@Nonnegative int minimumCapacity) {

	this.keyGroupRange = keyGroupRange;
	this.totalKeyGroups = totalKeyGroups;
	this.minimumCapacity = minimumCapacity;
}
 
源代码29 项目: flink   文件: TaskLocalStateStoreImpl.java
@Override
public void storeLocalState(
	@Nonnegative long checkpointId,
	@Nullable TaskStateSnapshot localState) {

	if (localState == null) {
		localState = NULL_DUMMY;
	}

	if (LOG.isTraceEnabled()) {
		LOG.debug(
			"Stored local state for checkpoint {} in subtask ({} - {} - {}) : {}.",
			checkpointId, jobID, jobVertexID, subtaskIndex, localState);
	} else if (LOG.isDebugEnabled()) {
		LOG.debug(
			"Stored local state for checkpoint {} in subtask ({} - {} - {})",
			checkpointId, jobID, jobVertexID, subtaskIndex);
	}

	Map.Entry<Long, TaskStateSnapshot> toDiscard = null;

	synchronized (lock) {
		if (disposed) {
			// we ignore late stores and simply discard the state.
			toDiscard = new AbstractMap.SimpleEntry<>(checkpointId, localState);
		} else {
			TaskStateSnapshot previous =
				storedTaskStateByCheckpointID.put(checkpointId, localState);

			if (previous != null) {
				toDiscard = new AbstractMap.SimpleEntry<>(checkpointId, previous);
			}
		}
	}

	if (toDiscard != null) {
		asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
	}
}
 
HeapPriorityQueueStateSnapshot(
	@Nonnull T[] heapArrayCopy,
	@Nonnull KeyExtractorFunction<T> keyExtractor,
	@Nonnull RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo,
	@Nonnull KeyGroupRange keyGroupRange,
	@Nonnegative int totalKeyGroups) {

	this.keyExtractor = keyExtractor;
	this.heapArrayCopy = heapArrayCopy;
	this.metaInfo = metaInfo;
	this.keyGroupRange = keyGroupRange;
	this.totalKeyGroups = totalKeyGroups;
}
 
 类所在包
 同包方法