Skip to content

Commit

Permalink
[SPARK-1386] Web UI for Spark Streaming
Browse files Browse the repository at this point in the history
When debugging Spark Streaming applications it is necessary to monitor certain metrics that are not shown in the Spark application UI. For example, what is average processing time of batches? What is the scheduling delay? Is the system able to process as fast as it is receiving data? How many records I am receiving through my receivers?

While the StreamingListener interface introduced in the 0.9 provided some of this information, it could only be accessed programmatically. A UI that shows information specific to the streaming applications is necessary for easier debugging. This PR introduces such a UI. It shows various statistics related to the streaming application. Here is a screenshot of the UI running on my local machine.

http://i.imgur.com/1ooDGhm.png

This UI is integrated into the Spark UI running at 4040.

Author: Tathagata Das <[email protected]>
Author: Andrew Or <[email protected]>

Closes #290 from tdas/streaming-web-ui and squashes the following commits:

fc73ca5 [Tathagata Das] Merge pull request amplab#9 from andrewor14/ui-refactor
642dd88 [Andrew Or] Merge SparkUISuite.scala into UISuite.scala
eb30517 [Andrew Or] Merge github.com:apache/spark into ui-refactor
f4f4cbe [Tathagata Das] More minor fixes.
34bb364 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui
252c566 [Tathagata Das] Merge pull request amplab#8 from andrewor14/ui-refactor
e038b4b [Tathagata Das] Addressed Patrick's comments.
125a054 [Andrew Or] Disable serving static resources with gzip
90feb8d [Andrew Or] Address Patrick's comments
89dae36 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui
72fe256 [Tathagata Das] Merge pull request amplab#6 from andrewor14/ui-refactor
2fc09c8 [Tathagata Das] Added binary check exclusions
aa396d4 [Andrew Or] Rename tabs and pages (No more IndexPage.scala)
f8e1053 [Tathagata Das] Added Spark and Streaming UI unit tests.
caa5e05 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui
585cd65 [Tathagata Das] Merge pull request amplab#5 from andrewor14/ui-refactor
914b8ff [Tathagata Das] Moved utils functions to UIUtils.
548c98c [Andrew Or] Wide refactoring of WebUI, UITab, and UIPage (see commit message)
6de06b0 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui
ee6543f [Tathagata Das] Minor changes based on Andrew's comments.
fa760fe [Tathagata Das] Fixed long line.
1c0bcef [Tathagata Das] Refactored streaming UI into two files.
1af239b [Tathagata Das] Changed streaming UI to attach itself as a tab with the Spark UI.
827e81a [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui
168fe86 [Tathagata Das] Merge pull request amplab#2 from andrewor14/ui-refactor
3e986f8 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui
c78c92d [Andrew Or] Remove outdated comment
8f7323b [Andrew Or] End of file new lines, indentation, and imports (minor)
0d61ee8 [Andrew Or] Merge branch 'streaming-web-ui' of github.com:tdas/spark into ui-refactor
9a48fa1 [Andrew Or] Allow adding tabs to SparkUI dynamically + add example
61358e3 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-web-ui
53be2c5 [Tathagata Das] Minor style updates.
ed25dfc [Andrew Or] Generalize SparkUI header to display tabs dynamically
a37ad4f [Andrew Or] Comments, imports and formatting (minor)
cd000b0 [Andrew Or] Merge github.com:apache/spark into ui-refactor
7d57444 [Andrew Or] Refactoring the UI interface to add flexibility
aef4dd5 [Tathagata Das] Added Apache licenses.
db27bad [Tathagata Das] Added last batch processing time to StreamingUI.
4d86e98 [Tathagata Das] Added basic stats to the StreamingUI and refactored the UI to a Page to make it easier to transition to using SparkUI later.
93f1c69 [Tathagata Das] Added network receiver information to the Streaming UI.
56cc7fb [Tathagata Das] First cut implementation of Streaming UI.
  • Loading branch information
tdas authored and pwendell committed Apr 12, 2014
1 parent 165e06a commit 6aa08c3
Show file tree
Hide file tree
Showing 54 changed files with 1,426 additions and 846 deletions.
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ class SparkContext(config: SparkConf) extends Logging {
// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.bind()
ui.start()

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
Expand Down
50 changes: 0 additions & 50 deletions core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.ui.{UIUtils, WebUI}
import org.apache.spark.ui.{WebUIPage, UIUtils}

private[spark] class IndexPage(parent: HistoryServer) {
private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

def render(request: HttpServletRequest): Seq[Node] = {
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
Expand Down Expand Up @@ -62,13 +62,13 @@ private[spark] class IndexPage(parent: HistoryServer) {
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val appName = if (info.started) info.name else info.logDirPath.getName
val uiAddress = parent.getAddress + info.ui.basePath
val startTime = if (info.started) WebUI.formatDate(info.startTime) else "Not started"
val endTime = if (info.completed) WebUI.formatDate(info.endTime) else "Not completed"
val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
val duration = if (difference > 0) WebUI.formatDuration(difference) else "---"
val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
val sparkUser = if (info.started) info.sparkUser else "Unknown user"
val logDirectory = info.logDirPath.getName
val lastUpdated = WebUI.formatDate(info.lastUpdated)
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{appName}</a></td>
<td>{startTime}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@

package org.apache.spark.deploy.history

import javax.servlet.http.HttpServletRequest

import scala.collection.mutable

import org.apache.hadoop.fs.{FileStatus, Path}
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkUIContainer
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.{WebUI, SparkUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils

Expand All @@ -46,17 +42,15 @@ import org.apache.spark.util.Utils
*/
class HistoryServer(
val baseLogDir: String,
securityManager: SecurityManager,
conf: SparkConf)
extends SparkUIContainer("History Server") with Logging {
extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {

import HistoryServer._

private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
private val localHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
private val port = WEB_UI_PORT
private val securityManager = new SecurityManager(conf)
private val indexPage = new IndexPage(this)

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTime = -1L
Expand Down Expand Up @@ -90,37 +84,23 @@ class HistoryServer(
}
}

private val handlers = Seq[ServletContextHandler](
createStaticHandler(STATIC_RESOURCE_DIR, "/static"),
createServletHandler("/",
(request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager)
)

// A mapping of application ID to its history information, which includes the rendered UI
val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()

initialize()

/**
* Start the history server.
* Initialize the history server.
*
* This starts a background thread that periodically synchronizes information displayed on
* this UI with the event logs in the provided base directory.
*/
def start() {
def initialize() {
attachPage(new HistoryPage(this))
attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
logCheckingThread.start()
}

/** Bind to the HTTP server behind this web interface. */
override def bind() {
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
} catch {
case e: Exception =>
logError("Failed to bind HistoryServer", e)
System.exit(1)
}
}

/**
* Check for any updates to event logs in the base directory. This is only effective once
* the server has been bound.
Expand Down Expand Up @@ -151,7 +131,7 @@ class HistoryServer(
// Remove any applications that should no longer be retained
appIdToInfo.foreach { case (appId, info) =>
if (!retainedAppIds.contains(appId)) {
detachUI(info.ui)
detachSparkUI(info.ui)
appIdToInfo.remove(appId)
}
}
Expand Down Expand Up @@ -186,15 +166,14 @@ class HistoryServer(
val path = logDir.getPath
val appId = path.getName
val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec)
val ui = new SparkUI(replayBus, appId, "/history/" + appId)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)
val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId)

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
replayBus.replay()
if (appListener.applicationStarted) {
attachUI(ui)
attachSparkUI(ui)
val appName = appListener.appName
val sparkUser = appListener.sparkUser
val startTime = appListener.startTime
Expand All @@ -213,6 +192,18 @@ class HistoryServer(
fileSystem.close()
}

/** Attach a reconstructed UI to this server. Only valid after bind(). */
private def attachSparkUI(ui: SparkUI) {
assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs")
ui.getHandlers.foreach(attachHandler)
}

/** Detach a reconstructed UI from this server. Only valid after bind(). */
private def detachSparkUI(ui: SparkUI) {
assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs")
ui.getHandlers.foreach(detachHandler)
}

/** Return the address of this server. */
def getAddress: String = "http://" + publicHost + ":" + boundPort

Expand Down Expand Up @@ -262,9 +253,9 @@ object HistoryServer {

def main(argStrings: Array[String]) {
val args = new HistoryServerArguments(argStrings)
val server = new HistoryServer(args.logDir, conf)
val securityManager = new SecurityManager(conf)
val server = new HistoryServer(args.logDir, securityManager, conf)
server.bind()
server.start()

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ private[spark] class Master(
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
appIdToUI.remove(a.id).foreach { ui => webUi.detachUI(ui) }
appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
Expand Down Expand Up @@ -667,12 +667,12 @@ private[spark] class Master(
if (!eventLogPaths.isEmpty) {
try {
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
val ui = new SparkUI(replayBus, appName + " (completed)", "/history/" + app.id)
ui.start()
val ui = new SparkUI(
new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
replayBus.replay()
app.desc.appUiUrl = ui.basePath
appIdToUI(app.id) = ui
webUi.attachUI(ui)
webUi.attachSparkUI(ui)
return true
} catch {
case t: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ import org.json4s.JValue
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class ApplicationPage(parent: MasterWebUI) {
val master = parent.masterActorRef
val timeout = parent.timeout
private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") {

private val master = parent.masterActorRef
private val timeout = parent.timeout

/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
override def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
Expand Down Expand Up @@ -96,7 +97,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
}

def executorRow(executor: ExecutorInfo): Seq[Node] = {
private def executorRow(executor: ExecutorInfo): Seq[Node] = {
<tr>
<td>{executor.id}</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ import scala.xml.Node
import akka.pattern.ask
import org.json4s.JValue

import org.apache.spark.deploy.{JsonProtocol}
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.masterActorRef
val timeout = parent.timeout
private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
private val master = parent.masterActorRef
private val timeout = parent.timeout

def renderJson(request: HttpServletRequest): JValue = {
override def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
JsonProtocol.writeMasterState(state)
Expand Down Expand Up @@ -139,7 +139,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
}

def workerRow(worker: WorkerInfo): Seq[Node] = {
private def workerRow(worker: WorkerInfo): Seq[Node] = {
<tr>
<td>
<a href={worker.webUiAddress}>{worker.id}</a>
Expand All @@ -154,8 +154,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
</tr>
}


def appRow(app: ApplicationInfo): Seq[Node] = {
private def appRow(app: ApplicationInfo): Seq[Node] = {
<tr>
<td>
<a href={"app?appId=" + app.id}>{app.id}</a>
Expand All @@ -169,14 +168,14 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
</td>
<td>{WebUI.formatDate(app.submitDate)}</td>
<td>{UIUtils.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
<td>{app.state.toString}</td>
<td>{WebUI.formatDuration(app.duration)}</td>
<td>{UIUtils.formatDuration(app.duration)}</td>
</tr>
}

def driverRow(driver: DriverInfo): Seq[Node] = {
private def driverRow(driver: DriverInfo): Seq[Node] = {
<tr>
<td>{driver.id} </td>
<td>{driver.submitDate}</td>
Expand Down
Loading

0 comments on commit 6aa08c3

Please sign in to comment.