下面列出了怎么用org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController的API类实例代码及写法,或者点击链接到github查看源代码。
public InterRegionServerRpcController(PayloadCarryingRpcController delegate, Configuration conf) {
super(delegate);
// Set priority higher that normal, but lower than high
this.priority = (HConstants.HIGH_QOS + HConstants.NORMAL_QOS) / 2;
}
@Override
public PayloadCarryingRpcController newController() {
PayloadCarryingRpcController delegate = super.newController();
return getController(delegate);
}
@Override
public PayloadCarryingRpcController newController(CellScanner cellScanner) {
PayloadCarryingRpcController delegate = super.newController(cellScanner);
return getController(delegate);
}
@Override
public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
PayloadCarryingRpcController delegate = super.newController(cellIterables);
return getController(delegate);
}
private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) {
// construct a chain of controllers
return new InterRegionServerRpcController(delegate, conf);
}
@Override
public PayloadCarryingRpcController newController() {
PayloadCarryingRpcController delegate = super.newController();
return new IndexQosRpcController(delegate, conf);
}
@Override
public PayloadCarryingRpcController newController(CellScanner cellScanner) {
PayloadCarryingRpcController delegate = super.newController(cellScanner);
return new IndexQosRpcController(delegate, conf);
}
@Override
public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
PayloadCarryingRpcController delegate = super.newController(cellIterables);
return new IndexQosRpcController(delegate, conf);
}
public IndexQosRpcController(PayloadCarryingRpcController delegate, Configuration conf) {
super(delegate);
this.conf = conf;
this.priority = PhoenixIndexRpcSchedulerFactory.getMinPriority(conf);
}
@Override
public PayloadCarryingRpcController newController() {
PayloadCarryingRpcController controller = delegate.newController();
return new CountingIndexClientRpcController(controller);
}
@Override
public PayloadCarryingRpcController newController(CellScanner cellScanner) {
PayloadCarryingRpcController controller = delegate.newController(cellScanner);
return new CountingIndexClientRpcController(controller);
}
@Override
public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
PayloadCarryingRpcController controller = delegate.newController(cellIterables);
return new CountingIndexClientRpcController(controller);
}
public CountingIndexClientRpcController(PayloadCarryingRpcController delegate) {
super(delegate);
}
@Override
public AdminProtos.ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
final AdminProtos.ReplicateWALEntryRequest request) throws ServiceException {
try {
// TODO Recording of last processed timestamp won't work if two batches of log entries are sent out of order
long lastProcessedTimestamp = -1;
SepEventExecutor eventExecutor = new SepEventExecutor(listener, executors, 100, sepMetrics);
List<AdminProtos.WALEntry> entries = request.getEntryList();
CellScanner cells = ((PayloadCarryingRpcController)controller).cellScanner();
for (final AdminProtos.WALEntry entry : entries) {
TableName tableName = (entry.getKey().getWriteTime() < subscriptionTimestamp) ? null :
TableName.valueOf(entry.getKey().getTableName().toByteArray());
Multimap<ByteBuffer, Cell> keyValuesPerRowKey = ArrayListMultimap.create();
final Map<ByteBuffer, byte[]> payloadPerRowKey = Maps.newHashMap();
int count = entry.getAssociatedCellCount();
for (int i = 0; i < count; i++) {
if (!cells.advance()) {
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
// this signals to us that we simply need to skip over count of cells
if (tableName == null) {
continue;
}
Cell cell = cells.current();
ByteBuffer rowKey = ByteBuffer.wrap(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
byte[] payload;
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (payloadExtractor != null && (payload = payloadExtractor.extractPayload(tableName.toBytes(), kv)) != null) {
if (payloadPerRowKey.containsKey(rowKey)) {
log.error("Multiple payloads encountered for row " + Bytes.toStringBinary(rowKey)
+ ", choosing " + Bytes.toStringBinary(payloadPerRowKey.get(rowKey)));
} else {
payloadPerRowKey.put(rowKey, payload);
}
}
keyValuesPerRowKey.put(rowKey, kv);
}
for (final ByteBuffer rowKeyBuffer : keyValuesPerRowKey.keySet()) {
final List<Cell> keyValues = (List<Cell>) keyValuesPerRowKey.get(rowKeyBuffer);
final SepEvent sepEvent = new SepEvent(tableName.toBytes(), CellUtil.cloneRow(keyValues.get(0)), keyValues,
payloadPerRowKey.get(rowKeyBuffer));
eventExecutor.scheduleSepEvent(sepEvent);
lastProcessedTimestamp = Math.max(lastProcessedTimestamp, entry.getKey().getWriteTime());
}
}
List<Future<?>> futures = eventExecutor.flush();
waitOnSepEventCompletion(futures);
if (lastProcessedTimestamp > 0) {
sepMetrics.reportSepTimestamp(lastProcessedTimestamp);
}
return AdminProtos.ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}