org.apache.hadoop.hbase.client.Connection#getTable ( )源码实例Demo

下面列出了org.apache.hadoop.hbase.client.Connection#getTable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: super-cloudops   文件: HfileBulkImporter.java
/**
 * e.g.</br>
 * 
 * <pre>
 *  yarn jar super-devops-tool-hbase-migrator-master.jar \
 *  com.wl4g.devops.tool.hbase.migrator.HfileBulkImporter \
 *  -z emr-header-1:2181 \
 *  -t safeclound.tb_elec_power \
 *  -p /tmp-devops/safeclound.tb_elec_power
 * </pre>
 * 
 * @param args
 * @throws Exception
 */
public static void main(String[] args) throws Exception {
	HbaseMigrateUtils.showBanner();

	CommandLine line = new Builder().option("z", "zkaddr", null, "Zookeeper address.")
			.option("t", "tabname", null, "Hbase table name.")
			.option("p", "path", null, "Data hdfs path to be import. e.g. hdfs://localhost:9000/bak/safeclound.tb_air")
			.build(args);

	Configuration cfg = HBaseConfiguration.create();
	cfg.set("hbase.zookeeper.quorum", line.getOptionValue("z"));
	Connection conn = ConnectionFactory.createConnection(cfg);
	Admin admin = conn.getAdmin();
	Table table = conn.getTable(TableName.valueOf(line.getOptionValue("t")));
	LoadIncrementalHFiles load = new LoadIncrementalHFiles(cfg);
	load.doBulkLoad(new Path(line.getOptionValue("p")), admin, table,
			conn.getRegionLocator(TableName.valueOf(line.getOptionValue("t"))));
}
 
源代码2 项目: hbase   文件: TestThriftConnection.java
private void testScanWithFilters(Connection connection, String tableName) throws IOException {
  createTable(thriftAdmin, tableName);
  try (Table table = connection.getTable(TableName.valueOf(tableName))){
    FilterList filterList = new FilterList();
    PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("testrow"));
    ColumnValueFilter columnValueFilter = new ColumnValueFilter(FAMILYA, QUALIFIER_1,
        CompareOperator.EQUAL, VALUE_1);
    filterList.addFilter(prefixFilter);
    filterList.addFilter(columnValueFilter);
    Scan scan = new Scan();
    scan.readVersions(2);
    scan.setFilter(filterList);
    ResultScanner scanner = table.getScanner(scan);
    Iterator<Result> iterator = scanner.iterator();
    assertTrue(iterator.hasNext());
    int counter = 0;
    while (iterator.hasNext()) {
      Result result = iterator.next();
      counter += result.size();
    }
    assertEquals(2, counter);
  }
}
 
源代码3 项目: hbase   文件: QuotaTableUtil.java
/**
 * Returns a list of {@code Delete} to remove all entries returned by the passed scanner.
 * @param connection connection to re-use
 * @param scan the scanner to use to generate the list of deletes
 */
static List<Delete> createDeletesForExistingSnapshotsFromScan(Connection connection, Scan scan)
    throws IOException {
  List<Delete> deletes = new ArrayList<>();
  try (Table quotaTable = connection.getTable(QUOTA_TABLE_NAME);
      ResultScanner rs = quotaTable.getScanner(scan)) {
    for (Result r : rs) {
      CellScanner cs = r.cellScanner();
      while (cs.advance()) {
        Cell c = cs.current();
        byte[] family = Bytes.copy(c.getFamilyArray(), c.getFamilyOffset(), c.getFamilyLength());
        byte[] qual =
            Bytes.copy(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength());
        Delete d = new Delete(r.getRow());
        d.addColumns(family, qual);
        deletes.add(d);
      }
    }
    return deletes;
  }
}
 
源代码4 项目: hgraphdb   文件: HBaseIndexTest.java
@Test
public void testPopulateEdgeIndex() throws Exception {
    assertEquals(0, count(graph.vertices()));
    Vertex v0 = graph.addVertex(T.id, id(0));
    Vertex v1 = graph.addVertex(T.id, id(1));
    Vertex v2 = graph.addVertex(T.id, id(2));
    Vertex v3 = graph.addVertex(T.id, id(3));
    Vertex v4 = graph.addVertex(T.id, id(4));
    v0.addEdge("b", v1, "key1", 1);
    v0.addEdge("b", v2, "key1", 2);
    v0.addEdge("b", v3, "key2", 3);
    v0.addEdge("a", v1, "key1", 1);
    v0.addEdge("b", v4, "key1", 4);

    HBaseGraphConfiguration hconf = graph.configuration();
    Connection conn = graph.connection();
    Table table = conn.getTable(HBaseGraphUtils.getTableName(hconf, Constants.EDGE_INDICES));

    verifyTableCount(table, 5*2);  // 5 edge endpoints
    graph.createIndex(ElementType.EDGE, "b", "key1", false, true, false);
    verifyTableCount(table, 5*2 + 3*2);  // 5 edge endpoints and 3 indices

    table.close();
}
 
源代码5 项目: hbase   文件: QuotaTableUtil.java
/**
 * Fetches any persisted HBase snapshot sizes stored in the quota table. The sizes here are
 * computed relative to the table which the snapshot was created from. A snapshot's size will
 * not include the size of files which the table still refers. These sizes, in bytes, are what
 * is used internally to compute quota violation for tables and namespaces.
 *
 * @return A map of snapshot name to size in bytes per space quota computations
 */
public static Map<String,Long> getObservedSnapshotSizes(Connection conn) throws IOException {
  try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
      ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) {
    final Map<String,Long> snapshotSizes = new HashMap<>();
    for (Result r : rs) {
      CellScanner cs = r.cellScanner();
      while (cs.advance()) {
        Cell c = cs.current();
        final String snapshot = extractSnapshotNameFromSizeCell(c);
        final long size = parseSnapshotSize(c);
        snapshotSizes.put(snapshot, size);
      }
    }
    return snapshotSizes;
  }
}
 
源代码6 项目: phoenix   文件: ServerUtil.java
private static Table getTableFromSingletonPool(RegionCoprocessorEnvironment env, TableName tableName) throws IOException {
    // It's ok to not ever do a pool.close() as we're storing a single
    // table only. The HTablePool holds no other resources that this table
    // which will be closed itself when it's no longer needed.
    Connection conn = ConnectionFactory.getConnection(ConnectionType.DEFAULT_SERVER_CONNECTION, env);
    try {
        return conn.getTable(tableName);
    } catch (RuntimeException t) {
        // handle cases that an IOE is wrapped inside a RuntimeException like HTableInterface#createHTableInterface
        if(t.getCause() instanceof IOException) {
            throw (IOException)t.getCause();
        } else {
            throw t;
        }
    }
}
 
private List<RegionInfo> listRegionsInMeta() throws Exception {
  Connection connection = TEST_UTIL.getConnection();
  Table table = connection.getTable(TableName.META_TABLE_NAME);
  Scan scan = new Scan();
  scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
  ResultScanner scanner = table.getScanner(scan);
  final List<RegionInfo> regionInfos = new ArrayList<>();
  for(Result r : scanner) {
    regionInfos.add(RegionInfo.parseFrom(r.getValue(HConstants.CATALOG_FAMILY,
      HConstants.REGIONINFO_QUALIFIER)));
  }
  return regionInfos;
}
 
源代码8 项目: kylin   文件: GridTableHBaseBenchmark.java
private static void prepareData(Connection conn) throws IOException {
    Table table = conn.getTable(TableName.valueOf(TEST_TABLE));

    try {
        // check how many rows existing
        int nRows = 0;
        Scan scan = new Scan();
        scan.setFilter(new KeyOnlyFilter());
        ResultScanner scanner = table.getScanner(scan);
        for (Result r : scanner) {
            r.getRow(); // nothing to do
            nRows++;
        }

        if (nRows > 0) {
            logger.info("{} existing rows", nRows);
            if (nRows != N_ROWS)
                throw new IOException("Expect " + N_ROWS + " rows but it is not");
            return;
        }

        // insert rows into empty table
        logger.info("Writing {} rows to {}", N_ROWS, TEST_TABLE);
        long nBytes = 0;
        for (int i = 0; i < N_ROWS; i++) {
            byte[] rowkey = Bytes.toBytes(i);
            Put put = new Put(rowkey);
            byte[] cell = randomBytes();
            put.addColumn(CF, QN, cell);
            table.put(put);
            nBytes += cell.length;
            dot(i, N_ROWS);
        }
        logger.info("Written {} rows, {} bytes", N_ROWS, nBytes);

    } finally {
        IOUtils.closeQuietly(table);
    }

}
 
源代码9 项目: kylin-on-parquet-v2   文件: HBaseLookupTable.java
public HBaseLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
    String tableName = extTableSnapshot.getStorageLocationIdentifier();
    this.lookupTableName = TableName.valueOf(tableName);
    KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
    Connection connection = HBaseConnection.get(kylinConfig.getStorageUrl());
    try {
        table = connection.getTable(lookupTableName);
    } catch (IOException e) {
        throw new RuntimeException("error when connect HBase", e);
    }

    String[] keyColumns = extTableSnapshot.getKeyColumns();
    encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, extTableSnapshot.getShardNum());
}
 
private static void testColumnScan(Connection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
    Stats stats = new Stats("COLUMN_SCAN");

    Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
    try {
        stats.markStart();

        int nLogicCols = colScans.size();
        int nLogicRows = colScans.get(0).getSecond() - colScans.get(0).getFirst();

        Scan[] scans = new Scan[nLogicCols];
        ResultScanner[] scanners = new ResultScanner[nLogicCols];
        for (int i = 0; i < nLogicCols; i++) {
            scans[i] = new Scan();
            scans[i].addFamily(CF);
            scanners[i] = table.getScanner(scans[i]);
        }
        for (int i = 0; i < nLogicRows; i++) {
            for (int c = 0; c < nLogicCols; c++) {
                Result r = scanners[c].next();
                stats.consume(r);
            }
            dot(i, nLogicRows);
        }

        stats.markEnd();
    } finally {
        IOUtils.closeQuietly(table);
    }
}
 
源代码11 项目: hbase   文件: QuotaTableUtil.java
/**
 * Returns a set of the names of all namespaces containing snapshot entries.
 * @param conn connection to re-use
 */
public static Set<String> getNamespaceSnapshots(Connection conn) throws IOException {
  try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
      ResultScanner rs = quotaTable.getScanner(createScanForNamespaceSnapshotSizes())) {
    Set<String> snapshots = new HashSet<>();
    for (Result r : rs) {
      CellScanner cs = r.cellScanner();
      while (cs.advance()) {
        cs.current();
        snapshots.add(getNamespaceFromRowKey(r.getRow()));
      }
    }
    return snapshots;
  }
}
 
源代码12 项目: antsdb   文件: Helper.java
public static Result exist(Connection conn, TableName tableName, byte[] key) throws IOException {
    Table htable = conn.getTable(tableName);
    Get get = new Get(key);
    get.addColumn(DATA_COLUMN_FAMILY_BYTES, SYS_COLUMN_VERSION_BYTES);
    Result r = htable.get(get);
    return r;
}
 
源代码13 项目: hbase   文件: QuotaTableUtil.java
/**
 * Fetches the computed size of all snapshots against tables in a namespace for space quotas.
 */
static long getNamespaceSnapshotSize(
    Connection conn, String namespace) throws IOException {
  try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
    Result r = quotaTable.get(createGetNamespaceSnapshotSize(namespace));
    if (r.isEmpty()) {
      return 0L;
    }
    r.advance();
    return parseSnapshotSize(r.current());
  } catch (InvalidProtocolBufferException e) {
    throw new IOException("Could not parse snapshot size value for namespace " + namespace, e);
  }
}
 
源代码14 项目: hbase   文件: RestartMetaTest.java
@Override
protected int doWork() throws Exception {
  ProcessBasedLocalHBaseCluster hbaseCluster =
      new ProcessBasedLocalHBaseCluster(conf, NUM_DATANODES, numRegionServers);
  hbaseCluster.startMiniDFS();

  // start the process based HBase cluster
  hbaseCluster.startHBase();

  // create tables if needed
  HBaseTestingUtility.createPreSplitLoadTestTable(conf, TABLE_NAME,
      HFileTestUtil.DEFAULT_COLUMN_FAMILY, Compression.Algorithm.NONE,
      DataBlockEncoding.NONE);

  LOG.debug("Loading data....\n\n");
  loadData();

  LOG.debug("Sleeping for " + SLEEP_SEC_AFTER_DATA_LOAD +
      " seconds....\n\n");
  Threads.sleep(5 * SLEEP_SEC_AFTER_DATA_LOAD);

  Connection connection = ConnectionFactory.createConnection(conf);

  int metaRSPort = HBaseTestingUtility.getMetaRSPort(connection);

  LOG.debug("Killing hbase:meta region server running on port " + metaRSPort);
  hbaseCluster.killRegionServer(metaRSPort);
  Threads.sleep(2000);

  LOG.debug("Restarting region server running on port metaRSPort");
  hbaseCluster.startRegionServer(metaRSPort);
  Threads.sleep(2000);

  LOG.debug("Trying to scan meta");

  Table metaTable = connection.getTable(TableName.META_TABLE_NAME);
  ResultScanner scanner = metaTable.getScanner(new Scan());
  Result result;
  while ((result = scanner.next()) != null) {
    LOG.info("Region assignment from META: "
        + Bytes.toStringBinary(result.getRow())
        + " => "
        + Bytes.toStringBinary(result.getFamilyMap(HConstants.CATALOG_FAMILY)
            .get(HConstants.SERVER_QUALIFIER)));
  }
  metaTable.close();
  connection.close();
  return 0;
}
 
源代码15 项目: phoenix-omid   文件: TTable.java
public TTable(Connection connection, String tableName, CommitTable.Client commitTableClient) throws IOException {
    this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
}
 
public SlicedRowFilterGTSDecoderIterator(long now, long timespan, List<Metadata> metadatas, Connection conn, TableName tableName, byte[] colfam, boolean writeTimestamp, KeyStore keystore, boolean useBlockCache) {
    
  this.keystore = keystore;
  this.now = now;
  this.timespan = timespan;
  this.hbaseAESKey = keystore.getKey(KeyStore.AES_HBASE_DATA);
  this.writeTimestamp = writeTimestamp;
  
  //
  // Check that if 'timespan' is < 0 then 'now' is either Long.MAX_VALUE or congruent to 0 modulo DEFAULT_MODULUS
  //
  
  if (timespan < 0) {
    if (Long.MAX_VALUE != now && 0 != (now % Constants.DEFAULT_MODULUS)) {
      throw new RuntimeException("Incompatible 'timespan' (" + timespan + ") and 'now' (" + now + ")");
    }
  }
  
  //
  // Create a SlicedRowFilter for the prefix, class id, labels id and ts
  // We include the prefix so we exit the filter early when the last
  // matching row has been reached
  //
  
  // 128BITS
  
  int[] bounds = { 0, 24 };
  
  //
  // Create singleton for each classId/labelsId combo
  //
  // TODO(hbs): we should really create multiple scanner, one per class Id for example,
  // 
  
  List<Pair<byte[], byte[]>> ranges = new ArrayList<Pair<byte[], byte[]>>();
  
  for (Metadata metadata: metadatas) {
    byte[][] keys = getKeys(metadata, now, timespan);
    byte[] lower = keys[0];
    byte[] upper = keys[1];
    
    this.metadatas.put(new String(Arrays.copyOfRange(lower, prefix.length, prefix.length + 16), StandardCharsets.ISO_8859_1), metadata);
    
    Pair<byte[],byte[]> range = new Pair<byte[],byte[]>(lower, upper);
    
    ranges.add(range);
  }
              
  SlicedRowFilter filter = new SlicedRowFilter(bounds, ranges, timespan < 0 ? -timespan : Long.MAX_VALUE);

  //
  // Create scanner. The start key is the lower bound of the first range
  //
  
  Scan scan = new Scan();
  scan.addFamily(colfam); // (HBaseStore.GTS_COLFAM, Longs.toByteArray(Long.MAX_VALUE - modulus));
  scan.setStartRow(filter.getStartKey());
  byte[] filterStopKey = filter.getStopKey();
  // Add one byte at the end (we can do that because we know the slice is the whole key)
  byte[] stopRow = Arrays.copyOf(filterStopKey, filterStopKey.length + 1);
  scan.setStopRow(stopRow);
  scan.setFilter(filter);
  
  scan.setMaxResultSize(1000000L);
  scan.setBatch(50000);
  scan.setCaching(50000);
  
  scan.setCacheBlocks(useBlockCache);

  Sensision.update(SensisionConstants.SENSISION_CLASS_CONTINUUM_HBASE_CLIENT_FILTERED_SCANNERS, Sensision.EMPTY_LABELS, 1);
  Sensision.update(SensisionConstants.SENSISION_CLASS_CONTINUUM_HBASE_CLIENT_FILTERED_SCANNERS_RANGES, Sensision.EMPTY_LABELS, ranges.size());

  try {
    this.htable = conn.getTable(tableName);
    this.scanner = this.htable.getScanner(scan);
    iter = scanner.iterator();          
  } catch (IOException ioe) {
    LOG.error("",ioe);
    this.iter = null;
  }
}
 
源代码17 项目: phoenix-omid   文件: TTable.java
public TTable(Connection connection, byte[] tableName, CommitTable.Client commitTableClient) throws IOException {
    this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
}
 
源代码18 项目: hbase   文件: TestFlushWithThroughputController.java
/**
 * Test the tuning task of {@link PressureAwareFlushThroughputController}
 */
@Test
public void testFlushThroughputTuning() throws Exception {
  Configuration conf = hbtu.getConfiguration();
  setMaxMinThroughputs(20L * 1024 * 1024, 10L * 1024 * 1024);
  conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
  conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD,
    3000);
  hbtu.startMiniCluster(1);
  Connection conn = ConnectionFactory.createConnection(conf);
  hbtu.getAdmin().createTable(TableDescriptorBuilder.newBuilder(tableName)
    .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false)
    .build());
  hbtu.waitTableAvailable(tableName);
  HRegionServer regionServer = hbtu.getRSForFirstRegionInTable(tableName);
  double pressure = regionServer.getFlushPressure();
  LOG.debug("Flush pressure before flushing: " + pressure);
  PressureAwareFlushThroughputController throughputController =
      (PressureAwareFlushThroughputController) regionServer.getFlushThroughputController();
  for (HRegion region : regionServer.getRegions()) {
    region.flush(true);
  }
  // We used to assert that the flush pressure is zero but after HBASE-15787 or HBASE-18294 we
  // changed to use heapSize instead of dataSize to calculate the flush pressure, and since
  // heapSize will never be zero, so flush pressure will never be zero either. So we changed the
  // assertion here.
  assertTrue(regionServer.getFlushPressure() < pressure);
  Thread.sleep(5000);
  boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(hbtu.getConfiguration());
  if (tablesOnMaster) {
    // If no tables on the master, this math is off and I'm not sure what it is supposed to be
    // when meta is on the regionserver and not on the master.
    assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
  }
  Table table = conn.getTable(tableName);
  Random rand = new Random();
  for (int i = 0; i < 10; i++) {
    for (int j = 0; j < 10; j++) {
      byte[] value = new byte[256 * 1024];
      rand.nextBytes(value);
      table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
    }
  }
  Thread.sleep(5000);
  double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure());
  assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON);

  conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY,
    NoLimitThroughputController.class.getName());
  regionServer.onConfigurationChange(conf);
  assertTrue(throughputController.isStopped());
  assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController);
  conn.close();
}
 
源代码19 项目: phoenix-omid   文件: TTable.java
public TTable(Connection connection, byte[] tableName, boolean conflictFree) throws IOException {
    this(connection.getTable(TableName.valueOf(tableName)), conflictFree);
}
 
源代码20 项目: hbase   文件: IntegrationTestLazyCfLoading.java
@Test
public void testReadersAndWriters() throws Exception {
  Configuration conf = util.getConfiguration();
  String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName());
  long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES);
  long serverCount = util.getHBaseClusterInterface().getClusterMetrics()
    .getLiveServerMetrics().size();
  long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER;
  Connection connection = ConnectionFactory.createConnection(conf);
  Table table = connection.getTable(TABLE_NAME);

  // Create multi-threaded writer and start it. We write multiple columns/CFs and verify
  // their integrity, therefore multi-put is necessary.
  MultiThreadedWriter writer =
    new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
  writer.setMultiPut(true);

  LOG.info("Starting writer; the number of keys to write is " + keysToWrite);
  // TODO : Need to see if tag support has to be given here in the integration test suite
  writer.start(1, keysToWrite, WRITER_THREADS);

  // Now, do scans.
  long now = EnvironmentEdgeManager.currentTime();
  long timeLimit = now + (maxRuntime * 60000);
  boolean isWriterDone = false;
  while (now < timeLimit && !isWriterDone) {
    LOG.info("Starting the scan; wrote approximately "
      + dataGen.getTotalNumberOfKeys() + " keys");
    isWriterDone = writer.isDone();
    if (isWriterDone) {
      LOG.info("Scanning full result, writer is done");
    }
    Scan scan = new Scan();
    for (byte[] cf : dataGen.getColumnFamilies()) {
      scan.addFamily(cf);
    }
    scan.setFilter(dataGen.getScanFilter());
    scan.setLoadColumnFamiliesOnDemand(true);
    // The number of keys we can expect from scan - lower bound (before scan).
    // Not a strict lower bound - writer knows nothing about filters, so we report
    // this from generator. Writer might have generated the value but not put it yet.
    long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys();
    long startTs = EnvironmentEdgeManager.currentTime();
    ResultScanner results = table.getScanner(scan);
    long resultCount = 0;
    Result result = null;
    // Verify and count the results.
    while ((result = results.next()) != null) {
      boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true);
      Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk);
      ++resultCount;
    }
    long timeTaken = EnvironmentEdgeManager.currentTime() - startTs;
    // Verify the result count.
    long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys();
    Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan
      + " were generated ", onesGennedAfterScan >= resultCount);
    if (isWriterDone) {
      Assert.assertTrue("Read " + resultCount + " keys; the writer is done and "
        + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount);
    } else if (onesGennedBeforeScan * 0.9 > resultCount) {
      LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan
        + ") - there might be a problem, or the writer might just be slow");
    }
    LOG.info("Scan took " + timeTaken + "ms");
    if (!isWriterDone) {
      Thread.sleep(WAIT_BETWEEN_SCANS_MS);
      now = EnvironmentEdgeManager.currentTime();
    }
  }
  Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures());
  Assert.assertTrue("Writer is not done", isWriterDone);
  // Assert.fail("Boom!");
  connection.close();
}