Goal:
This article explains how to use SparkMeasure to measure Spark job metrics.
Env:
Spark 2.4.4 with Scala 2.11.12
SparkMeasure 0.17
Concept:
SparkMeasure is a very cool tool to collect the aggregated stage or task level metrics for Spark jobs or queries. Basically it creates the customized Spark listeners.
Note: Collecting at task level has additional performance overhead comparing to collecting at stage level. Unless you want to study skew effects for tasks, I would suggest we collect at stage level.
Regarding where those metrics come from, we can look into the Spark source code under "core/src/main/scala/org/apache/spark/executor" folder.
You will find the metrics explanation inside TaskMetrics.scala, ShuffleReadMetrics.scala, ShuffleWriteMetrics.scala, etc.
For example:
1 2 3 4 5 6 7 8 9 10 | /** * Time the executor spends actually running the task (including fetching shuffle data). */ def executorRunTime : Long = _ executorRunTime.sum /** * CPU Time the executor spends actually running the task * (including fetching shuffle data) in nanoseconds. */ def executorCpuTime : Long = _ executorCpuTime.sum |
Installation:
In this post, we will use spark-shell or spark-submit to test. So we just need to follow this doc to build or download the jar file.
Note: Before downloading/building the jar, make sure the jar should match your spark and scala version.
a. Download the Jar from Maven Central
For example, based on my spark and scala version, I will choose below version:
1 | wget https: //repo1 .maven.org /maven2/ch/cern/sparkmeasure/spark-measure_2 .11 /0 .17 /spark-measure_2 .11-0.17.jar |
b. Build the Jar using sbt from source code
1 2 3 4 | git clone https: //github .com /lucacanali/sparkmeasure cd sparkmeasure sbt +package ls -l target /scala-2 .11 /spark-measure *.jar # location of the compiled jar |
Solution:
In this post, we will use the sample data and queries from another post "Predicate Pushdown for Parquet".
1. Interactive Mode using Spark-shell for single job/query
1 | spark-shell --jars spark-measure _ 2.11 - 0.17 .jar --master yarn --deploy-mode client --executor-memory 1 G --num-executors 4 |
Stage metrics:
1 2 3 | val stageMetrics = new ch.cern.sparkmeasure.StageMetrics(spark) val q 1 = "SELECT * FROM readdf WHERE Index=20000" stageMetrics.runAndMeasure(sql(q 1 ).show) |
Output:
1 2 | 21 / 02 / 04 15 : 08 : 55 WARN Utils : Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. 21 / 02 / 04 15 : 08 : 55 WARN StageMetrics : Stage metrics data refreshed into temp view PerfStageMetrics |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | Scheduling mode = FIFO Spark Context default degree of parallelism = 4 Aggregated Spark stage metrics : numStages = > 2 numTasks = > 4 elapsedTime = > 3835 ( 4 s) stageDuration = > 3827 ( 4 s) executorRunTime = > 4757 ( 5 s) executorCpuTime = > 3672 ( 4 s) executorDeserializeTime = > 772 ( 0.8 s) executorDeserializeCpuTime = > 510 ( 0.5 s) resultSerializationTime = > 0 ( 0 ms) jvmGCTime = > 239 ( 0.2 s) shuffleFetchWaitTime = > 0 ( 0 ms) shuffleWriteTime = > 0 ( 0 ms) resultSize = > 5441 ( 5.0 KB) diskBytesSpilled = > 0 ( 0 Bytes) memoryBytesSpilled = > 0 ( 0 Bytes) peakExecutionMemory = > 0 recordsRead = > 6240991 bytesRead = > 149260233 ( 142.0 MB) recordsWritten = > 0 bytesWritten = > 0 ( 0 Bytes) shuffleRecordsRead = > 0 shuffleTotalBlocksFetched = > 0 shuffleLocalBlocksFetched = > 0 shuffleRemoteBlocksFetched = > 0 shuffleTotalBytesRead = > 0 ( 0 Bytes) shuffleLocalBytesRead = > 0 ( 0 Bytes) shuffleRemoteBytesRead = > 0 ( 0 Bytes) shuffleRemoteBytesReadToDisk = > 0 ( 0 Bytes) shuffleBytesWritten = > 0 ( 0 Bytes) shuffleRecordsWritten = > 0 |
Task Metrics:
1 2 3 | val taskMetrics = new ch.cern.sparkmeasure.TaskMetrics(spark) val q 1 = "SELECT * FROM readdf WHERE Index=20000" taskMetrics.runAndMeasure(spark.sql(q 1 ).show) |
Output:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | 21 / 02 / 04 16 : 52 : 59 WARN Utils : Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. 21 / 02 / 04 16 : 52 : 59 WARN TaskMetrics : Stage metrics data refreshed into temp view PerfTaskMetrics Scheduling mode = FIFO Spark Contex default degree of parallelism = 4 Aggregated Spark task metrics : numtasks = > 4 elapsedTime = > 3896 ( 4 s) duration = > 5268 ( 5 s) schedulerDelayTime = > 94 ( 94 ms) executorRunTime = > 4439 ( 4 s) executorCpuTime = > 3561 ( 4 s) executorDeserializeTime = > 734 ( 0.7 s) executorDeserializeCpuTime = > 460 ( 0.5 s) resultSerializationTime = > 1 ( 1 ms) jvmGCTime = > 237 ( 0.2 s) shuffleFetchWaitTime = > 0 ( 0 ms) shuffleWriteTime = > 0 ( 0 ms) gettingResultTime = > 0 ( 0 ms) resultSize = > 2183 ( 2.0 KB) diskBytesSpilled = > 0 ( 0 Bytes) memoryBytesSpilled = > 0 ( 0 Bytes) peakExecutionMemory = > 0 recordsRead = > 6240991 bytesRead = > 149260233 ( 142.0 MB) recordsWritten = > 0 bytesWritten = > 0 ( 0 Bytes) shuffleRecordsRead = > 0 shuffleTotalBlocksFetched = > 0 shuffleLocalBlocksFetched = > 0 shuffleRemoteBlocksFetched = > 0 shuffleTotalBytesRead = > 0 ( 0 Bytes) shuffleLocalBytesRead = > 0 ( 0 Bytes) shuffleRemoteBytesRead = > 0 ( 0 Bytes) shuffleRemoteBytesReadToDisk = > 0 ( 0 Bytes) shuffleBytesWritten = > 0 ( 0 Bytes) shuffleRecordsWritten = > 0 |
2. Interactive Mode using Spark-shell for multiple jobs/queries
Take Stage Metrics for example:
1 2 3 4 5 6 7 8 9 10 11 12 | val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark) stageMetrics.begin() //Run multiple jobs/queries val q 1 = "SELECT * FROM readdf WHERE Index=20000" val q 2 = "SELECT * FROM readdf where Index=9999999999" spark.sql(q 1 ).show() spark.sql(q 2 ).show() stageMetrics.end() stageMetrics.printReport() |
Output:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | 21 / 02 / 04 17 : 00 : 59 WARN StageMetrics : Stage metrics data refreshed into temp view PerfStageMetrics Scheduling mode = FIFO Spark Context default degree of parallelism = 4 Aggregated Spark stage metrics : numStages = > 4 numTasks = > 8 elapsedTime = > 3242 ( 3 s) stageDuration = > 1094 ( 1 s) executorRunTime = > 1779 ( 2 s) executorCpuTime = > 942 ( 0.9 s) executorDeserializeTime = > 96 ( 96 ms) executorDeserializeCpuTime = > 37 ( 37 ms) resultSerializationTime = > 1 ( 1 ms) jvmGCTime = > 42 ( 42 ms) shuffleFetchWaitTime = > 0 ( 0 ms) shuffleWriteTime = > 0 ( 0 ms) resultSize = > 5441 ( 5.0 KB) diskBytesSpilled = > 0 ( 0 Bytes) memoryBytesSpilled = > 0 ( 0 Bytes) peakExecutionMemory = > 0 recordsRead = > 6240991 bytesRead = > 149305675 ( 142.0 MB) recordsWritten = > 0 bytesWritten = > 0 ( 0 Bytes) shuffleRecordsRead = > 0 shuffleTotalBlocksFetched = > 0 shuffleLocalBlocksFetched = > 0 shuffleRemoteBlocksFetched = > 0 shuffleTotalBytesRead = > 0 ( 0 Bytes) shuffleLocalBytesRead = > 0 ( 0 Bytes) shuffleRemoteBytesRead = > 0 ( 0 Bytes) shuffleRemoteBytesReadToDisk = > 0 ( 0 Bytes) shuffleBytesWritten = > 0 ( 0 Bytes) shuffleRecordsWritten = > 0 |
Further more, below command can print additional accumulables metrics (including SQL metrics):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | scala> stageMetrics.printAccumulables() 21 / 02 / 04 17 : 01 : 26 WARN StageMetrics : Accumulables metrics data refreshed into temp view AccumulablesStageMetrics Aggregated Spark accumulables of type internal.metric. Sum of values grouped by metric name Name = > sum(value) [group by name] executorCpuTime = > 943 ( 0.9 s) executorDeserializeCpuTime = > 39 ( 39 ms) executorDeserializeTime = > 96 ( 96 ms) executorRunTime = > 1779 ( 2 s) input.bytesRead = > 149305675 ( 142.0 MB) input.recordsRead = > 6240991 jvmGCTime = > 42 ( 42 ms) resultSerializationTime = > 1 ( 1 ms) resultSize = > 12780 ( 12.0 KB) SQL Metrics and other non-internal metrics. Values grouped per accumulatorId and metric name. Accid, Name = > max(value) [group by accId, name] 146 , duration total = > 1422 ( 1 s) 147 , number of output rows = > 18 148 , number of output rows = > 6240991 151 , scan time total = > 1359 ( 1 s) 202 , duration total = > 200 ( 0.2 s) 207 , scan time total = > 198 ( 0.2 s) scala> stageMetrics.printAccumulables() |
3. Flight Recorder Mode
Please refer to this doc for Flight Recorder Mode.
This mode will not touch your code/program and only need to add a jar file when submitting the job.
Take Stage Metrics for example:
1 2 3 4 5 6 7 8 | spark-submit --conf spark.driver.extraClassPath = ./spark-measure _ 2.11 - 0.17 .jar \ --conf spark.extraListeners = ch.cern.sparkmeasure.FlightRecorderStageMetrics \ --conf spark.sparkmeasure.outputFormat = json \ --conf spark.sparkmeasure.outputFilename = "/tmp/stageMetrics_flightRecorder" \ --conf spark.sparkmeasure.printToStdout = false \ -- class "PredicatePushdownTest" \ --master yarn \ ~/sbt/SparkScalaExample/target/scala- 2.11 /sparkscalaexample _ 2.11 - 1.0 .jar |
In the output, it will show:
1 | WARN FlightRecorderStageMetrics : Writing Stage Metrics data serialized as json to /tmp/stageMetrics _ flightRecorder |
The json output file looks as:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | $ cat /tmp/stageMetrics_flightRecorder [ { "jobId" : 0, "jobGroup" : null, "stageId" : 0, "name" : "load at PredicatePushdownTest.scala:16", "submissionTime" : 1612488772250, "completionTime" : 1612488773352, "stageDuration" : 1102, "numTasks" : 1, "executorRunTime" : 352, "executorCpuTime" : 141, "executorDeserializeTime" : 589, "executorDeserializeCpuTime" : 397, "resultSerializationTime" : 3, "jvmGCTime" : 95, "resultSize" : 1969, "diskBytesSpilled" : 0, "memoryBytesSpilled" : 0, "peakExecutionMemory" : 0, "recordsRead" : 0, "bytesRead" : 0, "recordsWritten" : 0, "bytesWritten" : 0, "shuffleFetchWaitTime" : 0, "shuffleTotalBytesRead" : 0, "shuffleTotalBlocksFetched" : 0, "shuffleLocalBlocksFetched" : 0, "shuffleRemoteBlocksFetched" : 0, "shuffleLocalBytesRead" : 0, "shuffleRemoteBytesRead" : 0, "shuffleRemoteBytesReadToDisk" : 0, "shuffleRecordsRead" : 0, "shuffleWriteTime" : 0, "shuffleBytesWritten" : 0, "shuffleRecordsWritten" : 0 }, { "jobId" : 1, "jobGroup" : null, "stageId" : 1, "name" : "collect at PredicatePushdownTest.scala:25", "submissionTime" : 1612488774600, "completionTime" : 1612488776522, "stageDuration" : 1922, "numTasks" : 4, "executorRunTime" : 4962, "executorCpuTime" : 4446, "executorDeserializeTime" : 1679, "executorDeserializeCpuTime" : 1215, "resultSerializationTime" : 2, "jvmGCTime" : 309, "resultSize" : 7545, "diskBytesSpilled" : 0, "memoryBytesSpilled" : 0, "peakExecutionMemory" : 0, "recordsRead" : 6240991, "bytesRead" : 149260233, "recordsWritten" : 0, "bytesWritten" : 0, "shuffleFetchWaitTime" : 0, "shuffleTotalBytesRead" : 0, "shuffleTotalBlocksFetched" : 0, "shuffleLocalBlocksFetched" : 0, "shuffleRemoteBlocksFetched" : 0, "shuffleLocalBytesRead" : 0, "shuffleRemoteBytesRead" : 0, "shuffleRemoteBytesReadToDisk" : 0, "shuffleRecordsRead" : 0, "shuffleWriteTime" : 0, "shuffleBytesWritten" : 0, "shuffleRecordsWritten" : 0 }, { "jobId" : 2, "jobGroup" : null, "stageId" : 2, "name" : "collect at PredicatePushdownTest.scala:30", "submissionTime" : 1612488776656, "completionTime" : 1612488776833, "stageDuration" : 177, "numTasks" : 4, "executorRunTime" : 427, "executorCpuTime" : 261, "executorDeserializeTime" : 89, "executorDeserializeCpuTime" : 27, "resultSerializationTime" : 0, "jvmGCTime" : 0, "resultSize" : 5884, "diskBytesSpilled" : 0, "memoryBytesSpilled" : 0, "peakExecutionMemory" : 0, "recordsRead" : 0, "bytesRead" : 45442, "recordsWritten" : 0, "bytesWritten" : 0, "shuffleFetchWaitTime" : 0, "shuffleTotalBytesRead" : 0, "shuffleTotalBlocksFetched" : 0, "shuffleLocalBlocksFetched" : 0, "shuffleRemoteBlocksFetched" : 0, "shuffleLocalBytesRead" : 0, "shuffleRemoteBytesRead" : 0, "shuffleRemoteBytesReadToDisk" : 0, "shuffleRecordsRead" : 0, "shuffleWriteTime" : 0, "shuffleBytesWritten" : 0, "shuffleRecordsWritten" : 0 } ] |
References:
On Measuring Apache Spark Workload Metrics for Performance Troubleshooting
Example analysis of Spark metrics collected with sparkMeasure
==
No comments:
Post a Comment