

public static void handleHBaseException(
    RetriesExhaustedWithDetailsException rex,
    Record record,
    Map<String, Record> rowKeyToRecord,
    ErrorRecordHandler errorRecordHandler
) throws StageException {
  for (int i = 0; i < rex.getNumExceptions(); i++) {
    if (rex.getCause(i) instanceof NoSuchColumnFamilyException) {
      Row r = rex.getRow(i);
      Record errorRecord = record != null ? record : rowKeyToRecord.get(Bytes.toString(r.getRow()));
      OnRecordErrorException exception = new OnRecordErrorException(errorRecord,
          getErrorDescription(rex.getCause(i), r, rex.getHostnamePort(i))
    } else {
      // If at least 1 non NoSuchColumnFamilyException exception,
      // consider as stage exception
      throw new StageException(Errors.HBASE_02, rex);
private void recordFailure(final Mutation m, final long keyBase,
    final long start, IOException e) {
  String exceptionInfo;
  if (e instanceof RetriesExhaustedWithDetailsException) {
    RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
    exceptionInfo = aggEx.getExhaustiveDescription();
  } else {
    StringWriter stackWriter = new StringWriter();
    PrintWriter pw = new PrintWriter(stackWriter);
    exceptionInfo = StringUtils.stringifyException(e);
  LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start)
      + "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
      + exceptionInfo);
private void recordFailure(final Table table, final Put put, final long keyBase,
    final long start, IOException e) {
  String exceptionInfo;
  if (e instanceof RetriesExhaustedWithDetailsException) {
    RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
    exceptionInfo = aggEx.getExhaustiveDescription();
  } else {
    StringWriter stackWriter = new StringWriter();
    PrintWriter pw = new PrintWriter(stackWriter);
    exceptionInfo = StringUtils.stringifyException(e);
  LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start)
      + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
      + exceptionInfo);
public void flushTuples()
  try {
  } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
    throw Throwables.propagate(e);
public void flushTables() throws InterruptedIOException, RetriesExhaustedWithDetailsException
  if (table != null) {
  for (Map.Entry<String, HTable> entry : tableCache.asMap().entrySet()) {
public void commitTransaction()
  try {
  } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
    throw Throwables.propagate(e);
private void performPut(Table table, Record record, Put p) throws StageException, IOException {
  try {
    // HTable internally keeps a buffer, a put() will keep on buffering till the buffer
    // limit is reached
    // Once it hits the buffer limit or autoflush is set to true, commit will happen
  } catch (RetriesExhaustedWithDetailsException rex) {
    // There may be more than one row which failed to persist
    AbstractHBaseConnectionHelper.handleHBaseException(rex, record, null, errorRecordHandler);
private void performPut(HTable hTable, Record record, Put p) throws StageException, IOException {
  try {
    // HTable internally keeps a buffer, a put() will keep on buffering till the buffer
    // limit is reached
    // Once it hits the buffer limit or autoflush is set to true, commit will happen
  } catch (RetriesExhaustedWithDetailsException rex) {
    // There may be more than one row which failed to persist
    AbstractHBaseConnectionHelper.handleHBaseException(rex, record, null, errorRecordHandler);
public void mutate(Table table, Mutation m,
    long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
  long start = System.currentTimeMillis();
  try {
    m = dataGenerator.beforeMutate(keyBase, m);
    if (m instanceof Increment) {
    } else if (m instanceof Append) {
    } else if (m instanceof Put) {
      table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put)m);
    } else if (m instanceof Delete) {
      table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete)m);
    } else {
      throw new IllegalArgumentException(
        "unsupported mutation " + m.getClass().getSimpleName());
    totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
  } catch (IOException e) {
    if (ignoreNonceConflicts) {
      LOG.info("Detected nonce conflict, ignoring: " + e.getMessage());
      totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
    String exceptionInfo;
    if (e instanceof RetriesExhaustedWithDetailsException) {
      RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
      exceptionInfo = aggEx.getExhaustiveDescription();
    } else {
      exceptionInfo = StringUtils.stringifyException(e);
    LOG.error("Failed to mutate: " + keyBase + " after " +
        (System.currentTimeMillis() - start) +
      "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
        + exceptionInfo);
public void mutate(Table table, Mutation m,
    long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
  long start = System.currentTimeMillis();
  try {
    m = dataGenerator.beforeMutate(keyBase, m);
    if (m instanceof Increment) {
    } else if (m instanceof Append) {
    } else if (m instanceof Put) {
      table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put)m);
    } else if (m instanceof Delete) {
      table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete)m);
    } else {
      throw new IllegalArgumentException(
        "unsupported mutation " + m.getClass().getSimpleName());
    totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
  } catch (IOException e) {
    String exceptionInfo;
    if (e instanceof RetriesExhaustedWithDetailsException) {
      RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
      exceptionInfo = aggEx.getExhaustiveDescription();
    } else {
      StringWriter stackWriter = new StringWriter();
      PrintWriter pw = new PrintWriter(stackWriter);
      exceptionInfo = StringUtils.stringifyException(e);
    LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) +
      "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
        + exceptionInfo);
private static void logExceptions(RetriesExhaustedWithDetailsException e) {
  Set<String> codes = new TreeSet<>();
  Set<String> messages = new TreeSet<>();
  for (Throwable e1 : e.getCauses()) {
    if (e1 instanceof StatusException) {
      StatusException statusException = (StatusException) e1;
源代码12 项目: pinpoint   文件: HbaseTemplate2IT.java
public void notExist() throws Exception {
    try {
        hbaseTemplate2.put(TableName.valueOf("NOT_EXIST"), new byte[] {0, 0, 0}, "familyName".getBytes(), "columnName".getBytes(), new byte[]{0, 0, 0});
    } catch (HbaseSystemException e) {
        RetriesExhaustedWithDetailsException exception = (RetriesExhaustedWithDetailsException)(e.getCause());
        if (!(exception.getCause(0) instanceof TableNotFoundException)) {
            Assert.fail("unexpected exception :" + e.getCause()); 

public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
	// fail the sink and skip the rest of the items
	// if the failure handler decides to throw an exception
	failureThrowable.compareAndSet(null, exception);
源代码14 项目: attic-apex-malhar   文件: HBaseStore.java
protected void flushTable(HTable table) throws InterruptedIOException, RetriesExhaustedWithDetailsException
 * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't
 * rethrowing the exception correctly?
 * <p>
 * We use a custom codec to enforce the thrown exception.
 * @throws Exception
@Test(timeout = 300000)
public void testQuickFailure() throws Exception {
  // incorrectly setup indexing for the primary table - target index table doesn't exist, which
  // should quickly return to the client
  byte[] family = Bytes.toBytes("family");
  ColumnGroup fam1 = new ColumnGroup(getIndexTableName());
  // values are [col1]
  fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
  CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
  // add the index family
  // usually, we would create the index table here, but we don't for the sake of the test.

  // setup the primary table
  String primaryTable = Bytes.toString(table.getTableName());
  HTableDescriptor pTable = new HTableDescriptor(primaryTable);
  pTable.addFamily(new HColumnDescriptor(family));
  // override the codec so we can use our test one
  builder.build(pTable, FailingTestCodec.class);

  // create the primary table
  HBaseAdmin admin = UTIL.getHBaseAdmin();
  Configuration conf = new Configuration(UTIL.getConfiguration());
  // up the number of retries/wait time to make it obvious that we are failing with retries here
  conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
  conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000);
  HTable primary = new HTable(conf, primaryTable);
  primary.setAutoFlush(false, true);

  // do a simple put that should be indexed
  Put p = new Put(Bytes.toBytes("row"));
  p.add(family, null, Bytes.toBytes("value"));
  try {
    fail("Shouldn't have gotten a successful write to the primary table");
  } catch (RetriesExhaustedWithDetailsException e) {
    LOG.info("Correclty got a failure of the put!");
源代码16 项目: flink   文件: HBaseSinkFunction.java
public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
	// fail the sink and skip the rest of the items
	// if the failure handler decides to throw an exception
	failureThrowable.compareAndSet(null, exception);
public void testQuotaEnforcementsFromRS() throws Exception {
  final long sizeLimit = 1024L * 8L; // 8KB
  final long tableSize = 1024L * 10L; // 10KB
  final int numRegions = 10;
  final TableName tn = helper.createTableWithRegions(numRegions);

  // Define the quota
  QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
      tn, sizeLimit, SpaceViolationPolicy.NO_INSERTS);

  // Write at least `tableSize` data
  try {
    helper.writeData(tn, tableSize);
  } catch (RetriesExhaustedWithDetailsException | SpaceLimitingException e) {
    // Pass

  final HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
  final RegionServerSpaceQuotaManager manager = rs.getRegionServerSpaceQuotaManager();
  Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000, new Predicate<Exception>() {
    public boolean evaluate() throws Exception {
      ActivePolicyEnforcement enforcements = manager.getActiveEnforcements();
      SpaceViolationPolicyEnforcement enforcement = enforcements.getPolicyEnforcement(tn);
      // Signifies that we're waiting on the quota snapshot to be fetched
      if (enforcement instanceof MissingSnapshotViolationPolicyEnforcement) {
        return false;
      return enforcement.getQuotaSnapshot().getQuotaStatus().isInViolation();

  // We obtain the violations for a RegionServer by observing the snapshots
  Map<TableName, SpaceQuotaSnapshot> snapshots = (Map<TableName, SpaceQuotaSnapshot>) TEST_UTIL
  SpaceQuotaSnapshot snapshot = snapshots.get(tn);
  assertNotNull("Did not find snapshot for " + tn, snapshot);
  assertEquals(SpaceViolationPolicy.NO_INSERTS, snapshot.getQuotaStatus().getPolicy().get());
protected static void runMutationTests(Connection conn, TableName tableName, long rowCount,
    int valueSize) throws IOException {
  System.out.println("starting mutations");
  Stopwatch uberStopwatch = Stopwatch.createUnstarted();
  Stopwatch incrementalStopwatch = Stopwatch.createUnstarted();
  try (BufferedMutator mutator = conn.getBufferedMutator(tableName)) {
    // Use the same value over and over again. Creating new random data takes time. Don't count
    // creating a large array towards Bigtable performance
    byte[] value = Bytes.toBytes(RandomStringUtils.randomAlphanumeric(valueSize));
    for (long i = 1; i < 10; i++) {
      // The first few writes are slow.
      doPut(mutator, value);
    BigtableUtilities.printPerformance("starter batch", incrementalStopwatch, 10);

    for (int i = 0; i < rowCount - 10; i++) {
      doPut(mutator, value);
      if (i > 0 && i % PRINT_COUNT == 0) {
        BigtableUtilities.printPerformance("one batch", incrementalStopwatch, PRINT_COUNT);
        BigtableUtilities.printPerformance("average so far", uberStopwatch, i);
    System.out.println(String.format("Flush took %d ms.",
    BigtableUtilities.printPerformance("full batch", uberStopwatch, Math.toIntExact(rowCount));
  } catch (RetriesExhaustedWithDetailsException e) {
 * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't
 * rethrowing the exception correctly?
 * <p>
 * We use a custom codec to enforce the thrown exception.
 * @throws Exception
@Test(timeout = 300000)
public void testQuickFailure() throws Exception {
  // incorrectly setup indexing for the primary table - target index table doesn't exist, which
  // should quickly return to the client
  byte[] family = Bytes.toBytes("family");
  ColumnGroup fam1 = new ColumnGroup(getIndexTableName());
  // values are [col1]
  fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
  CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
  // add the index family
  // usually, we would create the index table here, but we don't for the sake of the test.

  // setup the primary table
  String primaryTable = Bytes.toString(table.getTableName());
  HTableDescriptor pTable = new HTableDescriptor(primaryTable);
  pTable.addFamily(new HColumnDescriptor(family));
  // override the codec so we can use our test one
  builder.build(pTable, FailingTestCodec.class);

  // create the primary table
  HBaseAdmin admin = UTIL.getHBaseAdmin();
  Configuration conf = new Configuration(UTIL.getConfiguration());
  // up the number of retries/wait time to make it obvious that we are failing with retries here
  conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
  conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000);
  HTable primary = new HTable(conf, primaryTable);
  primary.setAutoFlush(false, true);

  // do a simple put that should be indexed
  Put p = new Put(Bytes.toBytes("row"));
  p.add(family, null, Bytes.toBytes("value"));
  try {
    fail("Shouldn't have gotten a successful write to the primary table");
  } catch (RetriesExhaustedWithDetailsException e) {
    LOG.info("Correclty got a failure of the put!");