0%

Hadoop-Intellij-Plugin 插件设计和源码分析----HDFS文件系统连接Connection设计和实现

  本节,主要介绍Hadoop-IntelliJ-Plugin 的HDFS连接处理的设计和实现。主要包括HDFS连接Connection的接口定义、HDFS连接的实现、HDFS连接的缓存处理、和连接相关配置项。整个类的设计如下:

HDFS连接接口ConnectionHandler和其实现类ConnectionHandlerImpl

ConnectionHandler接口。

  该接口主要定义了获取HDFS连接对象Configuration、HDFS文件系统对象FileSystem、设置连接的相关信息、获取连接关联的文件系统对象的集合、获取连接对应的文件系统版本、判断HDFS文件系统是否可以连接上、测试HDFS连接、获取HDFS连接的状态等。接口定义的代码:

1
/**
2
 * 定义文件系统的连接处理接口
3
 * Created by fangyuzhong on 17-7-15.
4
 */
5
public interface ConnectionHandler extends Disposable,ConnectionProvider, Presentable
6
{
7
    /**
8
     * 获取当前的intellij的工程
9
     *
10
     * @return
11
     */
12
    Project getProject();
13
    /**
14
     * 测试HDFS是否可以连接
15
     *@param
16
     * @return
17
     */
18
    boolean createTestConnection() throws IOException;
19
20
    /**
21
     * 获取当前HDFS的连接Configuration对象
22
     *
23
     * @return
24
     */
25
    Configuration getMainConnection();
26
27
    /**
28
     * 获取当前连接的HDFS文件系统对象
29
     *
30
     * @return
31
     */
32
    FileSystem getMainFileSystem();
33
34
    /**
35
     * 获取连接状态
36
     *
37
     * @return
38
     */
39
    ConnectionStatus getConnectionStatus();
40
41
    /**
42
     * 获取连接字符串
43
     *
44
     * @return
45
     */
46
    ConnectionInfo getConnectionInfo();
47
48
    /**
49
     * 设置连接字符串
50
     *
51
     * @param paramConnectionInfo
52
     */
53
    void setConnectionInfo(ConnectionInfo paramConnectionInfo);
54
    /**
55
     * 连接是否激活
56
     *
57
     * @return
58
     */
59
    boolean isActive();
60
61
    /**
62
     * 获取连接对应的文件系统
63
     *
64
     * @return
65
     */
66
    FileSystemType getFileSystemType();
67
68
    /**
69
     * 获取文件系统版本
70
     *
71
     * @return
72
     */
73
    double getFileSystemVersion();
74
    /**
75
     * 连接是否连接上
76
     *
77
     * @return
78
     */
79
    boolean isConnected();
80
81
    /**
82
     * @return
83
     */
84
    int getIdleMinutes();
85
86
    /**
87
     * 获取文件系统的相关信息
88
     *
89
     * @return
90
     */
91
    FileSystemInfo getFileSystemInfo();
92
    /**
93
     * 释放连接
94
     */
95
    void disconnect();
96
97
    /**
98
     * 获取连接的ID
99
     *
100
     * @return
101
     */
102
    String getId();
103
104
    /**
105
     * 获取连接的用户名称
106
     *
107
     * @return
108
     */
109
    String getUserName();
110
111
    /**
112
     * 获取连接显示名称
113
     *
114
     * @return
115
     */
116
    String getPresentableText();
117
118
    /**
119
     * 获取连接引用对象
120
     * @return
121
     */
122
    ConnectionHandlerRef getRef();
123
124
    /**
125
     * 是否是虚拟连接
126
     * @return
127
     */
128
    boolean isVirtual();
129
130
    /**
131
     * 获取连接加载监控对象
132
     * @return
133
     */
134
    ConnectionLoadMonitor getLoadMonitor();
135
    /**
136
     * 获取连接集合
137
     * @return
138
     */
139
    ConnectionBundle getConnectionBundle();
140
    /**
141
     * 是否可连接
142
     * @return
143
     */
144
    boolean canConnect();
145
    /**
146
     *是否是有效连接
147
     * @param paramBoolean
148
     * @return
149
     */
150
    boolean isValid(boolean paramBoolean);
151
    /**
152
     * 是否是有效连接
153
     * @return
154
     */
155
    boolean isValid();
156
    /**
157
     *连接关联对象的集合
158
     * @return
159
     */
160
     FileSystemObjectBundle getObjectBundle();
161
    /**
162
     *
163
     * @return
164
     */
165
     Filter<FileSystemBrowserTreeNode> getObjectTypeFilter();
166
    /**
167
     * 连接配置
168
     * @return
169
     */
170
    ConnectionSettings getSettings();
171
}

HDFS连接的实现ConnectionHandlerImpl

  在实现类中,我们重点看下 如何通过外部配置实现连接到HDFS的。在实现类初始化方法中,首先创建一个HDFS的Configuration对象,由于IDEA插件的启动是由IDEA框架去自动维护,HDFS读取配置的时候是默认当前线程类的加载器,这里其实就是IDEA的主程序的线程,也就是说,如果使用IDEA的主线程类的加载器加载HDFS配置,那么在运行的时候,HDFS实例化的时候会找不到HDFS 的相关的jar 包。因此这里需要绕一下,IDEA框架启动装配插件后,在插件初始化HDFS配置代码处,重新设置HDFS配置的类加载器,然后再设置Configuration的相关配置对象,这样才能从插件路径下找到HDFS的jar包。设置HDFS的Configuration后,调用HDFS的FileSystem.get(Configuration) ,获取到HDFS的文件系统对象FileSystem,然后测试HDFS是否能够连接上。测试HDFS连接,这里实现比较简单,使用FileSystem.exties(path) 判断根目录”/“,是否存在即可。如果“/”目录不存在或者该方法如果抛出一个IOException 异常,表明HDFS因某种原因,连接不上。代码实现如下:

1
/**
2
 * 初始化连接实现
3
 * @param connectionBundle
4
 * @param connectionSettings
5
 */
6
public ConnectionHandlerImpl(ConnectionBundle connectionBundle, ConnectionSettings connectionSettings)
7
{
8
    this.connectionBundle = connectionBundle;
9
    this.connectionSettings = connectionSettings;
10
    configurationHdfs = new Configuration(false);
11
    ClassLoader pClassLoader=null;
12
    try
13
    {
14
        pClassLoader= Class.forName(com.fangyuzhong.intelliJ.hadoop.fsconnection.ConnectionManager.class.getName()).getClassLoader();
15
    }
16
    catch (Exception ex)
17
    {
18
        LOGGER.error("获取当前类的加载器错误",ex);
19
    }
20
    if(pClassLoader!=null)
21
    {
22
        String hdfsPath = connectionSettings.getFileSystemSettings().getHDFSUrl();
23
        String yarnResourceAMPath = connectionSettings.getFileSystemSettings().getMapReducelUrl();
24
        if(!StringUtil.isEmptyOrSpaces(hdfsPath))
25
        {
26
            //设置HDFS的配置类加载器
27
            configurationHdfs.setClassLoader(pClassLoader);
28
            //设置HDFS相关属性
29
            configurationHdfs.set(Constants.FS_HDFS_IMPL_KEY, Constants.FS_HDFS_IMPL_VALUE);
30
            configurationHdfs.set(Constants.FS_FILE_IMPL_KEY,Constants.FS_FILE_IMPL_VALUE);
31
            configurationHdfs.set(Constants.FS_DEFAULTFS_KEY, hdfsPath);
32
            configurationHdfs.set(Constants.YARN_RESOURCEMANAGER_SCHEDULER_ADDRESS,yarnResourceAMPath);
33
            //通过配置初始化HDFS的FileSystem
34
            InitiFileSystem(configurationHdfs);
35
        }
36
    }
37
    try
38
    {
39
        canConnection = createTestConnection();
40
    }
41
    catch (IOException ex)
42
    {
43
        canConnection=false;
44
        LOGGER.error("创建连接测试异常",ex);
45
    }
46
    connectionInfo = new ConnectionInfo(connectionSettings);
47
    connectionStatus = new ConnectionStatus();
48
    ref = new ConnectionHandlerRef(this);
49
    connectionStatus.setConnected(canConnection);
50
}
51
52
private  void  InitiFileSystem(Configuration configuration)
53
{
54
    try
55
    {
56
        fileSystem = FileSystem.get(configuration);
57
    }
58
    catch (Exception ex)
59
    {
60
        LOGGER.error("通过configuration获取HDFS系统对象异常",ex);
61
    }
62
}
1
public boolean createTestConnection() throws IOException
2
{
3
    boolean canConnection = false;
4
    if (fileSystem == null)
5
    {
6
        canConnection = false;
7
    }
8
    else
9
    {
10
        canConnection = fileSystem.exists(new Path("/"));
11
    }
12
    return canConnection;
13
}

  该类中还有个重要的方法:FileSystemObjectBundle getObjectBundle(),标示该连接到HDFS后,获取根目录下所有的对象,包括目录和文件对象

1
/**
2
 *获取连接对应的HDFS根目录下所有的对象集合 
3
 * @return
4
 */
5
@Override
6
public FileSystemObjectBundle getObjectBundle()
7
{
8
    if (objectBundle == null)
9
    {
10
        objectBundle = new FileSystemObjectBundleImpl(this, connectionBundle);
11
    }
12
    return objectBundle;
13
}

关于类FIleSystemObjectBundleImpl 将在后面介绍文件系统对象的时候介绍。

ConnectionBundle类,表示连接集合。

  用户可以进行多个HDFS连接设置,这样就会产生多个连接Connection,那使用ConnectionBundle类来定义和处理这些连接。ConnectionBundle类继承FIleSystemBrowserTreeNodeBase,表示他也是个树节点,当用户选择以单个树多个根节点来展示的时候,ConnectionBundle就以节点的方式来展示。这里还要注意,实例化 ConnectionBundle的时候,默认加载一个虚拟的连接 virtualConnectons对象。相关代码片段如下:

1
/**
2
 * 初始化
3
 * @param project
4
 */
5
public ConnectionBundle(Project project)
6
{
7
    this.projectRef = new ProjectRef(project);
8
    this.virtualConnections.add(new VirtualConnectionHandler("virtual-hdfs-fsconnection",
9
          "Virtual - hdfs 3.0", FileSystemType.HDFS,3.0, project));
10
}
11
12
/**
13
 * 添加连接到集合
14
 * @param connectionHandler
15
 */
16
public void addConnection(ConnectionHandler connectionHandler)
17
{
18
    this.connectionHandlers.add(connectionHandler);
19
    Disposer.register(this, connectionHandler);
20
}

HDFS连接Connection的管理类 ConnectionManager。

  ConnectionManager属于Project级别插件,实现了抽象类AbstractProjectComponent,维护整个HDFS的连接管理,包括:连接配置修改后,重新构建UI对象更新Connection连接、获取连接的集合ConnectionBundle、测试HDFS连接、根据连接的ID获取ConnectionHandle 等等。相关代码片段如下:

1
/**
2
 * 获取ConnectionManager对象
3
 * @param project
4
 * @return
5
 */
6
public static ConnectionManager getInstance(@NotNull Project project)
7
{
8
    return getComponent(project);
9
}
10
/**
11
 *获取ConnectionManager对象
12
 * @param project
13
 * @return
14
 */
15
private static ConnectionManager getComponent(@NotNull Project project)
16
{
17
18
    return FailsafeUtil.getComponent(project, ConnectionManager.class);
19
}
20
/**
21
 * 初始化
22
 * @param project
23
 */
24
private ConnectionManager(Project project)
25
{
26
    super(project);
27
    this.connectionBundle = ConnectionBundleSettings.getInstance(getProject()).getConnectionBundle()
28
    Disposer.register(this, this.connectionBundle);
29
}
30
/**
31
 * 初始化加载
32
 */
33
public void initComponent()
34
{
35
    super.initComponent();
36
    Project project = getProject();
37
    //注册连接配置修改监听
38
    EventUtil.subscribe(project, this, ConnectionSettingsListener.TOPIC, this.connectionSettingsListener);
39
    this.idleConnectionCleaner = new Timer("HDFS - Idle Connection Cleaner [" + project.getName() + "]");
40
    this.idleConnectionCleaner.schedule(new CloseIdleConnectionTask(), TimeUtil.ONE_MINUTE, TimeUtil.ONE_MINUTE);
41
}
42
/*
43
连接配置修改后处理
44
 */
45
private ConnectionSettingsListener connectionSettingsListener = new ConnectionSettingsAdapter()
46
{
47
    public void connectionChanged(String connectionId)
48
    {
49
        final ConnectionHandler connectionHandler = getConnectionHandler(connectionId);
50
        connectionHandler.getObjectBundle().refreshTreeChildren();
51
    }
52
};

HDFS连接connection缓存类 ConnectionCache

  ConnectionCache,缓存类,使用HashMap 以连接的ID为Key,对应的连接ConfigurationHandler为Value进行存储。该类是Application级别的插件,IDEA启动时进行初始化。该类中提供按照连接的ID查找连接ConnectionHandler。初始化时注册了Project生命周期的相关事件处理。代码如下:

1
/**
2
 * 定义HDFS连接的缓存类,Application级别插件,IDEA系统启动进行初始化
3
 * Created by fangyuzhong on 17-7-15.
4
 */
5
public class ConnectionCache
6
        implements ApplicationComponent
7
{
8
    /*
9
     使用HashMap 来存储Connection的缓存
10
     */
11
    private static Map<String, ConnectionHandler> CACHE = new THashMap();
12
13
    /**
14
     * 根据connection的ID查找ConnectionHandler
15
     * @param connectionId
16
     * @return
17
     */
18
    @Nullable
19
    public static ConnectionHandler findConnectionHandler(String connectionId)
20
    {
21
        ConnectionHandler connectionHandler =  CACHE.get(connectionId);
22
        ProjectManager projectManager = ProjectManager.getInstance();
23
        if ((connectionHandler == null) && (projectManager != null))
24
        {
25
            synchronized (ConnectionCache.class)
26
            {
27
                connectionHandler = CACHE.get(connectionId);
28
                if (connectionHandler == null)
29
                {
30
                    for (Project project : projectManager.getOpenProjects())
31
                    {
32
                        ConnectionManager connectionManager = ConnectionManager.getInstance(project);
33
                        connectionHandler = connectionManager.getConnectionHandler(connectionId);
34
                        if ((connectionHandler != null) && (!connectionHandler.isDisposed()))
35
                        {
36
                            CACHE.put(connectionId, connectionHandler);
37
                            return connectionHandler;
38
                        }
39
                    }
40
                }
41
            }
42
        }
43
        return (connectionHandler == null) || (connectionHandler.isDisposed()) ? null : connectionHandler;
44
    }
45
    /**
46
     * 初始化插件组件
47
     */
48
    public void initComponent()
49
    {
50
        //注册项目生命周期通知
51
        EventUtil.subscribe(null, ProjectLifecycleListener.TOPIC, this.projectLifecycleListener);
52
    }
53
    public void disposeComponent()
54
    {
55
    }
56
    @NotNull
57
    public String getComponentName()
58
    {
59
        return  "HadoopNavigator.ConnectionCache";
60
61
    }
62
    /**
63
     * 定义工程项目的生命周期处理事件类
64
     */
65
    private ProjectLifecycleListener projectLifecycleListener = new ProjectLifecycleListener.Adapter()
66
    {
67
        /**
68
         * 工程Project组件初始化后处理
69
         * @param project
70
         */
71
        public void projectComponentsInitialized(@NotNull Project project)
72
        {
73
            ConnectionManager connectionManager = ConnectionManager.getInstance(project);
74
            if (connectionManager == null) return;
75
            List<ConnectionHandler> connectionHandlers = connectionManager.getConnectionHandlers();
76
            for (ConnectionHandler connectionHandler : connectionHandlers)
77
            {
78
                ConnectionCache.CACHE.put(connectionHandler.getId(), connectionHandler);
79
            }
80
        }
81
82
        /**
83
         * 关闭工程Project后处理
84
         * @param project
85
         */
86
        public void afterProjectClosed(@NotNull Project project)
87
        {
88
            Iterator<String> connectionIds = ConnectionCache.CACHE.keySet().iterator();
89
            while (connectionIds.hasNext())
90
            {
91
                String connectionId = (String) connectionIds.next();
92
                ConnectionHandler connectionHandler = (ConnectionHandler) ConnectionCache.CACHE.get(connectionId);
93
                if ((connectionHandler.isDisposed()) || (connectionHandler.getProject() == project))
94
                {
95
                    connectionIds.remove();
96
                }
97
            }
98
        }
99
    };
100
}
----------------------- 本文结束 感谢阅读 -----------------------
坚持原创技术分享,您的支持将鼓励我继续创作!