下面列出了org.apache.hadoop.io.MultipleIOException#org.apache.hadoop.hdfs.server.protocol.NamespaceInfo 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private NamespaceInfo handshake(Configuration conf) throws IOException {
// connect to name node
InetSocketAddress nnAddress = NameNode.getServiceAddress(conf, true);
this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddress,
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(),
true).getProxy();
this.nnRpcAddress = NetUtils.getHostPortString(nnAddress);
this.nnHttpAddress = DFSUtil.getInfoServer(nnAddress, conf,
DFSUtil.getHttpClientScheme(conf)).toURL();
// get version and id info from the name-node
NamespaceInfo nsInfo = null;
while(!isStopRequested()) {
try {
nsInfo = handshake(namenode);
break;
} catch(SocketTimeoutException e) { // name-node is busy
LOG.info("Problem connecting to server: " + nnAddress);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
LOG.warn("Encountered exception ", e);
}
}
}
return nsInfo;
}
private void connectToNNAndHandshake() throws IOException {
// get NN proxy
bpNamenode = dn.connectToNN(nnAddr);
// First phase of the handshake with NN - get the namespace
// info.
NamespaceInfo nsInfo = retrieveNamespaceInfo();
// Verify that this matches the other NN in this HA pair.
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
bpos.verifyAndSetNamespaceInfo(nsInfo);
// Second phase of the handshake with the NN.
register(nsInfo);
}
private void connectToNNAndHandshake() throws IOException {
// get NN proxy
bpNamenode = dn.connectToNN(nnAddr);
// First phase of the handshake with NN - get the namespace
// info.
NamespaceInfo nsInfo = retrieveNamespaceInfo();
// Verify that this matches the other NN in this HA pair.
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
bpos.verifyAndSetNamespaceInfo(nsInfo);
// Second phase of the handshake with the NN.
register(nsInfo);
}
/**
* One of the Block Pools has successfully connected to its NN.
* This initializes the local storage for that block pool,
* checks consistency of the NN's cluster ID, etc.
*
* If this is the first block pool to register, this also initializes
* the datanode-scoped storage.
*
* @param bpos Block pool offer service
* @throws IOException if the NN is inconsistent with the local storage.
*/
void initBlockPool(BPOfferService bpos) throws IOException {
NamespaceInfo nsInfo = bpos.getNamespaceInfo();
if (nsInfo == null) {
throw new IOException("NamespaceInfo not found: Block pool " + bpos
+ " should have retrieved namespace info before initBlockPool.");
}
setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
// Register the new block pool with the BP manager.
blockPoolManager.addBlockPool(bpos);
// In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc.
initStorage(nsInfo);
// Exclude failed disks before initializing the block pools to avoid startup
// failures.
checkDiskError();
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
initDirectoryScanner(conf);
}
/**
* Analyze and load storage directories. Recover from previous transitions if
* required.
*
* The block pool storages are either all analyzed or none of them is loaded.
* Therefore, a failure on loading any block pool storage results a faulty
* data volume.
*
* @param datanode Datanode to which this storage belongs to
* @param nsInfo namespace information
* @param dataDirs storage directories of block pool
* @param startOpt startup option
* @return an array of loaded block pool directories.
* @throws IOException on error
*/
List<StorageDirectory> loadBpStorageDirectories(
DataNode datanode, NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt) throws IOException {
List<StorageDirectory> succeedDirs = Lists.newArrayList();
try {
for (File dataDir : dataDirs) {
if (containsStorageDir(dataDir)) {
throw new IOException(
"BlockPoolSliceStorage.recoverTransitionRead: " +
"attempt to load an used block storage: " + dataDir);
}
StorageDirectory sd =
loadStorageDirectory(datanode, nsInfo, dataDir, startOpt);
succeedDirs.add(sd);
}
} catch (IOException e) {
LOG.warn("Failed to analyze storage directories for block pool "
+ nsInfo.getBlockPoolID(), e);
throw e;
}
return succeedDirs;
}
@Test
public void testTwoWriters() throws Exception {
long start = 1;
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
bkjm1.format(nsi);
BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
EditLogOutputStream out1 = bkjm1.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
try {
bkjm2.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
fail("Shouldn't have been able to open the second writer");
} catch (IOException ioe) {
LOG.info("Caught exception as expected", ioe);
}finally{
out1.close();
}
}
@Test
public void testSimpleWrite() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
String zkpath = bkjm.finalizedLedgerZNode(1, 100);
assertNotNull(zkc.exists(zkpath, false));
assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
}
private QuorumJournalManager createSpyingQJM()
throws IOException, URISyntaxException {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) {
protected ExecutorService createSingleThreadExecutor() {
// Don't parallelize calls to the quorum in the tests.
// This makes the tests more deterministic.
return MoreExecutors.sameThreadExecutor();
}
};
return Mockito.spy(logger);
}
};
return closeLater(new QuorumJournalManager(
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory));
}
@Before
public void setUp() throws IOException {
mockDnConf = mock(DNConf.class);
doReturn(VersionInfo.getVersion()).when(mockDnConf).getMinimumNameNodeVersion();
DataNode mockDN = mock(DataNode.class);
doReturn(true).when(mockDN).shouldRun();
doReturn(mockDnConf).when(mockDN).getDnConf();
BPOfferService mockBPOS = mock(BPOfferService.class);
doReturn(mockDN).when(mockBPOS).getDataNode();
actor = new BPServiceActor(INVALID_ADDR, mockBPOS);
fakeNsInfo = mock(NamespaceInfo.class);
// Return a a good software version.
doReturn(VersionInfo.getVersion()).when(fakeNsInfo).getSoftwareVersion();
// Return a good layout version for now.
doReturn(HdfsConstants.NAMENODE_LAYOUT_VERSION).when(fakeNsInfo)
.getLayoutVersion();
DatanodeProtocolClientSideTranslatorPB fakeDnProt =
mock(DatanodeProtocolClientSideTranslatorPB.class);
when(fakeDnProt.versionRequest()).thenReturn(fakeNsInfo);
actor.setNameNode(fakeDnProt);
}
void setupNS(Configuration conf, AbstractList<File> dataDirs)
throws IOException {
// get NN proxy
DatanodeProtocol dnp =
(DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID, nnAddr, conf);
setNameNode(dnp);
// handshake with NN
NamespaceInfo nsInfo = handshake();
setNamespaceInfo(nsInfo);
synchronized(DataNode.this){
setupNSStorage();
}
nsRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
nsRegistration.setInfoPort(infoServer.getPort());
}
/**
* Format a namespace slice storage.
* @param sd the namespace storage
* @param nsInfo the name space info
* @throws IOException Signals that an I/O exception has occurred.
*/
private void format(StorageDirectory nsSdir, NamespaceInfo nsInfo) throws IOException {
LOG.info("Formatting namespace " + namespaceID + " directory "
+ nsSdir.getCurrentDir());
nsSdir.clearDirectory(); // create directory
File rbwDir = new File(nsSdir.getCurrentDir(), STORAGE_DIR_RBW);
File finalizedDir = new File(nsSdir.getCurrentDir(), STORAGE_DIR_FINALIZED);
LOG.info("Creating Directories : " + rbwDir + ", " + finalizedDir);
if (!rbwDir.mkdirs() || !finalizedDir.mkdirs()) {
throw new IOException("Cannot create directories : " + rbwDir + ", "
+ finalizedDir);
}
this.layoutVersion = FSConstants.LAYOUT_VERSION;
this.cTime = nsInfo.getCTime();
this.namespaceID = nsInfo.getNamespaceID();
this.storageType = NodeType.DATA_NODE;
nsSdir.write();
}
@Test
public void testNumberOfTransactions() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
long numTrans = bkjm.getNumberOfTransactions(1, true);
assertEquals(100, numTrans);
}
public static String buildPath(String journalId, long segmentTxId,
NamespaceInfo nsInfo) {
StringBuilder path = new StringBuilder("/getJournal?");
try {
path.append(JOURNAL_ID_PARAM).append("=")
.append(URLEncoder.encode(journalId, "UTF-8"));
path.append("&" + SEGMENT_TXID_PARAM).append("=")
.append(segmentTxId);
path.append("&" + STORAGEINFO_PARAM).append("=")
.append(URLEncoder.encode(nsInfo.toColonSeparatedString(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
// Never get here -- everyone supports UTF-8
throw new RuntimeException(e);
}
return path.toString();
}
private static QuorumJournalManager createRandomFaultyQJM(
MiniJournalCluster cluster, final Random seedGenerator)
throws IOException, URISyntaxException {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
return new RandomFaultyChannel(conf, nsInfo, journalId, addr,
seedGenerator.nextLong());
}
};
return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID),
FAKE_NSINFO, spyFactory);
}
@Test
public void testAddStorageDirectories() throws IOException,
URISyntaxException {
final int numLocations = 3;
final int numNamespace = 3;
List<StorageLocation> locations = createStorageLocations(numLocations);
// Add volumes for multiple namespaces.
List<NamespaceInfo> namespaceInfos = createNamespaceInfos(numNamespace);
for (NamespaceInfo ni : namespaceInfos) {
storage.addStorageLocations(mockDN, ni, locations, START_OPT);
for (StorageLocation sl : locations) {
checkDir(sl.getFile());
checkDir(sl.getFile(), ni.getBlockPoolID());
}
}
assertEquals(numLocations, storage.getNumStorageDirs());
locations = createStorageLocations(numLocations);
List<StorageLocation> addedLocation =
storage.addStorageLocations(mockDN, namespaceInfos.get(0),
locations, START_OPT);
assertTrue(addedLocation.isEmpty());
// The number of active storage dirs has not changed, since it tries to
// add the storage dirs that are under service.
assertEquals(numLocations, storage.getNumStorageDirs());
// Add more directories.
locations = createStorageLocations(6);
storage.addStorageLocations(mockDN, nsInfo, locations, START_OPT);
assertEquals(6, storage.getNumStorageDirs());
}
private static QuorumJournalManager createInjectableQJM(MiniJournalCluster cluster)
throws IOException, URISyntaxException {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
return new InvocationCountingChannel(conf, nsInfo, journalId, addr);
}
};
return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID),
FAKE_NSINFO, spyFactory);
}
/**
* Initializes the {@link #data}. The initialization is done only once, when
* handshake with the the first namenode is completed.
*/
private void initStorage(final NamespaceInfo nsInfo) throws IOException {
final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory
= FsDatasetSpi.Factory.getFactory(conf);
if (!factory.isSimulated()) {
final StartupOption startOpt = getStartupOption(conf);
if (startOpt == null) {
throw new IOException("Startup option not set.");
}
final String bpid = nsInfo.getBlockPoolID();
//read storage info, lock data dirs and transition fs state if necessary
synchronized (this) {
storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt);
}
final StorageInfo bpStorage = storage.getBPStorage(bpid);
LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
+ ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()
+ ";nsInfo=" + nsInfo + ";dnuuid=" + storage.getDatanodeUuid());
}
// If this is a newly formatted DataNode then assign a new DatanodeUuid.
checkDatanodeUuid();
synchronized(this) {
if (data == null) {
data = factory.newInstance(this, storage, conf);
}
}
}
/**
* Construct a custom journal manager.
* The class to construct is taken from the configuration.
* @param uri Uri to construct
* @return The constructed journal manager
* @throws IllegalArgumentException if no class is configured for uri
*/
private JournalManager createJournal(URI uri) {
Class<? extends JournalManager> clazz
= getJournalClass(conf, uri.getScheme());
try {
Constructor<? extends JournalManager> cons
= clazz.getConstructor(Configuration.class, URI.class,
NamespaceInfo.class);
return cons.newInstance(conf, uri, storage.getNamespaceInfo());
} catch (Exception e) {
throw new IllegalArgumentException("Unable to construct journal, "
+ uri, e);
}
}
/**
* Iterate over all the storage directories, checking if it should be
* formatted. Format the storage if necessary and allowed by the user.
* @return True if formatting is processed
*/
private boolean format(NNStorage storage, NamespaceInfo nsInfo)
throws IOException {
// Check with the user before blowing away data.
if (!Storage.confirmFormat(storage.dirIterable(null), force, interactive)) {
storage.close();
return false;
} else {
// Format the storage (writes VERSION file)
storage.format(nsInfo);
return true;
}
}
public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
List<Path> targetPaths, Configuration conf,
int maxNotChangedIterations)
throws IOException {
this.nameNodeUri = nameNodeUri;
this.idPath = idPath;
this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays
.asList(new Path("/")) : targetPaths;
this.maxNotChangedIterations = maxNotChangedIterations;
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
NamenodeProtocol.class).getProxy();
this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
ClientProtocol.class, fallbackToSimpleAuth).getProxy();
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest();
this.blockpoolID = namespaceinfo.getBlockPoolID();
final FsServerDefaults defaults = fs.getServerDefaults(new Path("/"));
this.keyManager = new KeyManager(blockpoolID, namenode,
defaults.getEncryptDataTransfer(), conf);
// if it is for test, we do not create the id file
out = checkAndMarkRunning();
if (out == null) {
// Exit if there is another one running.
throw new IOException("Another " + name + " is running.");
}
}
void reRegister() throws IOException {
if (shouldRun()) {
// re-retrieve namespace info to make sure that, if the NN
// was restarted, we still match its version (HDFS-2120)
NamespaceInfo nsInfo = retrieveNamespaceInfo();
// and re-register
register(nsInfo);
scheduler.scheduleHeartbeat();
}
}
/**
* Initializes the {@link #data}. The initialization is done only once, when
* handshake with the the first namenode is completed.
*/
private void initStorage(final NamespaceInfo nsInfo) throws IOException {
final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory
= FsDatasetSpi.Factory.getFactory(conf);
if (!factory.isSimulated()) {
final StartupOption startOpt = getStartupOption(conf);
if (startOpt == null) {
throw new IOException("Startup option not set.");
}
final String bpid = nsInfo.getBlockPoolID();
//read storage info, lock data dirs and transition fs state if necessary
synchronized (this) {
storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt);
}
final StorageInfo bpStorage = storage.getBPStorage(bpid);
LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
+ ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()
+ ";nsInfo=" + nsInfo + ";dnuuid=" + storage.getDatanodeUuid());
}
// If this is a newly formatted DataNode then assign a new DatanodeUuid.
checkDatanodeUuid();
synchronized(this) {
if (data == null) {
data = factory.newInstance(this, storage, conf);
}
}
}
private StorageDirectory loadStorageDirectory(DataNode datanode,
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt)
throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, false);
try {
StorageState curState = sd.analyzeStorage(startOpt, this);
// sd is locked but not opened
switch (curState) {
case NORMAL:
break;
case NON_EXISTENT:
LOG.info("Storage directory " + dataDir + " does not exist");
throw new IOException("Storage directory " + dataDir
+ " does not exist");
case NOT_FORMATTED: // format
LOG.info("Storage directory " + dataDir + " is not formatted for "
+ nsInfo.getBlockPoolID());
LOG.info("Formatting ...");
format(sd, nsInfo, datanode.getDatanodeUuid());
break;
default: // recovery part is common
sd.doRecover(curState);
}
// 2. Do transitions
// Each storage directory is treated individually.
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
doTransition(datanode, sd, nsInfo, startOpt);
// 3. Update successfully loaded storage.
setServiceLayoutVersion(getServiceLayoutVersion());
writeProperties(sd);
return sd;
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
}
public DummyJournalManager(Configuration conf, URI u,
NamespaceInfo nsInfo) {
// Set static vars so the test case can verify them.
DummyJournalManager.conf = conf;
DummyJournalManager.uri = u;
DummyJournalManager.nsInfo = nsInfo;
}
private void setupNS() throws IOException {
// handshake with NN
NamespaceInfo nsInfo;
nsInfo = handshake(true);
setNamespaceInfo(nsInfo);
synchronized(AvatarDataNode.this){
setupNSStorage();
}
nsRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
nsRegistration.setInfoPort(infoServer.getPort());
}
@Before
public void setup() throws Exception {
spyLoggers = ImmutableList.of(
mockLogger(),
mockLogger(),
mockLogger());
qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) {
@Override
protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) {
return spyLoggers;
}
};
for (AsyncLogger logger : spyLoggers) {
futureReturns(GetJournalStateResponseProto.newBuilder()
.setLastPromisedEpoch(0)
.setHttpPort(-1)
.build())
.when(logger).getJournalState();
futureReturns(
NewEpochResponseProto.newBuilder().build()
).when(logger).newEpoch(Mockito.anyLong());
futureReturns(null).when(logger).format(Mockito.<NamespaceInfo>any());
}
qjm.recoverUnfinalizedSegments();
}
/**
* Format all configured journals which are not file-based.
*
* File-based journals are skipped, since they are formatted by the
* Storage format code.
*/
synchronized void formatNonFileJournals(NamespaceInfo nsInfo) throws IOException {
Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
"Bad state: %s", state);
for (JournalManager jm : journalSet.getJournalManagers()) {
if (!(jm instanceof FileJournalManager)) {
jm.format(nsInfo);
}
}
}
/**
* Verify the BKJM is creating the bookie available default path, when there
* is no 'dfs.namenode.bookkeeperjournal.zk.availablebookies' configured
*/
@Test
public void testDefaultBKAvailablePath() throws Exception {
Configuration conf = new Configuration();
Assert.assertNull(BK_ROOT_PATH + " already exists", zkc.exists(
BK_ROOT_PATH, false));
NamespaceInfo nsi = newNSInfo();
bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-DefaultBKPath"),
nsi);
bkjm.format(nsi);
Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH
+ " doesn't exists", zkc.exists(BK_ROOT_PATH, false));
}
public static NamespaceInfoProto convert(NamespaceInfo info) {
return NamespaceInfoProto.newBuilder()
.setBlockPoolID(info.getBlockPoolID())
.setBuildVersion(info.getBuildVersion())
.setUnused(0)
.setStorageInfo(PBHelper.convert((StorageInfo)info))
.setSoftwareVersion(info.getSoftwareVersion())
.setCapabilities(info.getCapabilities())
.build();
}
public static NamespaceInfoProto convert(NamespaceInfo info) {
return NamespaceInfoProto.newBuilder()
.setBlockPoolID(info.getBlockPoolID())
.setBuildVersion(info.getBuildVersion())
.setUnused(0)
.setStorageInfo(PBHelper.convert((StorageInfo)info))
.setSoftwareVersion(info.getSoftwareVersion())
.setCapabilities(info.getCapabilities())
.build();
}