下面列出了org.apache.hadoop.io.MultipleIOException#org.apache.hadoop.hdfs.DFSClient 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* This method retrieve all the datanodes of a hdfs cluster
*/
private List<String> getDataNodes() throws IOException {
Configuration conf = new Configuration(false);
conf.addResource(new org.apache.hadoop.fs.Path(HdfsDataContext.getHdfsConfigDirectory(config)));
List<String> datanodesList = new ArrayList<>();
InetSocketAddress namenodeAddress = new InetSocketAddress(
HdfsDataContext.getHdfsNamenodeDefault(config),
HdfsDataContext.getHdfsNamenodePortDefault(config));
DFSClient dfsClient = new DFSClient(namenodeAddress, conf);
ClientProtocol nameNode = dfsClient.getNamenode();
DatanodeInfo[] datanodeReport =
nameNode.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
for (DatanodeInfo di : datanodeReport) {
datanodesList.add(di.getHostName());
}
return datanodesList;
}
Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle dirHandle,
String fileName) throws IOException {
String fileIdPath = Nfs3Utils.getFileIdPath(dirHandle) + "/" + fileName;
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) {
OpenFileCtx openFileCtx = fileContextCache.get(new FileHandle(attr
.getFileId()));
if (openFileCtx != null) {
attr.setSize(openFileCtx.getNextOffset());
attr.setUsed(openFileCtx.getNextOffset());
}
}
return attr;
}
private CacheLoader<String, DFSClient> clientLoader() {
return new CacheLoader<String, DFSClient>() {
@Override
public DFSClient load(String userName) throws Exception {
UserGroupInformation ugi = getUserGroupInformation(
userName,
UserGroupInformation.getCurrentUser());
// Guava requires CacheLoader never returns null.
return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
@Override
public DFSClient run() throws IOException {
return new DFSClient(NameNode.getAddress(config), config);
}
});
}
};
}
@Test
public void testEviction() throws IOException {
NfsConfiguration conf = new NfsConfiguration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost");
// Only one entry will be in the cache
final int MAX_CACHE_SIZE = 1;
DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE);
DFSClient c1 = cache.getDfsClient("test1");
assertTrue(cache.getDfsClient("test1").toString().contains("ugi=test1"));
assertEquals(c1, cache.getDfsClient("test1"));
assertFalse(isDfsClientClose(c1));
cache.getDfsClient("test2");
assertTrue(isDfsClientClose(c1));
assertTrue("cache size should be the max size or less",
cache.clientCache.size() <= MAX_CACHE_SIZE);
}
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws ServletException, IOException {
final PrintWriter out = response.getWriter();
final String path = ServletUtil.getDecodedPath(request, "/getFileChecksum");
final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
xml.declaration();
final ServletContext context = getServletContext();
final DataNode datanode = (DataNode) context.getAttribute("datanode");
final Configuration conf =
new HdfsConfiguration(datanode.getConf());
try {
final DFSClient dfs = DatanodeJspHelper.getDFSClient(request,
datanode, conf, getUGI(request, conf));
final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path, Long.MAX_VALUE);
MD5MD5CRC32FileChecksum.write(xml, checksum);
} catch(IOException ioe) {
writeXml(ioe, path, xml);
} catch (InterruptedException e) {
writeXml(e, path, xml);
}
xml.endDocument();
}
/**
* Get information about a domain socket path.
*
* @param addr The inet address to use.
* @param conf The client configuration.
*
* @return Information about the socket path.
*/
public PathInfo getPathInfo(InetSocketAddress addr, DFSClient.Conf conf) {
// If there is no domain socket path configured, we can't use domain
// sockets.
if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED;
// If we can't do anything with the domain socket, don't create it.
if (!conf.isDomainSocketDataTraffic() &&
(!conf.isShortCircuitLocalReads() || conf.isUseLegacyBlockReaderLocal())) {
return PathInfo.NOT_CONFIGURED;
}
// If the DomainSocket code is not loaded, we can't create
// DomainSocket objects.
if (DomainSocket.getLoadingFailureReason() != null) {
return PathInfo.NOT_CONFIGURED;
}
// UNIX domain sockets can only be used to talk to local peers
if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED;
String escapedPath = DomainSocket.getEffectivePath(
conf.getDomainSocketPath(), addr.getPort());
PathState status = pathMap.getIfPresent(escapedPath);
if (status == null) {
return new PathInfo(escapedPath, PathState.VALID);
} else {
return new PathInfo(escapedPath, status);
}
}
private DFSClient genClientWithDummyHandler() throws IOException {
URI nnUri = dfs.getUri();
FailoverProxyProvider<ClientProtocol> failoverProxyProvider =
NameNodeProxies.createFailoverProxyProvider(conf,
nnUri, ClientProtocol.class, true, null);
InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
Integer.MAX_VALUE,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT));
ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[] { ClientProtocol.class }, dummyHandler);
DFSClient client = new DFSClient(null, proxy, conf, null);
return client;
}
private void lostFoundInit(DFSClient dfs) {
lfInited = true;
try {
String lfName = "/lost+found";
final HdfsFileStatus lfStatus = dfs.getFileInfo(lfName);
if (lfStatus == null) { // not exists
lfInitedOk = dfs.mkdirs(lfName, null, true);
lostFound = lfName;
} else if (!lfStatus.isDir()) { // exists but not a directory
LOG.warn("Cannot use /lost+found : a regular file with this name exists.");
lfInitedOk = false;
} else { // exists and is a directory
lostFound = lfName;
lfInitedOk = true;
}
} catch (Exception e) {
e.printStackTrace();
lfInitedOk = false;
}
if (lostFound == null) {
LOG.warn("Cannot initialize /lost+found .");
lfInitedOk = false;
internalError = true;
}
}
private void onGetFileChecksum(ChannelHandlerContext ctx) throws IOException {
MD5MD5CRC32FileChecksum checksum = null;
final String nnId = params.namenodeId();
DFSClient dfsclient = newDfsClient(nnId, conf);
try {
checksum = dfsclient.getFileChecksum(path, Long.MAX_VALUE);
dfsclient.close();
dfsclient = null;
} finally {
IOUtils.cleanup(LOG, dfsclient);
}
final byte[] js = JsonUtil.toJsonString(checksum).getBytes(Charsets.UTF_8);
DefaultFullHttpResponse resp =
new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(js));
resp.headers().set(CONTENT_TYPE, APPLICATION_JSON_UTF8);
resp.headers().set(CONTENT_LENGTH, js.length);
resp.headers().set(CONNECTION, CLOSE);
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
}
/**
* Verifies that attempting to resolve a non-symlink results in client
* exception
*/
@Test
public void testLinkTargetNonSymlink() throws UnsupportedFileSystemException,
IOException {
FileContext fc = null;
Path notSymlink = new Path("/notasymlink");
try {
fc = FileContext.getFileContext(cluster.getFileSystem().getUri());
fc.create(notSymlink, EnumSet.of(CreateFlag.CREATE));
DFSClient client = new DFSClient(cluster.getFileSystem().getUri(),
cluster.getConfiguration(0));
try {
client.getLinkTarget(notSymlink.toString());
fail("Expected exception for resolving non-symlink");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("is not a symbolic link", e);
}
} finally {
if (fc != null) {
fc.delete(notSymlink, false);
}
}
}
public TerrapinControllerServiceImpl(PropertiesConfiguration configuration,
ZooKeeperManager zkManager,
DFSClient hdfsClient,
HelixAdmin helixAdmin,
String clusterName) {
this.configuration = configuration;
this.zkManager = zkManager;
this.hdfsClient = hdfsClient;
this.helixAdmin = helixAdmin;
this.clusterName = clusterName;
ExecutorService threadPool = new ThreadPoolExecutor(100,
100,
0,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(1000),
new ThreadFactoryBuilder().setDaemon(false)
.setNameFormat("controller-pool-%d")
.build());
this.futurePool = new ExecutorServiceFuturePool(threadPool);
}
private void lostFoundInit(DFSClient dfs) {
lfInited = true;
try {
String lfName = "/lost+found";
// check that /lost+found exists
if (!dfs.exists(lfName)) {
lfInitedOk = dfs.mkdirs(lfName);
lostFound = lfName;
} else if (!dfs.isDirectory(lfName)) {
LOG.warn("Cannot use /lost+found : a regular file with this name exists.");
lfInitedOk = false;
} else { // exists and isDirectory
lostFound = lfName;
lfInitedOk = true;
}
} catch (Exception e) {
e.printStackTrace();
lfInitedOk = false;
}
if (lostFound == null) {
LOG.warn("Cannot initialize /lost+found .");
lfInitedOk = false;
}
}
/** {@inheritDoc} */
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws ServletException, IOException {
final UnixUserGroupInformation ugi = getUGI(request);
final PrintWriter out = response.getWriter();
final String filename = getFilename(request, response);
final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
xml.declaration();
final Configuration conf = new Configuration(DataNode.getDataNode().getConf());
final int socketTimeout = conf.getInt("dfs.socket.timeout", HdfsConstants.READ_TIMEOUT);
final SocketFactory socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
UnixUserGroupInformation.saveToConf(conf,
UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
final ClientProtocol nnproxy = DFSClient.createNamenode(conf);
try {
final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
filename, nnproxy, socketFactory, socketTimeout);
MD5MD5CRC32FileChecksum.write(xml, checksum);
} catch(IOException ioe) {
new RemoteException(ioe.getClass().getName(), ioe.getMessage()
).writeXml(filename, xml);
}
xml.endDocument();
}
public RpcProgramMountd(NfsConfiguration config,
DatagramSocket registrationSocket, boolean allowInsecurePorts)
throws IOException {
// Note that RPC cache is not enabled
super("mountd", "localhost", config.getInt(
NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY,
NfsConfigKeys.DFS_NFS_MOUNTD_PORT_DEFAULT), PROGRAM, VERSION_1,
VERSION_3, registrationSocket, allowInsecurePorts);
exports = new ArrayList<String>();
exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY,
NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT));
this.hostsMatcher = NfsExports.getInstance(config);
this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
UserGroupInformation.setConfiguration(config);
SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY,
NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY);
this.dfsClient = new DFSClient(NameNode.getAddress(config), config);
}
@Test (timeout=60000)
public void testRemoveCacheDescriptor() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new RemoveCacheDirectiveInfoOp(client, "pool",
"/path");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testAddCacheDirectiveInfo() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new AddCacheDirectiveInfoOp(client,
new CacheDirectiveInfo.Builder().
setPool("pool").
setPath(new Path("/path")).
build());
testClientRetryWithFailover(op);
}
@Test(timeout=60000, expected = SnapshotAccessControlException.class)
public void testCreateSymlink() throws Exception {
@SuppressWarnings("deprecation")
DFSClient dfsclient = new DFSClient(conf);
dfsclient.createSymlink(sub2.toString(), "/TestSnapshot/sub1/.snapshot",
false);
}
private void test(long[] capacities, String[] racks,
long newCapacity, String newRack) throws Exception {
int numOfDatanodes = capacities.length;
assertEquals(numOfDatanodes, racks.length);
cluster = new MiniDFSCluster(0, CONF, capacities.length, true, true, null,
racks, capacities);
try {
cluster.waitActive();
client = DFSClient.createNamenode(CONF);
long totalCapacity=0L;
for(long capacity:capacities) {
totalCapacity += capacity;
}
// fill up the cluster to be 30% full
long totalUsedSpace = totalCapacity*3/10;
createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(CONF, 1, true, null,
new String[]{newRack}, new long[]{newCapacity});
totalCapacity += newCapacity;
// run balancer and validate results
runBalancer(CONF, totalUsedSpace, totalCapacity);
} finally {
cluster.shutdown();
}
}
public CorruptedTestFile(String name, Set<Integer> blocksToCorrupt,
DFSClient dfsClient, int numDataNodes, int blockSize)
throws IOException {
this.name = name;
this.blocksToCorrupt = blocksToCorrupt;
this.dfsClient = dfsClient;
this.numDataNodes = numDataNodes;
this.blockSize = blockSize;
this.initialContents = cacheInitialContents();
}
private RemovalListener<String, DFSClient> clientRemovalListener() {
return new RemovalListener<String, DFSClient>() {
@Override
public void onRemoval(RemovalNotification<String, DFSClient> notification) {
DFSClient client = notification.getValue();
try {
client.close();
} catch (IOException e) {
LOG.warn(String.format(
"IOException when closing the DFSClient(%s), cause: %s", client,
e));
}
}
};
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
try {
MiniDFSCluster cluster = UTIL.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
DistributedFileSystem fs = cluster.getFileSystem();
Method enableAllECPolicies =
DFSTestUtil.class.getMethod("enableAllECPolicies", DistributedFileSystem.class);
enableAllECPolicies.invoke(null, fs);
DFSClient client = fs.getClient();
Method setErasureCodingPolicy =
DFSClient.class.getMethod("setErasureCodingPolicy", String.class, String.class);
setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy
try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
// If this comes back as having hflush then some test setup assumption is wrong.
// Fail the test so that a developer has to look and triage
assertFalse("Did not enable EC!", out.hasCapability(StreamCapabilities.HFLUSH));
}
} catch (NoSuchMethodException e) {
// We're not testing anything interesting if EC is not available, so skip the rest of the test
Assume.assumeNoException("Using an older version of hadoop; EC not available.", e);
}
UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
}
private void testUnevenDistribution(
long distribution[], long capacities[], String[] racks) throws Exception {
int numDatanodes = distribution.length;
if (capacities.length != numDatanodes || racks.length != numDatanodes) {
throw new IllegalArgumentException("Array length is not the same");
}
// calculate total space that need to be filled
long totalUsedSpace=0L;
for(int i=0; i<distribution.length; i++) {
totalUsedSpace += distribution[i];
}
// fill the cluster
Block[] blocks = generateBlocks(totalUsedSpace, (short)numDatanodes);
// redistribute blocks
Block[][] blocksDN = distributeBlocks(
blocks, (short)(numDatanodes-1), distribution);
// restart the cluster: do NOT format the cluster
CONF.set("dfs.safemode.threshold.pct", "0.0f");
cluster = new MiniDFSCluster(0, CONF, numDatanodes,
false, true, null, racks, capacities);
cluster.waitActive();
client = DFSClient.createNamenode(CONF);
cluster.injectBlocks(blocksDN);
long totalCapacity = 0L;
for(long capacity:capacities) {
totalCapacity += capacity;
}
runBalancer(totalUsedSpace, totalCapacity);
}
private DatanodeInfo bestNode(DFSClient dfs, DatanodeInfo[] nodes,
TreeSet<DatanodeInfo> deadNodes) throws IOException {
if ((nodes == null) ||
(nodes.length - deadNodes.size() < 1)) {
throw new IOException("No live nodes contain current block");
}
DatanodeInfo chosenNode;
do {
chosenNode = nodes[r.nextInt(nodes.length)];
} while (deadNodes.contains(chosenNode));
return chosenNode;
}
public static WccAttr getWccAttr(DFSClient client, String fileIdPath)
throws IOException {
HdfsFileStatus fstat = getFileStatus(client, fileIdPath);
if (fstat == null) {
return null;
}
long size = fstat.isDir() ? getDirSize(fstat.getChildrenNum()) : fstat
.getLen();
return new WccAttr(size, new NfsTime(fstat.getModificationTime()),
new NfsTime(fstat.getModificationTime()));
}
private void test(long[] capacities, String[] racks,
long newCapacity, String newRack) throws Exception {
int numOfDatanodes = capacities.length;
assertEquals(numOfDatanodes, racks.length);
cluster = new MiniDFSCluster(0, CONF, capacities.length, true, true, null,
racks, capacities);
try {
cluster.waitActive();
client = DFSClient.createNamenode(CONF);
long totalCapacity=0L;
for(long capacity:capacities) {
totalCapacity += capacity;
}
// fill up the cluster to be 30% full
long totalUsedSpace = totalCapacity*3/10;
createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(CONF, 1, true, null,
new String[]{newRack}, new long[]{newCapacity});
totalCapacity += newCapacity;
// run balancer and validate results
runBalancer(totalUsedSpace, totalCapacity);
} finally {
cluster.shutdown();
}
}
@Test
public void ensureInvalidBlockTokensAreRejected() throws IOException,
URISyntaxException {
cluster.transitionToActive(0);
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA);
assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH));
DFSClient dfsClient = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs);
DFSClient spyDfsClient = Mockito.spy(dfsClient);
Mockito.doAnswer(
new Answer<LocatedBlocks>() {
@Override
public LocatedBlocks answer(InvocationOnMock arg0) throws Throwable {
LocatedBlocks locatedBlocks = (LocatedBlocks)arg0.callRealMethod();
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
Token<BlockTokenIdentifier> token = lb.getBlockToken();
BlockTokenIdentifier id = lb.getBlockToken().decodeIdentifier();
// This will make the token invalid, since the password
// won't match anymore
id.setExpiryDate(Time.now() + 10);
Token<BlockTokenIdentifier> newToken =
new Token<BlockTokenIdentifier>(id.getBytes(),
token.getPassword(), token.getKind(), token.getService());
lb.setBlockToken(newToken);
}
return locatedBlocks;
}
}).when(spyDfsClient).getLocatedBlocks(Mockito.anyString(),
Mockito.anyLong(), Mockito.anyLong());
DFSClientAdapter.setDFSClient((DistributedFileSystem)fs, spyDfsClient);
try {
assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH));
fail("Shouldn't have been able to read a file with invalid block tokens");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("Could not obtain block", ioe);
}
}
/**
* Check the commit status with the given offset
* @param commitOffset the offset to commit
* @param channel the channel to return response
* @param xid the xid of the commit request
* @param preOpAttr the preOp attribute
* @param fromRead whether the commit is triggered from read request
* @return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
* COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR
*/
public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset,
Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) {
if (!fromRead) {
Preconditions.checkState(channel != null && preOpAttr != null);
// Keep stream active
updateLastAccessTime();
}
Preconditions.checkState(commitOffset >= 0);
COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid,
preOpAttr, fromRead);
if (LOG.isDebugEnabled()) {
LOG.debug("Got commit status: " + ret.name());
}
// Do the sync outside the lock
if (ret == COMMIT_STATUS.COMMIT_DO_SYNC
|| ret == COMMIT_STATUS.COMMIT_FINISHED) {
try {
// Sync file data and length
fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
ret = COMMIT_STATUS.COMMIT_FINISHED; // Remove COMMIT_DO_SYNC status
// Nothing to do for metadata since attr related change is pass-through
} catch (ClosedChannelException cce) {
if (pendingWrites.isEmpty()) {
ret = COMMIT_STATUS.COMMIT_FINISHED;
} else {
ret = COMMIT_STATUS.COMMIT_ERROR;
}
} catch (IOException e) {
LOG.error("Got stream error during data sync: " + e);
// Do nothing. Stream will be closed eventually by StreamMonitor.
// status = Nfs3Status.NFS3ERR_IO;
ret = COMMIT_STATUS.COMMIT_ERROR;
}
}
return ret;
}
@Test
public void ensureInvalidBlockTokensAreRejected() throws IOException,
URISyntaxException {
cluster.transitionToActive(0);
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA);
assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH));
DFSClient dfsClient = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs);
DFSClient spyDfsClient = Mockito.spy(dfsClient);
Mockito.doAnswer(
new Answer<LocatedBlocks>() {
@Override
public LocatedBlocks answer(InvocationOnMock arg0) throws Throwable {
LocatedBlocks locatedBlocks = (LocatedBlocks)arg0.callRealMethod();
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
Token<BlockTokenIdentifier> token = lb.getBlockToken();
BlockTokenIdentifier id = lb.getBlockToken().decodeIdentifier();
// This will make the token invalid, since the password
// won't match anymore
id.setExpiryDate(Time.now() + 10);
Token<BlockTokenIdentifier> newToken =
new Token<BlockTokenIdentifier>(id.getBytes(),
token.getPassword(), token.getKind(), token.getService());
lb.setBlockToken(newToken);
}
return locatedBlocks;
}
}).when(spyDfsClient).getLocatedBlocks(Mockito.anyString(),
Mockito.anyLong(), Mockito.anyLong());
DFSClientAdapter.setDFSClient((DistributedFileSystem)fs, spyDfsClient);
try {
assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH));
fail("Shouldn't have been able to read a file with invalid block tokens");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("Could not obtain block", ioe);
}
}
@Test
public void testCheckCommitAixCompatMode() throws IOException {
DFSClient dfsClient = Mockito.mock(DFSClient.class);
Nfs3FileAttributes attr = new Nfs3FileAttributes();
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
NfsConfiguration conf = new NfsConfiguration();
conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
// Enable AIX compatibility mode.
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
new ShellBasedIdMapping(new NfsConfiguration()), true, conf);
// Test fall-through to pendingWrites check in the event that commitOffset
// is greater than the number of bytes we've so far flushed.
Mockito.when(fos.getPos()).thenReturn((long) 2);
COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_FINISHED);
// Test the case when we actually have received more bytes than we're trying
// to commit.
ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
Mockito.when(fos.getPos()).thenReturn((long) 10);
ctx.setNextOffsetForTest((long)10);
status = ctx.checkCommitInternal(5, null, 1, attr, false);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
}
private CacheLoader<DFSInputStreamCaheKey, FSDataInputStream> inputStreamLoader() {
return new CacheLoader<DFSInputStreamCaheKey, FSDataInputStream>() {
@Override
public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception {
DFSClient client = getDfsClient(key.userId);
DFSInputStream dis = client.open(key.inodePath);
return client.createWrappedInputStream(dis);
}
};
}