类org.apache.hadoop.hbase.CellScanner源码实例Demo

下面列出了怎么用org.apache.hadoop.hbase.CellScanner的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hugegraph   文件: HbaseSessions.java
/**
 * Just for debug
 */
@SuppressWarnings("unused")
private void dump(String table, Scan scan) throws IOException {
    System.out.println(String.format(">>>> scan table %s with %s",
                                     table, scan));
    RowIterator iterator = this.scan(table, scan);
    while (iterator.hasNext()) {
        Result row = iterator.next();
        System.out.println(StringEncoding.format(row.getRow()));
        CellScanner cellScanner = row.cellScanner();
        while (cellScanner.advance()) {
            Cell cell = cellScanner.current();
            byte[] key = CellUtil.cloneQualifier(cell);
            byte[] val = CellUtil.cloneValue(cell);
            System.out.println(String.format("  %s=%s",
                               StringEncoding.format(key),
                               StringEncoding.format(val)));
        }
    }
}
 
源代码2 项目: phoenix   文件: BaseIndexIT.java
private void assertNoIndexDeletes(Connection conn, long minTimestamp, String fullIndexName) throws IOException, SQLException {
    if (!this.mutable) {
        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
        PTable index = pconn.getTable(new PTableKey(null, fullIndexName));
        byte[] physicalIndexTable = index.getPhysicalName().getBytes();
        try (Table hIndex = pconn.getQueryServices().getTable(physicalIndexTable)) {
            Scan scan = new Scan();
            scan.setRaw(true);
            if (this.transactional) {
                minTimestamp = TransactionUtil.convertToNanoseconds(minTimestamp);
            }
            scan.setTimeRange(minTimestamp, HConstants.LATEST_TIMESTAMP);
            ResultScanner scanner = hIndex.getScanner(scan);
            Result result;
            while ((result = scanner.next()) != null) {
                CellScanner cellScanner = result.cellScanner();
                while (cellScanner.advance()) {
                    Cell current = cellScanner.current();
                    assertTrue(CellUtil.isPut(current));
                }
            }
        };
    }
}
 
源代码3 项目: hbase   文件: AccessController.java
private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
  // No need to check if we're not going to throw
  if (!authorizationEnabled) {
    m.setAttribute(TAG_CHECK_PASSED, TRUE);
    return;
  }
  // Superusers are allowed to store cells unconditionally.
  if (Superusers.isSuperUser(user)) {
    m.setAttribute(TAG_CHECK_PASSED, TRUE);
    return;
  }
  // We already checked (prePut vs preBatchMutation)
  if (m.getAttribute(TAG_CHECK_PASSED) != null) {
    return;
  }
  for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
    Iterator<Tag> tagsItr = PrivateCellUtil.tagsIterator(cellScanner.current());
    while (tagsItr.hasNext()) {
      if (tagsItr.next().getType() == PermissionStorage.ACL_TAG_TYPE) {
        throw new AccessDeniedException("Mutation contains cell with reserved type tag");
      }
    }
  }
  m.setAttribute(TAG_CHECK_PASSED, TRUE);
}
 
源代码4 项目: hbase   文件: CellBlockBuilder.java
private boolean buildCellBlock(final Codec codec, final CompressionCodec compressor,
    final CellScanner cellScanner, OutputStreamSupplier supplier) throws IOException {
  if (cellScanner == null) {
    return false;
  }
  if (codec == null) {
    throw new CellScannerButNoCodecException();
  }
  int bufferSize = cellBlockBuildingInitialBufferSize;
  encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor);
  if (LOG.isTraceEnabled() && bufferSize < supplier.size()) {
    LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + supplier.size()
        + "; up hbase.ipc.cellblock.building.initial.buffersize?");
  }
  return true;
}
 
源代码5 项目: hbase   文件: TestTags.java
@Override
public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
    InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
  if (checkTagPresence) {
    if (results.size() > 0) {
      // Check tag presence in the 1st cell in 1st Result
      Result result = results.get(0);
      CellScanner cellScanner = result.cellScanner();
      if (cellScanner.advance()) {
        Cell cell = cellScanner.current();
        tags = PrivateCellUtil.getTags(cell);
      }
    }
  }
  return hasMore;
}
 
源代码6 项目: hbase   文件: RSRpcServices.java
private static Get toGet(final Mutation mutation) throws IOException {
  if(!(mutation instanceof Increment) && !(mutation instanceof Append)) {
    throw new AssertionError("mutation must be a instance of Increment or Append");
  }
  Get get = new Get(mutation.getRow());
  CellScanner cellScanner = mutation.cellScanner();
  while (!cellScanner.advance()) {
    Cell cell = cellScanner.current();
    get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
  }
  if (mutation instanceof Increment) {
    // Increment
    Increment increment = (Increment) mutation;
    get.setTimeRange(increment.getTimeRange().getMin(), increment.getTimeRange().getMax());
  } else {
    // Append
    Append append = (Append) mutation;
    get.setTimeRange(append.getTimeRange().getMin(), append.getTimeRange().getMax());
  }
  for (Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) {
    get.setAttribute(entry.getKey(), entry.getValue());
  }
  return get;
}
 
源代码7 项目: hbase   文件: QuotaTableUtil.java
/**
 * Returns a multimap for all existing table snapshot entries.
 * @param conn connection to re-use
 */
public static Multimap<TableName, String> getTableSnapshots(Connection conn) throws IOException {
  try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
      ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) {
    Multimap<TableName, String> snapshots = HashMultimap.create();
    for (Result r : rs) {
      CellScanner cs = r.cellScanner();
      while (cs.advance()) {
        Cell c = cs.current();

        final String snapshot = extractSnapshotNameFromSizeCell(c);
        snapshots.put(getTableFromRowKey(r.getRow()), snapshot);
      }
    }
    return snapshots;
  }
}
 
源代码8 项目: hbase   文件: RSRpcServices.java
/**
 * Replicate WAL entries on the region server.
 * @param controller the RPC controller
 * @param request the request
 */
@Override
@QosPriority(priority=HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
    final ReplicateWALEntryRequest request) throws ServiceException {
  try {
    checkOpen();
    if (regionServer.getReplicationSinkService() != null) {
      requestCount.increment();
      List<WALEntry> entries = request.getEntryList();
      checkShouldRejectReplicationRequest(entries);
      CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
      regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries();
      regionServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
        request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
        request.getSourceHFileArchiveDirPath());
      regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries();
      return ReplicateWALEntryResponse.newBuilder().build();
    } else {
      throw new ServiceException("Replication services are not initialized yet");
    }
  } catch (IOException ie) {
    throw new ServiceException(ie);
  }
}
 
源代码9 项目: hbase   文件: RSRpcServices.java
private void skipCellsForMutation(Action action, CellScanner cellScanner) {
  if (cellScanner == null) {
    return;
  }
  try {
    if (action.hasMutation()) {
      MutationProto m = action.getMutation();
      if (m.hasAssociatedCellCount()) {
        for (int i = 0; i < m.getAssociatedCellCount(); i++) {
          cellScanner.advance();
        }
      }
    }
  } catch (IOException e) {
    // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
    // marked as failed as we could not see the Region here. At client side the top level
    // RegionAction exception will be considered first.
    LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
  }
}
 
源代码10 项目: hbase   文件: CellBlockBuilder.java
private void encodeCellsTo(OutputStream os, CellScanner cellScanner, Codec codec,
    CompressionCodec compressor) throws IOException {
  Compressor poolCompressor = null;
  try {
    if (compressor != null) {
      if (compressor instanceof Configurable) {
        ((Configurable) compressor).setConf(this.conf);
      }
      poolCompressor = CodecPool.getCompressor(compressor);
      os = compressor.createOutputStream(os, poolCompressor);
    }
    Codec.Encoder encoder = codec.getEncoder(os);
    while (cellScanner.advance()) {
      encoder.write(cellScanner.current());
    }
    encoder.flush();
  } catch (BufferOverflowException | IndexOutOfBoundsException e) {
    throw new DoNotRetryIOException(e);
  } finally {
    os.close();
    if (poolCompressor != null) {
      CodecPool.returnCompressor(poolCompressor);
    }
  }
}
 
源代码11 项目: phoenix   文件: TestUtil.java
public static int getRowCount(Table table, boolean isRaw) throws IOException {
    Scan s = new Scan();
    s.setRaw(isRaw);;
    s.setMaxVersions();
    int rows = 0;
    try (ResultScanner scanner = table.getScanner(s)) {
        Result result = null;
        while ((result = scanner.next()) != null) {
            rows++;
            CellScanner cellScanner = result.cellScanner();
            Cell current = null;
            while (cellScanner.advance()) {
                current = cellScanner.current();
            }
        }
    }
    return rows;
}
 
源代码12 项目: hbase   文件: TestVisibilityLabels.java
@Test
public void testSimpleVisibilityLabels() throws Exception {
  TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
  try (Table table = createTableAndWriteDataWithLabels(tableName, SECRET + "|" + CONFIDENTIAL,
      PRIVATE + "|" + CONFIDENTIAL)) {
    Scan s = new Scan();
    s.setAuthorizations(new Authorizations(SECRET, CONFIDENTIAL, PRIVATE));
    ResultScanner scanner = table.getScanner(s);
    Result[] next = scanner.next(3);

    assertTrue(next.length == 2);
    CellScanner cellScanner = next[0].cellScanner();
    cellScanner.advance();
    Cell current = cellScanner.current();
    assertTrue(Bytes.equals(current.getRowArray(), current.getRowOffset(),
        current.getRowLength(), row1, 0, row1.length));
    cellScanner = next[1].cellScanner();
    cellScanner.advance();
    current = cellScanner.current();
    assertTrue(Bytes.equals(current.getRowArray(), current.getRowOffset(),
        current.getRowLength(), row2, 0, row2.length));
  }
}
 
源代码13 项目: hbase   文件: QuotaTableUtil.java
/**
 * Returns a list of {@code Delete} to remove all entries returned by the passed scanner.
 * @param connection connection to re-use
 * @param scan the scanner to use to generate the list of deletes
 */
static List<Delete> createDeletesForExistingSnapshotsFromScan(Connection connection, Scan scan)
    throws IOException {
  List<Delete> deletes = new ArrayList<>();
  try (Table quotaTable = connection.getTable(QUOTA_TABLE_NAME);
      ResultScanner rs = quotaTable.getScanner(scan)) {
    for (Result r : rs) {
      CellScanner cs = r.cellScanner();
      while (cs.advance()) {
        Cell c = cs.current();
        byte[] family = Bytes.copy(c.getFamilyArray(), c.getFamilyOffset(), c.getFamilyLength());
        byte[] qual =
            Bytes.copy(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength());
        Delete d = new Delete(r.getRow());
        d.addColumns(family, qual);
        deletes.add(d);
      }
    }
    return deletes;
  }
}
 
源代码14 项目: hbase   文件: TableSnapshotInputFormatTestBase.java
protected static void verifyRowFromMap(ImmutableBytesWritable key, Result result)
  throws IOException {
  byte[] row = key.get();
  CellScanner scanner = result.cellScanner();
  while (scanner.advance()) {
    Cell cell = scanner.current();

    //assert that all Cells in the Result have the same key
    Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
      cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
  }

  for (byte[] family : FAMILIES) {
    byte[] actual = result.getValue(family, family);
    Assert.assertArrayEquals(
      "Row in snapshot does not match, expected:" + Bytes.toString(row) + " ,actual:" + Bytes
        .toString(actual), row, actual);
  }
}
 
源代码15 项目: phoenix   文件: TestUtil.java
public static CellCount getCellCount(Table table, boolean isRaw) throws IOException {
    Scan s = new Scan();
    s.setRaw(isRaw);;
    s.setMaxVersions();

    CellCount cellCount = new CellCount();
    try (ResultScanner scanner = table.getScanner(s)) {
        Result result = null;
        while ((result = scanner.next()) != null) {
            CellScanner cellScanner = result.cellScanner();
            Cell current = null;
            while (cellScanner.advance()) {
                current = cellScanner.current();
                cellCount.addCell(Bytes.toString(CellUtil.cloneRow(current)));
            }
        }
    }
    return cellCount;
}
 
源代码16 项目: phoenix   文件: PhoenixTxIndexMutationGenerator.java
private TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation mutation) {
    this.currentTimestamp = currentTimestamp;
    this.indexedColumns = indexedColumns;
    this.mutation = mutation;
    int estimatedSize = indexedColumns.size();
    this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
    this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize);
    try {
        CellScanner scanner = mutation.cellScanner();
        while (scanner.advance()) {
            Cell cell = scanner.current();
            pendingUpdates.add(PhoenixKeyValueUtil.maybeCopyCell(cell));
        }
    } catch (IOException e) {
        throw new RuntimeException(e); // Impossible
    }
}
 
源代码17 项目: hbase   文件: AbstractTestIPC.java
/**
 * It is hard to verify the compression is actually happening under the wraps. Hope that if
 * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
 * confirm that compression is happening down in the client and server).
 */
@Test
public void testCompressCellBlock() throws IOException, ServiceException {
  Configuration conf = new Configuration(HBaseConfiguration.create());
  conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
  List<Cell> cells = new ArrayList<>();
  int count = 3;
  for (int i = 0; i < count; i++) {
    cells.add(CELL);
  }
  RpcServer rpcServer = createRpcServer(null, "testRpcServer",
      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
          SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
      new FifoRpcScheduler(CONF, 1));

  try (AbstractRpcClient<?> client = createRpcClient(conf)) {
    rpcServer.start();
    BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
    HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
    String message = "hello";
    assertEquals(message,
      stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
    int index = 0;
    CellScanner cellScanner = pcrc.cellScanner();
    assertNotNull(cellScanner);
    while (cellScanner.advance()) {
      assertEquals(CELL, cellScanner.current());
      index++;
    }
    assertEquals(count, index);
  } finally {
    rpcServer.stop();
  }
}
 
源代码18 项目: hbase   文件: VisibilityController.java
@Override
public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
    throws IOException {
  // If authorization is not enabled, we don't care about reserved tags
  if (!authorizationEnabled) {
    return null;
  }
  for (CellScanner cellScanner = append.cellScanner(); cellScanner.advance();) {
    if (!checkForReservedVisibilityTagPresence(cellScanner.current())) {
      throw new FailedSanityCheckException("Append contains cell with reserved type tag");
    }
  }
  return null;
}
 
源代码19 项目: hbase   文件: VisibilityController.java
@Override
public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
    throws IOException {
  // If authorization is not enabled, we don't care about reserved tags
  if (!authorizationEnabled) {
    return null;
  }
  for (CellScanner cellScanner = increment.cellScanner(); cellScanner.advance();) {
    if (!checkForReservedVisibilityTagPresence(cellScanner.current())) {
      throw new FailedSanityCheckException("Increment contains cell with reserved type tag");
    }
  }
  return null;
}
 
源代码20 项目: hbase   文件: ReplicationProtbufUtil.java
/**
 * @param cells
 * @return <code>cells</code> packaged as a CellScanner
 */
static CellScanner getCellScanner(final List<List<? extends Cell>> cells, final int size) {
  return new SizedCellScanner() {
    private final Iterator<List<? extends Cell>> entries = cells.iterator();
    private Iterator<? extends Cell> currentIterator = null;
    private Cell currentCell;

    @Override
    public Cell current() {
      return this.currentCell;
    }

    @Override
    public boolean advance() {
      if (this.currentIterator == null) {
        if (!this.entries.hasNext()) return false;
        this.currentIterator = this.entries.next().iterator();
      }
      if (this.currentIterator.hasNext()) {
        this.currentCell = this.currentIterator.next();
        return true;
      }
      this.currentCell = null;
      this.currentIterator = null;
      return advance();
    }

    @Override
    public long heapSize() {
      return size;
    }
  };
}
 
源代码21 项目: hbase   文件: CellBlockBuilder.java
public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor, CellScanner cellScanner,
    ByteBufAllocator alloc) throws IOException {
  ByteBufOutputStreamSupplier supplier = new ByteBufOutputStreamSupplier(alloc);
  if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
    return supplier.buf;
  } else {
    return null;
  }
}
 
源代码22 项目: hbase   文件: AsyncRegionServerAdmin.java
public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
    ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) {
  return call((stub, controller, done) -> {
    controller.setCallTimeout(timeout);
    stub.replicateWALEntry(controller, request, done);
  }, cellScanner);
}
 
源代码23 项目: hbase   文件: RSRpcServices.java
private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
  if (r.maxCellSize > 0) {
    CellScanner cells = m.cellScanner();
    while (cells.advance()) {
      int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
      if (size > r.maxCellSize) {
        String msg = "Cell with size " + size + " exceeds limit of " + r.maxCellSize + " bytes";
        LOG.debug(msg);
        throw new DoNotRetryIOException(msg);
      }
    }
  }
}
 
源代码24 项目: hbase   文件: CellBlockBuilder.java
/**
 * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
 * <code>compressor</code>.
 * @param codec
 * @param compressor
 * @param cellScanner
 * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
 *         passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
 *         been flipped and is ready for reading. Use limit to find total size.
 * @throws IOException
 */
public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
    final CellScanner cellScanner) throws IOException {
  ByteBufferOutputStreamSupplier supplier = new ByteBufferOutputStreamSupplier();
  if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
    ByteBuffer bb = supplier.baos.getByteBuffer();
    // If no cells, don't mess around. Just return null (could be a bunch of existence checking
    // gets or something -- stuff that does not return a cell).
    return bb.hasRemaining() ? bb : null;
  } else {
    return null;
  }
}
 
源代码25 项目: hbase   文件: RSRpcServices.java
private void failRegionAction(MultiResponse.Builder responseBuilder,
    RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
    CellScanner cellScanner, Throwable error) {
  rpcServer.getMetrics().exception(error);
  regionActionResultBuilder.setException(ResponseConverter.buildException(error));
  responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
  // All Mutations in this RegionAction not executed as we can not see the Region online here
  // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
  // corresponding to these Mutations.
  if (cellScanner != null) {
    skipCellsForMutations(regionAction.getActionList(), cellScanner);
  }
}
 
源代码26 项目: hbase   文件: RSRpcServices.java
private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
  if (cellScanner == null) {
    return;
  }
  for (Action action : actions) {
    skipCellsForMutation(action, cellScanner);
  }
}
 
源代码27 项目: hbase   文件: ProtobufUtil.java
/**
 * Convert a protocol buffer Mutate to an Increment
 *
 * @param proto the protocol buffer Mutate to convert
 * @return the converted client Increment
 * @throws IOException
 */
public static Increment toIncrement(final MutationProto proto, final CellScanner cellScanner)
        throws IOException {
  MutationType type = proto.getMutateType();
  assert type == MutationType.INCREMENT : type.name();
  Increment increment = toDelta((Bytes row) -> new Increment(row.get(), row.getOffset(), row.getLength()),
          Increment::add, proto, cellScanner);
  if (proto.hasTimeRange()) {
    TimeRange timeRange = toTimeRange(proto.getTimeRange());
    increment.setTimeRange(timeRange.getMin(), timeRange.getMax());
  }
  return increment;
}
 
源代码28 项目: hbase   文件: CellBlockBuilder.java
/**
 * @param codec to use for cellblock
 * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
 *          position()'ed at the start of the cell block and limit()'ed at the end.
 * @return CellScanner to work against the content of <code>cellBlock</code>. All cells created
 *         out of the CellScanner will share the same ByteBuffer being passed.
 * @throws IOException if cell encoding fails
 */
public CellScanner createCellScannerReusingBuffers(final Codec codec,
    final CompressionCodec compressor, ByteBuff cellBlock) throws IOException {
  // Use this method from HRS to create the CellScanner
  // If compressed, decompress it first before passing it on else we will leak compression
  // resources if the stream is not closed properly after we let it out.
  if (compressor != null) {
    cellBlock = decompress(compressor, cellBlock);
  }
  return codec.getDecoder(cellBlock);
}
 
源代码29 项目: hbase   文件: SimpleServerCall.java
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
    justification = "Can't figure why this complaint is happening... see below")
SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md,
    RequestHeader header, Message param, CellScanner cellScanner,
    SimpleServerRpcConnection connection, long size, final InetAddress remoteAddress,
    long receiveTime, int timeout, ByteBuffAllocator bbAllocator,
    CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup,
    SimpleRpcServerResponder responder) {
  super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, receiveTime,
      timeout, bbAllocator, cellBlockBuilder, reqCleanup);
  this.responder = responder;
}
 
源代码30 项目: hbase   文件: SimpleServerRpcConnection.java
@Override
public SimpleServerCall createCall(int id, BlockingService service, MethodDescriptor md,
    RequestHeader header, Message param, CellScanner cellScanner, long size,
    InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
  return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size,
      remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator,
      this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
}
 
 类所在包
 同包方法