下面列出了org.apache.hadoop.fs.ParentNotDirectoryException#org.apache.commons.net.ftp.FTPClient 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public OutputStream appendToFile(Path path) throws XenonException {
LOGGER.debug("appendToFile path = {}", path);
assertIsOpen();
Path absPath = toAbsolutePath(path);
assertPathExists(absPath);
assertPathIsNotDirectory(absPath);
try {
// Since FTP connections can only do a single thing a time, we need
// a new FTPClient to handle the stream.
FTPClient newClient = adaptor.connect(getLocation(), credential);
newClient.enterLocalPassiveMode();
OutputStream out = newClient.appendFileStream(absPath.toString());
if (out == null) {
checkClientReply("Failed to append to path: " + absPath.toString());
}
return new TransferClientOutputStream(out, new CloseableClient(newClient));
} catch (IOException e) {
throw new XenonException(ADAPTOR_NAME, "Failed to append to path: " + absPath);
}
}
/**
* 下载单个文件
* @param remote remote
* @param local local
* @return return
*/
public boolean downloadFile(String remote, File local) {
boolean success = false;
long fr = System.currentTimeMillis();
try {
client.setFileType(FTPClient.BINARY_FILE_TYPE);
File _localDir = local.getParentFile();
if(!_localDir.exists()){
_localDir.mkdirs();
}
OutputStream is = new FileOutputStream(local);
client.retrieveFile(remote, is);
success = true;
} catch (IOException e) {
e.printStackTrace();
}
log.warn("[ftp download file][耗时:{}][length:{}][remote:{}][local:{}]",DateUtil.conversion(System.currentTimeMillis()-fr),FileUtil.length(local.length()),remote, local.getAbsolutePath());
return success;
}
public static void removeFTPDirectory(FTPClient ftpClient, String directoryName) {
try {
ftpClient.changeWorkingDirectory(directoryName);
for (FTPFile file : ftpClient.listFiles()) {
if (file.isDirectory()) {
FileUtils.removeFTPDirectory(ftpClient, file.getName());
} else {
log.debug("Deleting " + file.getName());
ftpClient.deleteFile(file.getName());
}
}
ftpClient.changeWorkingDirectory(directoryName);
ftpClient.changeToParentDirectory();
log.debug("Deleting " + directoryName);
ftpClient.removeDirectory(directoryName);
} catch (Exception ex) {
}
}
public static void download(String ftpUrl, String localFilePath, String ftpUsername, String ftpPassword, String ftpControlEncoding) throws IOException {
String username = StringUtils.isEmpty(ftpUsername) ? ConfigConstants.getFtpUsername() : ftpUsername;
String password = StringUtils.isEmpty(ftpPassword) ? ConfigConstants.getFtpPassword() : ftpPassword;
String controlEncoding = StringUtils.isEmpty(ftpControlEncoding) ? ConfigConstants.getFtpControlEncoding() : ftpControlEncoding;
URL url = new URL(ftpUrl);
String host = url.getHost();
int port = (url.getPort() == -1) ? url.getDefaultPort() : url.getPort();
String remoteFilePath = url.getPath();
LOGGER.debug("FTP connection url:{}, username:{}, password:{}, controlEncoding:{}, localFilePath:{}", ftpUrl, username, password, controlEncoding, localFilePath);
FTPClient ftpClient = connect(host, port, username, password, controlEncoding);
OutputStream outputStream = new FileOutputStream(localFilePath);
ftpClient.enterLocalPassiveMode();
boolean downloadResult = ftpClient.retrieveFile(new String(remoteFilePath.getBytes(controlEncoding), StandardCharsets.ISO_8859_1), outputStream);
LOGGER.debug("FTP download result {}", downloadResult);
outputStream.flush();
outputStream.close();
ftpClient.logout();
ftpClient.disconnect();
}
@Override
public InputStream readFromFile(Path path) throws XenonException {
LOGGER.debug("newInputStream path = {}", path);
assertIsOpen();
Path absPath = toAbsolutePath(path);
assertPathExists(absPath);
assertPathIsFile(absPath);
// Since FTP connections can only do a single thing a time, we need a
// new FTPClient to handle the stream.
FTPClient newClient = adaptor.connect(getLocation(), credential);
newClient.enterLocalPassiveMode();
try {
InputStream in = newClient.retrieveFileStream(absPath.toString());
checkClientReply(newClient, "Failed to read from path: " + absPath.toString());
return new TransferClientInputStream(in, new CloseableClient(newClient));
} catch (IOException e) {
throw new XenonException(ADAPTOR_NAME, "Failed to read from path: " + absPath);
}
}
@Override
public void ensureDirectoryExists(final FlowFile flowFile, final File directoryName) throws IOException {
if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) {
ensureDirectoryExists(flowFile, directoryName.getParentFile());
}
final String remoteDirectory = directoryName.getAbsolutePath().replace("\\", "/").replaceAll("^.\\:", "");
final FTPClient client = getClient(flowFile);
final boolean cdSuccessful = setWorkingDirectory(remoteDirectory);
if (!cdSuccessful) {
logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
if (client.makeDirectory(remoteDirectory)) {
logger.debug("Created {}", new Object[] {remoteDirectory});
} else {
throw new IOException("Failed to create remote directory " + remoteDirectory);
}
}
}
/**
* Upload directory to specified FTP server without retries.
*
* @param ftpServer
* @param username
* @param password
* @param sourceDirectoryPath
* @param targetDirectoryPath
* @return Boolean to indicate whether uploading is successful.
*/
protected boolean uploadDirectory(final String ftpServer, final String username, final String password,
final String sourceDirectoryPath, final String targetDirectoryPath) {
logger.debug("FTP username: " + username);
try {
final FTPClient ftpClient = getFTPClient(ftpServer, username, password);
logger.quiet(String.format(UPLOAD_DIR_START, sourceDirectoryPath, targetDirectoryPath));
uploadDirectory(ftpClient, sourceDirectoryPath, targetDirectoryPath, "");
logger.quiet(String.format(UPLOAD_DIR_FINISH, sourceDirectoryPath, targetDirectoryPath));
ftpClient.disconnect();
return true;
} catch (Exception e) {
logger.error(String.format(UPLOAD_DIR_FAILURE, sourceDirectoryPath, targetDirectoryPath), e);
}
return false;
}
FTPClient connect(String path, String addr, int port, String username, String password) throws Exception {
FTPClient ftp = new FTPClient();
int reply;
ftp.connect(addr, port);
ftp.login(username, password);
ftp.configure(new FTPClientConfig(ftp.getSystemType()));
ftp.setControlEncoding("UTF-8");
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftp.disconnect();
return null;
}
ftp.changeWorkingDirectory(path);
return ftp;
}
@Override
public SingleCloverURI create(SingleCloverURI target, CreateParameters params) throws IOException {
if (params.getLastModified() != null) {
throw new UnsupportedOperationException(FileOperationMessages.getString("FTPOperationHandler.setting_date_not_supported")); //$NON-NLS-1$
}
URI uri = target.toURI().normalize();
FTPClient ftp = null;
try {
ftp = connect(uri);
if (create(ftp, uri, params)) {
return CloverURI.createSingleURI(uri);
}
} finally {
disconnect(ftp);
}
return null;
}
/**
* Convenience method, so that we don't open a new connection when using this
* method from within another method. Otherwise every API invocation incurs
* the overhead of opening/closing a TCP connection.
*/
private boolean delete(FTPClient client, Path file, boolean recursive)
throws IOException {
Path workDir = new Path(client.printWorkingDirectory());
Path absolute = makeAbsolute(workDir, file);
String pathName = absolute.toUri().getPath();
try {
FileStatus fileStat = getFileStatus(client, absolute);
if (fileStat.isFile()) {
return client.deleteFile(pathName);
}
} catch (FileNotFoundException e) {
//the file is not there
return false;
}
FileStatus[] dirEntries = listStatus(client, absolute);
if (dirEntries != null && dirEntries.length > 0 && !(recursive)) {
throw new IOException("Directory: " + file + " is not empty.");
}
for (FileStatus dirEntry : dirEntries) {
delete(client, new Path(absolute, dirEntry.getPath()), recursive);
}
return client.removeDirectory(pathName);
}
/**
* Delete given directory from FTP server (directory must be empty).
* @param hostName the FTP server host name to connect
* @param port the port to connect
* @param userName the user name
* @param password the password
* @param remotePath the path to the directory on the FTP to be removed
* @return true if file has been removed and false otherwise.
* @throws RuntimeException in case any exception has been thrown.
*/
public static boolean deleteDirectoryFromFTPServer(String hostName, Integer port, String userName, String password, String remotePath) {
boolean deleted = false;
FTPClient ftpClient = new FTPClient();
String errorMessage = "Could not delete the directory '%s' from FTP server '%s'. Cause: %s";
try {
connectAndLoginOnFTPServer(ftpClient, hostName, port, userName, password);
deleted = ftpClient.removeDirectory(remotePath);
} catch (IOException ex) {
throw new RuntimeException(String.format(errorMessage, remotePath, hostName), ex);
} finally {
disconnectAndLogoutFromFTPServer(ftpClient, hostName);
}
return deleted;
}
public static void ftpUpload(FTPConfig config, String directory, File file, String remoteFileName)
throws IOException
{
FTPClient server = new FTPClient();
server.connect(config.host, config.port);
assertValidReplyCode(server.getReplyCode(), server);
server.login(config.userName, config.password);
assertValidReplyCode(server.getReplyCode(), server);
assertValidReplyCode(server.cwd(directory), server);
server.setFileTransferMode(FTP.BINARY_FILE_TYPE);
server.setFileType(FTP.BINARY_FILE_TYPE);
server.storeFile(remoteFileName, new FileInputStream(file));
assertValidReplyCode(server.getReplyCode(), server);
server.sendNoOp();
server.disconnect();
}
/**
* Convenience method, so that we don't open a new connection when using this
* method from within another method. Otherwise every API invocation incurs
* the overhead of opening/closing a TCP connection.
*/
private FileStatus[] listStatus(FTPClient client, Path file)
throws IOException {
Path workDir = new Path(client.printWorkingDirectory());
Path absolute = makeAbsolute(workDir, file);
FileStatus fileStat = getFileStatus(client, absolute);
if (fileStat.isFile()) {
return new FileStatus[] { fileStat };
}
FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath());
FileStatus[] fileStats = new FileStatus[ftpFiles.length];
for (int i = 0; i < ftpFiles.length; i++) {
fileStats[i] = getFileStatus(ftpFiles[i], absolute);
}
return fileStats;
}
private static void assertValidReplyCode(int code, FTPClient ftp)
{
if (FTPReply.isPositiveCompletion(code))
{
//good
SimpleLogger.variable("Good Completion code " + code);
}
else if (FTPReply.isPositiveIntermediate(code))
{
// do nothing
SimpleLogger.variable("Good Intermediate code " + code);
}
else if (FTPReply.isPositivePreliminary(code))
{
// do nothing
SimpleLogger.variable("Good Preliminary code " + code);
}
else
{
// bad
throw new Error("Problem encountered with FTP Server, returned Code " + code + ", replied '"
+ ftp.getReplyString() + "'");
}
}
/**
* 登出FTP服务器
* <p>
* 由于本工具类会自动维护FTPClient连接,故调用该方法便可直接登出FTP
* </p>
*/
public static void logout(){
FTPClient ftpClient = ftpClientMap.get();
ftpClientMap.remove();
if(null != ftpClient){
String ftpRemoteAddress = ftpClient.getRemoteAddress().toString();
try{
ftpClient.logout();
LogUtil.getLogger().debug("FTP服务器[" + ftpRemoteAddress + "]登出成功...");
}catch (IOException e){
LogUtil.getLogger().warn("FTP服务器[" + ftpRemoteAddress + "]登出时发生异常,堆栈轨迹如下", e);
}finally{
if(ftpClient.isConnected()){
try {
ftpClient.disconnect();
LogUtil.getLogger().debug("FTP服务器[" + ftpRemoteAddress + "]连接释放完毕...");
} catch (IOException ioe) {
LogUtil.getLogger().warn("FTP服务器[" + ftpRemoteAddress + "]连接释放时发生异常,堆栈轨迹如下", ioe);
}
}
}
}
}
/**
* 创建远程目录
* @param remotePath 不含文件名的远程路径(格式为/a/b/c)
*/
private static void createRemoteFolder(FTPClient ftpClient, String remotePath) throws IOException{
String[] folders = remotePath.split("/");
String remoteTempPath = "";
for(String folder : folders){
if(StringUtils.isNotBlank(folder)){
remoteTempPath += "/" + folder;
boolean flag = ftpClient.changeWorkingDirectory(remoteTempPath);
LogUtil.getLogger().info("change working directory : " + remoteTempPath + "-->" + (flag?"SUCCESS":"FAIL"));
if(!flag){
flag = ftpClient.makeDirectory(remoteTempPath);
LogUtil.getLogger().info("make directory : " + remoteTempPath + "-->" + (flag?"SUCCESS":"FAIL"));
}
}
}
}
/**
* 上传文件
* <p>
* 该方法与{@link #uploadAndLogout(String, String, String, String, InputStream)}的区别是:
* 上传完文件后没有登出服务器及释放连接,但会关闭输入流;
* 之所以提供该方法是用于同时上传多个文件的情况下,使之能够共用一个FTP连接
* </p>
* @param hostname 目标主机地址
* @param username FTP登录用户
* @param password FTP登录密码
* @param remoteURL 保存在FTP上的含完整路径和后缀的完整文件名
* @param is 文件输入流
* @return True if successfully completed, false if not.
*/
public static boolean upload(String hostname, String username, String password, String remoteURL, InputStream is){
if(!login(hostname, username, password, DEFAULT_DEFAULT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT, DEFAULT_DATA_TIMEOUT)){
return false;
}
FTPClient ftpClient = ftpClientMap.get();
try{
remoteURL = FilenameUtils.separatorsToUnix(remoteURL);
if(!ftpClient.changeWorkingDirectory(FilenameUtils.getFullPathNoEndSeparator(remoteURL))){
createRemoteFolder(ftpClient, FilenameUtils.getFullPathNoEndSeparator(remoteURL));
ftpClient.changeWorkingDirectory(FilenameUtils.getFullPathNoEndSeparator(remoteURL));
}
String remoteFile = new String(FilenameUtils.getName(remoteURL).getBytes(DEFAULT_CHARSET), "ISO-8859-1");
ftpClient.setCopyStreamListener(new FTPProcess(is.available(), System.currentTimeMillis()));
return ftpClient.storeFile(remoteFile, is);
}catch(IOException e){
LogUtil.getLogger().error("文件["+remoteURL+"]上传到FTP服务器["+hostname+"]失败,堆栈轨迹如下", e);
return false;
}finally{
IOUtils.closeQuietly(is);
}
}
@Test
public void testUnderlyingConnect() throws SocketException, IOException {
final FTPClient client1 = new FTPClient();
final FTPClient client2 = new FTPClient();
try {
final String hostname = "localhost";
client1.connect(hostname, FtpProviderTestCase.getSocketPort());
client2.connect(hostname, FtpProviderTestCase.getSocketPort());
} finally {
if (client1 != null) {
client1.disconnect();
}
if (client2 != null) {
client2.disconnect();
}
}
}
/**
* 把目錄下的檔案抓下來,如果有權限
*
* @param cwdDirectory cd 目錄
* @param storeDir 本地端目錄
* @param head 檔案開頭 代null或 空白 忽略
* @param deleteFtpFile 是否刪除 FTP 上檔案(取完後刪除)
*
* @throws SocketException
* @throws IOException
* @throws Exception
*/
public void get(String cwdDirectory, File storeDir, String head,
boolean deleteFtpFile) throws SocketException, IOException, Exception {
if (!this.ftpClient.isConnected() ) {
this.logger.error("FTP not connection...");
throw new Exception("FTP not connection...");
}
this.ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE); // 非 binary mode 在類 vsFtpd 可能會有問題
if (cwdDirectory!=null && !"".equals(cwdDirectory)) {
this.ftpClient.cwd(cwdDirectory);
}
FTPFile ftpFiles[] = this.ftpClient.listFiles();
for (int ix=0; ftpFiles!=null && ix<ftpFiles.length; ix++) {
if (head!=null && !"".equals(head)) {
if (ftpFiles[ix].getName().indexOf(head)!=0) {
logger.info("not get : " + ftpFiles[ix].getName());
continue;
}
}
logger.info( ftpFiles[ix] );
if (ftpFiles[ix].isFile()) {
File downloadFile = new File(storeDir.getPath() + "/" + ftpFiles[ix].getName() );
FileOutputStream fos = new FileOutputStream(downloadFile);
if (this.ftpClient.retrieveFile(ftpFiles[ix].getName(), fos) ) {
logger.info("ftp GET (save to) : " + storeDir.getPath() + "/" + ftpFiles[ix].getName());
if (deleteFtpFile) {
this.delete(ftpFiles[ix].getName());
}
}
downloadFile = null;
fos.close();
fos = null;
}
}
}
@Test
public void test_connect() throws IOException {
Mockery mockContext = constructMockContext();
final FTPClient mockedFtpClient = mockContext.mock(FTPClient.class);
final FtpClientFactory mockedFactory = mockContext.mock(FtpClientFactory.class);
mockContext.checking(new Expectations() {
{
one(mockedFactory).createInstance();
will(returnValue(mockedFtpClient));
one(mockedFtpClient).connect(with(any(String.class)));
one(mockedFtpClient).user(with(any(String.class)));
will(returnValue(1));
one(mockedFtpClient).pass(with(any(String.class)));
one(mockedFtpClient).setFileType(FTP.BINARY_FILE_TYPE);
}
});
WctDepositParameter depositParameter = new WctDepositParameter();
FtpFileMover ftpFileMover = new FtpFileMover(mockedFactory);
ftpFileMover.connect(depositParameter);
mockContext.assertIsSatisfied();
}
@Override
public FSDataInputStream open(Path file, int bufferSize) throws IOException {
FTPClient client = connect();
Path workDir = new Path(client.printWorkingDirectory());
Path absolute = makeAbsolute(workDir, file);
FileStatus fileStat = getFileStatus(client, absolute);
if (fileStat.isDir()) {
disconnect(client);
throw new IOException("Path " + file + " is a directory.");
}
client.allocate(bufferSize);
Path parent = absolute.getParent();
// Change to parent directory on the
// server. Only then can we read the
// file
// on the server by opening up an InputStream. As a side effect the working
// directory on the server is changed to the parent directory of the file.
// The FTP client connection is closed when close() is called on the
// FSDataInputStream.
client.changeWorkingDirectory(parent.toUri().getPath());
InputStream is = client.retrieveFileStream(file.getName());
FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is,
client, statistics));
if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
// The ftpClient is an inconsistent state. Must close the stream
// which in turn will logout and disconnect from FTP server
fis.close();
throw new IOException("Unable to open file: " + file + ", Aborting");
}
return fis;
}
static private void closeSilently(FTPClient ftpClient) {
try {
ftpClient.logout();
if (ftpClient.isConnected()) {
ftpClient.disconnect();
}
} catch (IOException e) {
// Ignore
}
}
@Override
public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
final FTPClient client = getClient(null);
InputStream in = client.retrieveFileStream(remoteFileName);
if (in == null) {
throw new IOException(client.getReplyString());
}
return in;
}
public static void setWorkingDirectory(TransferableContext context, FTPClient ftp, String datasetName)
throws IOException {
String modifiedDatasetName = MainframeUtils.getDatasetName(context, datasetName);
boolean replyCode = ftp.changeWorkingDirectory(modifiedDatasetName);
if (!replyCode) {
throw new IOException("Unable to change working directory to " + modifiedDatasetName + " for ftp transfer with ftp client = " + ftp + ". " + ftp.getReplyString());
}
}
/**
* Closes the given FTPS connection (if one is open)
*
* @param ftp authenticated ftps object
*/
private static void closeFTPConnection(FTPClient ftp) {
if(ftp.isConnected()) {
try {
ftp.logout();
ftp.disconnect();
} catch(IOException ioe) {
// do nothing
}
}
}
public static void ftpDisconnect(FTPClient ftp) {
try {
if (ftp.isConnected()) {
try {
ftp.logout();
ftp.disconnect();
} catch (Exception ioe) {
LOGGER.error("Exception while disconnecting ftp", ioe);
}
}
} catch (Throwable thr) {
LOGGER.error("Throwable while disconnecting ftp", thr);
}
LOGGER.debug("FTP has been successfully disconnected.");
}
/**
* 验证一个FTPClient对象是否可用
*/
@Override
public boolean validateObject(FTPClient ftpClient) {
try {
return ftpClient.sendNoOp();
} catch (IOException e) {
e.printStackTrace();
throw new ConnectionPoolException("验证FTPClient 对象失败!");
}
}
@Override
public boolean mkdirs(Path file, FsPermission permission) throws IOException {
FTPClient client = connect();
try {
boolean success = mkdirs(client, file, permission);
return success;
} finally {
disconnect(client);
}
}
/**
* FTP上传文件
*
* @throws Exception
*/
public boolean uploadFile(String remote, InputStream local) {
FTPClient ftpClient = borrowObject();
try {
byte[] bytes = remote.getBytes("GBK");
return ftpClient.storeFile(new String(bytes, "iso-8859-1"), local);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}finally {
returnObject(ftpClient);
}
}
private boolean connectServer(String ip,int port,String user,String pwd){
boolean isSuccess = false;
ftpClient = new FTPClient();
try {
ftpClient.connect(ip);
isSuccess = ftpClient.login(user,pwd);
} catch (IOException e) {
logger.error("连接FTP服务器异常",e);
}
return isSuccess;
}