0%

Hadoop-Intellij-Plugin 插件设计和源码分析----HDFS文件系统常用操作类

  本节介绍一下使用Hadoop的Java的API进行HDFS的相关操作,包括判断目录、文件是否存在;获目录、文件的ACL权限;获取目录或文件的属性信息;下载目录或文件;上传目录或文件等等。这里我就不做详细分析了,贴出整个类的代码

1
/**
2
 * HDFS文件操作工具类
3
 * Created by fangyuzhong on 17-7-27.
4
 */
5
public class HDFSUtil
6
{
7
    /*
8
     定义文件读取的缓冲区长度
9
     */
10
    private static final int BUFFERSIZE=4096;
11
    /**
12
     * 判断目录是否存在
13
     * @param strPath 目录路径
14
     * @param fileSystem 文件系统
15
     * @return 存在返回true,否则返回false
16
     */
17
    public static boolean dirExists(String strPath,FileSystem fileSystem)
18
    {
19
        try
20
        {
21
            return fileSystem.getFileStatus(new Path(strPath)).isDirectory();
22
        }
23
        catch (Exception ex)
24
        {
25
            return false;
26
        }
27
    }
28
    /**
29
     * 判断文件是否存在
30
     * @param strPath 文件路径
31
     * @param fileSystem 文件系统
32
     * @return 存在返回true,否则返回false
33
     */
34
    public static boolean fileExists(String strPath,FileSystem fileSystem)
35
    {
36
        try
37
        {
38
            return fileSystem.getFileStatus(new Path(strPath)).isFile();
39
        }
40
        catch (Exception ex)
41
        {
42
            return false;
43
        }
44
    }
45
    /**
46
     *获取指定目录或者文件的ACL权限对象
47
     * @param fileSystem
48
     * @param strDirPath
49
     * @param strUserName
50
     * @return
51
     */
52
    public static FsAction getDirFileActionByUser(FileSystem fileSystem,String strDirPath,String strUserName)
53
    {
54
        Path path = new Path(strDirPath);
55
        try
56
        {
57
            AclStatus aclStatus= fileSystem.getAclStatus(path);
58
            if(aclStatus.getOwner().equals(strUserName))
59
            {
60
               return aclStatus.getPermission().getUserAction();
61
            }
62
            List<AclEntry> aclEntries= aclStatus.getEntries();
63
            for(AclEntry aclEntry:aclEntries)
64
            {
65
                if(aclEntry.getType()== AclEntryType.USER)
66
                {
67
                    if(aclEntry.getName().equals(strUserName))
68
                    {
69
                       return aclEntry.getPermission();
70
                    }
71
                }
72
            }
73
            return  aclStatus.getPermission().getOtherAction();
74
        }
75
        catch (IOException ex)
76
        {
77
            return  null;
78
        }
79
    }
80
    /**
81
     * 获取文件系统的相关信息
82
     *
83
     * @param fileStatus
84
     * @return
85
     */
86
    public synchronized static Map<String, String> getFileSystemInformation(FileStatus fileStatus)
87
    {
88
        Map<String, String> fileSystemInformations = new TreeMap();
89
        ResourceBundle resourceBundle = LocaleLanguageManager.getInstance().getResourceBundle();
90
        if (fileStatus.isDirectory())
91
        {
92
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.DIRECTORYPATH), fileStatus.getPath().toString());
93
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.DIRECTORYOWNER), fileStatus.getOwner());
94
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.DIRECTORYPERMISSION), fileStatus.getPermission().toString());
95
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.DIRECTORYGROUP), fileStatus.getGroup());
96
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.DIRECTORYMODIFICATIONTIME), new Timestamp(fileStatus.getModificationTime()).toString());
97
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.DIRECTORYACCESSTIME), new Timestamp(fileStatus.getAccessTime()).toString());
98
        } else
99
        {
100
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.FILEPATH), fileStatus.getPath().toString());
101
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.FILELEN), FormatUtil.sizeFormatNum2String(fileStatus.getLen()));
102
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.FILEREPLICATION), new Short(fileStatus.getReplication()).toString());
103
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.FILEBLOCKSIZE), FormatUtil.sizeFormatNum2String(fileStatus.getBlockSize()));
104
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.FILEOWNER), fileStatus.getOwner());
105
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.FILEGROUP), fileStatus.getGroup());
106
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.FILEPERMISSION), fileStatus.getPermission().toString());
107
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.FILEMODIFICATIONTIME), new Timestamp(fileStatus.getModificationTime()).toString());
108
            fileSystemInformations.put(resourceBundle.getString(LanguageKeyWord.FILEACCESSTIME), new Timestamp(fileStatus.getAccessTime()).toString());
109
110
        }
111
        return fileSystemInformations;
112
    }
113
    /**
114
     * @param strhdfspath       HDFS中选中的单个文件路径
115
     * @param strLocalPath      下载要保存到本地的文件路径
116
     * @param fileSystem        HDFS文件系统对象
117
     * @param progressIndicator IDEA进度对象
118
     * @return 是否下载成功
119
     * @throws IOException
120
     */
121
    @Deprecated
122
    public synchronized static boolean copyFile(String strhdfspath, String strLocalPath, FileSystem fileSystem,
123
                                                ProgressIndicator progressIndicator) throws IOException
124
    {
125
        OutputStream out = null;
126
        InputStream in = null;
127
        try
128
        {
129
            progressIndicator.setIndeterminate(false);//确定进度
130
            String fileName = strhdfspath.trim().substring(strhdfspath.lastIndexOf("/") + 1);
131
            //事先获取文件的大小
132
            long fileSize = fileSystem.getFileStatus(new Path(strhdfspath)).getLen() / 1024L;
133
            int pageSize = (int) Math.ceil(fileSize / 100.00);//进度条进度设置100
134
            in = fileSystem.open(new Path(strhdfspath), 0);
135
            out = new FileOutputStream(strLocalPath + "/" + fileName);
136
            PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
137
            byte[] buf = new byte[HDFSUtil.BUFFERSIZE];
138
            long count = 0;
139
            int progressCount = 0;
140
            for (int bytesRead = in.read(buf); bytesRead >= 0; bytesRead = in.read(buf))
141
            {
142
                count++;
143
                if (count % pageSize == 0)
144
                {
145
                    progressCount++;
146
                    progressIndicator.setIndeterminate(false);
147
                    progressIndicator.setFraction(progressCount * 0.1);
148
                }
149
                out.write(buf, 0, bytesRead);
150
                if (ps != null && ps.checkError())
151
                {
152
                    throw new IOException("Unable to write to output stream.");
153
                }
154
            }
155
            if (true)
156
            {
157
                out.close();
158
                out = null;
159
                in.close();
160
                in = null;
161
            }
162
        } finally
163
        {
164
            if (true)
165
            {
166
                closeStream(out);
167
                closeStream(in);
168
            }
169
        }
170
        return true;
171
    }
172
    /**
173
     * 下载单个文件
174
     *
175
     * @param strhdfspath       HDFS中选中的单个文件路径
176
     * @param strLocalPath      下载要保存到本地的文件路径
177
     * @param fileSystem        HDFS文件系统对象
178
     * @param progressIndicator IDEA进度对象
179
     * @param project           IDEA工程
180
     */
181
    @Deprecated
182
    public synchronized static void copyFile(String strhdfspath, String strLocalPath, FileSystem fileSystem,
183
                                             ProgressIndicator progressIndicator, Project project)
184
    {
185
        boolean isSuccess = false;
186
        try
187
        {
188
            isSuccess = copyFile(strhdfspath, strLocalPath, fileSystem, progressIndicator);
189
190
        } catch (Exception ex)
191
        {
192
            isSuccess = false;
193
194
        }
195
        if (!isSuccess)
196
        {
197
            MessageUtil.showErrorDialog(project, LocaleLanguageManager.getInstance().getResourceBundle().getString(LanguageKeyWord.MESSAGETILE),
198
                    LocaleLanguageManager.getInstance().getResourceBundle().getString(LanguageKeyWord.DOWNFAILED));
199
        } else
200
        {
201
            MessageUtil.showInfoDialog(project, LocaleLanguageManager.getInstance().getResourceBundle().getString(LanguageKeyWord.MESSAGETILE),
202
                    LocaleLanguageManager.getInstance().getResourceBundle().getString(LanguageKeyWord.DOWNSUCCESS));
203
204
        }
205
    }
206
    /**
207
     * 将文件或目录拷贝到本地系统(即下载目录或文件)
208
     * @param src 远程的文件或者目录
209
     * @param dst 本地的文件或者目录
210
     * @param conf 系统配置
211
     * @param overwrite 是否覆盖
212
     * @param srcFileSystem 源系统对象
213
     * @param progressIndicator 进度对象
214
     *
215
     */
216
    public synchronized static boolean  copyToLocalFile(String src,String dst,Configuration conf,boolean overwrite,
217
                                           FileSystem srcFileSystem,ProgressIndicator progressIndicator)
218
    {
219
        boolean isSuccess=false;
220
        try
221
        {
222
            FileSystem dstFileSystem = getLocal(conf);//获取目标(本地文件系统)
223
            FileStatus fileStatus = srcFileSystem.getFileStatus(new Path(src));
224
            isSuccess= copyFile(srcFileSystem,fileStatus,dstFileSystem,new Path(dst),
225
                    overwrite,progressIndicator,LocaleLanguageManager.getInstance().getResourceBundle().getString(LanguageKeyWord.DOWNPROGRESSTEXT));
226
        }
227
        catch (Exception ex)
228
        {
229
            LoggerFactory.createLogger(HDFSUtil.class).error("Error",ex);
230
        }
231
        return isSuccess;
232
    }
233
    /**
234
     * 将本地文件或者目录拷贝到目标系统中(HDFS)
235
     * @param src 源路径(本地路径)
236
     * @param dst 目标路径(HDFS路径)
237
     * @param conf 配置
238
     * @param overwrite 是否覆盖
239
     * @param dstFileSystem 目标文件系统(HDFS)
240
     * @param progressIndicator 进度对象
241
     * @return
242
     */
243
    public synchronized static boolean copyFromLocalFile(String src,String dst,Configuration conf,boolean overwrite,
244
                                                         FileSystem dstFileSystem,ProgressIndicator progressIndicator)
245
    {
246
        boolean isSuccess=false;
247
        try
248
        {
249
            FileSystem srcFileSystem = getLocal(conf);//获取本地选中的目录或者文件
250
            FileStatus fileStatus = srcFileSystem.getFileStatus(new Path(src));
251
            isSuccess= copyFile(srcFileSystem,fileStatus,dstFileSystem,new Path(dst),
252
                    overwrite,progressIndicator,LocaleLanguageManager.getInstance().getResourceBundle().getString(LanguageKeyWord.UPPROGRESSTEXT));
253
        }
254
        catch (Exception ex)
255
        {
256
257
        }
258
        return isSuccess;
259
    }
260
    /**
261
     *
262
     * @param srcFS
263
     * @param srcStatus
264
     * @param dstFS
265
     * @param dst
266
     * @param overwrite
267
     * @param progressIndicator
268
     * @param strProgressText
269
     * @return
270
     * @throws IOException
271
     */
272
    public synchronized static boolean copyFile(FileSystem srcFS, FileStatus srcStatus,
273
                               FileSystem dstFS, Path dst,
274
                               boolean overwrite,ProgressIndicator progressIndicator,String strProgressText) throws IOException
275
    {
276
        Path src = srcStatus.getPath();
277
        dst = checkDest(src.getName(), dstFS, dst, overwrite);
278
        if (srcStatus.isDirectory())
279
        {
280
            checkDependencies(srcFS, src, dstFS, dst);
281
            if (!dstFS.mkdirs(dst))
282
            {
283
                return false;
284
            }
285
            FileStatus contents[] = srcFS.listStatus(src);
286
            int fileCount= (int)Math.ceil(contents.length/100.00);
287
            int k=0;
288
            String dir = LocaleLanguageManager.getInstance().getResourceBundle().getString(LanguageKeyWord.OBJECTTYPEDIRECTORY);
289
            for (int i = 0; i < contents.length; i++)
290
            {
291
                if(i%fileCount==0)
292
                {
293
                    k++;
294
                    progressIndicator.setIndeterminate(false);
295
                    progressIndicator.setFraction(k * 0.01);
296
                    String strMessage = String.format("%s:%s",strProgressText+dir,contents[i].getPath().getParent().toString());
297
                    progressIndicator.setText(strMessage);
298
                }
299
                copyFile(srcFS, contents[i], dstFS,
300
                        new Path(dst, contents[i].getPath().getName()), overwrite, progressIndicator,strProgressText);
301
            }
302
        } else
303
        {
304
            InputStream in = null;
305
            OutputStream out = null;
306
            try
307
            {
308
                in = srcFS.open(src);
309
                out = dstFS.create(dst, overwrite);
310
                PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
311
                byte[] buf = new byte[HDFSUtil.BUFFERSIZE];
312
                long fileSize=srcStatus.getLen();
313
                int bufCount =(int) Math.ceil(fileSize / (HDFSUtil.BUFFERSIZE*1.00));//缓冲区个数
314
                int count =0;
315
                int showProgressCount= (int)Math.ceil(bufCount/100.00);//计算多少个缓冲区后,显示进度条的一个刻度
316
                int k=0;
317
                String strFile=LocaleLanguageManager.getInstance().getResourceBundle().getString(LanguageKeyWord.OBJECTTYPEFILE);
318
                for (int bytesRead = in.read(buf); bytesRead >= 0; bytesRead = in.read(buf))
319
                {
320
                    count++;
321
                    if (count % showProgressCount == 0)
322
                    {
323
                        k++;
324
                        progressIndicator.setIndeterminate(false);
325
                        progressIndicator.setFraction(k * 0.01);
326
                        progressIndicator.setText2(strProgressText+strFile+"("+k+"%):"+src.toString());
327
                    }
328
                    out.write(buf, 0, bytesRead);
329
                    if (ps != null && ps.checkError())
330
                    {
331
                        throw new IOException("Unable to write to output stream.");
332
                    }
333
                }
334
                out.close();
335
                out = null;
336
                in.close();
337
                in = null;
338
            } catch (IOException e)
339
            {
340
                IOUtils.closeStream(out);
341
                IOUtils.closeStream(in);
342
                throw e;
343
            }
344
        }
345
        return true;
346
    }
347
348
349
    /**
350
     *
351
     * @param srcName
352
     * @param dstFS
353
     * @param dst
354
     * @param overwrite
355
     * @return
356
     * @throws IOException
357
     */
358
    private synchronized static Path checkDest(String srcName, FileSystem dstFS, Path dst,
359
                                  boolean overwrite) throws IOException
360
    {
361
        FileStatus sdst;
362
        try
363
        {
364
            sdst = dstFS.getFileStatus(dst);
365
        } catch (FileNotFoundException e)
366
        {
367
            sdst = null;
368
        }
369
        if (null != sdst)
370
        {
371
            if (sdst.isDirectory())
372
            {
373
                if (null == srcName)
374
                {
375
                    throw new IOException("Target " + dst + " is a directory");
376
                }
377
                return checkDest(null, dstFS, new Path(dst, srcName), overwrite);
378
            } else if (!overwrite)
379
            {
380
                throw new IOException("Target " + dst + " already exists");
381
            }
382
        }
383
        return dst;
384
    }
385
386
    /**
387
     *
388
     * @param srcFS
389
     * @param src
390
     * @param dstFS
391
     * @param dst
392
     * @throws IOException
393
     */
394
    private synchronized static void checkDependencies(FileSystem srcFS, Path src,
395
                                          FileSystem dstFS, Path dst)
396
            throws IOException
397
    {
398
        if (srcFS == dstFS)
399
        {
400
            String srcq = src.makeQualified(srcFS).toString() + Path.SEPARATOR;
401
            String dstq = dst.makeQualified(dstFS).toString() + Path.SEPARATOR;
402
            if (dstq.startsWith(srcq))
403
            {
404
                if (srcq.length() == dstq.length())
405
                {
406
                    throw new IOException("Cannot copy " + src + " to itself.");
407
                } else
408
                {
409
                    throw new IOException("Cannot copy " + src + " to its subdirectory " +
410
                            dst);
411
                }
412
            }
413
        }
414
    }
415
}
----------------------- 本文结束 感谢阅读 -----------------------
坚持原创技术分享,您的支持将鼓励我继续创作!