下面列出了org.apache.hadoop.hbase.protobuf.generated.AdminProtos#ReplicateWALEntryResponse ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public AdminProtos.ReplicateWALEntryResponse replicateWALEntry(RpcController controller, AdminProtos.ReplicateWALEntryRequest request) throws ServiceException {
throw new UnsupportedOperationException("No need to support.");
}
@Override
public AdminProtos.ReplicateWALEntryResponse replay(RpcController controller, AdminProtos.ReplicateWALEntryRequest request) throws ServiceException {
throw new UnsupportedOperationException("No need to support.");
}
@Override
public AdminProtos.ReplicateWALEntryResponse replicateWALEntry(RpcController rpcController, AdminProtos.ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public AdminProtos.ReplicateWALEntryResponse replay(final RpcController controller, final AdminProtos.ReplicateWALEntryRequest request) throws ServiceException {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public AdminProtos.ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
final AdminProtos.ReplicateWALEntryRequest request) throws ServiceException {
try {
// TODO Recording of last processed timestamp won't work if two batches of log entries are sent out of order
long lastProcessedTimestamp = -1;
SepEventExecutor eventExecutor = new SepEventExecutor(listener, executors, 100, sepMetrics);
List<AdminProtos.WALEntry> entries = request.getEntryList();
CellScanner cells = ((PayloadCarryingRpcController)controller).cellScanner();
for (final AdminProtos.WALEntry entry : entries) {
TableName tableName = (entry.getKey().getWriteTime() < subscriptionTimestamp) ? null :
TableName.valueOf(entry.getKey().getTableName().toByteArray());
Multimap<ByteBuffer, Cell> keyValuesPerRowKey = ArrayListMultimap.create();
final Map<ByteBuffer, byte[]> payloadPerRowKey = Maps.newHashMap();
int count = entry.getAssociatedCellCount();
for (int i = 0; i < count; i++) {
if (!cells.advance()) {
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
// this signals to us that we simply need to skip over count of cells
if (tableName == null) {
continue;
}
Cell cell = cells.current();
ByteBuffer rowKey = ByteBuffer.wrap(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
byte[] payload;
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (payloadExtractor != null && (payload = payloadExtractor.extractPayload(tableName.toBytes(), kv)) != null) {
if (payloadPerRowKey.containsKey(rowKey)) {
log.error("Multiple payloads encountered for row " + Bytes.toStringBinary(rowKey)
+ ", choosing " + Bytes.toStringBinary(payloadPerRowKey.get(rowKey)));
} else {
payloadPerRowKey.put(rowKey, payload);
}
}
keyValuesPerRowKey.put(rowKey, kv);
}
for (final ByteBuffer rowKeyBuffer : keyValuesPerRowKey.keySet()) {
final List<Cell> keyValues = (List<Cell>) keyValuesPerRowKey.get(rowKeyBuffer);
final SepEvent sepEvent = new SepEvent(tableName.toBytes(), CellUtil.cloneRow(keyValues.get(0)), keyValues,
payloadPerRowKey.get(rowKeyBuffer));
eventExecutor.scheduleSepEvent(sepEvent);
lastProcessedTimestamp = Math.max(lastProcessedTimestamp, entry.getKey().getWriteTime());
}
}
List<Future<?>> futures = eventExecutor.flush();
waitOnSepEventCompletion(futures);
if (lastProcessedTimestamp > 0) {
sepMetrics.reportSepTimestamp(lastProcessedTimestamp);
}
return AdminProtos.ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
public AdminProtos.ReplicateWALEntryResponse replicateWALEntry(RpcController rpcController, AdminProtos.ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException {
throw new UnsupportedOperationException("Not implemented");
}
public AdminProtos.ReplicateWALEntryResponse replay(RpcController controller, AdminProtos.ReplicateWALEntryRequest request) throws ServiceException {
throw new UnsupportedOperationException("Not implemented");
}