org.apache.hadoop.io.MultipleIOException#org.apache.hadoop.hdfs.DFSClient源码实例Demo

下面列出了org.apache.hadoop.io.MultipleIOException#org.apache.hadoop.hdfs.DFSClient 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: twister2   文件: DataNodeLocatorUtils.java
/**
 * This method retrieve all the datanodes of a hdfs cluster
 */
private List<String> getDataNodes() throws IOException {

  Configuration conf = new Configuration(false);
  conf.addResource(new org.apache.hadoop.fs.Path(HdfsDataContext.getHdfsConfigDirectory(config)));

  List<String> datanodesList = new ArrayList<>();
  InetSocketAddress namenodeAddress = new InetSocketAddress(
      HdfsDataContext.getHdfsNamenodeDefault(config),
      HdfsDataContext.getHdfsNamenodePortDefault(config));
  DFSClient dfsClient = new DFSClient(namenodeAddress, conf);
  ClientProtocol nameNode = dfsClient.getNamenode();
  DatanodeInfo[] datanodeReport =
      nameNode.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
  for (DatanodeInfo di : datanodeReport) {
    datanodesList.add(di.getHostName());
  }
  return datanodesList;
}
 
源代码2 项目: hadoop   文件: WriteManager.java
Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle dirHandle,
    String fileName) throws IOException {
  String fileIdPath = Nfs3Utils.getFileIdPath(dirHandle) + "/" + fileName;
  Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);

  if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) {
    OpenFileCtx openFileCtx = fileContextCache.get(new FileHandle(attr
        .getFileId()));

    if (openFileCtx != null) {
      attr.setSize(openFileCtx.getNextOffset());
      attr.setUsed(openFileCtx.getNextOffset());
    }
  }
  return attr;
}
 
源代码3 项目: hadoop   文件: DFSClientCache.java
private CacheLoader<String, DFSClient> clientLoader() {
  return new CacheLoader<String, DFSClient>() {
    @Override
    public DFSClient load(String userName) throws Exception {
      UserGroupInformation ugi = getUserGroupInformation(
              userName,
              UserGroupInformation.getCurrentUser());

      // Guava requires CacheLoader never returns null.
      return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
        @Override
        public DFSClient run() throws IOException {
          return new DFSClient(NameNode.getAddress(config), config);
        }
      });
    }
  };
}
 
源代码4 项目: hadoop   文件: TestDFSClientCache.java
@Test
public void testEviction() throws IOException {
  NfsConfiguration conf = new NfsConfiguration();
  conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost");

  // Only one entry will be in the cache
  final int MAX_CACHE_SIZE = 1;

  DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE);

  DFSClient c1 = cache.getDfsClient("test1");
  assertTrue(cache.getDfsClient("test1").toString().contains("ugi=test1"));
  assertEquals(c1, cache.getDfsClient("test1"));
  assertFalse(isDfsClientClose(c1));

  cache.getDfsClient("test2");
  assertTrue(isDfsClientClose(c1));
  assertTrue("cache size should be the max size or less",
      cache.clientCache.size() <= MAX_CACHE_SIZE);
}
 
源代码5 项目: big-c   文件: FileChecksumServlets.java
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response
    ) throws ServletException, IOException {
  final PrintWriter out = response.getWriter();
  final String path = ServletUtil.getDecodedPath(request, "/getFileChecksum");
  final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
  xml.declaration();

  final ServletContext context = getServletContext();
  final DataNode datanode = (DataNode) context.getAttribute("datanode");
  final Configuration conf = 
    new HdfsConfiguration(datanode.getConf());
  
  try {
    final DFSClient dfs = DatanodeJspHelper.getDFSClient(request, 
        datanode, conf, getUGI(request, conf));
    final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path, Long.MAX_VALUE);
    MD5MD5CRC32FileChecksum.write(xml, checksum);
  } catch(IOException ioe) {
    writeXml(ioe, path, xml);
  } catch (InterruptedException e) {
    writeXml(e, path, xml);
  }
  xml.endDocument();
}
 
源代码6 项目: hadoop   文件: DomainSocketFactory.java
/**
 * Get information about a domain socket path.
 *
 * @param addr         The inet address to use.
 * @param conf         The client configuration.
 *
 * @return             Information about the socket path.
 */
public PathInfo getPathInfo(InetSocketAddress addr, DFSClient.Conf conf) {
  // If there is no domain socket path configured, we can't use domain
  // sockets.
  if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED;
  // If we can't do anything with the domain socket, don't create it.
  if (!conf.isDomainSocketDataTraffic() &&
      (!conf.isShortCircuitLocalReads() || conf.isUseLegacyBlockReaderLocal())) {
    return PathInfo.NOT_CONFIGURED;
  }
  // If the DomainSocket code is not loaded, we can't create
  // DomainSocket objects.
  if (DomainSocket.getLoadingFailureReason() != null) {
    return PathInfo.NOT_CONFIGURED;
  }
  // UNIX domain sockets can only be used to talk to local peers
  if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED;
  String escapedPath = DomainSocket.getEffectivePath(
      conf.getDomainSocketPath(), addr.getPort());
  PathState status = pathMap.getIfPresent(escapedPath);
  if (status == null) {
    return new PathInfo(escapedPath, PathState.VALID);
  } else {
    return new PathInfo(escapedPath, status);
  }
}
 
源代码7 项目: big-c   文件: TestRetryCacheWithHA.java
private DFSClient genClientWithDummyHandler() throws IOException {
  URI nnUri = dfs.getUri();
  FailoverProxyProvider<ClientProtocol> failoverProxyProvider = 
      NameNodeProxies.createFailoverProxyProvider(conf, 
          nnUri, ClientProtocol.class, true, null);
  InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
      failoverProxyProvider, RetryPolicies
      .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
          Integer.MAX_VALUE,
          DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT,
          DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT));
  ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
      failoverProxyProvider.getInterface().getClassLoader(),
      new Class[] { ClientProtocol.class }, dummyHandler);
  
  DFSClient client = new DFSClient(null, proxy, conf, null);
  return client;
}
 
源代码8 项目: hadoop   文件: NamenodeFsck.java
private void lostFoundInit(DFSClient dfs) {
  lfInited = true;
  try {
    String lfName = "/lost+found";
    
    final HdfsFileStatus lfStatus = dfs.getFileInfo(lfName);
    if (lfStatus == null) { // not exists
      lfInitedOk = dfs.mkdirs(lfName, null, true);
      lostFound = lfName;
    } else if (!lfStatus.isDir()) { // exists but not a directory
      LOG.warn("Cannot use /lost+found : a regular file with this name exists.");
      lfInitedOk = false;
    }  else { // exists and is a directory
      lostFound = lfName;
      lfInitedOk = true;
    }
  }  catch (Exception e) {
    e.printStackTrace();
    lfInitedOk = false;
  }
  if (lostFound == null) {
    LOG.warn("Cannot initialize /lost+found .");
    lfInitedOk = false;
    internalError = true;
  }
}
 
源代码9 项目: hadoop   文件: WebHdfsHandler.java
private void onGetFileChecksum(ChannelHandlerContext ctx) throws IOException {
  MD5MD5CRC32FileChecksum checksum = null;
  final String nnId = params.namenodeId();
  DFSClient dfsclient = newDfsClient(nnId, conf);
  try {
    checksum = dfsclient.getFileChecksum(path, Long.MAX_VALUE);
    dfsclient.close();
    dfsclient = null;
  } finally {
    IOUtils.cleanup(LOG, dfsclient);
  }
  final byte[] js = JsonUtil.toJsonString(checksum).getBytes(Charsets.UTF_8);
  DefaultFullHttpResponse resp =
    new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(js));

  resp.headers().set(CONTENT_TYPE, APPLICATION_JSON_UTF8);
  resp.headers().set(CONTENT_LENGTH, js.length);
  resp.headers().set(CONNECTION, CLOSE);
  ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
}
 
源代码10 项目: hadoop   文件: TestResolveHdfsSymlink.java
/**
 * Verifies that attempting to resolve a non-symlink results in client
 * exception
 */
@Test
public void testLinkTargetNonSymlink() throws UnsupportedFileSystemException,
    IOException {
  FileContext fc = null;
  Path notSymlink = new Path("/notasymlink");
  try {
    fc = FileContext.getFileContext(cluster.getFileSystem().getUri());
    fc.create(notSymlink, EnumSet.of(CreateFlag.CREATE));
    DFSClient client = new DFSClient(cluster.getFileSystem().getUri(),
        cluster.getConfiguration(0));
    try {
      client.getLinkTarget(notSymlink.toString());
      fail("Expected exception for resolving non-symlink");
    } catch (IOException e) {
      GenericTestUtils.assertExceptionContains("is not a symbolic link", e);
    }
  } finally {
    if (fc != null) {
      fc.delete(notSymlink, false);
    }
  }
}
 
源代码11 项目: terrapin   文件: TerrapinControllerServiceImpl.java
public TerrapinControllerServiceImpl(PropertiesConfiguration configuration,
                                     ZooKeeperManager zkManager,
                                     DFSClient hdfsClient,
                                     HelixAdmin helixAdmin,
                                     String clusterName) {
  this.configuration = configuration;
  this.zkManager = zkManager;
  this.hdfsClient = hdfsClient;
  this.helixAdmin = helixAdmin;
  this.clusterName = clusterName;

  ExecutorService threadPool = new ThreadPoolExecutor(100,
      100,
      0,
      TimeUnit.SECONDS,
      new LinkedBlockingDeque<Runnable>(1000),
      new ThreadFactoryBuilder().setDaemon(false)
                    .setNameFormat("controller-pool-%d")
                    .build());
 this.futurePool = new ExecutorServiceFuturePool(threadPool);
}
 
源代码12 项目: hadoop-gpu   文件: NamenodeFsck.java
private void lostFoundInit(DFSClient dfs) {
  lfInited = true;
  try {
    String lfName = "/lost+found";
    // check that /lost+found exists
    if (!dfs.exists(lfName)) {
      lfInitedOk = dfs.mkdirs(lfName);
      lostFound = lfName;
    } else        if (!dfs.isDirectory(lfName)) {
      LOG.warn("Cannot use /lost+found : a regular file with this name exists.");
      lfInitedOk = false;
    }  else { // exists and isDirectory
      lostFound = lfName;
      lfInitedOk = true;
    }
  }  catch (Exception e) {
    e.printStackTrace();
    lfInitedOk = false;
  }
  if (lostFound == null) {
    LOG.warn("Cannot initialize /lost+found .");
    lfInitedOk = false;
  }
}
 
源代码13 项目: hadoop-gpu   文件: FileChecksumServlets.java
/** {@inheritDoc} */
public void doGet(HttpServletRequest request, HttpServletResponse response
    ) throws ServletException, IOException {
  final UnixUserGroupInformation ugi = getUGI(request);
  final PrintWriter out = response.getWriter();
  final String filename = getFilename(request, response);
  final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
  xml.declaration();

  final Configuration conf = new Configuration(DataNode.getDataNode().getConf());
  final int socketTimeout = conf.getInt("dfs.socket.timeout", HdfsConstants.READ_TIMEOUT);
  final SocketFactory socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
  UnixUserGroupInformation.saveToConf(conf,
      UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
  final ClientProtocol nnproxy = DFSClient.createNamenode(conf);

  try {
    final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
        filename, nnproxy, socketFactory, socketTimeout);
    MD5MD5CRC32FileChecksum.write(xml, checksum);
  } catch(IOException ioe) {
    new RemoteException(ioe.getClass().getName(), ioe.getMessage()
        ).writeXml(filename, xml);
  }
  xml.endDocument();
}
 
源代码14 项目: big-c   文件: RpcProgramMountd.java
public RpcProgramMountd(NfsConfiguration config,
    DatagramSocket registrationSocket, boolean allowInsecurePorts)
    throws IOException {
  // Note that RPC cache is not enabled
  super("mountd", "localhost", config.getInt(
      NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY,
      NfsConfigKeys.DFS_NFS_MOUNTD_PORT_DEFAULT), PROGRAM, VERSION_1,
      VERSION_3, registrationSocket, allowInsecurePorts);
  exports = new ArrayList<String>();
  exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY,
      NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT));
  this.hostsMatcher = NfsExports.getInstance(config);
  this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
  UserGroupInformation.setConfiguration(config);
  SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY,
      NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY);
  this.dfsClient = new DFSClient(NameNode.getAddress(config), config);
}
 
源代码15 项目: big-c   文件: TestRetryCacheWithHA.java
@Test (timeout=60000)
public void testRemoveCacheDescriptor() throws Exception {
  DFSClient client = genClientWithDummyHandler();
  AtMostOnceOp op = new RemoveCacheDirectiveInfoOp(client, "pool",
      "/path");
  testClientRetryWithFailover(op);
}
 
源代码16 项目: hadoop   文件: TestRetryCacheWithHA.java
@Test (timeout=60000)
public void testAddCacheDirectiveInfo() throws Exception {
  DFSClient client = genClientWithDummyHandler();
  AtMostOnceOp op = new AddCacheDirectiveInfoOp(client, 
      new CacheDirectiveInfo.Builder().
          setPool("pool").
          setPath(new Path("/path")).
          build());
  testClientRetryWithFailover(op);
}
 
源代码17 项目: big-c   文件: TestDisallowModifyROSnapshot.java
@Test(timeout=60000, expected = SnapshotAccessControlException.class)
public void testCreateSymlink() throws Exception {
  @SuppressWarnings("deprecation")
  DFSClient dfsclient = new DFSClient(conf);
  dfsclient.createSymlink(sub2.toString(), "/TestSnapshot/sub1/.snapshot",
      false);
}
 
源代码18 项目: RDFS   文件: TestBalancer.java
private void test(long[] capacities, String[] racks, 
    long newCapacity, String newRack) throws Exception {
  int numOfDatanodes = capacities.length;
  assertEquals(numOfDatanodes, racks.length);
  cluster = new MiniDFSCluster(0, CONF, capacities.length, true, true, null, 
      racks, capacities);
  try {
    cluster.waitActive();
    client = DFSClient.createNamenode(CONF);

    long totalCapacity=0L;
    for(long capacity:capacities) {
      totalCapacity += capacity;
    }
    // fill up the cluster to be 30% full
    long totalUsedSpace = totalCapacity*3/10;
    createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
    // start up an empty node with the same capacity and on the same rack
    cluster.startDataNodes(CONF, 1, true, null,
        new String[]{newRack}, new long[]{newCapacity});

    totalCapacity += newCapacity;

    // run balancer and validate results
    runBalancer(CONF, totalUsedSpace, totalCapacity);
  } finally {
    cluster.shutdown();
  }
}
 
源代码19 项目: hadoop   文件: TestFsck.java
public CorruptedTestFile(String name, Set<Integer> blocksToCorrupt,
    DFSClient dfsClient, int numDataNodes, int blockSize)
        throws IOException {
  this.name = name;
  this.blocksToCorrupt = blocksToCorrupt;
  this.dfsClient = dfsClient;
  this.numDataNodes = numDataNodes;
  this.blockSize = blockSize;
  this.initialContents = cacheInitialContents();
}
 
源代码20 项目: hadoop   文件: DFSClientCache.java
private RemovalListener<String, DFSClient> clientRemovalListener() {
  return new RemovalListener<String, DFSClient>() {
    @Override
    public void onRemoval(RemovalNotification<String, DFSClient> notification) {
      DFSClient client = notification.getValue();
      try {
        client.close();
      } catch (IOException e) {
        LOG.warn(String.format(
            "IOException when closing the DFSClient(%s), cause: %s", client,
            e));
      }
    }
  };
}
 
源代码21 项目: hbase   文件: TestHBaseWalOnEC.java
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  try {
    MiniDFSCluster cluster = UTIL.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
    DistributedFileSystem fs = cluster.getFileSystem();

    Method enableAllECPolicies =
      DFSTestUtil.class.getMethod("enableAllECPolicies", DistributedFileSystem.class);
    enableAllECPolicies.invoke(null, fs);

    DFSClient client = fs.getClient();
    Method setErasureCodingPolicy =
      DFSClient.class.getMethod("setErasureCodingPolicy", String.class, String.class);
    setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy

    try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
      // If this comes back as having hflush then some test setup assumption is wrong.
      // Fail the test so that a developer has to look and triage
      assertFalse("Did not enable EC!", out.hasCapability(StreamCapabilities.HFLUSH));
    }
  } catch (NoSuchMethodException e) {
    // We're not testing anything interesting if EC is not available, so skip the rest of the test
    Assume.assumeNoException("Using an older version of hadoop; EC not available.", e);
  }

  UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);

}
 
源代码22 项目: hadoop-gpu   文件: TestBalancer.java
private void testUnevenDistribution(
    long distribution[], long capacities[], String[] racks) throws Exception {
  int numDatanodes = distribution.length;
  if (capacities.length != numDatanodes || racks.length != numDatanodes) {
    throw new IllegalArgumentException("Array length is not the same");
  }

  // calculate total space that need to be filled
  long totalUsedSpace=0L;
  for(int i=0; i<distribution.length; i++) {
    totalUsedSpace += distribution[i];
  }

  // fill the cluster
  Block[] blocks = generateBlocks(totalUsedSpace, (short)numDatanodes);

  // redistribute blocks
  Block[][] blocksDN = distributeBlocks(
      blocks, (short)(numDatanodes-1), distribution);

  // restart the cluster: do NOT format the cluster
  CONF.set("dfs.safemode.threshold.pct", "0.0f"); 
  cluster = new MiniDFSCluster(0, CONF, numDatanodes,
      false, true, null, racks, capacities);
  cluster.waitActive();
  client = DFSClient.createNamenode(CONF);

  cluster.injectBlocks(blocksDN);

  long totalCapacity = 0L;
  for(long capacity:capacities) {
    totalCapacity += capacity;
  }
  runBalancer(totalUsedSpace, totalCapacity);
}
 
源代码23 项目: RDFS   文件: NamenodeFsck.java
private DatanodeInfo bestNode(DFSClient dfs, DatanodeInfo[] nodes,
                              TreeSet<DatanodeInfo> deadNodes) throws IOException {
  if ((nodes == null) ||
      (nodes.length - deadNodes.size() < 1)) {
    throw new IOException("No live nodes contain current block");
  }
  DatanodeInfo chosenNode;
  do {
    chosenNode = nodes[r.nextInt(nodes.length)];
  } while (deadNodes.contains(chosenNode));
  return chosenNode;
}
 
源代码24 项目: big-c   文件: Nfs3Utils.java
public static WccAttr getWccAttr(DFSClient client, String fileIdPath)
    throws IOException {
  HdfsFileStatus fstat = getFileStatus(client, fileIdPath);
  if (fstat == null) {
    return null;
  }

  long size = fstat.isDir() ? getDirSize(fstat.getChildrenNum()) : fstat
      .getLen();
  return new WccAttr(size, new NfsTime(fstat.getModificationTime()),
      new NfsTime(fstat.getModificationTime()));
}
 
源代码25 项目: hadoop-gpu   文件: TestBalancer.java
private void test(long[] capacities, String[] racks, 
    long newCapacity, String newRack) throws Exception {
  int numOfDatanodes = capacities.length;
  assertEquals(numOfDatanodes, racks.length);
  cluster = new MiniDFSCluster(0, CONF, capacities.length, true, true, null, 
      racks, capacities);
  try {
    cluster.waitActive();
    client = DFSClient.createNamenode(CONF);

    long totalCapacity=0L;
    for(long capacity:capacities) {
      totalCapacity += capacity;
    }
    // fill up the cluster to be 30% full
    long totalUsedSpace = totalCapacity*3/10;
    createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
    // start up an empty node with the same capacity and on the same rack
    cluster.startDataNodes(CONF, 1, true, null,
        new String[]{newRack}, new long[]{newCapacity});

    totalCapacity += newCapacity;

    // run balancer and validate results
    runBalancer(totalUsedSpace, totalCapacity);
  } finally {
    cluster.shutdown();
  }
}
 
@Test
public void ensureInvalidBlockTokensAreRejected() throws IOException,
    URISyntaxException {
  cluster.transitionToActive(0);
  FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
  
  DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA);
  assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH));
  
  DFSClient dfsClient = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs);
  DFSClient spyDfsClient = Mockito.spy(dfsClient);
  Mockito.doAnswer(
      new Answer<LocatedBlocks>() {
        @Override
        public LocatedBlocks answer(InvocationOnMock arg0) throws Throwable {
          LocatedBlocks locatedBlocks = (LocatedBlocks)arg0.callRealMethod();
          for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
            Token<BlockTokenIdentifier> token = lb.getBlockToken();
            BlockTokenIdentifier id = lb.getBlockToken().decodeIdentifier();
            // This will make the token invalid, since the password
            // won't match anymore
            id.setExpiryDate(Time.now() + 10);
            Token<BlockTokenIdentifier> newToken =
                new Token<BlockTokenIdentifier>(id.getBytes(),
                    token.getPassword(), token.getKind(), token.getService());
            lb.setBlockToken(newToken);
          }
          return locatedBlocks;
        }
      }).when(spyDfsClient).getLocatedBlocks(Mockito.anyString(),
          Mockito.anyLong(), Mockito.anyLong());
  DFSClientAdapter.setDFSClient((DistributedFileSystem)fs, spyDfsClient);
  
  try {
    assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH));
    fail("Shouldn't have been able to read a file with invalid block tokens");
  } catch (IOException ioe) {
    GenericTestUtils.assertExceptionContains("Could not obtain block", ioe);
  }
}
 
源代码27 项目: big-c   文件: OpenFileCtx.java
/**
 * Check the commit status with the given offset
 * @param commitOffset the offset to commit
 * @param channel the channel to return response
 * @param xid the xid of the commit request
 * @param preOpAttr the preOp attribute
 * @param fromRead whether the commit is triggered from read request
 * @return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
 * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR
 */
public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset,
    Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) {
  if (!fromRead) {
    Preconditions.checkState(channel != null && preOpAttr != null);
    // Keep stream active
    updateLastAccessTime();
  }
  Preconditions.checkState(commitOffset >= 0);

  COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid,
      preOpAttr, fromRead);
  if (LOG.isDebugEnabled()) {
    LOG.debug("Got commit status: " + ret.name());
  }
  // Do the sync outside the lock
  if (ret == COMMIT_STATUS.COMMIT_DO_SYNC
      || ret == COMMIT_STATUS.COMMIT_FINISHED) {
    try {
      // Sync file data and length
      fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
      ret = COMMIT_STATUS.COMMIT_FINISHED; // Remove COMMIT_DO_SYNC status 
      // Nothing to do for metadata since attr related change is pass-through
    } catch (ClosedChannelException cce) {
      if (pendingWrites.isEmpty()) {
        ret = COMMIT_STATUS.COMMIT_FINISHED;
      } else {
        ret = COMMIT_STATUS.COMMIT_ERROR;
      }
    } catch (IOException e) {
      LOG.error("Got stream error during data sync: " + e);
      // Do nothing. Stream will be closed eventually by StreamMonitor.
      // status = Nfs3Status.NFS3ERR_IO;
      ret = COMMIT_STATUS.COMMIT_ERROR;
    }
  }
  return ret;
}
 
源代码28 项目: big-c   文件: TestFailoverWithBlockTokensEnabled.java
@Test
public void ensureInvalidBlockTokensAreRejected() throws IOException,
    URISyntaxException {
  cluster.transitionToActive(0);
  FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
  
  DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA);
  assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH));
  
  DFSClient dfsClient = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs);
  DFSClient spyDfsClient = Mockito.spy(dfsClient);
  Mockito.doAnswer(
      new Answer<LocatedBlocks>() {
        @Override
        public LocatedBlocks answer(InvocationOnMock arg0) throws Throwable {
          LocatedBlocks locatedBlocks = (LocatedBlocks)arg0.callRealMethod();
          for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
            Token<BlockTokenIdentifier> token = lb.getBlockToken();
            BlockTokenIdentifier id = lb.getBlockToken().decodeIdentifier();
            // This will make the token invalid, since the password
            // won't match anymore
            id.setExpiryDate(Time.now() + 10);
            Token<BlockTokenIdentifier> newToken =
                new Token<BlockTokenIdentifier>(id.getBytes(),
                    token.getPassword(), token.getKind(), token.getService());
            lb.setBlockToken(newToken);
          }
          return locatedBlocks;
        }
      }).when(spyDfsClient).getLocatedBlocks(Mockito.anyString(),
          Mockito.anyLong(), Mockito.anyLong());
  DFSClientAdapter.setDFSClient((DistributedFileSystem)fs, spyDfsClient);
  
  try {
    assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH));
    fail("Shouldn't have been able to read a file with invalid block tokens");
  } catch (IOException ioe) {
    GenericTestUtils.assertExceptionContains("Could not obtain block", ioe);
  }
}
 
源代码29 项目: big-c   文件: TestWrites.java
@Test
public void testCheckCommitAixCompatMode() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  // Enable AIX compatibility mode.
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(new NfsConfiguration()), true, conf);
  
  // Test fall-through to pendingWrites check in the event that commitOffset
  // is greater than the number of bytes we've so far flushed.
  Mockito.when(fos.getPos()).thenReturn((long) 2);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_FINISHED);
  
  // Test the case when we actually have received more bytes than we're trying
  // to commit.
  ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest((long)10);
  status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
}
 
源代码30 项目: big-c   文件: DFSClientCache.java
private CacheLoader<DFSInputStreamCaheKey, FSDataInputStream> inputStreamLoader() {
  return new CacheLoader<DFSInputStreamCaheKey, FSDataInputStream>() {

    @Override
    public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception {
      DFSClient client = getDfsClient(key.userId);
      DFSInputStream dis = client.open(key.inodePath);
      return client.createWrappedInputStream(dis);
    }
  };
}