下面列出了怎么用org.apache.hadoop.util.HostsFileReader的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Get an instance of the configured Block Placement Policy based on the
* value of the configuration paramater dfs.block.replicator.classname.
*
* @param conf the configuration to be used
* @param stats an object thatis used to retrieve the load on the cluster
* @param clusterMap the network topology of the cluster
* @param namesystem the FSNamesystem
* @return an instance of BlockPlacementPolicy
*/
public static BlockPlacementPolicy getInstance(Configuration conf,
FSClusterStats stats,
NetworkTopology clusterMap,
HostsFileReader hostsReader,
DNSToSwitchMapping dnsToSwitchMapping,
FSNamesystem namesystem) {
Class<? extends BlockPlacementPolicy> replicatorClass =
conf.getClass("dfs.block.replicator.classname",
BlockPlacementPolicyDefault.class,
BlockPlacementPolicy.class);
BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils.newInstance(
replicatorClass, conf);
replicator.initialize(conf, stats, clusterMap, hostsReader,
dnsToSwitchMapping, namesystem);
return replicator;
}
private HostsFileReader createHostsFileReader(String includesFile,
String excludesFile) throws IOException, YarnException {
HostsFileReader hostsReader =
new HostsFileReader(includesFile,
(includesFile == null || includesFile.isEmpty()) ? null
: this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(this.conf, includesFile),
excludesFile,
(excludesFile == null || excludesFile.isEmpty()) ? null
: this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(this.conf, excludesFile));
return hostsReader;
}
/**
* Read set of host names from a file
*
* @return set of host names
*/
static Set<String> getHostListFromFile(String fileName, String type) {
Set<String> nodes = new HashSet<String>();
try {
HostsFileReader.readFileToSet(type, fileName, nodes);
return StringUtils.getTrimmedStrings(nodes);
} catch (IOException e) {
throw new IllegalArgumentException(
"Failed to read host list from file: " + fileName);
}
}
private static HostSet readFile(String type, String filename)
throws IOException {
HostSet res = new HostSet();
if (!filename.isEmpty()) {
HashSet<String> entrySet = new HashSet<String>();
HostsFileReader.readFileToSet(type, filename, entrySet);
for (String str : entrySet) {
InetSocketAddress addr = parseEntry(type, filename, str);
if (addr != null) {
res.add(addr);
}
}
}
return res;
}
private HostsFileReader createHostsFileReader(String includesFile,
String excludesFile) throws IOException, YarnException {
HostsFileReader hostsReader =
new HostsFileReader(includesFile,
(includesFile == null || includesFile.isEmpty()) ? null
: this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(this.conf, includesFile),
excludesFile,
(excludesFile == null || excludesFile.isEmpty()) ? null
: this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(this.conf, excludesFile));
return hostsReader;
}
/**
* Read set of host names from a file
*
* @return set of host names
*/
static Set<String> getHostListFromFile(String fileName, String type) {
Set<String> nodes = new HashSet<String>();
try {
HostsFileReader.readFileToSet(type, fileName, nodes);
return StringUtils.getTrimmedStrings(nodes);
} catch (IOException e) {
throw new IllegalArgumentException(
"Failed to read host list from file: " + fileName);
}
}
private static HostSet readFile(String type, String filename)
throws IOException {
HostSet res = new HostSet();
if (!filename.isEmpty()) {
HashSet<String> entrySet = new HashSet<String>();
HostsFileReader.readFileToSet(type, filename, entrySet);
for (String str : entrySet) {
InetSocketAddress addr = parseEntry(type, filename, str);
if (addr != null) {
res.add(addr);
}
}
}
return res;
}
public void testFileModifications() throws Exception {
System.out.println(TEST_ROOT_DIR);
Configuration conf = new Configuration();
File hosts = new File(TEST_ROOT_DIR, "hosts.file");
if (!hosts.exists()) {
hosts.createNewFile();
}
FileWriter writer = new FileWriter(hosts);
writer.write("host1.host.com\n");
writer.write("host2.host.com\n");
writer.close();
TTMover mover = new TTMoverTestStub(TEST_ROOT_DIR.toString());
mover.setConf(conf);
mover.addHostToFile(hosts.getAbsolutePath(), "host3.host.com");
HostsFileReader reader =
new HostsFileReader(hosts.getAbsolutePath(), hosts.getAbsolutePath());
System.out.println(reader.getHosts().toString());
assertEquals(3, reader.getHosts().size());
mover.removeHostFromFile(hosts.getAbsolutePath(), "host1.host.com");
reader.refresh();
assertEquals(2, reader.getHosts().size());
mover.restoreFile(hosts.getAbsolutePath());
reader.refresh();
assertEquals(2, reader.getHosts().size());
assertTrue(reader.getHosts().contains("host1.host.com"));
assertFalse(reader.getHosts().contains("host3.host.com"));
}
public void testHostRemove() throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.hosts", "hosts.include");
conf.set("mapred.hosts.exclude", "hosts.exclude");
File hostsInclude = new File(TEST_ROOT_DIR, "hosts.include");
File hostsExclude = new File(TEST_ROOT_DIR, "hosts.exclude");
File slaves = new File(TEST_ROOT_DIR, "slaves");
if (hostsExclude.exists()) {
hostsExclude.delete();
}
hostsExclude.createNewFile();
FileWriter writer = new FileWriter(hostsInclude);
writer.write("host1\nhost2\n");
writer.close();
writer = new FileWriter(slaves);
writer.write("host1\nhost2\n");
writer.close();
TTMoverTestStub mover = new TTMoverTestStub(TEST_ROOT_DIR.toString());
mover.setConf(conf);
mover.run(new String[]{"-remove", "host1"});
HostsFileReader reader =
new HostsFileReader(hostsInclude.getAbsolutePath(),
hostsExclude.getAbsolutePath());
assertTrue(reader.getExcludedHosts().contains("host1"));
assertTrue(reader.getHosts().contains("host2"));
assertFalse(reader.getHosts().contains("host1"));
}
public void testHostAdd() throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.hosts", "hosts.include");
conf.set("mapred.hosts.exclude", "hosts.exclude");
File hostsInclude = new File(TEST_ROOT_DIR, "hosts.include");
File hostsExclude = new File(TEST_ROOT_DIR, "hosts.exclude");
File slaves = new File(TEST_ROOT_DIR, "slaves");
FileWriter writer = new FileWriter(hostsInclude);
writer.write("host1\nhost2\n");
writer.close();
writer = new FileWriter(slaves);
writer.write("host1\nhost2\n");
writer.close();
writer = new FileWriter(hostsExclude);
writer.write("host3\n");
writer.close();
HostsFileReader reader =
new HostsFileReader(hostsInclude.getAbsolutePath(),
hostsExclude.getAbsolutePath());
assertEquals(2, reader.getHosts().size());
TTMoverTestStub mover = new TTMoverTestStub(TEST_ROOT_DIR.toString());
mover.setConf(conf);
mover.run(new String[]{"-add", "host3"});
reader.refresh();
assertFalse(reader.getExcludedHosts().contains("host3"));
assertTrue(reader.getHosts().contains("host3"));
}
/** {@inheritDoc} */
@Override
public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap, HostsFileReader hostsReader,
DNSToSwitchMapping dnsToSwitchMapping, FSNamesystem namesystem) {
super.initialize(conf, stats, clusterMap,
hostsReader, dnsToSwitchMapping, namesystem);
this.conf = conf;
this.namesystem = namesystem;
this.cachedLocatedBlocks = new CachedLocatedBlocks(this.conf, namesystem);
this.cachedFullPathNames = new CachedFullPathNames(this.conf, namesystem);
}
/**
* Construct ClusterManager given {@link CoronaConf}
*
* @param conf the configuration for the ClusterManager
* @throws IOException
*/
public ClusterManager(CoronaConf conf) throws IOException {
this.conf = conf;
initLegalTypes();
metrics = new ClusterManagerMetrics(getTypes());
sessionManager = new SessionManager(this);
sessionManager.setConf(conf);
sessionHistoryManager = new SessionHistoryManager();
sessionHistoryManager.setConf(conf);
HostsFileReader hostsReader =
new HostsFileReader(conf.getHostsFile(), conf.getExcludesFile());
nodeManager = new NodeManager(this, hostsReader);
nodeManager.setConf(conf);
sessionNotifier = new SessionNotifier(sessionManager, this, metrics);
sessionNotifier.setConf(conf);
scheduler = new Scheduler(nodeManager, sessionManager,
sessionNotifier, getTypes(), metrics, conf);
scheduler.start();
metrics.registerUpdater(scheduler, sessionNotifier);
InetSocketAddress infoSocAddr =
NetUtils.createSocketAddr(conf.getClusterManagerHttpAddress());
infoServer =
new HttpServer("cm", infoSocAddr.getHostName(), infoSocAddr.getPort(),
infoSocAddr.getPort() == 0, conf);
infoServer.setAttribute("cm", this);
infoServer.start();
startTime = clock.getTime();
hostName = infoSocAddr.getHostName();
safeMode = false;
}
/**
* NodeManager constructor given a cluster manager and a
* {@link HostsFileReader} for includes/excludes lists
* @param clusterManager the cluster manager
* @param hostsReader the host reader for includes/excludes
*/
public NodeManager(
ClusterManager clusterManager, HostsFileReader hostsReader) {
this.hostsReader = hostsReader;
LOG.info("Included hosts: " + hostsReader.getHostNames().size() +
" Excluded hosts: " + hostsReader.getExcludedHosts().size());
this.clusterManager = clusterManager;
this.expireNodesThread = new Thread(this.expireNodes,
"expireNodes");
this.expireNodesThread.setDaemon(true);
this.expireNodesThread.start();
this.faultManager = new FaultManager(this);
}
private static boolean sendCommand(Configuration conf, String path)
throws IOException {
setupSslProps(conf);
int sslPort = getSslAddr(conf).getPort();
int err = 0;
StringBuilder b = new StringBuilder();
HostsFileReader hostsReader = new HostsFileReader(conf.get("hdfsproxy.hosts",
"hdfsproxy-hosts"), "");
Set<String> hostsList = hostsReader.getHosts();
for (String hostname : hostsList) {
HttpsURLConnection connection = null;
try {
connection = openConnection(hostname, sslPort, path);
connection.connect();
if (connection.getResponseCode() != HttpServletResponse.SC_OK) {
b.append("\n\t" + hostname + ": " + connection.getResponseCode()
+ " " + connection.getResponseMessage());
err++;
}
} catch (IOException e) {
b.append("\n\t" + hostname + ": " + e.getLocalizedMessage());
err++;
} finally {
if (connection != null)
connection.disconnect();
}
}
if (err > 0) {
System.err.print("Command failed on the following "
+ err + " host" + (err==1?":":"s:") + b.toString() + "\n");
return true;
}
return false;
}
/** {@inheritDoc} */
public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap, HostsFileReader hostsReader,
DNSToSwitchMapping dnsToSwitchMapping, FSNamesystem ns) {
this.considerLoad = conf.getBoolean("dfs.replication.considerLoad", true);
this.stats = stats;
this.clusterMap = clusterMap;
Configuration newConf = new Configuration();
this.attemptMultiplier = newConf.getInt("dfs.replication.attemptMultiplier", 200);
}
private static boolean sendCommand(Configuration conf, String path)
throws IOException {
setupSslProps(conf);
int sslPort = getSslAddr(conf).getPort();
int err = 0;
StringBuilder b = new StringBuilder();
HostsFileReader hostsReader = new HostsFileReader(conf.get("hdfsproxy.hosts",
"hdfsproxy-hosts"), "");
Set<String> hostsList = hostsReader.getHosts();
for (String hostname : hostsList) {
HttpsURLConnection connection = null;
try {
connection = openConnection(hostname, sslPort, path);
connection.connect();
if (connection.getResponseCode() != HttpServletResponse.SC_OK) {
b.append("\n\t" + hostname + ": " + connection.getResponseCode()
+ " " + connection.getResponseMessage());
err++;
}
} catch (IOException e) {
b.append("\n\t" + hostname + ": " + e.getLocalizedMessage());
err++;
} finally {
if (connection != null)
connection.disconnect();
}
}
if (err > 0) {
System.err.print("Command failed on the following "
+ err + " host" + (err==1?":":"s:") + b.toString() + "\n");
return true;
}
return false;
}
@VisibleForTesting
public HostsFileReader getHostsReader() {
return this.hostsReader;
}
@Before
public void setUp() throws Exception {
InlineDispatcher rmDispatcher = new InlineDispatcher();
rmContext =
new RMContextImpl(rmDispatcher, null, null, null,
mock(DelegationTokenRenewer.class), null, null, null, null, null);
NodesListManager nodesListManager = mock(NodesListManager.class);
HostsFileReader reader = mock(HostsFileReader.class);
when(nodesListManager.getHostsReader()).thenReturn(reader);
((RMContextImpl) rmContext).setNodesListManager(nodesListManager);
scheduler = mock(YarnScheduler.class);
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
eventType = event.getType();
if (eventType == SchedulerEventType.NODE_UPDATE) {
List<UpdatedContainerInfo> lastestContainersInfoList =
((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates();
for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) {
completedContainers.addAll(lastestContainersInfo.getCompletedContainers());
}
}
return null;
}
}
).when(scheduler).handle(any(SchedulerEvent.class));
rmDispatcher.register(SchedulerEventType.class,
new TestSchedulerEventDispatcher());
rmDispatcher.register(NodesListManagerEventType.class,
new TestNodeListManagerEventDispatcher());
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
nodesListManagerEvent = null;
}
@VisibleForTesting
public HostsFileReader getHostsReader() {
return this.hostsReader;
}
@Before
public void setUp() throws Exception {
InlineDispatcher rmDispatcher = new InlineDispatcher();
rmContext =
new RMContextImpl(rmDispatcher, null, null, null,
mock(DelegationTokenRenewer.class), null, null, null, null, null);
NodesListManager nodesListManager = mock(NodesListManager.class);
HostsFileReader reader = mock(HostsFileReader.class);
when(nodesListManager.getHostsReader()).thenReturn(reader);
((RMContextImpl) rmContext).setNodesListManager(nodesListManager);
scheduler = mock(YarnScheduler.class);
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
eventType = event.getType();
if (eventType == SchedulerEventType.NODE_UPDATE) {
List<UpdatedContainerInfo> lastestContainersInfoList =
((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates();
for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) {
completedContainers.addAll(lastestContainersInfo.getCompletedContainers());
}
}
return null;
}
}
).when(scheduler).handle(any(SchedulerEvent.class));
rmDispatcher.register(SchedulerEventType.class,
new TestSchedulerEventDispatcher());
rmDispatcher.register(NodesListManagerEventType.class,
new TestNodeListManagerEventDispatcher());
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
nodesListManagerEvent = null;
}
public NodeManagerTestable(
ClusterManager clusterManager, CoronaConf conf) throws IOException {
super(clusterManager,
new HostsFileReader(conf.getHostsFile(), conf.getExcludesFile()));
}
BlockPlacementPolicyConfigurable(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap, HostsFileReader hostsReader,
DNSToSwitchMapping dnsToSwitchMapping) {
initialize(conf, stats, clusterMap, hostsReader, dnsToSwitchMapping, null);
}
/**
* Used to setup a BlockPlacementPolicy object. This should be defined by
* all implementations of a BlockPlacementPolicy.
*
* @param conf the configuration object
* @param stats retrieve cluster status from here
* @param clusterMap cluster topology
* @param namesystem the FSNamesystem
*/
abstract protected void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap,
HostsFileReader hostsReader,
DNSToSwitchMapping dnsToSwitchMapping,
FSNamesystem namesystem);