下面列出了org.apache.hadoop.hbase.client.Connection#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Puts the specified RegionInfo into META with replica related columns
*/
public static void fixMetaHoleOnlineAndAddReplicas(Configuration conf,
RegionInfo hri, Collection<ServerName> servers, int numReplicas) throws IOException {
Connection conn = ConnectionFactory.createConnection(conf);
Table meta = conn.getTable(TableName.META_TABLE_NAME);
Put put = HBCKMetaTableAccessor.makePutFromRegionInfo(hri, System.currentTimeMillis());
if (numReplicas > 1) {
Random r = new Random();
ServerName[] serversArr = servers.toArray(new ServerName[servers.size()]);
for (int i = 1; i < numReplicas; i++) {
ServerName sn = serversArr[r.nextInt(serversArr.length)];
// the column added here is just to make sure the master is able to
// see the additional replicas when it is asked to assign. The
// final value of these columns will be different and will be updated
// by the actual regionservers that start hosting the respective replicas
HBCKMetaTableAccessor.addLocation(put, sn, sn.getStartcode(), i);
}
}
meta.put(put);
meta.close();
conn.close();
}
public int run(String[] argv) throws IOException {
setConf(HBaseConfiguration.create(getConf()));
/** Connection to the cluster. A single connection shared by all application threads. */
Connection connection = null;
/** A lightweight handle to a specific table. Used from a single thread. */
Table table = null;
try {
// establish the connection to the cluster.
connection = ConnectionFactory.createConnection(getConf());
// retrieve a handle to the target table.
table = connection.getTable(TABLE_NAME);
// describe the data we want to write.
Put p = new Put(Bytes.toBytes("someRow"));
p.addColumn(CF, Bytes.toBytes("qual"), Bytes.toBytes(42.0d));
// send the data.
table.put(p);
} finally {
// close everything down
if (table != null) table.close();
if (connection != null) connection.close();
}
return 0;
}
@Test
public void testReadRowAsProcessOwner() throws Exception {
final Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("hbase.zookeeper.property.clientPort", "" + port);
conf.set("zookeeper.znode.parent", "/hbase-unsecure");
Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TableName.valueOf("temp"));
// Read a row
Get get = new Get(Bytes.toBytes("row1"));
Result result = table.get(get);
byte[] valResult = result.getValue(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"));
Assert.assertTrue(Arrays.equals(valResult, Bytes.toBytes("val1")));
conn.close();
}
@Test
public void testReadRowFromColFam2AsProcessOwner() throws Exception {
final Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("hbase.zookeeper.property.clientPort", "" + port);
conf.set("zookeeper.znode.parent", "/hbase-unsecure");
Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TableName.valueOf("temp"));
// Read a row
Get get = new Get(Bytes.toBytes("row1"));
Result result = table.get(get);
byte[] valResult = result.getValue(Bytes.toBytes("colfam2"), Bytes.toBytes("col1"));
Assert.assertTrue(Arrays.equals(valResult, Bytes.toBytes("val2")));
conn.close();
}
@Test
public void testWriteRowAsProcessOwner() throws Exception {
final Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("hbase.zookeeper.property.clientPort", "" + port);
conf.set("zookeeper.znode.parent", "/hbase-unsecure");
Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TableName.valueOf("temp"));
// Add a new row
Put put = new Put(Bytes.toBytes("row2"));
put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), Bytes.toBytes("val2"));
table.put(put);
conn.close();
}
/**
* If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
* need to flush all the regions of the table as the data is held in memory and is also not
* present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
* regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
*/
public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
InterruptedException {
String tableName = conf.get(TABLE_NAME);
Admin hAdmin = null;
Connection connection = null;
String durability = conf.get(WAL_DURABILITY);
// Need to flush if the data is written to hbase and skip wal is enabled.
if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
&& Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
LOG.info("Flushing all data that skipped the WAL.");
try {
connection = ConnectionFactory.createConnection(conf);
hAdmin = connection.getAdmin();
hAdmin.flush(TableName.valueOf(tableName));
} finally {
if (hAdmin != null) {
hAdmin.close();
}
if (connection != null) {
connection.close();
}
}
}
}
/**
* Test things basically work.
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
cluster.startup();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
try {
HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
admin.createTable(htd);
} finally {
admin.close();
}
connection.close();
cluster.shutdown();
}
public void restartHBaseCluster(StartMiniClusterOption option)
throws IOException, InterruptedException {
if (hbaseAdmin != null) {
hbaseAdmin.close();
hbaseAdmin = null;
}
if (this.asyncConnection != null) {
this.asyncConnection.close();
this.asyncConnection = null;
}
this.hbaseCluster =
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
option.getRsClass());
// Don't leave here till we've done a successful scan of the hbase:meta
Connection conn = ConnectionFactory.createConnection(this.conf);
Table t = conn.getTable(TableName.META_TABLE_NAME);
ResultScanner s = t.getScanner(new Scan());
while (s.next() != null) {
// do nothing
}
LOG.info("HBase has been restarted");
s.close();
t.close();
conn.close();
}
@Test
public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
int regionReplication = 3;
final TableName tableName = TableName.valueOf(name.getMethodName());
HTableDescriptor htd = HTU.createTableDescriptor(tableName);
htd.setRegionReplication(regionReplication);
htd.setRegionMemstoreReplication(false);
HTU.getAdmin().createTable(htd);
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName);
try {
// write data to the primary. The replicas should not receive the data
final int STEP = 100;
for (int i = 0; i < 3; ++i) {
final int startRow = i * STEP;
final int endRow = (i + 1) * STEP;
LOG.info("Writing data from " + startRow + " to " + endRow);
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow);
verifyReplication(tableName, regionReplication, startRow, endRow, false);
// Flush the table, now the data should show up in the replicas
LOG.info("flushing table");
HTU.flush(tableName);
verifyReplication(tableName, regionReplication, 0, endRow, true);
}
} finally {
table.close();
connection.close();
}
}
public static void shutdown() {
synchronized (RegionConnectionFactory.class) {
for (Connection connection : connections.values()) {
try {
connection.close();
} catch (IOException e) {
LOG.warn("Unable to close coprocessor connection", e);
}
}
connections.clear();
}
}
private void checkHbase(ConfigService conf) throws Exception {
// check configuration
String hbaseConfPath = conf.getHBaseConf();
if (hbaseConfPath != null) {
if (!new File(hbaseConfPath).exists()) {
println("error: hbase config %s is not found", hbaseConfPath);
System.exit(0);
}
println("hbase config: %s", hbaseConfPath);
}
else if (conf.getProperty("hbase.zookeeper.quorum", null) == null) {
println("error: hbase is not configured");
System.exit(0);
}
// check the connection
Configuration hbaseConf = HBaseStorageService.getHBaseConfig(conf);
hbaseConf.set("hbase.client.retries.number", "0");
if (hbaseConf.get("hbase.zookeeper.quorum") != null) {
println("zookeeper quorum: %s", hbaseConf.get("hbase.zookeeper.quorum"));
}
try {
Connection conn = ConnectionFactory.createConnection(hbaseConf);
conn.getAdmin().listNamespaceDescriptors();
conn.close();
println("quorum is connected");
}
catch (Exception x) {
println("error: unable to connect to quorum");
}
}
private void generatePartitions(Path partitionsPath) throws IOException {
Connection connection = ConnectionFactory.createConnection(getConf());
Pair<byte[][], byte[][]> regionKeys
= connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
connection.close();
tableHash.selectPartitions(regionKeys);
LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
tableHash.writePartitionFile(getConf(), partitionsPath);
}
public void run(long startKeyIn, long maxQueriesIn) throws IOException {
long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE;
byte[] startKey = Bytes.toBytes(startKeyIn);
Connection connection = ConnectionFactory.createConnection(getConf());
Table table = connection.getTable(getTableName(getConf()));
long numQueries = 0;
// If isSpecificStart is set, only walk one list from that particular node.
// Note that in case of circular (or P-shaped) list it will walk forever, as is
// the case in normal run without startKey.
CINode node = findStartNode(table, startKey);
if (node == null) {
LOG.error("Start node not found: " + Bytes.toStringBinary(startKey));
throw new IOException("Start node not found: " + startKeyIn);
}
while (numQueries < maxQueries) {
numQueries++;
byte[] prev = node.prev;
long t1 = System.currentTimeMillis();
node = getNode(prev, table, node);
long t2 = System.currentTimeMillis();
if (node == null) {
LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev));
context.getCounter(Counts.UNDEFINED).increment(1l);
} else if (node.prev.length == NO_KEY.length) {
LOG.error("ConcurrentWalker found TERMINATING NODE: " +
Bytes.toStringBinary(node.key));
context.getCounter(Counts.TERMINATING).increment(1l);
} else {
// Increment for successful walk
context.getCounter(Counts.SUCCESS).increment(1l);
}
}
table.close();
connection.close();
}
@Test
public void testTableDeletion() throws Exception {
User TABLE_ADMIN = User.createUserForTesting(conf, "TestUser", new String[0]);
final TableName tableName = TableName.valueOf(name.getMethodName());
createTestTable(tableName);
// Grant TABLE ADMIN privs
grantOnTable(TEST_UTIL, TABLE_ADMIN.getShortName(), tableName, null, null, Permission.Action.ADMIN);
AccessTestAction deleteTableAction = new AccessTestAction() {
@Override
public Object run() throws Exception {
Connection unmanagedConnection =
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
Admin admin = unmanagedConnection.getAdmin();
try {
deleteTable(TEST_UTIL, admin, tableName);
} finally {
admin.close();
unmanagedConnection.close();
}
return null;
}
};
verifyDenied(deleteTableAction, USER_RW, USER_RO, USER_NONE, USER_GROUP_READ,
USER_GROUP_WRITE);
verifyAllowed(deleteTableAction, TABLE_ADMIN);
}
public static void initCredentials(Job job) throws IOException {
UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
if (userProvider.isHadoopSecurityEnabled()) {
// propagate delegation related props from launcher job to MR job
if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
job.getConfiguration().set("mapreduce.job.credentials.binary",
System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
}
}
if (userProvider.isHBaseSecurityEnabled()) {
try {
// init credentials for remote cluster
String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
User user = userProvider.getCurrent();
if (quorumAddress != null) {
Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
Connection peerConn = ConnectionFactory.createConnection(peerConf);
try {
TokenUtil.addTokenForJob(peerConn, user, job);
} finally {
peerConn.close();
}
}
Connection conn = ConnectionFactory.createConnection(job.getConfiguration());
try {
TokenUtil.addTokenForJob(conn, user, job);
} finally {
conn.close();
}
} catch (InterruptedException ie) {
LOG.info("Interrupted obtaining user authentication token");
Thread.currentThread().interrupt();
}
}
}
@Override
public List<String> getGroupAuths(String[] groups, boolean systemCall) throws IOException {
assert (labelsRegion != null || systemCall);
List<String> auths = new ArrayList<>();
if (groups != null && groups.length > 0) {
for (String group : groups) {
Get get = new Get(Bytes.toBytes(AuthUtil.toGroupEntry(group)));
List<Cell> cells = null;
if (labelsRegion == null) {
Table table = null;
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(conf);
table = connection.getTable(VisibilityConstants.LABELS_TABLE_NAME);
Result result = table.get(get);
cells = result.listCells();
} finally {
if (table != null) {
table.close();
connection.close();
}
}
} else {
cells = this.labelsRegion.get(get, false);
}
if (cells != null) {
for (Cell cell : cells) {
String auth = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
auths.add(auth);
}
}
}
}
return auths;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
justification="Intentional")
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
printUsage();
return 1;
}
String tableName = args[0];
String familyName = args[1];
TableName tn = TableName.valueOf(tableName);
Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin();
try {
TableDescriptor htd = admin.getDescriptor(tn);
ColumnFamilyDescriptor family = htd.getColumnFamily(Bytes.toBytes(familyName));
if (family == null || !family.isMobEnabled()) {
throw new IOException("Column family " + familyName + " is not a MOB column family");
}
if (family.getMinVersions() > 0) {
throw new IOException(
"The minVersions of the column family is not 0, could not be handled by this cleaner");
}
cleanExpiredMobFiles(tableName, family);
return 0;
} finally {
admin.close();
try {
connection.close();
} catch (IOException e) {
LOG.error("Failed to close the connection.", e);
}
}
}
@Override
protected int doWork() throws Exception {
ProcessBasedLocalHBaseCluster hbaseCluster =
new ProcessBasedLocalHBaseCluster(conf, NUM_DATANODES, numRegionServers);
hbaseCluster.startMiniDFS();
// start the process based HBase cluster
hbaseCluster.startHBase();
// create tables if needed
HBaseTestingUtility.createPreSplitLoadTestTable(conf, TABLE_NAME,
HFileTestUtil.DEFAULT_COLUMN_FAMILY, Compression.Algorithm.NONE,
DataBlockEncoding.NONE);
LOG.debug("Loading data....\n\n");
loadData();
LOG.debug("Sleeping for " + SLEEP_SEC_AFTER_DATA_LOAD +
" seconds....\n\n");
Threads.sleep(5 * SLEEP_SEC_AFTER_DATA_LOAD);
Connection connection = ConnectionFactory.createConnection(conf);
int metaRSPort = HBaseTestingUtility.getMetaRSPort(connection);
LOG.debug("Killing hbase:meta region server running on port " + metaRSPort);
hbaseCluster.killRegionServer(metaRSPort);
Threads.sleep(2000);
LOG.debug("Restarting region server running on port metaRSPort");
hbaseCluster.startRegionServer(metaRSPort);
Threads.sleep(2000);
LOG.debug("Trying to scan meta");
Table metaTable = connection.getTable(TableName.META_TABLE_NAME);
ResultScanner scanner = metaTable.getScanner(new Scan());
Result result;
while ((result = scanner.next()) != null) {
LOG.info("Region assignment from META: "
+ Bytes.toStringBinary(result.getRow())
+ " => "
+ Bytes.toStringBinary(result.getFamilyMap(HConstants.CATALOG_FAMILY)
.get(HConstants.SERVER_QUALIFIER)));
}
metaTable.close();
connection.close();
return 0;
}
public void testScan() throws IOException {
Stopwatch tableOpenTimer = Stopwatch.createUnstarted();
Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
Stopwatch scanTimer = Stopwatch.createUnstarted();
tableOpenTimer.start();
Connection connection = ConnectionFactory.createConnection(getConf());
Table table = connection.getTable(TableName.valueOf(tablename));
tableOpenTimer.stop();
Scan scan = getScan();
scanOpenTimer.start();
ResultScanner scanner = table.getScanner(scan);
scanOpenTimer.stop();
long numRows = 0;
long numCells = 0;
scanTimer.start();
while (true) {
Result result = scanner.next();
if (result == null) {
break;
}
numRows++;
numCells += result.rawCells().length;
}
scanTimer.stop();
scanner.close();
table.close();
connection.close();
ScanMetrics metrics = scanner.getScanMetrics();
long totalBytes = metrics.countOfBytesInResults.get();
double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
System.out.println("HBase scan: ");
System.out.println("total time to open table: " +
tableOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
System.out.println("total time to open scanner: " +
scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
System.out.println("total time to scan: " +
scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
System.out.println("total bytes: " + totalBytes + " bytes ("
+ StringUtils.humanReadableInt(totalBytes) + ")");
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
System.out.println("total rows : " + numRows);
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
System.out.println("total cells : " + numCells);
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
options.addOption("s", "start", true, "start key");
options.addOption("e", "end", true, "end key");
options.addOption("l", "limit", true, "number to print");
GnuParser parser = new GnuParser();
CommandLine cmd = null;
try {
cmd = parser.parse(options, args);
if (cmd.getArgs().length != 0) {
throw new ParseException("Command takes no arguments");
}
} catch (ParseException e) {
System.err.println("Failed to parse command line " + e.getMessage());
System.err.println();
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(getClass().getSimpleName(), options);
System.exit(-1);
}
Connection connection = ConnectionFactory.createConnection(getConf());
Table table = connection.getTable(getTableName(getConf()));
Scan scan = new Scan();
scan.setBatch(10000);
if (cmd.hasOption("s"))
scan.withStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
if (cmd.hasOption("e")) {
scan.withStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
}
int limit = 0;
if (cmd.hasOption("l"))
limit = Integer.parseInt(cmd.getOptionValue("l"));
else
limit = 100;
ResultScanner scanner = table.getScanner(scan);
CINode node = new CINode();
Result result = scanner.next();
int count = 0;
while (result != null && count++ < limit) {
node = getCINode(result, node);
System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
Bytes.toStringBinary(node.prev), node.count, node.client);
result = scanner.next();
}
scanner.close();
table.close();
connection.close();
return 0;
}