下面列出了怎么用org.apache.hadoop.hbase.Coprocessor的API类实例代码及写法,或者点击链接到github查看源代码。
private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
if (!admin.tableExists(TableName.valueOf(tableName))) {
LOG.info("Creating {} table...", tableName);
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : families) {
HColumnDescriptor datafam = new HColumnDescriptor(family);
datafam.setMaxVersions(MAX_VERSIONS);
desc.addFamily(datafam);
}
int priority = Coprocessor.PRIORITY_HIGHEST;
desc.addCoprocessor(OmidSnapshotFilter.class.getName(),null,++priority,null);
desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,++priority,null);
admin.createTable(desc);
try {
hbaseTestUtil.waitTableAvailable(TableName.valueOf(tableName),5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
if (!admin.tableExists(TableName.valueOf(tableName))) {
LOG.info("Creating {} table...", tableName);
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : families) {
HColumnDescriptor datafam = new HColumnDescriptor(family);
datafam.setMaxVersions(MAX_VERSIONS);
desc.addFamily(datafam);
}
int priority = Coprocessor.PRIORITY_HIGHEST;
desc.addCoprocessor(OmidSnapshotFilter.class.getName(),null,++priority,null);
desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,++priority,null);
admin.createTable(desc);
try {
hbaseTestUtil.waitTableAvailable(TableName.valueOf(tableName),5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
protected static Table createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
List<String> coprocessors) throws Exception {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : columnFamilies) {
HColumnDescriptor columnDesc = new HColumnDescriptor(family);
columnDesc.setMaxVersions(Integer.MAX_VALUE);
columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
desc.addFamily(columnDesc);
}
if (existingData) {
desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
}
// Divide individually to prevent any overflow
int priority = Coprocessor.PRIORITY_USER;
// order in list is the same order that coprocessors will be invoked
for (String coprocessor : coprocessors) {
desc.addCoprocessor(coprocessor, null, ++priority, null);
}
hBaseAdmin.createTable(desc);
testUtil.waitTableAvailable(tableName, 5000);
return testUtil.getConnection().getTable(TableName.valueOf(tableName));
}
protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
List<String> coprocessors) throws Exception {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : columnFamilies) {
HColumnDescriptor columnDesc = new HColumnDescriptor(family);
columnDesc.setMaxVersions(Integer.MAX_VALUE);
columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
desc.addFamily(columnDesc);
}
if (existingData) {
desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
}
// Divide individually to prevent any overflow
int priority = Coprocessor.PRIORITY_USER;
// order in list is the same order that coprocessors will be invoked
for (String coprocessor : coprocessors) {
desc.addCoprocessor(coprocessor, null, ++priority, null);
}
hBaseAdmin.createTable(desc);
testUtil.waitTableAvailable(tableName, 5000);
return new HTable(testUtil.getConfiguration(), tableName);
}
protected static Table createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
List<String> coprocessors) throws Exception {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : columnFamilies) {
HColumnDescriptor columnDesc = new HColumnDescriptor(family);
columnDesc.setMaxVersions(Integer.MAX_VALUE);
columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
desc.addFamily(columnDesc);
}
if (existingData) {
desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
}
// Divide individually to prevent any overflow
int priority = Coprocessor.PRIORITY_USER;
// order in list is the same order that coprocessors will be invoked
for (String coprocessor : coprocessors) {
desc.addCoprocessor(coprocessor, null, ++priority, null);
}
hBaseAdmin.createTable(desc);
testUtil.waitTableAvailable(tableName, 5000);
return testUtil.getConnection().getTable(TableName.valueOf(tableName));
}
protected static Table createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
List<String> coprocessors) throws Exception {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : columnFamilies) {
HColumnDescriptor columnDesc = new HColumnDescriptor(family);
columnDesc.setMaxVersions(Integer.MAX_VALUE);
columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
desc.addFamily(columnDesc);
}
if (existingData) {
desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
}
// Divide individually to prevent any overflow
int priority = Coprocessor.PRIORITY_USER;
// order in list is the same order that coprocessors will be invoked
for (String coprocessor : coprocessors) {
desc.addCoprocessor(coprocessor, null, ++priority, null);
}
hBaseAdmin.createTable(desc);
testUtil.waitTableAvailable(tableName, 5000);
return testUtil.getConnection().getTable(TableName.valueOf(tableName));
}
protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
List<String> coprocessors) throws Exception {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : columnFamilies) {
HColumnDescriptor columnDesc = new HColumnDescriptor(family);
columnDesc.setMaxVersions(Integer.MAX_VALUE);
columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
desc.addFamily(columnDesc);
}
if (existingData) {
desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
}
// Divide individually to prevent any overflow
int priority = Coprocessor.PRIORITY_USER;
// order in list is the same order that coprocessors will be invoked
for (String coprocessor : coprocessors) {
desc.addCoprocessor(coprocessor, null, ++priority, null);
}
hBaseAdmin.createTable(desc);
testUtil.waitTableAvailable(tableName, 5000);
return new HTable(testUtil.getConfiguration(), tableName);
}
protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
List<String> coprocessors) throws Exception {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : columnFamilies) {
HColumnDescriptor columnDesc = new HColumnDescriptor(family);
columnDesc.setMaxVersions(Integer.MAX_VALUE);
columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
desc.addFamily(columnDesc);
}
if (existingData) {
desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
}
// Divide individually to prevent any overflow
int priority = Coprocessor.PRIORITY_USER;
// order in list is the same order that coprocessors will be invoked
for (String coprocessor : coprocessors) {
desc.addCoprocessor(coprocessor, null, ++priority, null);
}
hBaseAdmin.createTable(desc);
testUtil.waitTableAvailable(tableName, 5000);
return new HTable(testUtil.getConfiguration(), tableName);
}
protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
List<String> coprocessors) throws Exception {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : columnFamilies) {
HColumnDescriptor columnDesc = new HColumnDescriptor(family);
columnDesc.setMaxVersions(Integer.MAX_VALUE);
columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
desc.addFamily(columnDesc);
}
if (existingData) {
desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
}
// Divide individually to prevent any overflow
int priority = Coprocessor.PRIORITY_USER;
// order in list is the same order that coprocessors will be invoked
for (String coprocessor : coprocessors) {
desc.addCoprocessor(coprocessor, null, ++priority, null);
}
hBaseAdmin.createTable(desc);
testUtil.waitTableAvailable(tableName, 5000);
return new HTable(testUtil.getConfiguration(), tableName);
}
protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
List<String> coprocessors) throws Exception {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : columnFamilies) {
HColumnDescriptor columnDesc = new HColumnDescriptor(family);
columnDesc.setMaxVersions(Integer.MAX_VALUE);
columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
desc.addFamily(columnDesc);
}
if (existingData) {
desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
}
// Divide individually to prevent any overflow
int priority = Coprocessor.PRIORITY_USER;
// order in list is the same order that coprocessors will be invoked
for (String coprocessor : coprocessors) {
desc.addCoprocessor(coprocessor, null, ++priority, null);
}
hBaseAdmin.createTable(desc);
testUtil.waitTableAvailable(tableName, 5000);
return new HTable(testUtil.getConfiguration(), tableName);
}
public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(conf);
//coprocessor所在的jar包的存放路径
Path path = new Path(fs.getUri() + Path.SEPARATOR + "micmiu/coprocessor/demo.jar");
//HTableDescriptor
HTableDescriptor htd = new HTableDescriptor("demo_copro");
//addFamily
htd.addFamily(new HColumnDescriptor("cf"));
//
//设置要加载的corpocessor
htd.setValue("COPROCESSOR$1", path.toString() +
"|" + RegionObserverDemo.class.getCanonicalName() +
"|" + Coprocessor.PRIORITY_USER);
//
HBaseAdmin admin = new HBaseAdmin(conf);
//创建表"testtable"
admin.createTable(htd);
System.out.println("finished.");
}
@Test
// HBASE-3516: Test CP Class loading from local file system
public void testClassLoadingFromLocalFS() throws Exception {
File jarFile = buildCoprocessorJar(cpName3);
// create a table that references the jar
TableDescriptorBuilder tdb = TableDescriptorBuilder.newBuilder(TableName.valueOf(cpName3));
tdb.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes("test")).build());
tdb.setValue("COPROCESSOR$1", getLocalPath(jarFile) + "|" + cpName3 + "|" +
Coprocessor.PRIORITY_USER);
TableDescriptor tableDescriptor = tdb.build();
Admin admin = TEST_UTIL.getAdmin();
admin.createTable(tableDescriptor);
waitForTable(tableDescriptor.getTableName());
// verify that the coprocessor was loaded
boolean found = false;
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
if (region.getRegionInfo().getRegionNameAsString().startsWith(cpName3)) {
found = (region.getCoprocessorHost().findCoprocessor(cpName3) != null);
}
}
assertTrue("Class " + cpName3 + " was missing on a region", found);
}
/** Initialize the environment */
public void startup() throws IOException {
if (state == Coprocessor.State.INSTALLED ||
state == Coprocessor.State.STOPPED) {
state = Coprocessor.State.STARTING;
Thread currentThread = Thread.currentThread();
ClassLoader hostClassLoader = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(this.getClassLoader());
impl.start(this);
state = Coprocessor.State.ACTIVE;
} finally {
currentThread.setContextClassLoader(hostClassLoader);
}
} else {
LOG.warn("Not starting coprocessor " + impl.getClass().getName() +
" because not inactive (state=" + state.toString() + ")");
}
}
/** Clean up the environment */
public void shutdown() {
if (state == Coprocessor.State.ACTIVE) {
state = Coprocessor.State.STOPPING;
Thread currentThread = Thread.currentThread();
ClassLoader hostClassLoader = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(this.getClassLoader());
impl.stop(this);
state = Coprocessor.State.STOPPED;
} catch (IOException ioe) {
LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
} finally {
currentThread.setContextClassLoader(hostClassLoader);
}
} else {
LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
" because not active (state="+state.toString()+")");
}
}
@BeforeClass
public static void setupBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Enable security
enableSecurity(conf);
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName());
// Verify enableSecurity sets up what we require
verifyConfiguration(conf);
// Enable EXEC permission checking
conf.setBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY, true);
TEST_UTIL.startMiniCluster();
TEST_UTIL.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
MasterCoprocessorHost cpHost =
TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterCoprocessorHost();
cpHost.load(AccessController.class, Coprocessor.PRIORITY_HIGHEST, conf);
USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]);
USER_RW = User.createUserForTesting(conf, "rwuser", new String[0]);
USER_RO = User.createUserForTesting(conf, "rouser", new String[0]);
USER_NONE = User.createUserForTesting(conf, "usernone", new String[0]);
}
@Test
public void testRegionObserverScanTimeStacking() throws Exception {
byte[] ROW = Bytes.toBytes("testRow");
byte[] TABLE = Bytes.toBytes(getClass().getName());
byte[] A = Bytes.toBytes("A");
byte[][] FAMILIES = new byte[][] { A };
// Use new HTU to not overlap with the DFS cluster started in #CompactionStacking
Configuration conf = new HBaseTestingUtility().getConfiguration();
HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES);
RegionCoprocessorHost h = region.getCoprocessorHost();
h.load(NoDataFromScan.class, Coprocessor.PRIORITY_HIGHEST, conf);
h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf);
Put put = new Put(ROW);
put.addColumn(A, A, A);
region.put(put);
Get get = new Get(ROW);
Result r = region.get(get);
assertNull(
"Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: "
+ r, r.listCells());
HBaseTestingUtility.closeRegionAndWAL(region);
}
@Test
public void testRegionObserverFlushTimeStacking() throws Exception {
byte[] ROW = Bytes.toBytes("testRow");
byte[] TABLE = Bytes.toBytes(getClass().getName());
byte[] A = Bytes.toBytes("A");
byte[][] FAMILIES = new byte[][] { A };
// Use new HTU to not overlap with the DFS cluster started in #CompactionStacking
Configuration conf = new HBaseTestingUtility().getConfiguration();
HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES);
RegionCoprocessorHost h = region.getCoprocessorHost();
h.load(NoDataFromFlush.class, Coprocessor.PRIORITY_HIGHEST, conf);
h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf);
// put a row and flush it to disk
Put put = new Put(ROW);
put.addColumn(A, A, A);
region.put(put);
region.flush(true);
Get get = new Get(ROW);
Result r = region.get(get);
assertNull(
"Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: "
+ r, r.listCells());
HBaseTestingUtility.closeRegionAndWAL(region);
}
/**
* A loaded WAL coprocessor won't break existing WAL test cases.
*/
@Test
public void testWALCoprocessorLoaded() throws Exception {
// test to see whether the coprocessor is loaded or not.
AbstractFSWAL<?> wal = null;
try {
wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
WALCoprocessorHost host = wal.getCoprocessorHost();
Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
assertNotNull(c);
} finally {
if (wal != null) {
wal.close();
}
}
}
private void registerCoprocessor(String jarPath, String tableName, String localJarPath) throws IOException {
Configuration configuration = getConf();
try (FileSystem fs = FileSystem.get(configuration); HBaseAdmin admin = new HBaseAdmin(configuration)) {
Path path = new Path(fs.getUri() + Path.SEPARATOR + jarPath);
LOGGER.info("Checking path {} ... ", path.toString());
if (!fs.exists(path)) {
LOGGER.info("Path: {} not exist, uploading jar ...", path.toString());
if (localJarPath == null) {
throw new IOException("local jar path is not given, please manually upload coprocessor jar onto hdfs at " + jarPath
+ " and retry, or provide local coprocessor jar path through CLI argument and upload automatically");
}
LOGGER.info("Copying from local {} to {}", localJarPath, jarPath);
fs.copyFromLocalFile(new Path(localJarPath), path);
LOGGER.info("Succeed to copied coprocessor jar to {}", path.toString());
} else {
LOGGER.info("Path {} already exists", path.toString());
}
LOGGER.info("Checking hbase table {}", tableName);
TableName table = TableName.valueOf(tableName);
HTableDescriptor tableDescriptor = admin.getTableDescriptor(table);
LOGGER.info("Table {} found", tableName);
if (tableDescriptor.hasCoprocessor(AggregateProtocolEndPoint.class.getName())) {
LOGGER.warn("Table '" + tableName + "' already registered coprocessor: " + AggregateProtocolEndPoint.class.getName() + ", removing firstly");
tableDescriptor.removeCoprocessor(AggregateProtocolEndPoint.class.getName());
admin.modifyTable(table, tableDescriptor);
tableDescriptor = admin.getTableDescriptor(table);
}
tableDescriptor.addCoprocessor(AggregateProtocolEndPoint.class.getName(),
path, Coprocessor.PRIORITY_USER, new HashMap<>());
admin.modifyTable(table, tableDescriptor);
LOGGER.info("Succeed to enable coprocessor on table " + tableName);
}
}
@Test
// HBASE-6308: Test CP classloader is the CoprocessorClassLoader
public void testPrivateClassLoader() throws Exception {
File jarFile = buildCoprocessorJar(cpName4);
// create a table that references the jar
TableDescriptorBuilder tdb = TableDescriptorBuilder.newBuilder(TableName.valueOf(cpName4));
tdb.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes("test")).build());
tdb.setValue("COPROCESSOR$1", getLocalPath(jarFile) + "|" + cpName4 + "|" +
Coprocessor.PRIORITY_USER);
TableDescriptor tableDescriptor = tdb.build();
Admin admin = TEST_UTIL.getAdmin();
admin.createTable(tableDescriptor);
waitForTable(tableDescriptor.getTableName());
// verify that the coprocessor was loaded correctly
boolean found = false;
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
if (region.getRegionInfo().getRegionNameAsString().startsWith(cpName4)) {
Coprocessor cp = region.getCoprocessorHost().findCoprocessor(cpName4);
if (cp != null) {
found = true;
assertEquals("Class " + cpName4 + " was not loaded by CoprocessorClassLoader",
cp.getClass().getClassLoader().getClass(), CoprocessorClassLoader.class);
}
}
}
assertTrue("Class " + cpName4 + " was missing on a region", found);
}
@Override
public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
if (c1.equals(c2)) {
return 0;
}
return c1.getName().compareTo(c2.getName());
}
/**
* Constructor
* @param impl the coprocessor instance
* @param priority chaining priority
*/
public BaseEnvironment(final C impl, final int priority, final int seq, final Configuration conf) {
this.impl = impl;
this.classLoader = impl.getClass().getClassLoader();
this.priority = priority;
this.state = Coprocessor.State.INSTALLED;
this.seq = seq;
this.conf = new ReadOnlyConfiguration(conf);
}
@BeforeClass
public static void setupBeforeClass() throws Exception {
// setup configuration
conf = TEST_UTIL.getConfiguration();
conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
// Enable security
enableSecurity(conf);
// Verify enableSecurity sets up what we require
verifyConfiguration(conf);
TEST_UTIL.startMiniCluster();
MasterCoprocessorHost cpHost = TEST_UTIL.getMiniHBaseCluster().getMaster()
.getMasterCoprocessorHost();
cpHost.load(AccessController.class, Coprocessor.PRIORITY_HIGHEST, conf);
AccessController ac = (AccessController)
cpHost.findCoprocessor(AccessController.class.getName());
cpHost.createEnvironment(ac, Coprocessor.PRIORITY_HIGHEST, 1, conf);
RegionServerCoprocessorHost rsHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
.getRegionServerCoprocessorHost();
rsHost.createEnvironment(ac, Coprocessor.PRIORITY_HIGHEST, 1, conf);
// Wait for the ACL table to become available
TEST_UTIL.waitTableEnabled(PermissionStorage.ACL_TABLE_NAME);
// create a set of test users
USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]);
USER_OTHER = User.createUserForTesting(conf, "other", new String[0]);
}
private static List<AccessController> getAccessControllers(MiniHBaseCluster cluster) {
List<AccessController> result = Lists.newArrayList();
for (RegionServerThread t: cluster.getLiveRegionServerThreads()) {
for (HRegion region: t.getRegionServer().getOnlineRegionsLocalContext()) {
Coprocessor cp = region.getCoprocessorHost().findCoprocessor(AccessController.class);
if (cp != null) {
result.add((AccessController)cp);
}
}
}
return result;
}
@Test
public void testCoprocessorLoading() throws Exception {
MasterCoprocessorHost cpHost =
TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterCoprocessorHost();
cpHost.load(MyAccessController.class, Coprocessor.PRIORITY_HIGHEST, conf);
AccessController ACCESS_CONTROLLER = cpHost.findCoprocessor(MyAccessController.class);
MasterCoprocessorEnvironment CP_ENV = cpHost.createEnvironment(
ACCESS_CONTROLLER, Coprocessor.PRIORITY_HIGHEST, 1, conf);
RegionServerCoprocessorHost rsHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
.getRegionServerCoprocessorHost();
RegionServerCoprocessorEnvironment RSCP_ENV = rsHost.createEnvironment(
ACCESS_CONTROLLER, Coprocessor.PRIORITY_HIGHEST, 1, conf);
}
@BeforeClass
public static void setupBeforeClass() throws Exception {
// setup configuration
conf = TEST_UTIL.getConfiguration();
conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
// Enable security
enableSecurity(conf);
// Verify enableSecurity sets up what we require
verifyConfiguration(conf);
// We expect 0.98 cell ACL semantics
conf.setBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT, false);
TEST_UTIL.startMiniCluster();
MasterCoprocessorHost cpHost = TEST_UTIL.getMiniHBaseCluster().getMaster()
.getMasterCoprocessorHost();
cpHost.load(AccessController.class, Coprocessor.PRIORITY_HIGHEST, conf);
AccessController ac = cpHost.findCoprocessor(AccessController.class);
cpHost.createEnvironment(ac, Coprocessor.PRIORITY_HIGHEST, 1, conf);
RegionServerCoprocessorHost rsHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
.getRegionServerCoprocessorHost();
rsHost.createEnvironment(ac, Coprocessor.PRIORITY_HIGHEST, 1, conf);
// Wait for the ACL table to become available
TEST_UTIL.waitTableEnabled(PermissionStorage.ACL_TABLE_NAME);
// create a set of test users
USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]);
USER_OTHER = User.createUserForTesting(conf, "other", new String[0]);
GROUP_USER = User.createUserForTesting(conf, "group_user", new String[] { GROUP });
usersAndGroups = new String[] { USER_OTHER.getShortName(), AuthUtil.toGroupEntry(GROUP) };
}
/**
* Test a table modification adding a coprocessor path
* which is not whitelisted.
* @exception Exception should be thrown and caught
* to show coprocessor is working as desired
* @param whitelistedPaths A String array of paths to add in
* for the whitelisting configuration
* @param coprocessorPath A String to use as the
* path for a mock coprocessor
*/
private static void positiveTestCase(String[] whitelistedPaths,
String coprocessorPath) throws Exception {
Configuration conf = UTIL.getConfiguration();
// load coprocessor under test
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
CoprocessorWhitelistMasterObserver.class.getName());
conf.setStrings(
CoprocessorWhitelistMasterObserver.CP_COPROCESSOR_WHITELIST_PATHS_KEY,
whitelistedPaths);
// set retries low to raise exception quickly
conf.setInt("hbase.client.retries.number", 5);
UTIL.startMiniCluster();
UTIL.createTable(TEST_TABLE, new byte[][] { TEST_FAMILY });
UTIL.waitUntilAllRegionsAssigned(TEST_TABLE);
Connection connection = ConnectionFactory.createConnection(conf);
Table t = connection.getTable(TEST_TABLE);
HTableDescriptor htd = new HTableDescriptor(t.getDescriptor());
htd.addCoprocessor("net.clayb.hbase.coprocessor.NotWhitelisted",
new Path(coprocessorPath),
Coprocessor.PRIORITY_USER, null);
LOG.info("Modifying Table");
try {
connection.getAdmin().modifyTable(htd);
fail("Expected coprocessor to raise IOException");
} catch (IOException e) {
// swallow exception from coprocessor
}
LOG.info("Done Modifying Table");
assertEquals(0, t.getDescriptor().getCoprocessorDescriptors().size());
}
/**
* Test a table modification adding a coprocessor path
* which is whitelisted. The coprocessor should be added to
* the table descriptor successfully.
* @param whitelistedPaths A String array of paths to add in
* for the whitelisting configuration
* @param coprocessorPath A String to use as the
* path for a mock coprocessor
*/
private static void negativeTestCase(String[] whitelistedPaths,
String coprocessorPath) throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setInt("hbase.client.retries.number", 5);
// load coprocessor under test
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
CoprocessorWhitelistMasterObserver.class.getName());
// set retries low to raise exception quickly
// set a coprocessor whitelist path for test
conf.setStrings(
CoprocessorWhitelistMasterObserver.CP_COPROCESSOR_WHITELIST_PATHS_KEY,
whitelistedPaths);
UTIL.startMiniCluster();
UTIL.createTable(TEST_TABLE, new byte[][] { TEST_FAMILY });
UTIL.waitUntilAllRegionsAssigned(TEST_TABLE);
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
// disable table so we do not actually try loading non-existant
// coprocessor file
admin.disableTable(TEST_TABLE);
Table t = connection.getTable(TEST_TABLE);
HTableDescriptor htd = new HTableDescriptor(t.getDescriptor());
htd.addCoprocessor("net.clayb.hbase.coprocessor.Whitelisted",
new Path(coprocessorPath),
Coprocessor.PRIORITY_USER, null);
LOG.info("Modifying Table");
admin.modifyTable(htd);
assertEquals(1, t.getDescriptor().getCoprocessorDescriptors().size());
LOG.info("Done Modifying Table");
}
/**
* Test a table creation including a coprocessor path
* which is not whitelisted. Table will not be created due to the
* offending coprocessor.
*/
@Test
public void testCreationNonWhitelistedCoprocessorPath() throws Exception {
Configuration conf = UTIL.getConfiguration();
// load coprocessor under test
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
CoprocessorWhitelistMasterObserver.class.getName());
conf.setStrings(CoprocessorWhitelistMasterObserver.CP_COPROCESSOR_WHITELIST_PATHS_KEY,
new String[]{});
// set retries low to raise exception quickly
conf.setInt("hbase.client.retries.number", 5);
UTIL.startMiniCluster();
TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
new TableDescriptorBuilder.ModifyableTableDescriptor(TEST_TABLE);
ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(TEST_FAMILY);
tableDescriptor.setColumnFamily(familyDescriptor);
tableDescriptor.setCoprocessor(
CoprocessorDescriptorBuilder.newBuilder("net.clayb.hbase.coprocessor.NotWhitelisted")
.setJarPath("file:///notpermitted/couldnotpossiblyexist.jar")
.setPriority(Coprocessor.PRIORITY_USER)
.setProperties(Collections.emptyMap())
.build());
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
LOG.info("Creating Table");
try {
admin.createTable(tableDescriptor);
fail("Expected coprocessor to raise IOException");
} catch (IOException e) {
// swallow exception from coprocessor
}
LOG.info("Done Creating Table");
// ensure table was not created
assertEquals(0,
admin.listTableDescriptors(Pattern.compile("^" + TEST_TABLE.getNameAsString() + "$")).size());
}
@BeforeClass
public static void setupBeforeClass() throws Exception {
// setup configuration
conf = TEST_UTIL.getConfiguration();
// Enable security
enableSecurity(conf);
// Verify enableSecurity sets up what we require
verifyConfiguration(conf);
// We expect 0.98 cell ACL semantics
conf.setBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT, false);
TEST_UTIL.startMiniCluster();
MasterCoprocessorHost cpHost = TEST_UTIL.getMiniHBaseCluster().getMaster()
.getMasterCoprocessorHost();
cpHost.load(AccessController.class, Coprocessor.PRIORITY_HIGHEST, conf);
AccessController ac = cpHost.findCoprocessor(AccessController.class);
cpHost.createEnvironment(ac, Coprocessor.PRIORITY_HIGHEST, 1, conf);
RegionServerCoprocessorHost rsHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
.getRegionServerCoprocessorHost();
rsHost.createEnvironment(ac, Coprocessor.PRIORITY_HIGHEST, 1, conf);
// Wait for the ACL table to become available
TEST_UTIL.waitTableEnabled(PermissionStorage.ACL_TABLE_NAME);
// create a set of test users
USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]);
USER_OTHER = User.createUserForTesting(conf, "other", new String[0]);
USER_OTHER2 = User.createUserForTesting(conf, "other2", new String[0]);
GROUP_USER = User.createUserForTesting(conf, "group_user", new String[] { GROUP });
usersAndGroups = new String[] { USER_OTHER.getShortName(), AuthUtil.toGroupEntry(GROUP) };
}