下面列出了怎么用org.apache.hadoop.hbase.ipc.RpcControllerFactory的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testClientWritesWithPriority() throws Exception {
Configuration conf = new Configuration(UTIL.getConfiguration());
// add the keys for our rpc factory
conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
CountingIndexClientRpcFactory.class.getName());
// and set the index table as the current table
conf.setStrings(IndexQosRpcControllerFactory.INDEX_TABLE_NAMES_KEY,
TestTable.getTableNameString());
HTable table = new HTable(conf, TestTable.getTableName());
// do a write to the table
Put p = new Put(row);
p.add(family, qual, new byte[] { 1, 0, 1, 0 });
table.put(p);
table.flushCommits();
// check the counts on the rpc controller
assertEquals("Didn't get the expected number of index priority writes!", 1,
(int) CountingIndexClientRpcController.priorityCounts
.get(QueryServicesOptions.DEFAULT_INDEX_MIN_PRIORITY));
table.close();
}
MasterRegistry(Configuration conf) throws IOException {
this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
// this through the master registry...
// This is a problem as we will use the cluster id to determine the authentication method
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
Set<ServerName> masterAddrs = parseMasterAddrs(conf);
ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
ImmutableMap.builderWithExpectedSize(masterAddrs.size());
User user = User.getCurrent();
for (ServerName masterAddr : masterAddrs) {
builder.put(masterAddr,
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
}
masterAddr2Stub = builder.build();
}
public static Configuration getIndexWriterConnection(Configuration conf) {
Configuration clonedConfig = PropertiesUtil.cloneConfig(conf);
/*
* Set the rpc controller factory so that the HTables used by IndexWriter would
* set the correct priorities on the remote RPC calls.
*/
clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
// lower the number of rpc retries. We inherit config from HConnectionManager#setServerSideHConnectionRetries,
// which by default uses a multiplier of 10. That is too many retries for our synchronous index writes
clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
conf.getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, conf
.getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
return clonedConfig;
}
/**
* Set the index rpc controller, if the rpc controller exists. No-op if there the RpcController
* is not on the classpath.
* @param conf to update
*/
public static void setPhoenixIndexRpcController(Configuration conf) {
if (rpcControllerExists()) {
// then we can load the class just fine
conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
PhoenixIndexRpcSchedulerFactory.class.getName());
}
}
@Test
public void testFallbackToDefaultRpcControllerFactory() {
Configuration conf = new Configuration(UTIL.getConfiguration());
conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, "foo.bar.Baz");
// Should not fail
RpcControllerFactory factory = RpcControllerFactory.instantiate(conf);
assertNotNull(factory);
assertEquals(factory.getClass(), RpcControllerFactory.class);
}
@Override
public void start(CoprocessorEnvironment e) throws IOException {
// Can't use ClientKeyValueBuilder on server-side because the memstore expects to
// be able to get a single backing buffer for a KeyValue.
this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
/*
* We need to create a copy of region's configuration since we don't want any side effect of
* setting the RpcControllerFactory.
*/
upsertSelectConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
/*
* Till PHOENIX-3995 is fixed, we need to use the
* InterRegionServerIndexRpcControllerFactory. Although this would cause remote RPCs to use
* index handlers on the destination region servers, it is better than using the regular
* priority handlers which could result in a deadlock.
*/
upsertSelectConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
compactionConfig = ServerUtil.getCompactionConfig(e.getConfiguration());
// For retries of index write failures, use the same # of retries as the rebuilder
indexWriteConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
indexWriteConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER));
indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator());
}
private static HBaseRpcController getRpcController() {
return RpcControllerFactory.instantiate(conf).newController();
}
public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
SocketAddress localAddress, User user) {
this.conf = conf;
this.user = user;
if (user.isLoginFromKeytab()) {
spawnRenewalChore(user.getUGI());
}
this.connConf = new AsyncConnectionConfiguration(conf);
this.registry = registry;
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null));
} else {
this.metrics = Optional.empty();
}
this.rpcClient = RpcClientFactory.createClient(
conf, clusterId, localAddress, metrics.orElse(null));
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
this.rpcTimeout =
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
nonceGenerator = PerClientRandomNonceGenerator.get();
} else {
nonceGenerator = NO_NONCE_GENERATOR;
}
this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
ClusterStatusListener listener = null;
if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
// TODO: this maybe a blocking operation, better to create it outside the constructor and pass
// it in, just like clusterId. Not a big problem for now as the default value is false.
Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(
STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
if (listenerClass == null) {
LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
} else {
try {
listener = new ClusterStatusListener(
new ClusterStatusListener.DeadServerHandler() {
@Override
public void newDead(ServerName sn) {
locator.clearCache(sn);
rpcClient.cancelConnections(sn);
}
}, conf, listenerClass);
} catch (IOException e) {
LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
}
}
}
this.clusterStatusListener = listener;
}
HBaseHbck(BlockingInterface hbck, RpcControllerFactory rpcControllerFactory) {
this.hbck = hbck;
this.rpcControllerFactory = rpcControllerFactory;
}
public static QueryServicesOptions withDefaults() {
Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
QueryServicesOptions options = new QueryServicesOptions(config)
.setIfUnset(STATS_USE_CURRENT_TIME_ATTRIB, DEFAULT_STATS_USE_CURRENT_TIME)
.setIfUnset(RUN_UPDATE_STATS_ASYNC, DEFAULT_RUN_UPDATE_STATS_ASYNC)
.setIfUnset(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC)
.setIfUnset(KEEP_ALIVE_MS_ATTRIB, DEFAULT_KEEP_ALIVE_MS)
.setIfUnset(THREAD_POOL_SIZE_ATTRIB, DEFAULT_THREAD_POOL_SIZE)
.setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE)
.setIfUnset(THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS)
.setIfUnset(CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES)
.setIfUnset(SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES)
.setIfUnset(SPOOL_DIRECTORY, DEFAULT_SPOOL_DIRECTORY)
.setIfUnset(MAX_MEMORY_PERC_ATTRIB, DEFAULT_MAX_MEMORY_PERC)
.setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, DEFAULT_MAX_TENANT_MEMORY_PERC)
.setIfUnset(MAX_SERVER_CACHE_SIZE_ATTRIB, DEFAULT_MAX_SERVER_CACHE_SIZE)
.setIfUnset(SCAN_CACHE_SIZE_ATTRIB, DEFAULT_SCAN_CACHE_SIZE)
.setIfUnset(DATE_FORMAT_ATTRIB, DEFAULT_DATE_FORMAT)
.setIfUnset(DATE_FORMAT_TIMEZONE_ATTRIB, DEFAULT_DATE_FORMAT_TIMEZONE)
.setIfUnset(STATS_UPDATE_FREQ_MS_ATTRIB, DEFAULT_STATS_UPDATE_FREQ_MS)
.setIfUnset(MIN_STATS_UPDATE_FREQ_MS_ATTRIB, DEFAULT_MIN_STATS_UPDATE_FREQ_MS)
.setIfUnset(STATS_CACHE_THREAD_POOL_SIZE, DEFAULT_STATS_CACHE_THREAD_POOL_SIZE)
.setIfUnset(CALL_QUEUE_ROUND_ROBIN_ATTRIB, DEFAULT_CALL_QUEUE_ROUND_ROBIN)
.setIfUnset(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE)
.setIfUnset(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER)
.setIfUnset(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES)
.setIfUnset(IMMUTABLE_ROWS_ATTRIB, DEFAULT_IMMUTABLE_ROWS)
.setIfUnset(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD)
.setIfUnset(MAX_SPOOL_TO_DISK_BYTES_ATTRIB, DEFAULT_MAX_SPOOL_TO_DISK_BYTES)
.setIfUnset(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA)
.setIfUnset(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE)
.setIfUnset(GROUPBY_MAX_CACHE_SIZE_ATTRIB, DEFAULT_GROUPBY_MAX_CACHE_MAX)
.setIfUnset(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES)
.setIfUnset(SEQUENCE_CACHE_SIZE_ATTRIB, DEFAULT_SEQUENCE_CACHE_SIZE)
.setIfUnset(SCAN_RESULT_CHUNK_SIZE, DEFAULT_SCAN_RESULT_CHUNK_SIZE)
.setIfUnset(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE)
.setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK)
.setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK)
.setIfUnset(GLOBAL_METRICS_ENABLED, DEFAULT_IS_GLOBAL_METRICS_ENABLED)
.setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY)
.setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX)
.setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER)
.setIfUnset(COLLECT_REQUEST_LEVEL_METRICS, DEFAULT_REQUEST_LEVEL_METRICS_ENABLED)
.setIfUnset(ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE, DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE)
.setIfUnset(ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE, DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE)
.setIfUnset(RENEW_LEASE_THRESHOLD_MILLISECONDS, DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS)
.setIfUnset(RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS, DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS)
.setIfUnset(RENEW_LEASE_THREAD_POOL_SIZE, DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE)
.setIfUnset(IS_NAMESPACE_MAPPING_ENABLED, DEFAULT_IS_NAMESPACE_MAPPING_ENABLED)
.setIfUnset(IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, DEFAULT_IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE)
.setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)
.setIfUnset(AUTO_UPGRADE_ENABLED, DEFAULT_AUTO_UPGRADE_ENABLED)
.setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING)
.setIfUnset(TRACING_ENABLED, DEFAULT_TRACING_ENABLED)
.setIfUnset(TRACING_BATCH_SIZE, DEFAULT_TRACING_BATCH_SIZE)
.setIfUnset(TRACING_THREAD_POOL_SIZE, DEFAULT_TRACING_THREAD_POOL_SIZE)
.setIfUnset(STATS_COLLECTION_ENABLED, DEFAULT_STATS_COLLECTION_ENABLED)
.setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION)
.setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION)
.setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING)
.setIfUnset(COST_BASED_OPTIMIZER_ENABLED, DEFAULT_COST_BASED_OPTIMIZER_ENABLED)
.setIfUnset(PHOENIX_ACLS_ENABLED, DEFAULT_PHOENIX_ACLS_ENABLED)
.setIfUnset(LOG_LEVEL, DEFAULT_LOGGING_LEVEL)
.setIfUnset(LOG_SAMPLE_RATE, DEFAULT_LOG_SAMPLE_RATE)
.setIfUnset(TxConstants.TX_PRE_014_CHANGESET_KEY, Boolean.FALSE.toString())
.setIfUnset(CLIENT_METRICS_TAG, DEFAULT_CLIENT_METRICS_TAG)
.setIfUnset(CLIENT_INDEX_ASYNC_THRESHOLD, DEFAULT_CLIENT_INDEX_ASYNC_THRESHOLD)
;
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
// it to 1, so we'll change it.
int scanCaching = config.getInt(SCAN_CACHE_SIZE_ATTRIB, 0);
if (scanCaching == 1) {
config.setInt(SCAN_CACHE_SIZE_ATTRIB, DEFAULT_SCAN_CACHE_SIZE);
} else if (scanCaching <= 0) { // Provides the user with a way of setting it to 1
config.setInt(SCAN_CACHE_SIZE_ATTRIB, 1);
}
return options;
}