Goal:
This article explains how to control the parallelism of Spark job and uses a Spark on YARN job to demonstrate.Env:
Spark 2.1.0MapR 5.2
Solution:
Spark job's parallelism is decided by the number of executors(YARN containers for Spark on YARN job.)
From previous article -- Resource allocation configurations for Spark on Yarn, it is controlled by options spark.executor.instances (--num-executors).Each executor's parallelism is decided by the number of tasks which can run concurrently.
Inside each executor, assume N number of tasks are assigned.
However only X number of tasks can run concurrently. Here X means the parallelism of the threads inside each executor.
Total number of CPU cores for each executor(YARN container for Spark on YARN) are determined by spark.executor.cores(--executor-cores).
The number of CPU cores needed by each task is determined by spark.task.cpus(default=1).
So X = spark.executor.cores/spark.task.cpus
Demo
Let's run a Spark on YARN example job with 10 tasks needed:/opt/mapr/spark/spark-2.1.0/bin/run-example --master yarn --deploy-mode client SparkPi 10
1. Default: spark.executor.cores=1, spark.task.cpus=1
ResourceManager allocates 2 executors(YARN containers):2018-06-08 12:47:41,369 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_e07_1511207998241_0010_01_000001 of capacity <memory:1024, vCores:1, disks:0.0> on host s2.poc.com:27338, which has 1 containers, <memory:1024, vCores:1, disks:0.0> used and <memory:4096, vCores:1, disks:1.33> available after allocation 2018-06-08 12:47:48,201 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_e07_1511207998241_0010_01_000002 of capacity <memory:3072, vCores:1, disks:0.0> on host s4.poc.com:53027, which has 1 containers, <memory:3072, vCores:1, disks:0.0> used and <memory:10849, vCores:1, disks:1.33> available after allocation 2018-06-08 12:47:48,202 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_e07_1511207998241_0010_01_000003 of capacity <memory:3072, vCores:1, disks:0.0> on host s3.poc.com:11447, which has 1 containers, <memory:3072, vCores:1, disks:0.0> used and <memory:2048, vCores:1, disks:1.33> available after allocationFrom Spark UI, inside each executor, 5 tasks got assigned: 5 in s4, and 5 in s3.
Now let's look at the YARN container log for s4:
[root@s4 container_e07_1511207998241_0010_01_000002]# egrep "Running task|Finished task" stderr 2018-06-08 12:47:55,273 INFO [Executor task launch worker-0] executor.Executor: Running task 1.0 in stage 0.0 (TID 1) 2018-06-08 12:47:55,712 INFO [Executor task launch worker-0] executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 1856 bytes result sent to driver 2018-06-08 12:47:55,723 INFO [Executor task launch worker-0] executor.Executor: Running task 3.0 in stage 0.0 (TID 3) 2018-06-08 12:47:55,750 INFO [Executor task launch worker-0] executor.Executor: Finished task 3.0 in stage 0.0 (TID 3). 1128 bytes result sent to driver 2018-06-08 12:47:55,759 INFO [Executor task launch worker-0] executor.Executor: Running task 4.0 in stage 0.0 (TID 4) 2018-06-08 12:47:55,780 INFO [Executor task launch worker-0] executor.Executor: Finished task 4.0 in stage 0.0 (TID 4). 1041 bytes result sent to driver 2018-06-08 12:47:55,792 INFO [Executor task launch worker-0] executor.Executor: Running task 6.0 in stage 0.0 (TID 6) 2018-06-08 12:47:55,812 INFO [Executor task launch worker-0] executor.Executor: Finished task 6.0 in stage 0.0 (TID 6). 1128 bytes result sent to driver 2018-06-08 12:47:55,821 INFO [Executor task launch worker-0] executor.Executor: Running task 8.0 in stage 0.0 (TID 8) 2018-06-08 12:47:55,839 INFO [Executor task launch worker-0] executor.Executor: Finished task 8.0 in stage 0.0 (TID 8). 1041 bytes result sent to driverFrom the timestamp above, inside each executor(YARN container), only 1 task was running concurrently.
2. spark.executor.cores=2, spark.task.cpus=1
Again, ResourceManager allocates 2 executors(YARN container), however this time each executor has 2 cores.2018-06-08 13:28:36,122 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_e07_1511207998241_0012_01_000001 of capacity <memory:1024, vCores:1, disks:0.0> on host s3.poc.com:11447, which has 1 containers, <memory:1024, vCores:1, disks:0.0> used and <memory:4096, vCores:1, disks:1.33> available after allocation 2018-06-08 13:28:42,288 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_e07_1511207998241_0012_01_000003 of capacity <memory:2048, vCores:2, disks:0.0> on host s2.poc.com:27338, which has 1 containers, <memory:2048, vCores:2, disks:0.0> used and <memory:3072, vCores:0, disks:1.33> available after allocation 2018-06-08 13:28:42,683 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_e07_1511207998241_0012_01_000004 of capacity <memory:2048, vCores:2, disks:0.0> on host s1.poc.com:39579, which has 1 containers, <memory:2048, vCores:2, disks:0.0> used and <memory:3072, vCores:0, disks:1.33> available after allocationFrom Spark UI, inside executor on s1, 7 tasks got assigned; on s2, 3 tasks got assigned:
YARN container log for s1:
# egrep "Running task|Finished task" stderr 2018-06-08 13:28:49,451 INFO [Executor task launch worker-0] executor.Executor: Running task 1.0 in stage 0.0 (TID 1) 2018-06-08 13:28:49,451 INFO [Executor task launch worker-1] executor.Executor: Running task 3.0 in stage 0.0 (TID 3) 2018-06-08 13:28:49,927 INFO [Executor task launch worker-1] executor.Executor: Finished task 3.0 in stage 0.0 (TID 3). 1041 bytes result sent to driver 2018-06-08 13:28:49,929 INFO [Executor task launch worker-0] executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 1856 bytes result sent to driver 2018-06-08 13:28:49,940 INFO [Executor task launch worker-0] executor.Executor: Running task 4.0 in stage 0.0 (TID 4) 2018-06-08 13:28:49,950 INFO [Executor task launch worker-1] executor.Executor: Running task 5.0 in stage 0.0 (TID 5) 2018-06-08 13:28:49,963 INFO [Executor task launch worker-0] executor.Executor: Finished task 4.0 in stage 0.0 (TID 4). 1041 bytes result sent to driver 2018-06-08 13:28:49,970 INFO [Executor task launch worker-0] executor.Executor: Running task 6.0 in stage 0.0 (TID 6) 2018-06-08 13:28:50,021 INFO [Executor task launch worker-1] executor.Executor: Finished task 5.0 in stage 0.0 (TID 5). 1041 bytes result sent to driver 2018-06-08 13:28:50,023 INFO [Executor task launch worker-0] executor.Executor: Finished task 6.0 in stage 0.0 (TID 6). 1041 bytes result sent to driver 2018-06-08 13:28:50,033 INFO [Executor task launch worker-1] executor.Executor: Running task 8.0 in stage 0.0 (TID 8) 2018-06-08 13:28:50,036 INFO [Executor task launch worker-0] executor.Executor: Running task 7.0 in stage 0.0 (TID 7) 2018-06-08 13:28:50,061 INFO [Executor task launch worker-0] executor.Executor: Finished task 7.0 in stage 0.0 (TID 7). 1041 bytes result sent to driver 2018-06-08 13:28:50,078 INFO [Executor task launch worker-1] executor.Executor: Finished task 8.0 in stage 0.0 (TID 8). 1041 bytes result sent to driverYARN container log for s2:
# egrep "Running task|Finished task" stderr 2018-06-08 13:28:49,450 INFO [Executor task launch worker-1] executor.Executor: Running task 2.0 in stage 0.0 (TID 2) 2018-06-08 13:28:49,450 INFO [Executor task launch worker-0] executor.Executor: Running task 0.0 in stage 0.0 (TID 0) 2018-06-08 13:28:50,055 INFO [Executor task launch worker-1] executor.Executor: Finished task 2.0 in stage 0.0 (TID 2). 1041 bytes result sent to driver 2018-06-08 13:28:50,056 INFO [Executor task launch worker-0] executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1856 bytes result sent to driver 2018-06-08 13:28:50,071 INFO [Executor task launch worker-0] executor.Executor: Running task 9.0 in stage 0.0 (TID 9) 2018-06-08 13:28:50,113 INFO [Executor task launch worker-0] executor.Executor: Finished task 9.0 in stage 0.0 (TID 9). 1041 bytes result sent to driverFrom the timestamp above, inside each executor(YARN container), 2 task was running concurrently.
In all, above test results confirm the theory that the parallelism of threads/tasks inside each executor: X = spark.executor.cores/spark.task.cpus
No comments:
Post a Comment