下面列出了怎么用org.apache.hadoop.fs.CommonConfigurationKeysPublic的API类实例代码及写法,或者点击链接到github查看源代码。
private void setupOzoneFileSystem()
throws IOException, TimeoutException, InterruptedException {
OzoneConfiguration conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
.build();
cluster.waitForClusterToBeReady();
// create a volume and a bucket to be used by OzoneFileSystem
OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(cluster);
volumeName = bucket.getVolumeName();
bucketName = bucket.getName();
String rootPath = String.format("%s://%s.%s/",
OzoneConsts.OZONE_URI_SCHEME, bucket.getName(),
bucket.getVolumeName());
// Set the fs.defaultFS and start the filesystem
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
// Set the number of keys to be processed during batch operate.
conf.setInt(OZONE_FS_ITERATE_BATCH_SIZE, 5);
fs = FileSystem.get(conf);
}
/**
* Returns a ConnectionId object.
* @param addr Remote address for the connection.
* @param protocol Protocol for RPC.
* @param ticket UGI
* @param rpcTimeout timeout
* @param conf Configuration object
* @return A ConnectionId instance
* @throws IOException
*/
static ConnectionId getConnectionId(InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
RetryPolicy connectionRetryPolicy, Configuration conf) throws IOException {
if (connectionRetryPolicy == null) {
final int max = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT);
final int retryInterval = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,
CommonConfigurationKeysPublic
.IPC_CLIENT_CONNECT_RETRY_INTERVAL_DEFAULT);
connectionRetryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
max, retryInterval, TimeUnit.MILLISECONDS);
}
return new ConnectionId(addr, protocol, ticket, rpcTimeout,
connectionRetryPolicy, conf);
}
/**
* Helper function for testOtherDefaultFS(),
* run fs -ls o3fs:/// against different fs.defaultFS input.
*
* @param defaultFS Desired fs.defaultFS to be used in the test
* @throws Exception
*/
private void testWithDefaultFS(String defaultFS) throws Exception {
OzoneConfiguration clientConf = new OzoneConfiguration(conf);
clientConf.setQuietMode(false);
clientConf.set(o3fsImplKey, o3fsImplValue);
// fs.defaultFS = file:///
clientConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
defaultFS);
FsShell shell = new FsShell(clientConf);
try {
// Test case: ozone fs -ls o3fs:///
// Expectation: Fail. fs.defaultFS is not a qualified o3fs URI.
int res = ToolRunner.run(shell, new String[] {"-ls", "o3fs:///"});
Assert.assertEquals(res, -1);
} finally {
shell.close();
}
}
/**
* Test OM HA URLs with some unqualified fs.defaultFS.
* @throws Exception
*/
@Test
public void testOtherDefaultFS() throws Exception {
// Test scenarios where fs.defaultFS isn't a fully qualified o3fs
// fs.defaultFS = file:///
testWithDefaultFS(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
// fs.defaultFS = hdfs://ns1/
testWithDefaultFS("hdfs://ns1/");
// fs.defaultFS = o3fs:///
String unqualifiedFs1 = String.format(
"%s:///", OzoneConsts.OZONE_URI_SCHEME);
testWithDefaultFS(unqualifiedFs1);
// fs.defaultFS = o3fs://bucketName.volumeName/
String unqualifiedFs2 = String.format("%s://%s.%s/",
OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName);
testWithDefaultFS(unqualifiedFs2);
}
public static void populateYarnSecureConfigurations(Configuration conf, String principal, String keytab) {
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
conf.set(YarnConfiguration.RM_KEYTAB, keytab);
conf.set(YarnConfiguration.RM_PRINCIPAL, principal);
conf.set(YarnConfiguration.NM_KEYTAB, keytab);
conf.set(YarnConfiguration.NM_PRINCIPAL, principal);
conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, keytab);
conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, keytab);
conf.set("hadoop.security.auth_to_local", "RULE:[1:$1] RULE:[2:$1]");
}
@Override
public void init(Configuration configuration, RMProxy<T> rmProxy,
Class<T> protocol) {
this.rmProxy = rmProxy;
this.protocol = protocol;
this.rmProxy.checkAllowedProtocols(this.protocol);
this.conf = new YarnConfiguration(configuration);
Collection<String> rmIds = HAUtil.getRMHAIds(conf);
this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]);
conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]);
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES,
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES));
conf.setInt(CommonConfigurationKeysPublic.
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS,
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
}
public synchronized static void init(Configuration conf) {
if (initCalled) {
return;
} else {
initCalled = true;
}
Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
conf.getClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class,
DNSToSwitchMapping.class);
try {
DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
dnsToSwitchMappingClass, conf);
// Wrap around the configured class with the Cached implementation so as
// to save on repetitive lookups.
// Check if the impl is already caching, to avoid double caching.
dnsToSwitchMapping =
((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
: new CachedDNSToSwitchMapping(newInstance));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Test
public void testCaching() {
Configuration conf = new Configuration();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
try {
InetAddress iaddr = InetAddress.getByName("host1");
MyResolver.resolvedHost1 = iaddr.getHostAddress();
} catch (UnknownHostException e) {
// Ignore if not found
}
Node node = RackResolver.resolve("host1");
Assert.assertEquals("/rack1", node.getNetworkLocation());
node = RackResolver.resolve("host1");
Assert.assertEquals("/rack1", node.getNetworkLocation());
node = RackResolver.resolve(invalidHost);
Assert.assertEquals(NetworkTopology.DEFAULT_RACK, node.getNetworkLocation());
}
Server createServer() {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
if (UserGroupInformation.isSecurityEnabled()) {
secretManager = new LocalizerTokenSecretManager();
}
Server server = rpc.getServer(LocalizationProtocol.class, this,
localizationServerAddress, conf, secretManager,
conf.getInt(YarnConfiguration.NM_LOCALIZER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_NM_LOCALIZER_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
server.refreshServiceAcl(conf, new NMPolicyProvider());
}
return server;
}
ConnectionManager() {
this.idleScanTimer = new Timer(
"IPC Server idle connection scanner for port " + getPort(), true);
this.idleScanThreshold = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
this.idleScanInterval = conf.getInt(
CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY,
CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT);
this.maxIdleTime = 2 * conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
this.maxIdleToClose = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
this.maxConnections = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_MAX_CONNECTIONS_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_MAX_CONNECTIONS_DEFAULT);
// create a set with concurrency -and- a thread-safe iterator, add 2
// for listener and idle closer threads
this.connections = Collections.newSetFromMap(
new ConcurrentHashMap<Connection,Boolean>(
maxQueueSize, 0.75f, readThreads+2));
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
int threadCount = conf.getInt(
YarnConfiguration.RM_AMLAUNCHER_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_AMLAUNCHER_THREAD_COUNT);
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("ApplicationMasterLauncher #%d")
.build();
launcherPool = new ThreadPoolExecutor(threadCount, threadCount, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
launcherPool.setThreadFactory(tf);
Configuration newConf = new YarnConfiguration(conf);
newConf.setInt(CommonConfigurationKeysPublic.
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
conf.getInt(YarnConfiguration.RM_NODEMANAGER_CONNECT_RETIRES,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_CONNECT_RETIRES));
setConfig(newConf);
super.serviceInit(newConf);
}
static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
}
// Since we're creating a new UserGroupInformation here, we know that no
// future RPC proxies will be able to re-use the same connection. And
// usages of this proxy tend to be one-off calls.
//
// This is a temporary fix: callers should really achieve this by using
// RPC.stopProxy() on the resulting object, but this is currently not
// working in trunk. See the discussion on HDFS-1965.
Configuration confWithNoIpcIdle = new Configuration(conf);
confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic
.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
UserGroupInformation ticket = UserGroupInformation
.createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString());
ticket.addToken(locatedBlock.getBlockToken());
return createClientDatanodeProtocolProxy(addr, ticket, confWithNoIpcIdle,
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
}
@Before
public void setUp() throws Exception {
counter = new AtomicInteger(0);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
eventQueue = new LinkedBlockingQueue<Event>();
dispatcher = new AsyncDispatcher(eventQueue);
Renewer.reset();
delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getSystemCredentialsForApps()).thenReturn(
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
when(mockContext.getDelegationTokenRenewer()).thenReturn(
delegationTokenRenewer);
when(mockContext.getDispatcher()).thenReturn(dispatcher);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
delegationTokenRenewer.setRMContext(mockContext);
delegationTokenRenewer.init(conf);
delegationTokenRenewer.start();
}
private static void setupAndStartRM() throws Exception {
Configuration rmconf = new Configuration();
rmconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
rmconf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class);
rmconf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
rmconf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
rmconf.set("yarn.resourcemanager.principal", spnegoPrincipal);
rmconf.set("yarn.resourcemanager.keytab",
spnegoKeytabFile.getAbsolutePath());
rmconf.setBoolean("mockrm.webapp.enabled", true);
UserGroupInformation.setConfiguration(rmconf);
rm = new MockRM(rmconf);
rm.start();
}
@Test
public void testServiceAclsRefreshWithLocalConfigurationProvider() {
configuration.setBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
ResourceManager resourceManager = null;
try {
resourceManager = new ResourceManager();
resourceManager.init(configuration);
resourceManager.start();
resourceManager.adminService.refreshServiceAcls(RefreshServiceAclsRequest
.newInstance());
} catch (Exception ex) {
fail("Using localConfigurationProvider. Should not get any exception.");
} finally {
if (resourceManager != null) {
resourceManager.stop();
}
}
}
@Test
public void testFillInRacks() {
AMRMClientImpl<ContainerRequest> client =
new AMRMClientImpl<ContainerRequest>();
Configuration conf = new Configuration();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
client.init(conf);
Resource capability = Resource.newInstance(1024, 1, 1);
ContainerRequest request =
new ContainerRequest(capability, new String[] {"host1", "host2"},
new String[] {"/rack2"}, Priority.newInstance(1));
client.addContainerRequest(request);
verifyResourceRequest(client, request, "host1", true);
verifyResourceRequest(client, request, "host2", true);
verifyResourceRequest(client, request, "/rack1", true);
verifyResourceRequest(client, request, "/rack2", true);
verifyResourceRequest(client, request, ResourceRequest.ANY, true);
}
@Test (expected = InvalidContainerRequestException.class)
public void testDifferentLocalityRelaxationSamePriority() {
AMRMClientImpl<ContainerRequest> client =
new AMRMClientImpl<ContainerRequest>();
Configuration conf = new Configuration();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
client.init(conf);
Resource capability = Resource.newInstance(1024, 1, 1);
ContainerRequest request1 =
new ContainerRequest(capability, new String[] {"host1", "host2"},
null, Priority.newInstance(1), false);
client.addContainerRequest(request1);
ContainerRequest request2 =
new ContainerRequest(capability, new String[] {"host3"},
null, Priority.newInstance(1), true);
client.addContainerRequest(request2);
}
@Test (expected = InvalidContainerRequestException.class)
public void testLocalityRelaxationDifferentLevels() {
AMRMClientImpl<ContainerRequest> client =
new AMRMClientImpl<ContainerRequest>();
Configuration conf = new Configuration();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
client.init(conf);
Resource capability = Resource.newInstance(1024, 1, 1);
ContainerRequest request1 =
new ContainerRequest(capability, new String[] {"host1", "host2"},
null, Priority.newInstance(1), false);
client.addContainerRequest(request1);
ContainerRequest request2 =
new ContainerRequest(capability, null,
new String[] {"rack1"}, Priority.newInstance(1), true);
client.addContainerRequest(request2);
}
@Test
public void testGetHistoryIntermediateDoneDirForUser() throws IOException {
// Test relative path
Configuration conf = new Configuration();
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
"/mapred/history/done_intermediate");
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
String pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
Assert.assertEquals("/mapred/history/done_intermediate/" +
System.getProperty("user.name"), pathStr);
// Test fully qualified path
// Create default configuration pointing to the minicluster
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
dfsCluster.getURI().toString());
FileOutputStream os = new FileOutputStream(coreSitePath);
conf.writeXml(os);
os.close();
// Simulate execution under a non-default namenode
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
"file:///");
pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
Assert.assertEquals(dfsCluster.getURI().toString() +
"/mapred/history/done_intermediate/" + System.getProperty("user.name"),
pathStr);
}
public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
JobID jobId, MRClientProtocol historyServerProxy) {
this.conf = new Configuration(conf); // Cloning for modifying.
// For faster redirects from AM to HS.
this.conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
this.conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES_ON_TIMEOUTS,
MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES_ON_TIMEOUTS));
this.rm = rm;
this.jobId = jobId;
this.historyServerProxy = historyServerProxy;
this.appId = TypeConverter.toYarn(jobId).getAppId();
notRunningJobs = new HashMap<JobState, HashMap<String, NotRunningJob>>();
}
SCMSecurityProtocolServer(OzoneConfiguration conf,
CertificateServer certificateServer) throws IOException {
this.certificateServer = certificateServer;
final int handlerCount =
conf.getInt(ScmConfigKeys.OZONE_SCM_SECURITY_HANDLER_COUNT_KEY,
ScmConfigKeys.OZONE_SCM_SECURITY_HANDLER_COUNT_DEFAULT);
rpcAddress = HddsServerUtil
.getScmSecurityInetAddress(conf);
// SCM security service RPC service.
RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
ProtobufRpcEngine.class);
metrics = new ProtocolMessageMetrics("ScmSecurityProtocol",
"SCM Security protocol metrics",
SCMSecurityProtocolProtos.Type.values());
BlockingService secureProtoPbService =
SCMSecurityProtocolProtos.SCMSecurityProtocolService
.newReflectiveBlockingService(
new SCMSecurityProtocolServerSideTranslatorPB(this, metrics));
this.rpcServer =
StorageContainerManager.startRpcServer(
conf,
rpcAddress,
SCMSecurityProtocolPB.class,
secureProtoPbService,
handlerCount);
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
rpcServer.refreshServiceAcl(conf, SCMPolicyProvider.getInstance());
}
}
/**
* Returns the hostname for this datanode. If the hostname is not
* explicitly configured in the given config, then it is determined
* via the DNS class.
*
* @param conf Configuration
*
* @return the hostname (NB: may not be a FQDN)
* @throws UnknownHostException if the dfs.datanode.dns.interface
* option is used and the hostname can not be determined
*/
public static String getHostName(ConfigurationSource conf)
throws UnknownHostException {
String name = conf.get(DFS_DATANODE_HOST_NAME_KEY);
if (name == null) {
String dnsInterface = conf.get(
CommonConfigurationKeysPublic.HADOOP_SECURITY_DNS_INTERFACE_KEY);
String nameServer = conf.get(
CommonConfigurationKeysPublic.HADOOP_SECURITY_DNS_NAMESERVER_KEY);
boolean fallbackToHosts = false;
if (dnsInterface == null) {
// Try the legacy configuration keys.
dnsInterface = conf.get(DFS_DATANODE_DNS_INTERFACE_KEY);
dnsInterface = conf.get(DFS_DATANODE_DNS_INTERFACE_KEY);
nameServer = conf.get(DFS_DATANODE_DNS_NAMESERVER_KEY);
} else {
// If HADOOP_SECURITY_DNS_* is set then also attempt hosts file
// resolution if DNS fails. We will not use hosts file resolution
// by default to avoid breaking existing clusters.
fallbackToHosts = true;
}
name = DNS.getDefaultHost(dnsInterface, nameServer, fallbackToHosts);
}
return name;
}
@Before
public void init() throws Exception {
volumeName = RandomStringUtils.randomAlphabetic(10).toLowerCase();
bucketName = RandomStringUtils.randomAlphabetic(10).toLowerCase();
OzoneConfiguration conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
.build();
cluster.waitForClusterToBeReady();
// create a volume and a bucket to be used by OzoneFileSystem
OzoneBucket bucket =
TestDataUtil.createVolumeAndBucket(cluster, volumeName, bucketName);
rootPath = String
.format("%s://%s.%s/", OzoneConsts.OZONE_URI_SCHEME, bucketName,
volumeName);
if (setDefaultFs) {
// Set the fs.defaultFS and start the filesystem
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
fs = FileSystem.get(conf);
} else {
fs = FileSystem.get(new URI(rootPath + "/test.txt"), conf);
}
o3fs = (OzoneFileSystem) fs;
statistics = (OzoneFSStorageStatistics) o3fs.getOzoneFSOpsCountStatistics();
omMetrics = cluster.getOzoneManager().getMetrics();
}
@Before
public void createConfiguration() {
conf = new HdfsConfiguration();
// Turn off persistent IPC, so that the DFSClient can survive NN restart
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
}
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
.setChunkSize(2) // MB
.setBlockSize(8) // MB
.setStreamBufferFlushSize(2) // MB
.setStreamBufferMaxSize(4) // MB
.build();
cluster.waitForClusterToBeReady();
// create a volume and a bucket to be used by OzoneFileSystem
OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(cluster);
// Set the fs.defaultFS and start the filesystem
String uri = String.format("%s://%s.%s/",
OzoneConsts.OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, uri);
fs = FileSystem.get(conf);
int fileLen = 30 * 1024 * 1024;
data = string2Bytes(RandomStringUtils.randomAlphanumeric(fileLen));
filePath = new Path("/" + RandomStringUtils.randomAlphanumeric(5));
try (FSDataOutputStream stream = fs.create(filePath)) {
stream.write(data);
}
}
protected void startRpcServer()
{
Configuration conf = getConfig();
LOG.info("Config: " + conf);
LOG.info("Listener thread count " + listenerThreadCount);
try {
server = new RPC.Builder(conf).setProtocol(StreamingContainerUmbilicalProtocol.class).setInstance(this)
.setBindAddress("0.0.0.0").setPort(0).setNumHandlers(listenerThreadCount).setSecretManager(tokenSecretManager)
.setVerbose(false).build();
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
//refreshServiceAcls(conf, new MRAMPolicyProvider());
server.refreshServiceAcl(conf, new PolicyProvider()
{
@Override
public Service[] getServices()
{
return (new Service[]{
new Service(StreamingContainerUmbilicalProtocol.class
.getName(), StreamingContainerUmbilicalProtocol.class)
});
}
});
}
server.start();
this.address = NetUtils.getConnectAddress(server);
LOG.info("Container callback server listening at " + this.address);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
ConnectionId(InetSocketAddress address, Class<?> protocol,
UserGroupInformation ticket, int rpcTimeout,
RetryPolicy connectionRetryPolicy, Configuration conf) {
this.protocol = protocol;
this.address = address;
this.ticket = ticket;
this.rpcTimeout = rpcTimeout;
this.connectionRetryPolicy = connectionRetryPolicy;
this.maxIdleTime = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
this.maxRetriesOnSasl = conf.getInt(
CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY,
CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT);
this.maxRetriesOnSocketTimeouts = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
this.tcpNoDelay = conf.getBoolean(
CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT);
this.doPing = conf.getBoolean(
CommonConfigurationKeys.IPC_CLIENT_PING_KEY,
CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT);
this.pingInterval = (doPing ? Client.getPingInterval(conf) : 0);
this.conf = conf;
}
/**
* Fallback to clear text passwords in configuration.
* @param name
* @return clear text password or null
*/
protected char[] getPasswordFromConfig(String name) {
char[] pass = null;
if (getBoolean(CredentialProvider.CLEAR_TEXT_FALLBACK,
CommonConfigurationKeysPublic.
HADOOP_SECURITY_CREDENTIAL_CLEAR_TEXT_FALLBACK_DEFAULT)) {
String passStr = get(name);
if (passStr != null) {
pass = passStr.toCharArray();
}
}
return pass;
}
/**
* Fallback to clear text passwords in configuration.
* @param name
* @return clear text password or null
*/
protected char[] getPasswordFromConfig(String name) {
char[] pass = null;
if (getBoolean(CredentialProvider.CLEAR_TEXT_FALLBACK,
CommonConfigurationKeysPublic.
HADOOP_SECURITY_CREDENTIAL_CLEAR_TEXT_FALLBACK_DEFAULT)) {
String passStr = get(name);
if (passStr != null) {
pass = passStr.toCharArray();
}
}
return pass;
}
private FileStreamToken getStreamTokenInternal(String pathString, MediaSourceInfo mediaSourceInfo)
throws Exception {
HdfsConfig hdfsConfig = HdfsConfigManager.getHdfsConfig(mediaSourceInfo, hdfsWriterParameter);
DistributedFileSystem hadoopFS = (DistributedFileSystem) FileSystemManager.getFileSystem(hdfsConfig);
ReentrantLock lock = FileLockUtils.getLock(pathString);
try {
lock.lock();
FileStreamToken token = tokens.get(pathString);
if (token == null) {
FSDataOutputStream fileStream;
Path path = new Path(pathString);
if (!hadoopFS.exists(path)) {
fileStream = hadoopFS.create(path, false,
hdfsConfig.getConfiguration().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
(short) 3, 64 * 1024 * 1024L);
logger.info("stream create succeeded for file : " + pathString);
} else {
fileStream = hadoopFS.append(path);
logger.info("stream append succeeded for file : " + pathString);
}
token = new FileStreamToken(pathString, path, hadoopFS, fileStream);
tokens.put(pathString, token);
}
return token;
} finally {
lock.unlock();
}
}