下面列出了怎么用org.apache.hadoop.hbase.codec.Codec的API类实例代码及写法,或者点击链接到github查看源代码。
static void doCodec(final Codec codec, final Cell [] cells, final int cycles, final int count,
final int initialBufferSize)
throws IOException {
byte [] bytes = null;
Cell [] cellsDecoded = null;
for (int i = 0; i < cycles; i++) {
ByteArrayOutputStream baos = new ByteArrayOutputStream(initialBufferSize);
Codec.Encoder encoder = codec.getEncoder(baos);
bytes = runEncoderTest(i, initialBufferSize, baos, encoder, cells);
}
for (int i = 0; i < cycles; i++) {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
Codec.Decoder decoder = codec.getDecoder(bais);
cellsDecoded = CodecPerformance.runDecoderTest(i, count, decoder);
}
verifyCells(cells, cellsDecoded);
}
private boolean buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner, OutputStreamSupplier supplier) throws IOException {
if (cellScanner == null) {
return false;
}
if (codec == null) {
throw new CellScannerButNoCodecException();
}
int bufferSize = cellBlockBuildingInitialBufferSize;
encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor);
if (LOG.isTraceEnabled() && bufferSize < supplier.size()) {
LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + supplier.size()
+ "; up hbase.ipc.cellblock.building.initial.buffersize?");
}
return true;
}
private void encodeCellsTo(OutputStream os, CellScanner cellScanner, Codec codec,
CompressionCodec compressor) throws IOException {
Compressor poolCompressor = null;
try {
if (compressor != null) {
if (compressor instanceof Configurable) {
((Configurable) compressor).setConf(this.conf);
}
poolCompressor = CodecPool.getCompressor(compressor);
os = compressor.createOutputStream(os, poolCompressor);
}
Codec.Encoder encoder = codec.getEncoder(os);
while (cellScanner.advance()) {
encoder.write(cellScanner.current());
}
encoder.flush();
} catch (BufferOverflowException | IndexOutOfBoundsException e) {
throw new DoNotRetryIOException(e);
} finally {
os.close();
if (poolCompressor != null) {
CodecPool.returnCompressor(poolCompressor);
}
}
}
/**
* Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
* <code>compressor</code>.
* @param codec to use for encoding
* @param compressor to use for encoding
* @param cellScanner to encode
* @param allocator to allocate the {@link ByteBuff}.
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
* been flipped and is ready for reading. Use limit to find total size. If
* <code>pool</code> was not null, then this returned ByteBuffer came from there and
* should be returned to the pool when done.
* @throws IOException if encoding the cells fail
*/
public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
CellScanner cellScanner, ByteBuffAllocator allocator) throws IOException {
if (cellScanner == null) {
return null;
}
if (codec == null) {
throw new CellScannerButNoCodecException();
}
ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(allocator);
encodeCellsTo(bbos, cellScanner, codec, compressor);
if (bbos.size() == 0) {
bbos.releaseResources();
return null;
}
return bbos;
}
private static void timerTests(final CellBlockBuilder builder, final int count, final int size,
final Codec codec, final CompressionCodec compressor) throws IOException {
final int cycles = 1000;
StopWatch timer = new StopWatch();
timer.start();
for (int i = 0; i < cycles; i++) {
timerTest(builder, timer, count, size, codec, compressor, false);
}
timer.stop();
LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false + ", count="
+ count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
timer.reset();
timer.start();
for (int i = 0; i < cycles; i++) {
timerTest(builder, timer, count, size, codec, compressor, true);
}
timer.stop();
LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true + ", count="
+ count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
}
private void writeWALEdit(WALCellCodec codec, List<Cell> kvs, FSDataOutputStream out) throws IOException {
out.writeInt(kvs.size());
Codec.Encoder cellEncoder = codec.getEncoder(out);
// We interleave the two lists for code simplicity
for (Cell kv : kvs) {
cellEncoder.write(kv);
}
}
/**
* Reads WALEdit from cells.
* @param cellDecoder Cell decoder.
* @param expectedCount Expected cell count.
* @return Number of KVs read.
*/
public int readFromCells(Codec.Decoder cellDecoder, int expectedCount) throws IOException {
cells.clear();
cells.ensureCapacity(expectedCount);
while (cells.size() < expectedCount && cellDecoder.advance()) {
add(cellDecoder.current());
}
return cells.size();
}
@Override
protected NettyRpcClient createRpcClientNoCodec(Configuration conf) {
setConf(conf);
return new NettyRpcClient(conf) {
@Override
Codec getCodec() {
return null;
}
};
}
@Override
protected BlockingRpcClient createRpcClientNoCodec(Configuration conf) {
return new BlockingRpcClient(conf) {
@Override
Codec getCodec() {
return null;
}
};
}
protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) {
return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) {
@Override
Codec getCodec() {
return null;
}
};
}
public NettyRpcDuplexHandler(NettyRpcConnection conn, CellBlockBuilder cellBlockBuilder,
Codec codec, CompressionCodec compressor) {
this.conn = conn;
this.cellBlockBuilder = cellBlockBuilder;
this.codec = codec;
this.compressor = compressor;
}
/**
* Encapsulate the ugly casting and RuntimeException conversion in private method.
* @return Codec to use on this client.
*/
Codec getCodec() {
// For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
// "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
if (className == null || className.length() == 0) {
return null;
}
try {
return (Codec) Class.forName(className).getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException("Failed getting codec " + className, e);
}
}
/**
* Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
* <code>compressor</code>.
* @param codec
* @param compressor
* @param cellScanner
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
* been flipped and is ready for reading. Use limit to find total size.
* @throws IOException
*/
public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner) throws IOException {
ByteBufferOutputStreamSupplier supplier = new ByteBufferOutputStreamSupplier();
if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
ByteBuffer bb = supplier.baos.getByteBuffer();
// If no cells, don't mess around. Just return null (could be a bunch of existence checking
// gets or something -- stuff that does not return a cell).
return bb.hasRemaining() ? bb : null;
} else {
return null;
}
}
public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor, CellScanner cellScanner,
ByteBufAllocator alloc) throws IOException {
ByteBufOutputStreamSupplier supplier = new ByteBufOutputStreamSupplier(alloc);
if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
return supplier.buf;
} else {
return null;
}
}
/**
* @param codec to use for cellblock
* @param cellBlock to encode
* @return CellScanner to work against the content of <code>cellBlock</code>
* @throws IOException if encoding fails
*/
public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte[] cellBlock) throws IOException {
// Use this method from Client side to create the CellScanner
if (compressor != null) {
ByteBuffer cellBlockBuf = decompress(compressor, cellBlock);
return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
}
// Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
// make Cells directly over the passed BB. This method is called at client side and we don't
// want the Cells to share the same byte[] where the RPC response is being read. Caching of any
// of the Cells at user's app level will make it not possible to GC the response byte[]
return codec.getDecoder(new ByteArrayInputStream(cellBlock));
}
/**
* @param codec to use for cellblock
* @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
* position()'ed at the start of the cell block and limit()'ed at the end.
* @return CellScanner to work against the content of <code>cellBlock</code>. All cells created
* out of the CellScanner will share the same ByteBuffer being passed.
* @throws IOException if cell encoding fails
*/
public CellScanner createCellScannerReusingBuffers(final Codec codec,
final CompressionCodec compressor, ByteBuff cellBlock) throws IOException {
// Use this method from HRS to create the CellScanner
// If compressed, decompress it first before passing it on else we will leak compression
// resources if the stream is not closed properly after we let it out.
if (compressor != null) {
cellBlock = decompress(compressor, cellBlock);
}
return codec.getDecoder(cellBlock);
}
static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder builder, final Codec codec,
final CompressionCodec compressor, final int count, final int size, final boolean sized)
throws IOException {
Cell[] cells = getCells(count, size);
CellScanner cellScanner = sized ? getSizedCellScanner(cells)
: CellUtil.createCellScanner(Arrays.asList(cells).iterator());
ByteBuffer bb = builder.buildCellBlock(codec, compressor, cellScanner);
cellScanner = builder.createCellScannerReusingBuffers(codec, compressor,
new SingleByteBuff(bb));
int i = 0;
while (cellScanner.advance()) {
i++;
}
assertEquals(count, i);
}
private void writeWALEdit(WALCellCodec codec, List<Cell> kvs, FSDataOutputStream out) throws IOException {
out.writeInt(kvs.size());
Codec.Encoder cellEncoder = codec.getEncoder(out);
// We interleave the two lists for code simplicity
for (Cell kv : kvs) {
cellEncoder.write(kv);
}
}
protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId,
String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor)
throws IOException {
if (remoteId.getAddress().isUnresolved()) {
throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
}
this.serverAddress = remoteId.getAddress().getAddress();
this.timeoutTimer = timeoutTimer;
this.codec = codec;
this.compressor = compressor;
this.conf = conf;
User ticket = remoteId.getTicket();
this.securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
this.useSasl = isSecurityEnabled;
// Choose the correct Token and AuthenticationProvider for this client to use
SaslClientAuthenticationProviders providers =
SaslClientAuthenticationProviders.getInstance(conf);
Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> pair;
if (useSasl && securityInfo != null) {
pair = providers.selectProvider(clusterId, ticket);
if (pair == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found no valid authentication method from providers={} with tokens={}",
providers.toString(), ticket.getTokens());
}
throw new RuntimeException("Found no valid authentication method from options");
}
} else if (!useSasl) {
// Hack, while SIMPLE doesn't go via SASL.
pair = providers.getSimpleProvider();
} else {
throw new RuntimeException("Could not compute valid client authentication provider");
}
this.provider = pair.getFirst();
this.token = pair.getSecond();
LOG.debug("Using {} authentication for service={}, sasl={}",
provider.getSaslAuthMethod().getName(), remoteId.serviceName, useSasl);
reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
this.remoteId = remoteId;
}
static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder builder, final Codec codec,
final CompressionCodec compressor) throws IOException {
doBuildCellBlockUndoCellBlock(builder, codec, compressor, 10, 1, false);
}
private static void timerTest(final CellBlockBuilder builder, final StopWatch timer,
final int count, final int size, final Codec codec, final CompressionCodec compressor,
final boolean sized) throws IOException {
doBuildCellBlockUndoCellBlock(builder, codec, compressor, count, size, sized);
}