下面列出了怎么用org.apache.hadoop.hbase.client.HTable的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Map<String, Object> count(String task) {
MappingConfig config = hbaseMapping.get(task);
String hbaseTable = config.getHbaseMapping().getHbaseTable();
long rowCount = 0L;
try {
HTable table = (HTable) hbaseTemplate.getConnection().getTable(TableName.valueOf(hbaseTable));
Scan scan = new Scan();
scan.setFilter(new FirstKeyOnlyFilter());
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
rowCount += result.size();
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
Map<String, Object> res = new LinkedHashMap<>();
res.put("hbaseTable", hbaseTable);
res.put("count", rowCount);
return res;
}
private static void createHBaseTable1() throws IOException {
// create a table
TableName tableName = TableName.valueOf(TEST_TABLE_1);
createTable(tableName, FAMILIES, SPLIT_KEYS);
// get the HTable instance
HTable table = openTable(tableName);
List<Put> puts = new ArrayList<>();
// add some data
puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
// append rows to table
table.put(puts);
table.close();
}
public static boolean isMoved(HBaseAdmin admin, String tableName, String regionName, String serverNameTarget) {
try (HTable table = new HTable(admin.getConfiguration(), tableName)) {
NavigableMap<HRegionInfo, ServerName> regionLocations = table.getRegionLocations();
for (Map.Entry<HRegionInfo, ServerName> regionLocation : regionLocations.entrySet()) {
if (regionLocation.getKey().getEncodedName().equals(regionName)) {
return regionLocation.getValue().getServerName().equals(serverNameTarget);
}
}
if (!existsRegion(regionName, regionLocations.keySet()))
return true; // skip moving
} catch (IOException e) {
return false;
}
return false;
}
public static void main(String[] args) throws IOException {
foo(6, 5);
foo(5, 2);
foo(3, 0);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hbase_host");
conf.set("zookeeper.znode.parent", "/hbase-unsecure");
HTable table = new HTable(conf, "test1");
Put put = new Put(Bytes.toBytes("row1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
table.put(put);
table.close();
}
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
System.out.println("preparing HBaseStreamPartitioner for streamId " + stream.get_streamId());
this.targetTasks = targetTasks;
this.targetTasksSize = this.targetTasks.size();
Configuration conf = HBaseConfiguration.create();
try {
hTable = new HTable(conf, tableName);
refreshRegionInfo(tableName);
System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
protected void jobSetup(Job job) throws IOException, ImportException {
super.jobSetup(job);
// we shouldn't have gotten here if bulk load dir is not set
// so let's throw a ImportException
if(getContext().getDestination() == null){
throw new ImportException("Can't run HBaseBulkImportJob without a " +
"valid destination directory.");
}
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class);
FileOutputFormat.setOutputPath(job, getContext().getDestination());
HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
HFileOutputFormat.configureIncrementalLoad(job, hTable);
}
protected void verifyHBaseCell(String tableName, String rowKey,
String colFamily, String colName, String val) throws IOException {
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(colName));
HTable table = new HTable(new Configuration(
hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName));
try {
Result r = table.get(get);
byte [] actualVal = r.getValue(Bytes.toBytes(colFamily),
Bytes.toBytes(colName));
if (null == val) {
assertNull("Got a result when expected null", actualVal);
} else {
assertNotNull("No result, but we expected one", actualVal);
assertEquals(val, Bytes.toString(actualVal));
}
} finally {
table.close();
}
}
protected int countHBaseTable(String tableName, String colFamily)
throws IOException {
int count = 0;
HTable table = new HTable(new Configuration(
hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName));
try {
ResultScanner scanner = table.getScanner(Bytes.toBytes(colFamily));
for(Result result = scanner.next();
result != null;
result = scanner.next()) {
count++;
}
} finally {
table.close();
}
return count;
}
protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
List<String> coprocessors) throws Exception {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : columnFamilies) {
HColumnDescriptor columnDesc = new HColumnDescriptor(family);
columnDesc.setMaxVersions(Integer.MAX_VALUE);
columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
desc.addFamily(columnDesc);
}
if (existingData) {
desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
}
// Divide individually to prevent any overflow
int priority = Coprocessor.PRIORITY_USER;
// order in list is the same order that coprocessors will be invoked
for (String coprocessor : coprocessors) {
desc.addCoprocessor(coprocessor, null, ++priority, null);
}
hBaseAdmin.createTable(desc);
testUtil.waitTableAvailable(tableName, 5000);
return new HTable(testUtil.getConfiguration(), tableName);
}
private NavigableMap<HRegionInfo, ServerName> getRegionLocations(String table) throws IOException {
long startTimestamp = System.currentTimeMillis();
Util.printVerboseMessage(args, Util.getMethodName() + " - start");
NavigableMap<HRegionInfo, ServerName> result = regionLocations.get(table);
if (result == null) {
try (HTable htable = new HTable(admin.getConfiguration(), table)) {
result = htable.getRegionLocations();
regionLocations.put(table, result);
}
}
Util.printVerboseMessage(args, Util.getMethodName() + " - end", startTimestamp);
return result;
}
public static void main(String[] args) throws IOException {
foo(6, 5);
foo(5, 2);
foo(3, 0);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hbase_host");
conf.set("zookeeper.znode.parent", "/hbase-unsecure");
HTable table = new HTable(conf, "test1");
Put put = new Put(Bytes.toBytes("row1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
table.put(put);
table.close();
}
@Test
public void testLocalIndexTableRegionSplitPolicyAndSplitKeys() throws Exception {
createBaseTable(TestUtil.DEFAULT_DATA_TABLE_NAME, null,"('e','i','o')");
Connection conn1 = DriverManager.getConnection(getUrl());
Connection conn2 = DriverManager.getConnection(getUrl());
conn1.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_NAME + "(v1)");
conn2.createStatement().executeQuery("SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME).next();
HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
HTableDescriptor htd = admin.getTableDescriptor(TableName.valueOf(MetaDataUtil.getLocalIndexTableName(TestUtil.DEFAULT_DATA_TABLE_NAME)));
assertEquals(IndexRegionSplitPolicy.class.getName(), htd.getValue(HTableDescriptor.SPLIT_POLICY));
try (HTable userTable = new HTable(admin.getConfiguration(),TableName.valueOf(TestUtil.DEFAULT_DATA_TABLE_NAME))) {
try (HTable indexTable = new HTable(admin.getConfiguration(),TableName.valueOf(MetaDataUtil.getLocalIndexTableName(TestUtil.DEFAULT_DATA_TABLE_NAME)))) {
assertArrayEquals("Both user table and index table should have same split keys.", userTable.getStartKeys(), indexTable.getStartKeys());
}
}
}
private NavigableMap<HRegionInfo, ServerName> getRegionLocations(String table) throws IOException {
long startTimestamp = System.currentTimeMillis();
Util.printVerboseMessage(args, Util.getMethodName() + " - start");
NavigableMap<HRegionInfo, ServerName> result = regionLocations.get(table);
if (result == null) {
try (HTable htable = new HTable(admin.getConfiguration(), table)) {
result = htable.getRegionLocations();
regionLocations.put(table, result);
}
}
Util.printVerboseMessage(args, Util.getMethodName() + " - end", startTimestamp);
return result;
}
/**
* Truncates HTable while preserving the region pre-splits
* @param table HTable to truncate
* @return new instance of the truncated HTable
* @throws IOException throws IOException in case of any HBase IO problems
*/
public static HTable truncateTable(HTable table) throws IOException {
Configuration conf = table.getConfiguration();
byte[][] presplits = table.getRegionLocator().getStartKeys();
if (presplits.length > 0 && presplits[0].length == 0) {
presplits = Arrays.copyOfRange(presplits, 1, presplits.length);
}
HTableDescriptor desc = table.getTableDescriptor();
table.close();
try (Connection con = ConnectionFactory.createConnection(conf)) {
try (Admin admin = con.getAdmin()) {
admin.disableTable(desc.getTableName());
admin.deleteTable(desc.getTableName());
admin.createTable(desc, presplits);
}
}
return HalyardTableUtils.getTable(conf, desc.getTableName().getNameAsString(), false, 0);
}
@Before
public void beforeTest() throws Exception {
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
// Prune state table is a non-transactional table, hence no transaction co-processor
Collections.<String>emptyList());
table.close();
dataJanitorState =
new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
return testUtil.getConnection().getTable(pruneStateTable);
}
});
}
public static boolean isMoved(HBaseAdmin admin, String tableName, String regionName, String serverNameTarget) {
try (HTable table = new HTable(admin.getConfiguration(), tableName)) {
NavigableMap<HRegionInfo, ServerName> regionLocations = table.getRegionLocations();
for (Map.Entry<HRegionInfo, ServerName> regionLocation : regionLocations.entrySet()) {
if (regionLocation.getKey().getEncodedName().equals(regionName)) {
return regionLocation.getValue().getServerName().equals(serverNameTarget);
}
}
if (!existsRegion(regionName, regionLocations.keySet()))
return true; // skip moving
} catch (IOException e) {
return false;
}
return false;
}
@Before
public void beforeTest() throws Exception {
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
// Prune state table is a non-transactional table, hence no transaction co-processor
Collections.<String>emptyList());
table.close();
dataJanitorState =
new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
return testUtil.getConnection().getTable(pruneStateTable);
}
});
}
public static boolean verifyRowsForEmptyColValue(Connection conn, String tableName, byte[] valueBytes)
throws IOException, SQLException {
PTable table = PhoenixRuntime.getTable(conn, tableName);
byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(table.getPhysicalName().getBytes());
Scan scan = new Scan();
scan.addColumn(emptyCF, emptyCQ);
ResultScanner resultScanner = htable.getScanner(scan);
for (Result result = resultScanner.next(); result != null; result = resultScanner.next()) {
if (Bytes.compareTo(result.getValue(emptyCF, emptyCQ), 0, valueBytes.length,
valueBytes, 0, valueBytes.length) != 0) {
return false;
}
}
return true;
}
@BeforeClass
public static void startMiniCluster() throws Exception {
// Setup the configuration to start HBase cluster with the invalid list pruning enabled
conf = HBaseConfiguration.create();
conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
// Flush prune data to table quickly, so that tests don't need have to wait long to see updates
conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L);
AbstractHBaseTableTest.startMiniCluster();
TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
TransactionManager txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
txManager.startAndWait();
// Do some transactional data operations
txDataTable1 = TableName.valueOf("invalidListPruneTestTable1");
HTable hTable = createTable(txDataTable1.getName(), new byte[][]{family}, false,
Collections.singletonList(TestTransactionProcessor.class.getName()));
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
txContext.start();
for (int i = 0; i < MAX_ROWS; ++i) {
txTable.put(new Put(Bytes.toBytes(i)).add(family, qualifier, Bytes.toBytes(i)));
}
txContext.finish();
}
testUtil.flush(txDataTable1);
txManager.stopAndWait();
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
connection = HConnectionManager.createConnection(conf);
dataJanitorState =
new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public HTableInterface get() throws IOException {
return connection.getTable(pruneStateTable);
}
});
}
@Test
public void testOneEventWithDefaults() throws Exception {
Map<String,String> ctxMap = new HashMap<String,String>();
ctxMap.put("table", tableName);
ctxMap.put("columnFamily", columnFamily);
ctxMap.put("serializer",
"org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer");
ctxMap.put("keep-alive", "0");
ctxMap.put("timeout", "10000");
Context tmpctx = new Context();
tmpctx.putAll(ctxMap);
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
deleteTable = true;
AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
Configurables.configure(sink, tmpctx);
Channel channel = new MemoryChannel();
Configurables.configure(channel, tmpctx);
sink.setChannel(channel);
sink.start();
Transaction tx = channel.getTransaction();
tx.begin();
Event e = EventBuilder.withBody(
Bytes.toBytes(valBase));
channel.put(e);
tx.commit();
tx.close();
Assert.assertFalse(sink.isConfNull());
sink.process();
sink.stop();
HTable table = new HTable(testUtility.getConfiguration(), tableName);
byte[][] results = getResults(table, 1);
byte[] out = results[0];
Assert.assertArrayEquals(e.getBody(), out);
out = results[1];
Assert.assertArrayEquals(Longs.toByteArray(1), out);
}
@Test
public void testSimpleDeletes() throws Exception {
HTable primary = createSetupTables(fam1);
// do a simple Put
long ts = 10;
Put p = new Put(row1);
p.add(FAM, indexed_qualifer, ts, value1);
p.add(FAM, regular_qualifer, ts, value2);
primary.put(p);
primary.flushCommits();
Delete d = new Delete(row1);
primary.delete(d);
HTable index = new HTable(UTIL.getConfiguration(), fam1.getTable());
List<KeyValue> expected = Collections.<KeyValue> emptyList();
// scan over all time should cause the delete to be covered
IndexTestingUtils.verifyIndexTableAtTimestamp(index, expected, 0, Long.MAX_VALUE, value1,
HConstants.EMPTY_END_ROW);
// scan at the older timestamp should still show the older value
List<Pair<byte[], CoveredColumn>> pairs = new ArrayList<Pair<byte[], CoveredColumn>>();
pairs.add(new Pair<byte[], CoveredColumn>(value1, col1));
pairs.add(new Pair<byte[], CoveredColumn>(EMPTY_BYTES, col2));
expected = CoveredColumnIndexCodec.getIndexKeyValueForTesting(row1, ts, pairs);
IndexTestingUtils.verifyIndexTableAtTimestamp(index, expected, ts, value1);
// cleanup
closeAndCleanupTables(index, primary);
}
@Test
public void testSimpleDeletes() throws Exception {
HTable primary = createSetupTables(fam1);
// do a simple Put
long ts = 10;
Put p = new Put(row1);
p.add(FAM, indexed_qualifer, ts, value1);
p.add(FAM, regular_qualifer, ts, value2);
primary.put(p);
primary.flushCommits();
Delete d = new Delete(row1);
primary.delete(d);
HTable index = new HTable(UTIL.getConfiguration(), fam1.getTable());
List<KeyValue> expected = Collections.<KeyValue> emptyList();
// scan over all time should cause the delete to be covered
IndexTestingUtils.verifyIndexTableAtTimestamp(index, expected, 0, Long.MAX_VALUE, value1,
HConstants.EMPTY_END_ROW);
// scan at the older timestamp should still show the older value
List<Pair<byte[], CoveredColumn>> pairs = new ArrayList<Pair<byte[], CoveredColumn>>();
pairs.add(new Pair<byte[], CoveredColumn>(value1, col1));
pairs.add(new Pair<byte[], CoveredColumn>(EMPTY_BYTES, col2));
expected = CoveredColumnIndexCodec.getIndexKeyValueForTesting(row1, ts, pairs);
IndexTestingUtils.verifyIndexTableAtTimestamp(index, expected, ts, value1);
// cleanup
closeAndCleanupTables(index, primary);
}
/**
* Create an {@link HTable} instance and set it into this format.
*/
private HTable createTable() {
LOG.info("Initializing HBaseConfiguration");
org.apache.hadoop.conf.Configuration hConf = getHadoopConfiguration();
try {
return new HTable(hConf, getTableName());
} catch (Exception e) {
LOG.error("Error instantiating a new HTable instance", e);
}
return null;
}
/**
* Create an {@link HTable} instance and set it into this format.
*/
private HTable createTable() {
LOG.info("Initializing HBaseConfiguration");
//use files found in the classpath
org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
try {
return new HTable(hConf, getTableName());
} catch (Exception e) {
LOG.error("Error instantiating a new HTable instance", e);
}
return null;
}
@Test
public void testDropLocalIndexShouldDeleteDataFromLocalIndexTable() throws Exception {
createBaseTable(TestUtil.DEFAULT_DATA_TABLE_NAME, null, "('e','i','o')");
Connection conn1 = DriverManager.getConnection(getUrl());
try {
conn1.createStatement().execute("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_NAME + " values('b',1,2,4,'z')");
conn1.createStatement().execute("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_NAME + " values('f',1,2,3,'a')");
conn1.createStatement().execute("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_NAME + " values('j',2,4,2,'a')");
conn1.createStatement().execute("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_NAME + " values('q',3,1,1,'c')");
conn1.commit();
conn1.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_NAME + "(v1)");
conn1.createStatement().execute("DROP INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_NAME);
HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
HTable indexTable = new HTable(admin.getConfiguration() ,TableName.valueOf(MetaDataUtil.getLocalIndexTableName(TestUtil.DEFAULT_DATA_TABLE_NAME)));
Pair<byte[][], byte[][]> startEndKeys = indexTable.getStartEndKeys();
byte[][] startKeys = startEndKeys.getFirst();
byte[][] endKeys = startEndKeys.getSecond();
// No entry should be present in local index table after drop index.
for (int i = 0; i < startKeys.length; i++) {
Scan s = new Scan();
s.setStartRow(startKeys[i]);
s.setStopRow(endKeys[i]);
ResultScanner scanner = indexTable.getScanner(s);
int count = 0;
for(Result r:scanner){
count++;
}
scanner.close();
assertEquals(0, count);
}
indexTable.close();
} finally {
conn1.close();
}
}
protected ResultScanner getResultScanner() throws IOException
{
if (resultScanner != null) {
return resultScanner;
}
HTable table = store.getTable();
Scan scan = new Scan();
resultScanner = table.getScanner(scan);
return resultScanner;
}
private static void createHBaseTable1() throws IOException {
// create a table
TableName tableName = TableName.valueOf(TEST_TABLE_1);
createTable(tableName, FAMILIES, SPLIT_KEYS);
// get the HTable instance
HTable table = openTable(tableName);
List<Put> puts = new ArrayList<>();
// add some data
puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1",
Timestamp.valueOf("2019-08-18 19:00:00"), Date.valueOf("2019-08-18"),
Time.valueOf("19:00:00"), new BigDecimal(12345678.0001)));
puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2",
Timestamp.valueOf("2019-08-18 19:01:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:01:00"),
new BigDecimal(12345678.0002)));
puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3",
Timestamp.valueOf("2019-08-18 19:02:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:02:00"),
new BigDecimal(12345678.0003)));
puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4",
Timestamp.valueOf("2019-08-18 19:03:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:03:00"),
new BigDecimal(12345678.0004)));
puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5",
Timestamp.valueOf("2019-08-19 19:10:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:10:00"),
new BigDecimal(12345678.0005)));
puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6",
Timestamp.valueOf("2019-08-19 19:20:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:20:00"),
new BigDecimal(12345678.0006)));
puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7",
Timestamp.valueOf("2019-08-19 19:30:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:30:00"),
new BigDecimal(12345678.0007)));
puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8",
Timestamp.valueOf("2019-08-19 19:40:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:40:00"),
new BigDecimal(12345678.0008)));
// append rows to table
table.put(puts);
table.close();
}
private void split(HTable table, byte[] splitPoint) throws IOException, InterruptedException, DecoderException {
int regionCountPrev = table.getRegionLocations().size();
admin.split(table.getTableName(), splitPoint);
for (int j = 0; j < Constant.TRY_MAX; j++) {
int regionCountNow = table.getRegionLocations().size();
if (regionCountPrev < regionCountNow) {
break;
} else {
Thread.sleep(Constant.WAIT_INTERVAL_MS);
}
}
}
@SuppressWarnings("rawtypes")
public static void startWrite(RecordReceiver lineReceiver, HTable table, Configuration configuration) {
List<Map> columns = configuration.getList(Key.COLUMN, Map.class);
Integer batchSize = configuration.getInt(Key.BATCH_SIZE, 100);
boolean writeToWAL = configuration.getBool(Key.WRITE_TO_WAL, true);
List<HbaseColumnCell> hbaseColumnCells = parseColumns(columns);
try {
Record record = null;
List<Put> puts = new ArrayList<Put>();
while ((record = lineReceiver.getFromReader()) != null) {
puts.add(getPut(hbaseColumnCells, record, writeToWAL));
if (puts.size() % batchSize == 0) {
table.put(puts);
table.flushCommits();
puts.clear();
}
}
if (!puts.isEmpty()) {
table.put(puts);
table.flushCommits();
}
table.close();
} catch (Exception e) {
String message = String.format("写hbase[%s]时发生IO异常,请检查您的网络是否正常!", table.getName());
LOG.error(message, e);
ErrorRecord.addError(message+"->"+e.getMessage());
throw DataXException.asDataXException(HBaseWriter98ErrorCode.WRITE_HBASE_IO_ERROR, e);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "t");
String startKey = "1";
String stopKey = "3";
boolean isParallel = true;
String familyName = "f";
String columnName = "id";
String remainder = "2";
Scan scan = new Scan(Bytes.toBytes(startKey), Bytes.toBytes(stopKey));
int count = 0;
if (isParallel) {
scan.setParallel(true);
}
scan.setFilter(new SingleColumnValueFilter(Bytes.toBytes(familyName), Bytes
.toBytes(columnName), CompareOp.LESS, Bytes.toBytes(remainder)));
ResultScanner scanner = table.getScanner(scan);
Result r = scanner.next();
while (r != null) {
count++;
r = scanner.next();
}
System.out.println("++ Scanning finished with count : " + count + " ++");
scanner.close();
table.close();
}