Goal:
This article explains how to use SparkMeasure to measure Spark job metrics.
Env:
Spark 2.4.4 with Scala 2.11.12
SparkMeasure 0.17
Concept:
SparkMeasure is a very cool tool to collect the aggregated stage or task level metrics for Spark jobs or queries. Basically it creates the customized Spark listeners.
Note: Collecting at task level has additional performance overhead comparing to collecting at stage level. Unless you want to study skew effects for tasks, I would suggest we collect at stage level.
Regarding where those metrics come from, we can look into the Spark source code under "core/src/main/scala/org/apache/spark/executor" folder.
You will find the metrics explanation inside TaskMetrics.scala, ShuffleReadMetrics.scala, ShuffleWriteMetrics.scala, etc.
For example:
/**
* Time the executor spends actually running the task (including fetching shuffle data).
*/
def executorRunTime: Long = _executorRunTime.sum
/**
* CPU Time the executor spends actually running the task
* (including fetching shuffle data) in nanoseconds.
*/
def executorCpuTime: Long = _executorCpuTime.sum
Installation:
In this post, we will use spark-shell or spark-submit to test. So we just need to follow this doc to build or download the jar file.
Note: Before downloading/building the jar, make sure the jar should match your spark and scala version.
a. Download the Jar from Maven Central
For example, based on my spark and scala version, I will choose below version:
wget https://repo1.maven.org/maven2/ch/cern/sparkmeasure/spark-measure_2.11/0.17/spark-measure_2.11-0.17.jar
b. Build the Jar using sbt from source code
git clone https://github.com/lucacanali/sparkmeasure
cd sparkmeasure
sbt +package
ls -l target/scala-2.11/spark-measure*.jar # location of the compiled jar
Solution:
In this post, we will use the sample data and queries from another post "Predicate Pushdown for Parquet".
1. Interactive Mode using Spark-shell for single job/query
spark-shell --jars spark-measure_2.11-0.17.jar --master yarn --deploy-mode client --executor-memory 1G --num-executors 4
Stage metrics:
val stageMetrics = new ch.cern.sparkmeasure.StageMetrics(spark)
val q1 = "SELECT * FROM readdf WHERE Index=20000"
stageMetrics.runAndMeasure(sql(q1).show)
Output:
21/02/04 15:08:55 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
21/02/04 15:08:55 WARN StageMetrics: Stage metrics data refreshed into temp view PerfStageMetrics
Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 2
numTasks => 4
elapsedTime => 3835 (4 s)
stageDuration => 3827 (4 s)
executorRunTime => 4757 (5 s)
executorCpuTime => 3672 (4 s)
executorDeserializeTime => 772 (0.8 s)
executorDeserializeCpuTime => 510 (0.5 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 239 (0.2 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 0 (0 ms)
resultSize => 5441 (5.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 6240991
bytesRead => 149260233 (142.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 0
shuffleTotalBlocksFetched => 0
shuffleLocalBlocksFetched => 0
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 0 (0 Bytes)
shuffleLocalBytesRead => 0 (0 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 0 (0 Bytes)
shuffleRecordsWritten => 0
Task Metrics:
val taskMetrics = new ch.cern.sparkmeasure.TaskMetrics(spark)
val q1 = "SELECT * FROM readdf WHERE Index=20000"
taskMetrics.runAndMeasure(spark.sql(q1).show)
Output:
21/02/04 16:52:59 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
21/02/04 16:52:59 WARN TaskMetrics: Stage metrics data refreshed into temp view PerfTaskMetrics
Scheduling mode = FIFO
Spark Contex default degree of parallelism = 4
Aggregated Spark task metrics:
numtasks => 4
elapsedTime => 3896 (4 s)
duration => 5268 (5 s)
schedulerDelayTime => 94 (94 ms)
executorRunTime => 4439 (4 s)
executorCpuTime => 3561 (4 s)
executorDeserializeTime => 734 (0.7 s)
executorDeserializeCpuTime => 460 (0.5 s)
resultSerializationTime => 1 (1 ms)
jvmGCTime => 237 (0.2 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 0 (0 ms)
gettingResultTime => 0 (0 ms)
resultSize => 2183 (2.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 6240991
bytesRead => 149260233 (142.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 0
shuffleTotalBlocksFetched => 0
shuffleLocalBlocksFetched => 0
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 0 (0 Bytes)
shuffleLocalBytesRead => 0 (0 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 0 (0 Bytes)
shuffleRecordsWritten => 0
2. Interactive Mode using Spark-shell for multiple jobs/queries
Take Stage Metrics for example:
val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.begin()
//Run multiple jobs/queries
val q1 = "SELECT * FROM readdf WHERE Index=20000"
val q2 = "SELECT * FROM readdf where Index=9999999999"
spark.sql(q1).show()
spark.sql(q2).show()
stageMetrics.end()
stageMetrics.printReport()
Output:
21/02/04 17:00:59 WARN StageMetrics: Stage metrics data refreshed into temp view PerfStageMetrics
Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 4
numTasks => 8
elapsedTime => 3242 (3 s)
stageDuration => 1094 (1 s)
executorRunTime => 1779 (2 s)
executorCpuTime => 942 (0.9 s)
executorDeserializeTime => 96 (96 ms)
executorDeserializeCpuTime => 37 (37 ms)
resultSerializationTime => 1 (1 ms)
jvmGCTime => 42 (42 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 0 (0 ms)
resultSize => 5441 (5.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 6240991
bytesRead => 149305675 (142.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 0
shuffleTotalBlocksFetched => 0
shuffleLocalBlocksFetched => 0
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 0 (0 Bytes)
shuffleLocalBytesRead => 0 (0 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 0 (0 Bytes)
shuffleRecordsWritten => 0
Further more, below command can print additional accumulables metrics (including SQL metrics):
scala> stageMetrics.printAccumulables()
21/02/04 17:01:26 WARN StageMetrics: Accumulables metrics data refreshed into temp view AccumulablesStageMetrics
Aggregated Spark accumulables of type internal.metric. Sum of values grouped by metric name
Name => sum(value) [group by name]
executorCpuTime => 943 (0.9 s)
executorDeserializeCpuTime => 39 (39 ms)
executorDeserializeTime => 96 (96 ms)
executorRunTime => 1779 (2 s)
input.bytesRead => 149305675 (142.0 MB)
input.recordsRead => 6240991
jvmGCTime => 42 (42 ms)
resultSerializationTime => 1 (1 ms)
resultSize => 12780 (12.0 KB)
SQL Metrics and other non-internal metrics. Values grouped per accumulatorId and metric name.
Accid, Name => max(value) [group by accId, name]
146, duration total => 1422 (1 s)
147, number of output rows => 18
148, number of output rows => 6240991
151, scan time total => 1359 (1 s)
202, duration total => 200 (0.2 s)
207, scan time total => 198 (0.2 s)
scala> stageMetrics.printAccumulables()
3. Flight Recorder Mode
Please refer to this doc for Flight Recorder Mode.
This mode will not touch your code/program and only need to add a jar file when submitting the job.
Take Stage Metrics for example:
spark-submit --conf spark.driver.extraClassPath=./spark-measure_2.11-0.17.jar \
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderStageMetrics \
--conf spark.sparkmeasure.outputFormat=json \
--conf spark.sparkmeasure.outputFilename="/tmp/stageMetrics_flightRecorder" \
--conf spark.sparkmeasure.printToStdout=false \
--class "PredicatePushdownTest" \
--master yarn \
~/sbt/SparkScalaExample/target/scala-2.11/sparkscalaexample_2.11-1.0.jar
In the output, it will show:
WARN FlightRecorderStageMetrics: Writing Stage Metrics data serialized as json to /tmp/stageMetrics_flightRecorder
The json output file looks as:
$ cat /tmp/stageMetrics_flightRecorder
[ {
"jobId" : 0,
"jobGroup" : null,
"stageId" : 0,
"name" : "load at PredicatePushdownTest.scala:16",
"submissionTime" : 1612488772250,
"completionTime" : 1612488773352,
"stageDuration" : 1102,
"numTasks" : 1,
"executorRunTime" : 352,
"executorCpuTime" : 141,
"executorDeserializeTime" : 589,
"executorDeserializeCpuTime" : 397,
"resultSerializationTime" : 3,
"jvmGCTime" : 95,
"resultSize" : 1969,
"diskBytesSpilled" : 0,
"memoryBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"recordsRead" : 0,
"bytesRead" : 0,
"recordsWritten" : 0,
"bytesWritten" : 0,
"shuffleFetchWaitTime" : 0,
"shuffleTotalBytesRead" : 0,
"shuffleTotalBlocksFetched" : 0,
"shuffleLocalBlocksFetched" : 0,
"shuffleRemoteBlocksFetched" : 0,
"shuffleLocalBytesRead" : 0,
"shuffleRemoteBytesRead" : 0,
"shuffleRemoteBytesReadToDisk" : 0,
"shuffleRecordsRead" : 0,
"shuffleWriteTime" : 0,
"shuffleBytesWritten" : 0,
"shuffleRecordsWritten" : 0
}, {
"jobId" : 1,
"jobGroup" : null,
"stageId" : 1,
"name" : "collect at PredicatePushdownTest.scala:25",
"submissionTime" : 1612488774600,
"completionTime" : 1612488776522,
"stageDuration" : 1922,
"numTasks" : 4,
"executorRunTime" : 4962,
"executorCpuTime" : 4446,
"executorDeserializeTime" : 1679,
"executorDeserializeCpuTime" : 1215,
"resultSerializationTime" : 2,
"jvmGCTime" : 309,
"resultSize" : 7545,
"diskBytesSpilled" : 0,
"memoryBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"recordsRead" : 6240991,
"bytesRead" : 149260233,
"recordsWritten" : 0,
"bytesWritten" : 0,
"shuffleFetchWaitTime" : 0,
"shuffleTotalBytesRead" : 0,
"shuffleTotalBlocksFetched" : 0,
"shuffleLocalBlocksFetched" : 0,
"shuffleRemoteBlocksFetched" : 0,
"shuffleLocalBytesRead" : 0,
"shuffleRemoteBytesRead" : 0,
"shuffleRemoteBytesReadToDisk" : 0,
"shuffleRecordsRead" : 0,
"shuffleWriteTime" : 0,
"shuffleBytesWritten" : 0,
"shuffleRecordsWritten" : 0
}, {
"jobId" : 2,
"jobGroup" : null,
"stageId" : 2,
"name" : "collect at PredicatePushdownTest.scala:30",
"submissionTime" : 1612488776656,
"completionTime" : 1612488776833,
"stageDuration" : 177,
"numTasks" : 4,
"executorRunTime" : 427,
"executorCpuTime" : 261,
"executorDeserializeTime" : 89,
"executorDeserializeCpuTime" : 27,
"resultSerializationTime" : 0,
"jvmGCTime" : 0,
"resultSize" : 5884,
"diskBytesSpilled" : 0,
"memoryBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"recordsRead" : 0,
"bytesRead" : 45442,
"recordsWritten" : 0,
"bytesWritten" : 0,
"shuffleFetchWaitTime" : 0,
"shuffleTotalBytesRead" : 0,
"shuffleTotalBlocksFetched" : 0,
"shuffleLocalBlocksFetched" : 0,
"shuffleRemoteBlocksFetched" : 0,
"shuffleLocalBytesRead" : 0,
"shuffleRemoteBytesRead" : 0,
"shuffleRemoteBytesReadToDisk" : 0,
"shuffleRecordsRead" : 0,
"shuffleWriteTime" : 0,
"shuffleBytesWritten" : 0,
"shuffleRecordsWritten" : 0
} ]
References:
On Measuring Apache Spark Workload Metrics for Performance Troubleshooting
Example analysis of Spark metrics collected with sparkMeasure
==
No comments:
Post a Comment