Thursday, February 4, 2021

Spark Tuning -- How to use SparkMeasure to measure Spark job metrics

Goal:

This article explains how to use SparkMeasure to measure Spark job metrics.

Env:

Spark 2.4.4 with Scala 2.11.12 

SparkMeasure 0.17

Concept:

SparkMeasure is a very cool tool to collect the aggregated stage or task level metrics for Spark jobs or queries. Basically it creates the customized Spark listeners.

Note: Collecting at task level has additional performance overhead comparing to collecting at stage level. Unless you want to study skew effects for tasks, I would suggest we collect at stage level.

Regarding where those metrics come from, we can look into the Spark source code under "core/src/main/scala/org/apache/spark/executor" folder.

You will find the metrics explanation inside TaskMetrics.scala, ShuffleReadMetrics.scala, ShuffleWriteMetrics.scala, etc.

For example:

1
2
3
4
5
6
7
8
9
10
/**
 * Time the executor spends actually running the task (including fetching shuffle data).
 */
def executorRunTime: Long = _executorRunTime.sum
 
/**
 * CPU Time the executor spends actually running the task
 * (including fetching shuffle data) in nanoseconds.
 */
def executorCpuTime: Long = _executorCpuTime.sum
Or you can find those metrics explanation from the Doc.

Installation:

In this post, we will use spark-shell or spark-submit to test. So we just need to follow this doc to build or download the jar file.

Note: Before downloading/building the jar, make sure the jar should match your spark and scala version. 

a. Download the Jar from Maven Central

For example, based on my spark and scala version, I will choose below version:

1
wget https://repo1.maven.org/maven2/ch/cern/sparkmeasure/spark-measure_2.11/0.17/spark-measure_2.11-0.17.jar

b. Build the Jar using sbt from source code

1
2
3
4
git clone https://github.com/lucacanali/sparkmeasure
cd sparkmeasure
sbt +package
ls -l target/scala-2.11/spark-measure*.jar  # location of the compiled jar

Solution:

In this post, we will use the sample data and queries from another post "Predicate Pushdown for Parquet".

1.  Interactive Mode using Spark-shell for single job/query

Please refer to this doc for Interactive Mode for Spark-shell.
1
spark-shell --jars spark-measure_2.11-0.17.jar --master yarn --deploy-mode client --executor-memory 1G --num-executors 4

Stage metrics:

1
2
3
val stageMetrics = new ch.cern.sparkmeasure.StageMetrics(spark)
val q1  = "SELECT * FROM readdf WHERE Index=20000"
stageMetrics.runAndMeasure(sql(q1).show)

Output:

1
2
21/02/04 15:08:55 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
21/02/04 15:08:55 WARN StageMetrics: Stage metrics data refreshed into temp view PerfStageMetrics
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 2
numTasks => 4
elapsedTime => 3835 (4 s)
stageDuration => 3827 (4 s)
executorRunTime => 4757 (5 s)
executorCpuTime => 3672 (4 s)
executorDeserializeTime => 772 (0.8 s)
executorDeserializeCpuTime => 510 (0.5 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 239 (0.2 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 0 (0 ms)
resultSize => 5441 (5.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 6240991
bytesRead => 149260233 (142.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 0
shuffleTotalBlocksFetched => 0
shuffleLocalBlocksFetched => 0
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 0 (0 Bytes)
shuffleLocalBytesRead => 0 (0 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 0 (0 Bytes)
shuffleRecordsWritten => 0

Task Metrics:

1
2
3
val taskMetrics = new ch.cern.sparkmeasure.TaskMetrics(spark)
val q1  = "SELECT * FROM readdf WHERE Index=20000"
taskMetrics.runAndMeasure(spark.sql(q1).show)

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
21/02/04 16:52:59 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
21/02/04 16:52:59 WARN TaskMetrics: Stage metrics data refreshed into temp view PerfTaskMetrics
 
Scheduling mode = FIFO
Spark Contex default degree of parallelism = 4
Aggregated Spark task metrics:
numtasks => 4
elapsedTime => 3896 (4 s)
duration => 5268 (5 s)
schedulerDelayTime => 94 (94 ms)
executorRunTime => 4439 (4 s)
executorCpuTime => 3561 (4 s)
executorDeserializeTime => 734 (0.7 s)
executorDeserializeCpuTime => 460 (0.5 s)
resultSerializationTime => 1 (1 ms)
jvmGCTime => 237 (0.2 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 0 (0 ms)
gettingResultTime => 0 (0 ms)
resultSize => 2183 (2.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 6240991
bytesRead => 149260233 (142.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 0
shuffleTotalBlocksFetched => 0
shuffleLocalBlocksFetched => 0
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 0 (0 Bytes)
shuffleLocalBytesRead => 0 (0 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 0 (0 Bytes)
shuffleRecordsWritten => 0

2.  Interactive Mode using Spark-shell for multiple jobs/queries

Take Stage Metrics for example:

1
2
3
4
5
6
7
8
9
10
11
12
val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.begin()
 
//Run multiple jobs/queries
val q1  = "SELECT * FROM readdf WHERE Index=20000"
val q2  = "SELECT * FROM readdf where Index=9999999999"
spark.sql(q1).show()
spark.sql(q2).show()
 
stageMetrics.end()
stageMetrics.printReport()

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
21/02/04 17:00:59 WARN StageMetrics: Stage metrics data refreshed into temp view PerfStageMetrics
 
Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 4
numTasks => 8
elapsedTime => 3242 (3 s)
stageDuration => 1094 (1 s)
executorRunTime => 1779 (2 s)
executorCpuTime => 942 (0.9 s)
executorDeserializeTime => 96 (96 ms)
executorDeserializeCpuTime => 37 (37 ms)
resultSerializationTime => 1 (1 ms)
jvmGCTime => 42 (42 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 0 (0 ms)
resultSize => 5441 (5.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 6240991
bytesRead => 149305675 (142.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 0
shuffleTotalBlocksFetched => 0
shuffleLocalBlocksFetched => 0
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 0 (0 Bytes)
shuffleLocalBytesRead => 0 (0 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 0 (0 Bytes)
shuffleRecordsWritten => 0

Further more, below command can print additional accumulables metrics (including SQL metrics):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
scala> stageMetrics.printAccumulables()
21/02/04 17:01:26 WARN StageMetrics: Accumulables metrics data refreshed into temp view AccumulablesStageMetrics
 
Aggregated Spark accumulables of type internal.metric. Sum of values grouped by metric name
Name => sum(value) [group by name]
 
executorCpuTime => 943 (0.9 s)
executorDeserializeCpuTime => 39 (39 ms)
executorDeserializeTime => 96 (96 ms)
executorRunTime => 1779 (2 s)
input.bytesRead => 149305675 (142.0 MB)
input.recordsRead => 6240991
jvmGCTime => 42 (42 ms)
resultSerializationTime => 1 (1 ms)
resultSize => 12780 (12.0 KB)
 
SQL Metrics and other non-internal metrics. Values grouped per accumulatorId and metric name.
Accid, Name => max(value) [group by accId, name]
 
  146, duration total => 1422 (1 s)
  147, number of output rows => 18
  148, number of output rows => 6240991
  151, scan time total => 1359 (1 s)
  202, duration total => 200 (0.2 s)
  207, scan time total => 198 (0.2 s)
scala> stageMetrics.printAccumulables()

3.  Flight Recorder Mode

Please refer to this doc for Flight Recorder Mode.

This mode will not touch your code/program and only need to add a jar file when submitting the job.

Take Stage Metrics for example:

1
2
3
4
5
6
7
8
spark-submit --conf spark.driver.extraClassPath=./spark-measure_2.11-0.17.jar  \
             --conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderStageMetrics \
             --conf spark.sparkmeasure.outputFormat=json    \
             --conf spark.sparkmeasure.outputFilename="/tmp/stageMetrics_flightRecorder"  \
             --conf spark.sparkmeasure.printToStdout=false   \
             --class "PredicatePushdownTest" \
             --master yarn  \
             ~/sbt/SparkScalaExample/target/scala-2.11/sparkscalaexample_2.11-1.0.jar

In the output, it will show:

1
WARN FlightRecorderStageMetrics: Writing Stage Metrics data serialized as json to /tmp/stageMetrics_flightRecorder

The json output file looks as:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
$ cat /tmp/stageMetrics_flightRecorder
[ {
  "jobId" : 0,
  "jobGroup" : null,
  "stageId" : 0,
  "name" : "load at PredicatePushdownTest.scala:16",
  "submissionTime" : 1612488772250,
  "completionTime" : 1612488773352,
  "stageDuration" : 1102,
  "numTasks" : 1,
  "executorRunTime" : 352,
  "executorCpuTime" : 141,
  "executorDeserializeTime" : 589,
  "executorDeserializeCpuTime" : 397,
  "resultSerializationTime" : 3,
  "jvmGCTime" : 95,
  "resultSize" : 1969,
  "diskBytesSpilled" : 0,
  "memoryBytesSpilled" : 0,
  "peakExecutionMemory" : 0,
  "recordsRead" : 0,
  "bytesRead" : 0,
  "recordsWritten" : 0,
  "bytesWritten" : 0,
  "shuffleFetchWaitTime" : 0,
  "shuffleTotalBytesRead" : 0,
  "shuffleTotalBlocksFetched" : 0,
  "shuffleLocalBlocksFetched" : 0,
  "shuffleRemoteBlocksFetched" : 0,
  "shuffleLocalBytesRead" : 0,
  "shuffleRemoteBytesRead" : 0,
  "shuffleRemoteBytesReadToDisk" : 0,
  "shuffleRecordsRead" : 0,
  "shuffleWriteTime" : 0,
  "shuffleBytesWritten" : 0,
  "shuffleRecordsWritten" : 0
}, {
  "jobId" : 1,
  "jobGroup" : null,
  "stageId" : 1,
  "name" : "collect at PredicatePushdownTest.scala:25",
  "submissionTime" : 1612488774600,
  "completionTime" : 1612488776522,
  "stageDuration" : 1922,
  "numTasks" : 4,
  "executorRunTime" : 4962,
  "executorCpuTime" : 4446,
  "executorDeserializeTime" : 1679,
  "executorDeserializeCpuTime" : 1215,
  "resultSerializationTime" : 2,
  "jvmGCTime" : 309,
  "resultSize" : 7545,
  "diskBytesSpilled" : 0,
  "memoryBytesSpilled" : 0,
  "peakExecutionMemory" : 0,
  "recordsRead" : 6240991,
  "bytesRead" : 149260233,
  "recordsWritten" : 0,
  "bytesWritten" : 0,
  "shuffleFetchWaitTime" : 0,
  "shuffleTotalBytesRead" : 0,
  "shuffleTotalBlocksFetched" : 0,
  "shuffleLocalBlocksFetched" : 0,
  "shuffleRemoteBlocksFetched" : 0,
  "shuffleLocalBytesRead" : 0,
  "shuffleRemoteBytesRead" : 0,
  "shuffleRemoteBytesReadToDisk" : 0,
  "shuffleRecordsRead" : 0,
  "shuffleWriteTime" : 0,
  "shuffleBytesWritten" : 0,
  "shuffleRecordsWritten" : 0
}, {
  "jobId" : 2,
  "jobGroup" : null,
  "stageId" : 2,
  "name" : "collect at PredicatePushdownTest.scala:30",
  "submissionTime" : 1612488776656,
  "completionTime" : 1612488776833,
  "stageDuration" : 177,
  "numTasks" : 4,
  "executorRunTime" : 427,
  "executorCpuTime" : 261,
  "executorDeserializeTime" : 89,
  "executorDeserializeCpuTime" : 27,
  "resultSerializationTime" : 0,
  "jvmGCTime" : 0,
  "resultSize" : 5884,
  "diskBytesSpilled" : 0,
  "memoryBytesSpilled" : 0,
  "peakExecutionMemory" : 0,
  "recordsRead" : 0,
  "bytesRead" : 45442,
  "recordsWritten" : 0,
  "bytesWritten" : 0,
  "shuffleFetchWaitTime" : 0,
  "shuffleTotalBytesRead" : 0,
  "shuffleTotalBlocksFetched" : 0,
  "shuffleLocalBlocksFetched" : 0,
  "shuffleRemoteBlocksFetched" : 0,
  "shuffleLocalBytesRead" : 0,
  "shuffleRemoteBytesRead" : 0,
  "shuffleRemoteBytesReadToDisk" : 0,
  "shuffleRecordsRead" : 0,
  "shuffleWriteTime" : 0,
  "shuffleBytesWritten" : 0,
  "shuffleRecordsWritten" : 0
} ]

References:

On Measuring Apache Spark Workload Metrics for Performance Troubleshooting

Example analysis of Spark metrics collected with sparkMeasure

==

No comments:

Post a Comment

Popular Posts