Spark2.1.0——创建SparkUI的分析
阅读建议:阅读本文前,最好先阅读《Spark2.1.0——SparkUI的实现》和《Spark2.1.0——WebUI框架体系》。 在SparkContext的初始化过程中,会创建SparkUI。有了对WebUI的总体认识,现在是时候了解SparkContext是如何构造SparkUI的了。SparkUI是WebUI框架的使用范例,了解了SparkUI的创建过程,读者对Mast..
阅读建议:阅读本文前,最好先阅读《Spark2.1.0——SparkUI的实现》和《Spark2.1.0——WebUI框架体系》。
在SparkContext的初始化过程中,会创建SparkUI。有了对WebUI的总体认识,现在是时候了解SparkContext是如何构造SparkUI的了。SparkUI是WebUI框架的使用范例,了解了SparkUI的创建过程,读者对MasterWebUI、WorkerWebUI及HistoryServer的创建过程也必然了然于心。创建SparkUI的代码如下:
_statusTracker = new SparkStatusTracker(this)
_progressBar =
if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
}
_ui.foreach(_.bind())
这段代码的执行步骤如下。
- 创建Spark状态跟踪器SparkStatusTracker。
- 创建ConsoleProgressBar。可以配置spark.ui.showConsoleProgress属性为false取消对ConsoleProgressBar的创建,此属性默认为true。
- 调用SparkUI的createLiveUI方法创建SparkUI。
- 给SparkUI绑定端口。SparkUI继承自WebUI,因此调用了代码清单4-12中WebUI的bind方法启动SparkUI底层的Jetty服务。
上述步骤中,第1、2、4步都很简单,所以着重来分析第3步。SparkUI的createLiveUI的实现如下。
def createLiveUI(
sc: SparkContext,
conf: SparkConf,
listenerBus: SparkListenerBus,
jobProgressListener: JobProgressListener,
securityManager: SecurityManager,
appName: String,
startTime: Long): SparkUI = {
create(Some(sc), conf, listenerBus, securityManager, appName,
jobProgressListener = Some(jobProgressListener), startTime = startTime)
}
可以看到SparkUI的createLiveUI方法中调用了create方法。create的实现如下。
private def create(
sc: Option[SparkContext],
conf: SparkConf,
listenerBus: SparkListenerBus,
securityManager: SecurityManager,
appName: String,
basePath: String = "",
jobProgressListener: Option[JobProgressListener] = None,
startTime: Long): SparkUI = {
val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
val listener = new JobProgressListener(conf)
listenerBus.addListener(listener)
listener
}
val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
listenerBus.addListener(operationGraphListener)
new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, _jobProgressListener, storageListener, operationGraphListener,
appName, basePath, startTime)
}
可以看到create方法里除了JobProgressListener是外部传入的之外,又增加了一些SparkListener,例如用于对JVM参数、Spark属性、Java系统属性、classpath等进行监控的EnvironmentListener;用于维护Executor的存储状态的StorageStatusListener;用于准备将Executor的信息展示在ExecutorsTab的ExecutorsListener;用于准备将Executor相关存储信息展示在BlockManagerUI的StorageListener;用于构建RDD的DAG(有向无关图)的RDDOperationGraphListener等。根据《Spark2.1.0——SparkContext初始化之Spark环境的创建》一文中的代码清单1,JobProgressListener早已被添加到listenerBus的监听器列表中,所以此处只需要将另外5个SparkListener的实现添加到listenerBus的监听器列表中。最后使用SparkUI的构造器创建SparkUI。
SparkUI的初始化
调用SparkUI的构造器创建SparkUI,实际也是对SparkUI的初始化过程。在介绍初始化之前,先来看看SparkUI中的两个成员属性。
- killEnabled:标记当前SparkUI能否提供杀死Stage或者Job的链接。
- appId:当前应用的ID。
SparkUI的构造过程中会执行initialize方法,其实现见代码清单1。
代码清单1 SparkUI的初始化
def initialize() {
val jobsTab = new JobsTab(this)
attachTab(jobsTab)
val stagesTab = new StagesTab(this)
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
attachHandler(ApiRootResource.getServletHandler(this))
// These should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler(
"/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
httpMethods = Set("GET", "POST")))
}
initialize()
根据代码清单1,SparkUI的初始化步骤如下。
- 构建页面布局并给每个WebUITab中的所有WebUIPage创建对应的ServletContextHandler。这一步使用了《Spark2.1.0——WebUI框架体系》一文的代码清单7中展示的attachTab方法。
- 调用JettyUtils的createStaticHandler方法创建对静态目录org/apache/spark/ui/static提供文件服务的ServletContextHandler,并使用attachHandler方法追加到SparkUI的服务中。
- 调用JettyUtils的createRedirectHandler方法创建几个将用户对源路径的请求重定向到目标路径的ServletContextHandler。例如,将用户对根路径"/"的请求重定向到目标路径"/jobs/"的ServletContextHandler。
SparkUI的页面布局与展示
SparkUI究竟是如何实现页面布局及展示的? 由于所有标签页都继承了SparkUITab,所以我们先来看看SparkUITab的实现:
private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
extends WebUITab(parent, prefix) {
def appName: String = parent.getAppName
}
根据上述代码,我们知道SparkUITab继承了WebUITab,并在实现中增加了一个用于获取当前应用名称的方法appName。EnvironmentTab是用于展示JVM、Spark属性、系统属性、类路径等相关信息的标签页,由于其实现简单且能说明问题,所以本节挑选EnvironmentTab作为示例解答本节一开始提出的问题。
EnvironmentTab的实现见代码清单2。
代码清单2 EnvironmentTab的实现
private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") {
val listener = parent.environmentListener
attachPage(new EnvironmentPage(this))
}
根据代码清单2,我们知道EnvironmentTab引用了SparkUI的environmentListener(类型为EnvironmentListener),并且包含EnvironmentPage这个页面。EnvironmentTab通过调用attachPage方法将EnvironmentPage与Jetty服务关联起来。根据《Spark2.1.0——WebUI框架体系》一文的代码清单5中attachPage的实现,创建的renderHandler将采用偏函数(request: HttpServletRequest) => page.render(request) 处理请求,因而会调用EnvironmentPage的render方法。EnvironmentPage的render方法将会渲染页面元素。EnvironmentPage的实现见代码清单3。
代码清单3 EnvironmentPage的实现
private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
private val listener = parent.listener
private def removePass(kv: (String, String)): (String, String) = {
if (kv._1.toLowerCase.contains("password") || kv._1.toLowerCase.contains("secret")) {
(kv._1, "******")
} else kv
}
def render(request: HttpServletRequest): Seq[Node] = {
// 调用UIUtils的listingTable方法生成JVM运行时信息、Spark属性信息、系统属性信息、类路径信息的表格
val runtimeInformationTable = UIUtils.listingTable(
propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
val sparkPropertiesTable = UIUtils.listingTable(
propertyHeader, propertyRow, listener.sparkProperties.map(removePass), fixedWidth = true)
val systemPropertiesTable = UIUtils.listingTable(
propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true)
val classpathEntriesTable = UIUtils.listingTable(
classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true)
val content =
<span>
<h4>Runtime Information</h4> {runtimeInformationTable}
<h4>Spark Properties</h4> {sparkPropertiesTable}
<h4>System Properties</h4> {systemPropertiesTable}
<h4>Classpath Entries</h4> {classpathEntriesTable}
</span>
// 调用UIUtils的headerSparkPage方法封装好css、js、header及页面布局等
UIUtils.headerSparkPage("Environment", content, parent)
}
// 定义JVM运行时信息、Spark属性信息、系统属性信息的表格头部propertyHeader和类路径信息的表格头部
// classPathHeaders
private def propertyHeader = Seq("Name", "Value")
private def classPathHeaders = Seq("Resource", "Source")
// 定义JVM运行时信息的表格中每行数据的生成方法jvmRow
private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
}
根据代码清单3,EnvironmentPage的render方法利用从父节点EnvironmentTab中得到的EnvironmentListener中的统计监控数据生成JVM运行时、Spark属性、系统属性以及类路径等状态的摘要信息。以JVM运行时为例,页面渲染的步骤如下:
- 定义JVM运行时信息、Spark属性信息、系统属性信息的表格头部propertyHeader和类路径信息的表格头部classPathHeaders。
- 定义JVM运行时信息的表格中每行数据的生成方法jvmRow。
- 调用UIUtils的listingTable方法生成JVM运行时信息、Spark属性信息、系统属性信息、类路径信息的表格。
- 调用UIUtils的headerSparkPage方法封装好css、js、header及页面布局等。
UIUtils工具类的实现细节留给感兴趣的读者自行查阅,本文不多赘述。
更多推荐


所有评论(0)