13
13
import org .apache .hadoop .fs .cosn .OperationCancellingStatusProvider ;
14
14
import org .apache .hadoop .fs .cosn .ReadBufferHolder ;
15
15
import org .apache .hadoop .fs .cosn .Unit ;
16
+ import org .apache .hadoop .fs .cosn .common .Pair ;
16
17
import org .apache .hadoop .fs .permission .FsPermission ;
17
18
import org .apache .hadoop .security .AccessControlException ;
18
19
import org .apache .hadoop .security .UserGroupInformation ;
@@ -739,7 +740,7 @@ public FileStatus[] listStatus(Path f) throws IOException {
739
740
listMaxLength = CosNFileSystem .POSIX_BUCKET_LIST_LIMIT ;
740
741
}
741
742
742
- if (key .length () > 0 ) {
743
+ if (! key .isEmpty () ) {
743
744
FileStatus fileStatus = this .getFileStatus (f );
744
745
if (fileStatus .isFile () || fileStatus .isSymlink ()) {
745
746
return new FileStatus []{fileStatus };
@@ -810,12 +811,14 @@ public FileStatus[] listStatus(Path f) throws IOException {
810
811
return status .toArray (new FileStatus [status .size ()]);
811
812
}
812
813
814
+
815
+
813
816
private FileStatus newFile (FileMetadata meta , Path path ) {
814
817
return new CosNFileStatus (meta .getLength (), false , 1 , getDefaultBlockSize (),
815
818
meta .getLastModified (), 0 , null , this .owner , this .group ,
816
819
path .makeQualified (this .getUri (), this .getWorkingDirectory ()),
817
820
meta .getETag (), meta .getCrc64ecm (), meta .getCrc32cm (),
818
- meta .getVersionId (), meta .getStorageClass ());
821
+ meta .getVersionId (), meta .getStorageClass (), meta . getUserAttributes () );
819
822
}
820
823
821
824
private FileStatus newDirectory (Path path ) {
@@ -838,7 +841,7 @@ private FileStatus newDirectory(FileMetadata meta, Path path) {
838
841
meta .getLastModified (), 0 , null , this .owner , this .group ,
839
842
path .makeQualified (this .getUri (), this .getWorkingDirectory ()),
840
843
meta .getETag (), meta .getCrc64ecm (), meta .getCrc32cm (),
841
- meta .getVersionId (), meta .getStorageClass ());
844
+ meta .getVersionId (), meta .getStorageClass (), meta . getUserAttributes () );
842
845
}
843
846
844
847
/**
@@ -980,103 +983,133 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
980
983
981
984
@ Override
982
985
public boolean rename (Path src , Path dst ) throws IOException {
983
-
986
+ Preconditions .checkNotNull (src );
987
+ Preconditions .checkNotNull (dst );
984
988
// Renaming the root directory is not allowed
985
989
if (src .isRoot ()) {
986
990
LOG .debug ("Cannot rename the root directory of a filesystem." );
987
991
return false ;
988
992
}
989
993
990
- // check the source path whether exists or not, if not return false.
991
- FileStatus srcFileStatus ;
992
- try {
993
- srcFileStatus = this .getFileStatus (src );
994
- } catch (FileNotFoundException e ) {
995
- LOG .debug ("The source path [{}] is not exist." , src );
994
+ // the preconditions for the rename operation.
995
+ // reference: https://hadoop.apache.org/docs/r3.3.0/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#rename
996
+ Pair <CosNFileStatus , CosNFileStatus > renameFileStatusPair = renameInitiate (src , dst );
997
+
998
+ // the postconditions for the rename operation.
999
+ // reference: https://hadoop.apache.org/docs/r3.3.0/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#rename
1000
+ if (src .equals (dst )) {
1001
+ if (renameFileStatusPair .getFirst () != null ) {
1002
+ if (renameFileStatusPair .getFirst ().isDirectory ()) {
1003
+ //Renaming a directory onto itself is no-op; return value is not specified.
1004
+ //In POSIX the result is False; in HDFS the result is True.
1005
+ return true ;
1006
+ }
1007
+ if (renameFileStatusPair .getFirst ().isFile ()) {
1008
+ // Renaming a file to itself is a no-op; the result is True.
1009
+ return true ;
1010
+ }
1011
+ // For symlink types, the Hadoop file system specification does not provide clear instructions,
1012
+ // I tested the soft connection in the POSIX file system, and the same behavior is also true.
1013
+ return true ;
1014
+ }
996
1015
return false ;
997
1016
}
998
1017
999
- // Source path and destination path are not allowed to be the same
1000
- if ( src . equals ( dst )) {
1001
- LOG . debug ( "The source path and the dest path refer to the same file or " +
1002
- "directory: {}" , dst );
1003
- throw new IOException ( "the source path and dest path refer to the " +
1004
- "same file or directory" );
1018
+ if (! isPosixBucket ) {
1019
+ return internalCopyAndDelete (
1020
+ src , renameFileStatusPair . getFirst (),
1021
+ dst , renameFileStatusPair . getSecond () );
1022
+ } else {
1023
+ return internalRename ( src , dst );
1005
1024
}
1025
+ }
1026
+
1027
+ private Pair <CosNFileStatus , CosNFileStatus > renameInitiate (Path srcPath , Path dstPath )
1028
+ throws PathIOException , IOException {
1029
+ // Preconditions
1030
+ Preconditions .checkNotNull (srcPath );
1031
+ Preconditions .checkNotNull (dstPath );
1032
+ Preconditions .checkArgument (srcPath .isAbsolute ());
1033
+ Preconditions .checkArgument (dstPath .isAbsolute ());
1034
+
1035
+ Pair <CosNFileStatus , CosNFileStatus > renameFileStatusPair = new Pair <>();
1036
+
1037
+ // Hadoop FileSystem Specification: if not exists(FS, src) : raise FileNotFoundException
1038
+ CosNFileStatus srcFileStatus = null ;
1039
+ try {
1040
+ srcFileStatus = (CosNFileStatus ) this .getFileStatus (srcPath );
1041
+ } catch (FileNotFoundException e ) {
1042
+ LOG .error ("The source path [{}] is not exist." , srcPath );
1043
+ throw e ;
1044
+ }
1045
+ renameFileStatusPair .setFirst (srcFileStatus );
1006
1046
1007
- // It is not allowed to rename a parent directory to its subdirectory
1008
- Path dstParentPath ;
1009
- dstParentPath = dst .getParent ();
1010
- while (null != dstParentPath && !src .equals (dstParentPath )) {
1047
+ // Hadoop FileSystem Specification: if isDescendant(FS, src, dest) : raise IOException
1048
+ Path dstParentPath = dstPath .getParent ();
1049
+ while (null != dstParentPath && !srcPath .equals (dstParentPath )) {
1011
1050
dstParentPath = dstParentPath .getParent ();
1012
1051
}
1013
1052
if (null != dstParentPath ) {
1014
- LOG .debug ("It is not allowed to rename a parent directory:{} to " +
1015
- "its subdirectory:{}." , src , dst );
1016
- throw new PathIOException ( String . format (
1017
- "It is not allowed to rename a parent directory:%s to its" +
1018
- " subdirectory:%s" ,
1019
- src , dst )) ;
1053
+ LOG .error ("It is not allowed to rename a parent directory:{} to its subdirectory:{}." , srcPath , dstPath );
1054
+ PathIOException pathIOException = new PathIOException ( srcPath . toString (),
1055
+ "It is not allowed to rename a parent directory to its subdirectory" );
1056
+ pathIOException . setOperation ( " rename" );
1057
+ pathIOException . setTargetPath ( dstPath . toString ());
1058
+ throw pathIOException ;
1020
1059
}
1021
1060
1022
- FileStatus dstFileStatus = null ;
1061
+ // Hadoop FileSystem Specification: isRoot(FS, dest) or exists(FS, parent(dest))
1062
+ CosNFileStatus dstFileStatus = null ;
1023
1063
try {
1024
- dstFileStatus = this .getFileStatus (dst );
1025
-
1026
- // The destination path exists and is a file,
1027
- // and the rename operation is not allowed.
1028
- //
1064
+ dstFileStatus = (CosNFileStatus ) this .getFileStatus (dstPath );
1029
1065
if (dstFileStatus .isFile ()) {
1030
- LOG .debug ("File: {} already exists." , dstFileStatus .getPath ());
1031
- return false ;
1066
+ throw new FileAlreadyExistsException (dstPath .toString ());
1032
1067
} else {
1033
1068
// The destination path is an existing directory,
1034
- // and it is checked whether there is a file or directory
1035
- // with the same name as the source path under the
1036
- // destination path
1037
- dst = new Path (dst , src .getName ());
1038
- FileStatus [] statuses ;
1069
+ Path tempDstPath = new Path (dstPath , srcPath .getName ());
1039
1070
try {
1040
- statuses = this .listStatus (dst );
1041
- } catch (FileNotFoundException e ) {
1042
- statuses = null ;
1043
- }
1044
- if (null != statuses && statuses .length > 0 ) {
1045
- LOG .debug ("Cannot rename {} to {}, the destination directory is non-empty." ,
1046
- src , dst );
1047
- return false ;
1071
+ // FileStatus tempDstFileStatus = this.getFileStatus(tempDstPath);
1072
+ // if (tempDstFileStatus.isDirectory()) {
1073
+ // throw new FileAlreadyExistsException(tempDstPath.toString());
1074
+ // }
1075
+ FileStatus [] fileStatuses = this .listStatus (tempDstPath );
1076
+ if (null != fileStatuses && fileStatuses .length > 0 ) {
1077
+ throw new FileAlreadyExistsException (tempDstPath .toString ());
1078
+ }
1079
+ } catch (FileNotFoundException ignore ) {
1080
+ // OK, expects Not Found.
1048
1081
}
1049
1082
}
1083
+ renameFileStatusPair .setSecond (dstFileStatus );
1050
1084
} catch (FileNotFoundException e ) {
1051
- // destination path not exists
1052
- Path tempDstParentPath = dst .getParent ();
1085
+ // Hadoop FileSystem Specification: if isFile(FS, parent(dest)) : raise IOException
1086
+ Path tempDstParentPath = dstPath .getParent ();
1053
1087
FileStatus dstParentStatus = this .getFileStatus (tempDstParentPath );
1054
1088
if (!dstParentStatus .isDirectory ()) {
1055
- throw new IOException (String .format (
1056
- "Cannot rename %s to %s, %s is a file" , src , dst , dst .getParent ()
1057
- ));
1089
+ PathIOException pathIOException = new PathIOException (tempDstParentPath .toString (),
1090
+ String .format ("Can not rename into a file [%s]" , tempDstParentPath ));
1091
+ pathIOException .setTargetPath (dstPath .toString ());
1092
+ throw pathIOException ;
1058
1093
}
1059
- // The default root directory is definitely there.
1060
1094
}
1061
1095
1062
- if (!isPosixBucket ) {
1063
- return internalCopyAndDelete (src , dst , srcFileStatus .isDirectory (),
1064
- srcFileStatus .isSymlink ());
1065
- } else {
1066
- return internalRename (src , dst );
1067
- }
1096
+ return renameFileStatusPair ;
1068
1097
}
1069
1098
1070
- private boolean internalCopyAndDelete (Path srcPath , Path dstPath ,
1071
- boolean isDir , boolean isSymlink ) throws IOException {
1072
- boolean result = false ;
1073
- if (isDir ) {
1074
- result = this .copyDirectory (srcPath , dstPath );
1099
+ private boolean internalCopyAndDelete (Path srcPath , CosNFileStatus srcFileStatus ,
1100
+ Path destPath , CosNFileStatus destFileStatus ) throws IOException {
1101
+ Preconditions .checkNotNull (srcPath );
1102
+ Preconditions .checkNotNull (srcFileStatus );
1103
+ boolean result ;
1104
+ if (srcFileStatus .isDirectory ()) {
1105
+ result = this .copyDirectory (
1106
+ srcPath , srcFileStatus ,
1107
+ destPath , destFileStatus );
1075
1108
} else {
1076
- if (isSymlink ) {
1077
- result = this .copySymlink (srcPath , dstPath );
1109
+ if (srcFileStatus . isSymlink () ) {
1110
+ result = this .copySymlink (srcPath , destPath );
1078
1111
} else {
1079
- result = this .copyFile (srcPath , dstPath );
1112
+ result = this .copyFile (srcPath , destPath );
1080
1113
}
1081
1114
}
1082
1115
@@ -1106,26 +1139,28 @@ private boolean copySymlink(Path srcSymlink, Path dstSymlink) throws IOException
1106
1139
return true ;
1107
1140
}
1108
1141
1109
- private boolean copyDirectory (Path srcPath , Path dstPath ) throws IOException {
1142
+ private boolean copyDirectory (Path srcPath , CosNFileStatus srcFileStatus ,
1143
+ Path destPath , CosNFileStatus destFileStatus ) throws IOException {
1110
1144
String srcKey = pathToKey (srcPath );
1111
1145
if (!srcKey .endsWith (PATH_DELIMITER )) {
1112
1146
srcKey += PATH_DELIMITER ;
1113
1147
}
1114
- String dstKey = pathToKey (dstPath );
1115
- if (!dstKey .endsWith (PATH_DELIMITER )) {
1116
- dstKey += PATH_DELIMITER ;
1148
+ String destKey = pathToKey (destPath );
1149
+ if (!destKey .endsWith (PATH_DELIMITER )) {
1150
+ destKey += PATH_DELIMITER ;
1117
1151
}
1118
1152
1119
- if (dstKey .startsWith (srcKey )) {
1120
- throw new IOException ("can not copy a directory to a subdirectory" +
1121
- " of self" );
1153
+ if (destKey .startsWith (srcKey )) {
1154
+ throw new IOException ("can not copy a directory to a subdirectory of self" );
1122
1155
}
1123
1156
// 这个方法是普通桶调用,普通桶严格区分文件对象和目录对象,这里srcKey是带后缀的,如果使用retrieveMetadata
1124
1157
// 可能会吞掉目录对象不存在的问题。导致后面的copy srcKey时,报404错误。
1125
- if (this .nativeStore .queryObjectMetadata (srcKey ) == null ) {
1158
+ if (srcFileStatus .getETag () == null ) {
1159
+ // 这里其实无论是否存在对应的 srcKey 空对象,都 put 一个进去,也是对的。
1160
+ // srcFileStatus.getETag() == null 只是为了在确定存在一个目录对象而非前缀的时候,就不需要在 PUT 一次了。
1126
1161
this .nativeStore .storeEmptyFile (srcKey );
1127
1162
} else {
1128
- this .nativeStore .copy (srcKey , dstKey );
1163
+ this .nativeStore .copy (srcKey , FileMetadata . fromCosNFileStatus ( srcFileStatus ), destKey );
1129
1164
}
1130
1165
1131
1166
CosNCopyFileContext copyFileContext = new CosNCopyFileContext ();
@@ -1139,7 +1174,7 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
1139
1174
checkPermission (new Path (file .getKey ()), RangerAccessType .DELETE );
1140
1175
this .boundedCopyThreadPool .execute (new CosNCopyFileTask (
1141
1176
this .nativeStore ,
1142
- file .getKey (), dstKey .concat (file .getKey ().substring (srcKey .length ())),
1177
+ file .getKey (), destKey .concat (file .getKey ().substring (srcKey .length ())),
1143
1178
copyFileContext ));
1144
1179
copiesToFinishes ++;
1145
1180
}
@@ -1152,9 +1187,9 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
1152
1187
if (this .operationCancellingStatusProviderThreadLocal .get () != null
1153
1188
&& this .operationCancellingStatusProviderThreadLocal .get ().isCancelled ()) {
1154
1189
LOG .warn ("The copy operation is cancelled. Stop copying the directory. srcKey: {}, dstKey: {}" ,
1155
- srcKey , dstKey );
1190
+ srcKey , destKey );
1156
1191
throw new IOException (String .format ("The copy operation is cancelled. srcKey: %s, dstKey: %s" ,
1157
- srcKey , dstKey ));
1192
+ srcKey , destKey ));
1158
1193
}
1159
1194
} while (null != priorLastKey && !Thread .currentThread ().isInterrupted ());
1160
1195
0 commit comments