下面列出了怎么用org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException的API类实例代码及写法,或者点击链接到github查看源代码。
public static void handleHBaseException(
RetriesExhaustedWithDetailsException rex,
Record record,
Map<String, Record> rowKeyToRecord,
ErrorRecordHandler errorRecordHandler
) throws StageException {
for (int i = 0; i < rex.getNumExceptions(); i++) {
if (rex.getCause(i) instanceof NoSuchColumnFamilyException) {
Row r = rex.getRow(i);
Record errorRecord = record != null ? record : rowKeyToRecord.get(Bytes.toString(r.getRow()));
OnRecordErrorException exception = new OnRecordErrorException(errorRecord,
Errors.HBASE_10,
getErrorDescription(rex.getCause(i), r, rex.getHostnamePort(i))
);
errorRecordHandler.onError(exception);
} else {
// If at least 1 non NoSuchColumnFamilyException exception,
// consider as stage exception
throw new StageException(Errors.HBASE_02, rex);
}
}
}
private void recordFailure(final Mutation m, final long keyBase,
final long start, IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start)
+ "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
+ exceptionInfo);
}
private void recordFailure(final Table table, final Put put, final long keyBase,
final long start, IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start)
+ "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
+ exceptionInfo);
}
public void flushTuples()
{
try {
store.flushTables();
} catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
throw Throwables.propagate(e);
}
}
public void flushTables() throws InterruptedIOException, RetriesExhaustedWithDetailsException
{
if (table != null) {
flushTable(table);
}
for (Map.Entry<String, HTable> entry : tableCache.asMap().entrySet()) {
flushTable(entry.getValue());
}
}
@Override
public void commitTransaction()
{
try {
flushTables();
} catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
throw Throwables.propagate(e);
}
}
private void performPut(Table table, Record record, Put p) throws StageException, IOException {
try {
// HTable internally keeps a buffer, a put() will keep on buffering till the buffer
// limit is reached
// Once it hits the buffer limit or autoflush is set to true, commit will happen
table.put(p);
} catch (RetriesExhaustedWithDetailsException rex) {
// There may be more than one row which failed to persist
AbstractHBaseConnectionHelper.handleHBaseException(rex, record, null, errorRecordHandler);
}
}
private void performPut(HTable hTable, Record record, Put p) throws StageException, IOException {
try {
// HTable internally keeps a buffer, a put() will keep on buffering till the buffer
// limit is reached
// Once it hits the buffer limit or autoflush is set to true, commit will happen
hTable.put(p);
} catch (RetriesExhaustedWithDetailsException rex) {
// There may be more than one row which failed to persist
AbstractHBaseConnectionHelper.handleHBaseException(rex, record, null, errorRecordHandler);
}
}
public void mutate(Table table, Mutation m,
long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
long start = System.currentTimeMillis();
try {
m = dataGenerator.beforeMutate(keyBase, m);
if (m instanceof Increment) {
table.increment((Increment)m);
} else if (m instanceof Append) {
table.append((Append)m);
} else if (m instanceof Put) {
table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put)m);
} else if (m instanceof Delete) {
table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete)m);
} else {
throw new IllegalArgumentException(
"unsupported mutation " + m.getClass().getSimpleName());
}
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
if (ignoreNonceConflicts) {
LOG.info("Detected nonce conflict, ignoring: " + e.getMessage());
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
return;
}
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to mutate: " + keyBase + " after " +
(System.currentTimeMillis() - start) +
"ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
+ exceptionInfo);
}
}
public void mutate(Table table, Mutation m,
long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
long start = System.currentTimeMillis();
try {
m = dataGenerator.beforeMutate(keyBase, m);
if (m instanceof Increment) {
table.increment((Increment)m);
} else if (m instanceof Append) {
table.append((Append)m);
} else if (m instanceof Put) {
table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put)m);
} else if (m instanceof Delete) {
table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete)m);
} else {
throw new IllegalArgumentException(
"unsupported mutation " + m.getClass().getSimpleName());
}
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) +
"ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
+ exceptionInfo);
}
}
private static void logExceptions(RetriesExhaustedWithDetailsException e) {
System.out.println(e.getExhaustiveDescription());
Set<String> codes = new TreeSet<>();
Set<String> messages = new TreeSet<>();
for (Throwable e1 : e.getCauses()) {
if (e1 instanceof StatusException) {
StatusException statusException = (StatusException) e1;
codes.add(statusException.getStatus().getCode().name());
messages.add(statusException.getMessage());
}
}
}
@Test
@Ignore
public void notExist() throws Exception {
try {
hbaseTemplate2.put(TableName.valueOf("NOT_EXIST"), new byte[] {0, 0, 0}, "familyName".getBytes(), "columnName".getBytes(), new byte[]{0, 0, 0});
Assert.fail("exceptions");
} catch (HbaseSystemException e) {
RetriesExhaustedWithDetailsException exception = (RetriesExhaustedWithDetailsException)(e.getCause());
if (!(exception.getCause(0) instanceof TableNotFoundException)) {
Assert.fail("unexpected exception :" + e.getCause());
}
}
}
@Override
public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, exception);
}
protected void flushTable(HTable table) throws InterruptedIOException, RetriesExhaustedWithDetailsException
{
table.flushCommits();
}
/**
* If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't
* rethrowing the exception correctly?
* <p>
* We use a custom codec to enforce the thrown exception.
* @throws Exception
*/
@Test(timeout = 300000)
public void testQuickFailure() throws Exception {
// incorrectly setup indexing for the primary table - target index table doesn't exist, which
// should quickly return to the client
byte[] family = Bytes.toBytes("family");
ColumnGroup fam1 = new ColumnGroup(getIndexTableName());
// values are [col1]
fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
// add the index family
builder.addIndexGroup(fam1);
// usually, we would create the index table here, but we don't for the sake of the test.
// setup the primary table
String primaryTable = Bytes.toString(table.getTableName());
@SuppressWarnings("deprecation")
HTableDescriptor pTable = new HTableDescriptor(primaryTable);
pTable.addFamily(new HColumnDescriptor(family));
// override the codec so we can use our test one
builder.build(pTable, FailingTestCodec.class);
// create the primary table
HBaseAdmin admin = UTIL.getHBaseAdmin();
admin.createTable(pTable);
Configuration conf = new Configuration(UTIL.getConfiguration());
// up the number of retries/wait time to make it obvious that we are failing with retries here
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000);
HTable primary = new HTable(conf, primaryTable);
primary.setAutoFlush(false, true);
// do a simple put that should be indexed
Put p = new Put(Bytes.toBytes("row"));
p.add(family, null, Bytes.toBytes("value"));
primary.put(p);
try {
primary.flushCommits();
fail("Shouldn't have gotten a successful write to the primary table");
} catch (RetriesExhaustedWithDetailsException e) {
LOG.info("Correclty got a failure of the put!");
}
primary.close();
}
@Override
public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, exception);
}
@Test
public void testQuotaEnforcementsFromRS() throws Exception {
final long sizeLimit = 1024L * 8L; // 8KB
final long tableSize = 1024L * 10L; // 10KB
final int numRegions = 10;
final TableName tn = helper.createTableWithRegions(numRegions);
// Define the quota
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
tn, sizeLimit, SpaceViolationPolicy.NO_INSERTS);
TEST_UTIL.getAdmin().setQuota(settings);
// Write at least `tableSize` data
try {
helper.writeData(tn, tableSize);
} catch (RetriesExhaustedWithDetailsException | SpaceLimitingException e) {
// Pass
}
final HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
final RegionServerSpaceQuotaManager manager = rs.getRegionServerSpaceQuotaManager();
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ActivePolicyEnforcement enforcements = manager.getActiveEnforcements();
SpaceViolationPolicyEnforcement enforcement = enforcements.getPolicyEnforcement(tn);
// Signifies that we're waiting on the quota snapshot to be fetched
if (enforcement instanceof MissingSnapshotViolationPolicyEnforcement) {
return false;
}
return enforcement.getQuotaSnapshot().getQuotaStatus().isInViolation();
}
});
// We obtain the violations for a RegionServer by observing the snapshots
@SuppressWarnings("unchecked")
Map<TableName, SpaceQuotaSnapshot> snapshots = (Map<TableName, SpaceQuotaSnapshot>) TEST_UTIL
.getAdmin().getRegionServerSpaceQuotaSnapshots(rs.getServerName());
SpaceQuotaSnapshot snapshot = snapshots.get(tn);
assertNotNull("Did not find snapshot for " + tn, snapshot);
assertTrue(snapshot.getQuotaStatus().isInViolation());
assertEquals(SpaceViolationPolicy.NO_INSERTS, snapshot.getQuotaStatus().getPolicy().get());
}
protected static void runMutationTests(Connection conn, TableName tableName, long rowCount,
int valueSize) throws IOException {
System.out.println("starting mutations");
Stopwatch uberStopwatch = Stopwatch.createUnstarted();
Stopwatch incrementalStopwatch = Stopwatch.createUnstarted();
try (BufferedMutator mutator = conn.getBufferedMutator(tableName)) {
// Use the same value over and over again. Creating new random data takes time. Don't count
// creating a large array towards Bigtable performance
byte[] value = Bytes.toBytes(RandomStringUtils.randomAlphanumeric(valueSize));
incrementalStopwatch.start();
for (long i = 1; i < 10; i++) {
// The first few writes are slow.
doPut(mutator, value);
}
mutator.flush();
BigtableUtilities.printPerformance("starter batch", incrementalStopwatch, 10);
uberStopwatch.reset();
incrementalStopwatch.reset();
uberStopwatch.start();
incrementalStopwatch.start();
for (int i = 0; i < rowCount - 10; i++) {
doPut(mutator, value);
if (i > 0 && i % PRINT_COUNT == 0) {
BigtableUtilities.printPerformance("one batch", incrementalStopwatch, PRINT_COUNT);
BigtableUtilities.printPerformance("average so far", uberStopwatch, i);
incrementalStopwatch.reset();
incrementalStopwatch.start();
}
}
incrementalStopwatch.reset();
incrementalStopwatch.start();
System.out.println("Flushing");
mutator.flush();
System.out.println(String.format("Flush took %d ms.",
incrementalStopwatch.elapsed(TimeUnit.MILLISECONDS)));
BigtableUtilities.printPerformance("full batch", uberStopwatch, Math.toIntExact(rowCount));
} catch (RetriesExhaustedWithDetailsException e) {
logExceptions(e);
}
}
/**
* If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't
* rethrowing the exception correctly?
* <p>
* We use a custom codec to enforce the thrown exception.
* @throws Exception
*/
@Test(timeout = 300000)
public void testQuickFailure() throws Exception {
// incorrectly setup indexing for the primary table - target index table doesn't exist, which
// should quickly return to the client
byte[] family = Bytes.toBytes("family");
ColumnGroup fam1 = new ColumnGroup(getIndexTableName());
// values are [col1]
fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
// add the index family
builder.addIndexGroup(fam1);
// usually, we would create the index table here, but we don't for the sake of the test.
// setup the primary table
String primaryTable = Bytes.toString(table.getTableName());
HTableDescriptor pTable = new HTableDescriptor(primaryTable);
pTable.addFamily(new HColumnDescriptor(family));
// override the codec so we can use our test one
builder.build(pTable, FailingTestCodec.class);
// create the primary table
HBaseAdmin admin = UTIL.getHBaseAdmin();
admin.createTable(pTable);
Configuration conf = new Configuration(UTIL.getConfiguration());
// up the number of retries/wait time to make it obvious that we are failing with retries here
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000);
HTable primary = new HTable(conf, primaryTable);
primary.setAutoFlush(false, true);
// do a simple put that should be indexed
Put p = new Put(Bytes.toBytes("row"));
p.add(family, null, Bytes.toBytes("value"));
primary.put(p);
try {
primary.flushCommits();
fail("Shouldn't have gotten a successful write to the primary table");
} catch (RetriesExhaustedWithDetailsException e) {
LOG.info("Correclty got a failure of the put!");
}
primary.close();
}