HadoopIntellijPlugin插件HDFS文件系统连接Connection设计和实现

  本节,主要介绍Hadoop-IntelliJ-Plugin 的HDFS连接处理的设计和实现。主要包括HDFS连接Connection的接口定义、HDFS连接的实现、HDFS连接的缓存处理、和连接相关配置项。整个类的设计如下:

HDFS连接接口ConnectionHandler和其实现类ConnectionHandlerImpl

ConnectionHandler接口。

  该接口主要定义了获取HDFS连接对象Configuration、HDFS文件系统对象FileSystem、设置连接的相关信息、获取连接关联的文件系统对象的集合、获取连接对应的文件系统版本、判断HDFS文件系统是否可以连接上、测试HDFS连接、获取HDFS连接的状态等。接口定义的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/**
* 定义文件系统的连接处理接口
* Created by fangyuzhong on 17-7-15.
*/
public interface ConnectionHandler extends Disposable,ConnectionProvider, Presentable
{
/**
* 获取当前的intellij的工程
*
* @return
*/
Project getProject();
/**
* 测试HDFS是否可以连接
*@param
* @return
*/
boolean createTestConnection() throws IOException;

/**
* 获取当前HDFS的连接Configuration对象
*
* @return
*/
Configuration getMainConnection();

/**
* 获取当前连接的HDFS文件系统对象
*
* @return
*/
FileSystem getMainFileSystem();

/**
* 获取连接状态
*
* @return
*/
ConnectionStatus getConnectionStatus();

/**
* 获取连接字符串
*
* @return
*/
ConnectionInfo getConnectionInfo();

/**
* 设置连接字符串
*
* @param paramConnectionInfo
*/
void setConnectionInfo(ConnectionInfo paramConnectionInfo);
/**
* 连接是否激活
*
* @return
*/
boolean isActive();

/**
* 获取连接对应的文件系统
*
* @return
*/
FileSystemType getFileSystemType();

/**
* 获取文件系统版本
*
* @return
*/
double getFileSystemVersion();
/**
* 连接是否连接上
*
* @return
*/
boolean isConnected();

/**
* @return
*/
int getIdleMinutes();

/**
* 获取文件系统的相关信息
*
* @return
*/
FileSystemInfo getFileSystemInfo();
/**
* 释放连接
*/
void disconnect();

/**
* 获取连接的ID
*
* @return
*/
String getId();

/**
* 获取连接的用户名称
*
* @return
*/
String getUserName();

/**
* 获取连接显示名称
*
* @return
*/
String getPresentableText();

/**
* 获取连接引用对象
* @return
*/
ConnectionHandlerRef getRef();

/**
* 是否是虚拟连接
* @return
*/
boolean isVirtual();

/**
* 获取连接加载监控对象
* @return
*/
ConnectionLoadMonitor getLoadMonitor();
/**
* 获取连接集合
* @return
*/
ConnectionBundle getConnectionBundle();
/**
* 是否可连接
* @return
*/
boolean canConnect();
/**
*是否是有效连接
* @param paramBoolean
* @return
*/
boolean isValid(boolean paramBoolean);
/**
* 是否是有效连接
* @return
*/
boolean isValid();
/**
*连接关联对象的集合
* @return
*/
FileSystemObjectBundle getObjectBundle();
/**
*
* @return
*/
Filter<FileSystemBrowserTreeNode> getObjectTypeFilter();
/**
* 连接配置
* @return
*/
ConnectionSettings getSettings();
}

HDFS连接的实现ConnectionHandlerImpl

  在实现类中,我们重点看下 如何通过外部配置实现连接到HDFS的。在实现类初始化方法中,首先创建一个HDFS的Configuration对象,由于IDEA插件的启动是由IDEA框架去自动维护,HDFS读取配置的时候是默认当前线程类的加载器,这里其实就是IDEA的主程序的线程,也就是说,如果使用IDEA的主线程类的加载器加载HDFS配置,那么在运行的时候,HDFS实例化的时候会找不到HDFS 的相关的jar 包。因此这里需要绕一下,IDEA框架启动装配插件后,在插件初始化HDFS配置代码处,重新设置HDFS配置的类加载器,然后再设置Configuration的相关配置对象,这样才能从插件路径下找到HDFS的jar包。设置HDFS的Configuration后,调用HDFS的FileSystem.get(Configuration) ,获取到HDFS的文件系统对象FileSystem,然后测试HDFS是否能够连接上。测试HDFS连接,这里实现比较简单,使用FileSystem.exties(path) 判断根目录”/“,是否存在即可。如果“/”目录不存在或者该方法如果抛出一个IOException 异常,表明HDFS因某种原因,连接不上。代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* 初始化连接实现
* @param connectionBundle
* @param connectionSettings
*/
public ConnectionHandlerImpl(ConnectionBundle connectionBundle, ConnectionSettings connectionSettings)
{
this.connectionBundle = connectionBundle;
this.connectionSettings = connectionSettings;
configurationHdfs = new Configuration(false);
ClassLoader pClassLoader=null;
try
{
pClassLoader= Class.forName(com.fangyuzhong.intelliJ.hadoop.fsconnection.ConnectionManager.class.getName()).getClassLoader();
}
catch (Exception ex)
{
LOGGER.error("获取当前类的加载器错误",ex);
}
if(pClassLoader!=null)
{
String hdfsPath = connectionSettings.getFileSystemSettings().getHDFSUrl();
String yarnResourceAMPath = connectionSettings.getFileSystemSettings().getMapReducelUrl();
if(!StringUtil.isEmptyOrSpaces(hdfsPath))
{
//设置HDFS的配置类加载器
configurationHdfs.setClassLoader(pClassLoader);
//设置HDFS相关属性
configurationHdfs.set(Constants.FS_HDFS_IMPL_KEY, Constants.FS_HDFS_IMPL_VALUE);
configurationHdfs.set(Constants.FS_FILE_IMPL_KEY,Constants.FS_FILE_IMPL_VALUE);
configurationHdfs.set(Constants.FS_DEFAULTFS_KEY, hdfsPath);
configurationHdfs.set(Constants.YARN_RESOURCEMANAGER_SCHEDULER_ADDRESS,yarnResourceAMPath);
//通过配置初始化HDFS的FileSystem
InitiFileSystem(configurationHdfs);
}
}
try
{
canConnection = createTestConnection();
}
catch (IOException ex)
{
canConnection=false;
LOGGER.error("创建连接测试异常",ex);
}
connectionInfo = new ConnectionInfo(connectionSettings);
connectionStatus = new ConnectionStatus();
ref = new ConnectionHandlerRef(this);
connectionStatus.setConnected(canConnection);
}

private void InitiFileSystem(Configuration configuration)
{
try
{
fileSystem = FileSystem.get(configuration);
}
catch (Exception ex)
{
LOGGER.error("通过configuration获取HDFS系统对象异常",ex);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean createTestConnection() throws IOException
{
boolean canConnection = false;
if (fileSystem == null)
{
canConnection = false;
}
else
{
canConnection = fileSystem.exists(new Path("/"));
}
return canConnection;
}

  该类中还有个重要的方法:FileSystemObjectBundle getObjectBundle(),标示该连接到HDFS后,获取根目录下所有的对象,包括目录和文件对象

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
*获取连接对应的HDFS根目录下所有的对象集合
* @return
*/
@Override
public FileSystemObjectBundle getObjectBundle()
{
if (objectBundle == null)
{
objectBundle = new FileSystemObjectBundleImpl(this, connectionBundle);
}
return objectBundle;
}

关于类FIleSystemObjectBundleImpl 将在后面介绍文件系统对象的时候介绍。

ConnectionBundle类,表示连接集合。

  用户可以进行多个HDFS连接设置,这样就会产生多个连接Connection,那使用ConnectionBundle类来定义和处理这些连接。ConnectionBundle类继承FIleSystemBrowserTreeNodeBase,表示他也是个树节点,当用户选择以单个树多个根节点来展示的时候,ConnectionBundle就以节点的方式来展示。这里还要注意,实例化 ConnectionBundle的时候,默认加载一个虚拟的连接 virtualConnectons对象。相关代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 初始化
* @param project
*/
public ConnectionBundle(Project project)
{
this.projectRef = new ProjectRef(project);
this.virtualConnections.add(new VirtualConnectionHandler("virtual-hdfs-fsconnection",
"Virtual - hdfs 3.0", FileSystemType.HDFS,3.0, project));
}

/**
* 添加连接到集合
* @param connectionHandler
*/
public void addConnection(ConnectionHandler connectionHandler)
{
this.connectionHandlers.add(connectionHandler);
Disposer.register(this, connectionHandler);
}

HDFS连接Connection的管理类 ConnectionManager。

  ConnectionManager属于Project级别插件,实现了抽象类AbstractProjectComponent,维护整个HDFS的连接管理,包括:连接配置修改后,重新构建UI对象更新Connection连接、获取连接的集合ConnectionBundle、测试HDFS连接、根据连接的ID获取ConnectionHandle 等等。相关代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 获取ConnectionManager对象
* @param project
* @return
*/
public static ConnectionManager getInstance(@NotNull Project project)
{
return getComponent(project);
}
/**
*获取ConnectionManager对象
* @param project
* @return
*/
private static ConnectionManager getComponent(@NotNull Project project)
{

return FailsafeUtil.getComponent(project, ConnectionManager.class);
}
/**
* 初始化
* @param project
*/
private ConnectionManager(Project project)
{
super(project);
this.connectionBundle = ConnectionBundleSettings.getInstance(getProject()).getConnectionBundle()
Disposer.register(this, this.connectionBundle);
}
/**
* 初始化加载
*/
public void initComponent()
{
super.initComponent();
Project project = getProject();
//注册连接配置修改监听
EventUtil.subscribe(project, this, ConnectionSettingsListener.TOPIC, this.connectionSettingsListener);
this.idleConnectionCleaner = new Timer("HDFS - Idle Connection Cleaner [" + project.getName() + "]");
this.idleConnectionCleaner.schedule(new CloseIdleConnectionTask(), TimeUtil.ONE_MINUTE, TimeUtil.ONE_MINUTE);
}
/*
连接配置修改后处理
*/
private ConnectionSettingsListener connectionSettingsListener = new ConnectionSettingsAdapter()
{
public void connectionChanged(String connectionId)
{
final ConnectionHandler connectionHandler = getConnectionHandler(connectionId);
connectionHandler.getObjectBundle().refreshTreeChildren();
}
};

HDFS连接connection缓存类 ConnectionCache

  ConnectionCache,缓存类,使用HashMap 以连接的ID为Key,对应的连接ConfigurationHandler为Value进行存储。该类是Application级别的插件,IDEA启动时进行初始化。该类中提供按照连接的ID查找连接ConnectionHandler。初始化时注册了Project生命周期的相关事件处理。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* 定义HDFS连接的缓存类,Application级别插件,IDEA系统启动进行初始化
* Created by fangyuzhong on 17-7-15.
*/
public class ConnectionCache
implements ApplicationComponent
{
/*
使用HashMap 来存储Connection的缓存
*/
private static Map<String, ConnectionHandler> CACHE = new THashMap();

/**
* 根据connection的ID查找ConnectionHandler
* @param connectionId
* @return
*/
@Nullable
public static ConnectionHandler findConnectionHandler(String connectionId)
{
ConnectionHandler connectionHandler = CACHE.get(connectionId);
ProjectManager projectManager = ProjectManager.getInstance();
if ((connectionHandler == null) && (projectManager != null))
{
synchronized (ConnectionCache.class)
{
connectionHandler = CACHE.get(connectionId);
if (connectionHandler == null)
{
for (Project project : projectManager.getOpenProjects())
{
ConnectionManager connectionManager = ConnectionManager.getInstance(project);
connectionHandler = connectionManager.getConnectionHandler(connectionId);
if ((connectionHandler != null) && (!connectionHandler.isDisposed()))
{
CACHE.put(connectionId, connectionHandler);
return connectionHandler;
}
}
}
}
}
return (connectionHandler == null) || (connectionHandler.isDisposed()) ? null : connectionHandler;
}
/**
* 初始化插件组件
*/
public void initComponent()
{
//注册项目生命周期通知
EventUtil.subscribe(null, ProjectLifecycleListener.TOPIC, this.projectLifecycleListener);
}
public void disposeComponent()
{
}
@NotNull
public String getComponentName()
{
return "HadoopNavigator.ConnectionCache";

}
/**
* 定义工程项目的生命周期处理事件类
*/
private ProjectLifecycleListener projectLifecycleListener = new ProjectLifecycleListener.Adapter()
{
/**
* 工程Project组件初始化后处理
* @param project
*/
public void projectComponentsInitialized(@NotNull Project project)
{
ConnectionManager connectionManager = ConnectionManager.getInstance(project);
if (connectionManager == null) return;
List<ConnectionHandler> connectionHandlers = connectionManager.getConnectionHandlers();
for (ConnectionHandler connectionHandler : connectionHandlers)
{
ConnectionCache.CACHE.put(connectionHandler.getId(), connectionHandler);
}
}

/**
* 关闭工程Project后处理
* @param project
*/
public void afterProjectClosed(@NotNull Project project)
{
Iterator<String> connectionIds = ConnectionCache.CACHE.keySet().iterator();
while (connectionIds.hasNext())
{
String connectionId = (String) connectionIds.next();
ConnectionHandler connectionHandler = (ConnectionHandler) ConnectionCache.CACHE.get(connectionId);
if ((connectionHandler.isDisposed()) || (connectionHandler.getProject() == project))
{
connectionIds.remove();
}
}
}
};
}