下面列出了org.apache.hadoop.hbase.HConstants#NO_NONCE 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public long estimatedSerializedSizeOf() {
long size = encodedRegionName != null ? encodedRegionName.length : 0;
size += tablename != null ? tablename.toBytes().length : 0;
if (clusterIds != null) {
size += 16 * clusterIds.size();
}
if (nonceGroup != HConstants.NO_NONCE) {
size += Bytes.SIZEOF_LONG; // nonce group
}
if (nonce != HConstants.NO_NONCE) {
size += Bytes.SIZEOF_LONG; // nonce
}
if (replicationScope != null) {
for (Map.Entry<byte[], Integer> scope: replicationScope.entrySet()) {
size += scope.getKey().length;
size += Bytes.SIZEOF_INT;
}
}
size += Bytes.SIZEOF_LONG; // sequence number
size += Bytes.SIZEOF_LONG; // write time
if (origLogSeqNum > 0) {
size += Bytes.SIZEOF_LONG; // original sequence number
}
return size;
}
/**
* Pass a processor to region to process multiple rows atomically.
*
* The RowProcessor implementations should be the inner classes of your
* RowProcessorEndpoint. This way the RowProcessor can be class-loaded with
* the Coprocessor endpoint together.
*
* See {@code TestRowProcessorEndpoint} for example.
*
* The request contains information for constructing processor
* (see {@link #constructRowProcessorFromRequest}. The processor object defines
* the read-modify-write procedure.
*/
@Override
public void process(RpcController controller, ProcessRequest request,
RpcCallback<ProcessResponse> done) {
ProcessResponse resultProto = null;
try {
RowProcessor<S,T> processor = constructRowProcessorFromRequest(request);
Region region = env.getRegion();
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
region.processRowsWithLocks(processor, nonceGroup, nonce);
T result = processor.getResult();
ProcessResponse.Builder b = ProcessResponse.newBuilder();
b.setRowProcessorResult(result.toByteString());
resultProto = b.build();
} catch (Exception e) {
CoprocessorRpcUtils.setControllerException(controller, new IOException(e));
}
done.run(resultProto);
}
/**
* Starts the operation if operation with such nonce has not already succeeded. If the
* operation is in progress, waits for it to end and checks whether it has succeeded.
* @param group Nonce group.
* @param nonce Nonce.
* @param stoppable Stoppable that terminates waiting (if any) when the server is stopped.
* @return true if the operation has not already succeeded and can proceed; false otherwise.
*/
public boolean startOperation(long group, long nonce, Stoppable stoppable)
throws InterruptedException {
if (nonce == HConstants.NO_NONCE) return true;
NonceKey nk = new NonceKey(group, nonce);
OperationContext ctx = new OperationContext();
while (true) {
OperationContext oldResult = nonces.putIfAbsent(nk, ctx);
if (oldResult == null) return true;
// Collision with some operation - should be extremely rare.
synchronized (oldResult) {
int oldState = oldResult.getState();
LOG.debug("Conflict detected by nonce: " + nk + ", " + oldResult);
if (oldState != OperationContext.WAIT) {
return oldState == OperationContext.PROCEED; // operation ended
}
oldResult.setHasWait();
oldResult.wait(this.conflictWaitIterationMs); // operation is still active... wait and loop
if (stoppable.isStopped()) {
throw new InterruptedException("Server stopped");
}
}
}
}
/**
* Reports the operation from WAL during replay.
* @param group Nonce group.
* @param nonce Nonce.
* @param writeTime Entry write time, used to ignore entries that are too old.
*/
public void reportOperationFromWal(long group, long nonce, long writeTime) {
if (nonce == HConstants.NO_NONCE) return;
// Give the write time some slack in case the clocks are not synchronized.
long now = EnvironmentEdgeManager.currentTime();
if (now > writeTime + (deleteNonceGracePeriod * 1.5)) return;
OperationContext newResult = new OperationContext();
newResult.setState(OperationContext.DONT_PROCEED);
NonceKey nk = new NonceKey(group, nonce);
OperationContext oldResult = nonces.putIfAbsent(nk, newResult);
if (oldResult != null) {
// Some schemes can have collisions (for example, expiring hashes), so just log it.
// We have no idea about the semantics here, so this is the least of many evils.
LOG.warn("Nonce collision during WAL recovery: " + nk
+ ", " + oldResult + " with " + newResult);
}
}
/**
* Return the write point of the previous succeed operation.
* @param group Nonce group.
* @param nonce Nonce.
* @return write point of the previous succeed operation.
*/
public long getMvccFromOperationContext(long group, long nonce) {
if (nonce == HConstants.NO_NONCE) {
return Long.MAX_VALUE;
}
NonceKey nk = new NonceKey(group, nonce);
OperationContext result = nonces.get(nk);
return result == null ? Long.MAX_VALUE : result.getMvcc();
}
protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times,
MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes, String cf)
throws IOException {
final byte[] row = Bytes.toBytes(cf);
for (int i = 0; i < times; i++) {
long timestamp = System.currentTimeMillis();
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, row, row, timestamp, row));
WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(),
SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, scopes);
log.appendData(hri, key, cols);
}
log.sync();
}
@Test
public void testWriteEntryCanBeNull() throws IOException {
String testName = currentTest.getMethodName();
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
CONF, null, true, null, null);
wal.close();
TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (byte[] fam : td.getColumnFamilyNames()) {
scopes.put(fam, 0);
}
long timestamp = System.currentTimeMillis();
byte[] row = Bytes.toBytes("row");
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, row, row, timestamp, row));
WALKeyImpl key =
new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID,
timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes);
try {
wal.append(ri, key, cols, true);
fail("Should fail since the wal has already been closed");
} catch (IOException e) {
// expected
assertThat(e.getMessage(), containsString("log is closed"));
// the WriteEntry should be null since we fail before setting it.
assertNull(key.getWriteEntry());
}
}
/**
* Create a protocol buffer MutateRequest for an append
*
* @param regionName
* @param append
* @return a mutate request
* @throws IOException
*/
public static MutateRequest buildMutateRequest(final byte[] regionName,
final Append append, long nonceGroup, long nonce) throws IOException {
MutateRequest.Builder builder = MutateRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
if (nonce != HConstants.NO_NONCE && nonceGroup != HConstants.NO_NONCE) {
builder.setNonceGroup(nonceGroup);
}
builder.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, append,
MutationProto.newBuilder(), nonce));
return builder.build();
}
/**
* Create a protocol buffer MutateRequest for a client increment
*
* @param regionName
* @param increment
* @return a mutate request
*/
public static MutateRequest buildMutateRequest(final byte[] regionName,
final Increment increment, final long nonceGroup, final long nonce) throws IOException {
MutateRequest.Builder builder = MutateRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
if (nonce != HConstants.NO_NONCE && nonceGroup != HConstants.NO_NONCE) {
builder.setNonceGroup(nonceGroup);
}
builder.setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, increment,
MutationProto.newBuilder(), nonce));
return builder.build();
}
public static MutationProto toMutation(final MutationType type, final Mutation mutation,
MutationProto.Builder builder, long nonce)
throws IOException {
builder = getMutationBuilderAndSetCommonFields(type, mutation, builder);
if (nonce != HConstants.NO_NONCE) {
builder.setNonce(nonce);
}
if (type == MutationType.INCREMENT) {
builder.setTimeRange(ProtobufUtil.toTimeRange(((Increment) mutation).getTimeRange()));
}
if (type == MutationType.APPEND) {
builder.setTimeRange(ProtobufUtil.toTimeRange(((Append) mutation).getTimeRange()));
}
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {
columnBuilder.clear();
columnBuilder.setFamily(UnsafeByteOperations.unsafeWrap(family.getKey()));
for (Cell cell: family.getValue()) {
valueBuilder.clear();
valueBuilder.setQualifier(UnsafeByteOperations.unsafeWrap(
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
valueBuilder.setValue(UnsafeByteOperations.unsafeWrap(
cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
valueBuilder.setTimestamp(cell.getTimestamp());
if (type == MutationType.DELETE || (type == MutationType.PUT && CellUtil.isDelete(cell))) {
KeyValue.Type keyValueType = KeyValue.Type.codeToType(cell.getTypeByte());
valueBuilder.setDeleteType(toDeleteType(keyValueType));
}
columnBuilder.addQualifierValue(valueBuilder.build());
}
builder.addColumnValue(columnBuilder.build());
}
return builder.build();
}
public static MutationProto toMutationNoData(final MutationType type, final Mutation mutation,
final MutationProto.Builder builder, long nonce) throws IOException {
getMutationBuilderAndSetCommonFields(type, mutation, builder);
builder.setAssociatedCellCount(mutation.size());
if (mutation instanceof Increment) {
builder.setTimeRange(ProtobufUtil.toTimeRange(((Increment) mutation).getTimeRange()));
}
if (mutation instanceof Append) {
builder.setTimeRange(ProtobufUtil.toTimeRange(((Append) mutation).getTimeRange()));
}
if (nonce != HConstants.NO_NONCE) {
builder.setNonce(nonce);
}
return builder.build();
}
@Override
public long newNonce() {
long result = HConstants.NO_NONCE;
do {
result = rdm.nextLong();
} while (result == HConstants.NO_NONCE);
return result;
}
/**
* @return The nonce group
*/
default long getNonceGroup() {
return HConstants.NO_NONCE;
}
/**
* @return The nonce
*/
default long getNonce() {
return HConstants.NO_NONCE;
}
public boolean hasNonce() {
return nonce != HConstants.NO_NONCE;
}
public boolean hasNonceGroup() {
return nonceGroup != HConstants.NO_NONCE;
}
@Override
public long newNonce() {
return HConstants.NO_NONCE;
}
@Override
public long getNonceGroup() {
return HConstants.NO_NONCE;
}
/**
* Create a NonceKey from the specified nonceGroup and nonce.
* @param nonceGroup the group to use for the {@link NonceKey}
* @param nonce the nonce to use in the {@link NonceKey}
* @return the generated NonceKey
*/
public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
}