-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: improve the rename performance. #170
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
import org.apache.hadoop.fs.cosn.OperationCancellingStatusProvider; | ||
import org.apache.hadoop.fs.cosn.ReadBufferHolder; | ||
import org.apache.hadoop.fs.cosn.Unit; | ||
import org.apache.hadoop.fs.cosn.common.Pair; | ||
import org.apache.hadoop.fs.permission.FsPermission; | ||
import org.apache.hadoop.security.AccessControlException; | ||
import org.apache.hadoop.security.UserGroupInformation; | ||
|
@@ -739,7 +740,7 @@ public FileStatus[] listStatus(Path f) throws IOException { | |
listMaxLength = CosNFileSystem.POSIX_BUCKET_LIST_LIMIT; | ||
} | ||
|
||
if (key.length() > 0) { | ||
if (!key.isEmpty()) { | ||
FileStatus fileStatus = this.getFileStatus(f); | ||
if (fileStatus.isFile() || fileStatus.isSymlink()) { | ||
return new FileStatus[]{fileStatus}; | ||
|
@@ -810,12 +811,14 @@ public FileStatus[] listStatus(Path f) throws IOException { | |
return status.toArray(new FileStatus[status.size()]); | ||
} | ||
|
||
|
||
|
||
private FileStatus newFile(FileMetadata meta, Path path) { | ||
return new CosNFileStatus(meta.getLength(), false, 1, getDefaultBlockSize(), | ||
meta.getLastModified(), 0, null, this.owner, this.group, | ||
path.makeQualified(this.getUri(), this.getWorkingDirectory()), | ||
meta.getETag(), meta.getCrc64ecm(), meta.getCrc32cm(), | ||
meta.getVersionId(), meta.getStorageClass()); | ||
meta.getVersionId(), meta.getStorageClass(), meta.getUserAttributes()); | ||
} | ||
|
||
private FileStatus newDirectory(Path path) { | ||
|
@@ -838,7 +841,7 @@ private FileStatus newDirectory(FileMetadata meta, Path path) { | |
meta.getLastModified(), 0, null, this.owner, this.group, | ||
path.makeQualified(this.getUri(), this.getWorkingDirectory()), | ||
meta.getETag(), meta.getCrc64ecm(), meta.getCrc32cm(), | ||
meta.getVersionId(), meta.getStorageClass()); | ||
meta.getVersionId(), meta.getStorageClass(), meta.getUserAttributes()); | ||
} | ||
|
||
/** | ||
|
@@ -980,103 +983,133 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { | |
|
||
@Override | ||
public boolean rename(Path src, Path dst) throws IOException { | ||
|
||
Preconditions.checkNotNull(src); | ||
Preconditions.checkNotNull(dst); | ||
// Renaming the root directory is not allowed | ||
if (src.isRoot()) { | ||
LOG.debug("Cannot rename the root directory of a filesystem."); | ||
return false; | ||
} | ||
|
||
// check the source path whether exists or not, if not return false. | ||
FileStatus srcFileStatus; | ||
try { | ||
srcFileStatus = this.getFileStatus(src); | ||
} catch (FileNotFoundException e) { | ||
LOG.debug("The source path [{}] is not exist.", src); | ||
// the preconditions for the rename operation. | ||
// reference: https://hadoop.apache.org/docs/r3.3.0/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#rename | ||
Pair<CosNFileStatus, CosNFileStatus> renameFileStatusPair = renameInitiate(src, dst); | ||
|
||
// the postconditions for the rename operation. | ||
// reference: https://hadoop.apache.org/docs/r3.3.0/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#rename | ||
if (src.equals(dst)) { | ||
if (renameFileStatusPair.getFirst() != null) { | ||
if (renameFileStatusPair.getFirst().isDirectory()) { | ||
//Renaming a directory onto itself is no-op; return value is not specified. | ||
//In POSIX the result is False; in HDFS the result is True. | ||
return true; | ||
} | ||
if (renameFileStatusPair.getFirst().isFile()) { | ||
// Renaming a file to itself is a no-op; the result is True. | ||
return true; | ||
} | ||
// For symlink types, the Hadoop file system specification does not provide clear instructions, | ||
// I tested the soft connection in the POSIX file system, and the same behavior is also true. | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
// Source path and destination path are not allowed to be the same | ||
if (src.equals(dst)) { | ||
LOG.debug("The source path and the dest path refer to the same file or " + | ||
"directory: {}", dst); | ||
throw new IOException("the source path and dest path refer to the " + | ||
"same file or directory"); | ||
if (!isPosixBucket) { | ||
return internalCopyAndDelete( | ||
src, renameFileStatusPair.getFirst(), | ||
dst, renameFileStatusPair.getSecond()); | ||
} else { | ||
return internalRename(src, dst); | ||
} | ||
} | ||
|
||
// It is not allowed to rename a parent directory to its subdirectory | ||
Path dstParentPath; | ||
dstParentPath = dst.getParent(); | ||
while (null != dstParentPath && !src.equals(dstParentPath)) { | ||
private Pair<CosNFileStatus, CosNFileStatus> renameInitiate(Path srcPath, Path dstPath) | ||
throws PathIOException, IOException { | ||
// Preconditions | ||
Preconditions.checkNotNull(srcPath); | ||
Preconditions.checkNotNull(dstPath); | ||
Preconditions.checkArgument(srcPath.isAbsolute()); | ||
Preconditions.checkArgument(dstPath.isAbsolute()); | ||
|
||
Pair<CosNFileStatus, CosNFileStatus> renameFileStatusPair = new Pair<>(); | ||
|
||
// Hadoop FileSystem Specification: if not exists(FS, src) : raise FileNotFoundException | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the source file src does not exist, FileNotFoundException should be raised. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 嗯,这里要测试下 |
||
CosNFileStatus srcFileStatus = null; | ||
try { | ||
srcFileStatus = (CosNFileStatus) this.getFileStatus(srcPath); | ||
} catch (FileNotFoundException e) { | ||
LOG.error("The source path [{}] is not exist.", srcPath); | ||
throw e; | ||
} | ||
renameFileStatusPair.setFirst(srcFileStatus); | ||
|
||
// Hadoop FileSystem Specification: if isDescendant(FS, src, dest) : raise IOException | ||
Path dstParentPath = dstPath.getParent(); | ||
while (null != dstParentPath && !srcPath.equals(dstParentPath)) { | ||
dstParentPath = dstParentPath.getParent(); | ||
} | ||
if (null != dstParentPath) { | ||
LOG.debug("It is not allowed to rename a parent directory:{} to " + | ||
"its subdirectory:{}.", src, dst); | ||
throw new PathIOException(String.format( | ||
"It is not allowed to rename a parent directory:%s to its" + | ||
" subdirectory:%s", | ||
src, dst)); | ||
LOG.error("It is not allowed to rename a parent directory:{} to its subdirectory:{}.", srcPath, dstPath); | ||
PathIOException pathIOException = new PathIOException(srcPath.toString(), | ||
"It is not allowed to rename a parent directory to its subdirectory"); | ||
pathIOException.setOperation("rename"); | ||
pathIOException.setTargetPath(dstPath.toString()); | ||
throw pathIOException; | ||
} | ||
|
||
FileStatus dstFileStatus = null; | ||
// Hadoop FileSystem Specification: isRoot(FS, dest) or exists(FS, parent(dest)) | ||
CosNFileStatus dstFileStatus = null; | ||
try { | ||
dstFileStatus = this.getFileStatus(dst); | ||
|
||
// The destination path exists and is a file, | ||
// and the rename operation is not allowed. | ||
// | ||
dstFileStatus = (CosNFileStatus) this.getFileStatus(dstPath); | ||
if (dstFileStatus.isFile()) { | ||
LOG.debug("File: {} already exists.", dstFileStatus.getPath()); | ||
return false; | ||
throw new FileAlreadyExistsException(dstPath.toString()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renaming a file atop an existing file is specified as failing, raising an exception. |
||
} else { | ||
// The destination path is an existing directory, | ||
// and it is checked whether there is a file or directory | ||
// with the same name as the source path under the | ||
// destination path | ||
dst = new Path(dst, src.getName()); | ||
FileStatus[] statuses; | ||
Path tempDstPath = new Path(dstPath, srcPath.getName()); | ||
try { | ||
statuses = this.listStatus(dst); | ||
} catch (FileNotFoundException e) { | ||
statuses = null; | ||
} | ||
if (null != statuses && statuses.length > 0) { | ||
LOG.debug("Cannot rename {} to {}, the destination directory is non-empty.", | ||
src, dst); | ||
return false; | ||
// FileStatus tempDstFileStatus = this.getFileStatus(tempDstPath); | ||
// if (tempDstFileStatus.isDirectory()) { | ||
// throw new FileAlreadyExistsException(tempDstPath.toString()); | ||
// } | ||
FileStatus[] fileStatuses = this.listStatus(tempDstPath); | ||
if (null != fileStatuses && fileStatuses.length > 0) { | ||
throw new FileAlreadyExistsException(tempDstPath.toString()); | ||
} | ||
} catch (FileNotFoundException ignore) { | ||
// OK, expects Not Found. | ||
} | ||
} | ||
renameFileStatusPair.setSecond(dstFileStatus); | ||
} catch (FileNotFoundException e) { | ||
// destination path not exists | ||
Path tempDstParentPath = dst.getParent(); | ||
// Hadoop FileSystem Specification: if isFile(FS, parent(dest)) : raise IOException | ||
Path tempDstParentPath = dstPath.getParent(); | ||
FileStatus dstParentStatus = this.getFileStatus(tempDstParentPath); | ||
if (!dstParentStatus.isDirectory()) { | ||
throw new IOException(String.format( | ||
"Cannot rename %s to %s, %s is a file", src, dst, dst.getParent() | ||
)); | ||
PathIOException pathIOException = new PathIOException(tempDstParentPath.toString(), | ||
String.format("Can not rename into a file [%s]", tempDstParentPath)); | ||
pathIOException.setTargetPath(dstPath.toString()); | ||
throw pathIOException; | ||
} | ||
// The default root directory is definitely there. | ||
} | ||
|
||
if (!isPosixBucket) { | ||
return internalCopyAndDelete(src, dst, srcFileStatus.isDirectory(), | ||
srcFileStatus.isSymlink()); | ||
} else { | ||
return internalRename(src, dst); | ||
} | ||
return renameFileStatusPair; | ||
} | ||
|
||
private boolean internalCopyAndDelete(Path srcPath, Path dstPath, | ||
boolean isDir, boolean isSymlink) throws IOException { | ||
boolean result = false; | ||
if (isDir) { | ||
result = this.copyDirectory(srcPath, dstPath); | ||
private boolean internalCopyAndDelete(Path srcPath, CosNFileStatus srcFileStatus, | ||
Path destPath, CosNFileStatus destFileStatus) throws IOException { | ||
Preconditions.checkNotNull(srcPath); | ||
Preconditions.checkNotNull(srcFileStatus); | ||
boolean result; | ||
if (srcFileStatus.isDirectory()) { | ||
result = this.copyDirectory( | ||
srcPath, srcFileStatus, | ||
destPath, destFileStatus); | ||
} else { | ||
if (isSymlink) { | ||
result = this.copySymlink(srcPath, dstPath); | ||
if (srcFileStatus.isSymlink()) { | ||
result = this.copySymlink(srcPath, destPath); | ||
} else { | ||
result = this.copyFile(srcPath, dstPath); | ||
result = this.copyFile(srcPath, destPath); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. src是不是也可以用status |
||
} | ||
} | ||
|
||
|
@@ -1106,26 +1139,28 @@ private boolean copySymlink(Path srcSymlink, Path dstSymlink) throws IOException | |
return true; | ||
} | ||
|
||
private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { | ||
private boolean copyDirectory(Path srcPath, CosNFileStatus srcFileStatus, | ||
Path destPath, CosNFileStatus destFileStatus) throws IOException { | ||
String srcKey = pathToKey(srcPath); | ||
if (!srcKey.endsWith(PATH_DELIMITER)) { | ||
srcKey += PATH_DELIMITER; | ||
} | ||
String dstKey = pathToKey(dstPath); | ||
if (!dstKey.endsWith(PATH_DELIMITER)) { | ||
dstKey += PATH_DELIMITER; | ||
String destKey = pathToKey(destPath); | ||
if (!destKey.endsWith(PATH_DELIMITER)) { | ||
destKey += PATH_DELIMITER; | ||
} | ||
|
||
if (dstKey.startsWith(srcKey)) { | ||
throw new IOException("can not copy a directory to a subdirectory" + | ||
" of self"); | ||
if (destKey.startsWith(srcKey)) { | ||
throw new IOException("can not copy a directory to a subdirectory of self"); | ||
} | ||
// 这个方法是普通桶调用,普通桶严格区分文件对象和目录对象,这里srcKey是带后缀的,如果使用retrieveMetadata | ||
// 可能会吞掉目录对象不存在的问题。导致后面的copy srcKey时,报404错误。 | ||
if (this.nativeStore.queryObjectMetadata(srcKey) == null) { | ||
if (srcFileStatus.getETag() == null) { | ||
// 这里其实无论是否存在对应的 srcKey 空对象,都 put 一个进去,也是对的。 | ||
// srcFileStatus.getETag() == null 只是为了在确定存在一个目录对象而非前缀的时候,就不需要在 PUT 一次了。 | ||
this.nativeStore.storeEmptyFile(srcKey); | ||
} else { | ||
this.nativeStore.copy(srcKey, dstKey); | ||
this.nativeStore.copy(srcKey, FileMetadata.fromCosNFileStatus(srcFileStatus), destKey); | ||
} | ||
|
||
CosNCopyFileContext copyFileContext = new CosNCopyFileContext(); | ||
|
@@ -1135,11 +1170,11 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { | |
do { | ||
CosNPartialListing objectList = this.nativeStore.list(srcKey, | ||
BUCKET_LIST_LIMIT, priorLastKey, true); | ||
for (FileMetadata file : objectList.getFiles()) { | ||
checkPermission(new Path(file.getKey()), RangerAccessType.DELETE); | ||
for (FileMetadata fileMetadata : objectList.getFiles()) { | ||
checkPermission(new Path(fileMetadata.getKey()), RangerAccessType.DELETE); | ||
this.boundedCopyThreadPool.execute(new CosNCopyFileTask( | ||
this.nativeStore, | ||
file.getKey(), dstKey.concat(file.getKey().substring(srcKey.length())), | ||
fileMetadata.getKey(), fileMetadata, destKey.concat(fileMetadata.getKey().substring(srcKey.length())), | ||
copyFileContext)); | ||
copiesToFinishes++; | ||
} | ||
|
@@ -1152,9 +1187,9 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { | |
if (this.operationCancellingStatusProviderThreadLocal.get() != null | ||
&& this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) { | ||
LOG.warn("The copy operation is cancelled. Stop copying the directory. srcKey: {}, dstKey: {}", | ||
srcKey, dstKey); | ||
srcKey, destKey); | ||
throw new IOException(String.format("The copy operation is cancelled. srcKey: %s, dstKey: %s", | ||
srcKey, dstKey)); | ||
srcKey, destKey)); | ||
} | ||
} while (null != priorLastKey && !Thread.currentThread().isInterrupted()); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dst&src 都为空?上面不有checknull