Goal:
This article explains the Predicate Pushdown for Parquet in Spark.
Solution:
Spark can push down the predicate into scanning parquet phase so that it can reduce the amount of data to be read.
This is done by checking the metadata of parquet files to filter out the unnecessary data.
Note: Refer to this blog on How to use pyarrow to view the metadata information inside a Parquet file.
This feature is controlled by a parameter named spark.sql.parquet.filterPushdown (default is true).
Let's use the parquet files created in another blog for example.
1. Create a DataFrame on parquet files
val targetdir = "/tmp/test_column_projection/newdf"
val readdf = spark.read.format("parquet").load(targetdir)
readdf.createOrReplaceTempView("readdf")
2. Let's look at the data distribution for column "Index".
scala> spark.sql("SELECT min(Index), max(Index), count(distinct Index),count(*) FROM readdf").show
+----------+----------+---------------------+--------+
|min(Index)|max(Index)|count(DISTINCT Index)|count(1)|
+----------+----------+---------------------+--------+
| 0| 396342| 396343| 6240991|
+----------+----------+---------------------+--------+
As we know, the data range of this column "Index" is 0~396342.
After knowing this, we can design our tests below to show the difference performance results for different filters.
3. Query 1 and its explain plan
val q1 = "SELECT * FROM readdf WHERE Index=20000"
val result1 = spark.sql(q1)
result1.explain
result1.collect
Query 1 will have to scan lots of data because the "Index=20000" data is in most of the parquet chunks.
The explain plan:
== Physical Plan ==
*(1) Project [Arrival_Time#26L, Creation_Time#27L, Device#28, Index#29L, Model#30, User#31, gt#32, x#33, y#34, z#35]
+- *(1) Filter (isnotnull(Index#29L) && (Index#29L = 20000))
+- *(1) FileScan parquet [Arrival_Time#26L,Creation_Time#27L,Device#28,Index#29L,Model#30,User#31,gt#32,x#33,y#34,z#35] Batched: true, Format: Parquet, Location: InMemoryFileIndex[maprfs:///tmp/test_column_projection/newdf], PartitionFilters: [], PushedFilters: [IsNotNull(Index), EqualTo(Index,20000)], ReadSchema: struct<Arrival_Time:bigint,Creation_Time:bigint,Device:string,Index:bigint,Model:string,User:stri...
4. Query 2 and its explain plan
val q2 = "SELECT * FROM readdf where Index=9999999999"
val result2 = spark.sql(q2)
result2.explain
result2.collect
Query 2 just needs to scan little data because the "Index=9999999999" data is outside the range for that column.
The explain plan:
== Physical Plan ==
*(1) Project [Arrival_Time#26L, Creation_Time#27L, Device#28, Index#29L, Model#30, User#31, gt#32, x#33, y#34, z#35]
+- *(1) Filter (isnotnull(Index#29L) && (Index#29L = 9999999999))
+- *(1) FileScan parquet [Arrival_Time#26L,Creation_Time#27L,Device#28,Index#29L,Model#30,User#31,gt#32,x#33,y#34,z#35] Batched: true, Format: Parquet, Location: InMemoryFileIndex[maprfs:///tmp/test_column_projection/newdf], PartitionFilters: [], PushedFilters: [IsNotNull(Index), EqualTo(Index,9999999999)], ReadSchema: struct<Arrival_Time:bigint,Creation_Time:bigint,Device:string,Index:bigint,Model:string,User:stri...
5. Query 3 and its explain plan after disabling spark.sql.parquet.filterPushdown
config("spark.sql.parquet.filterPushdown",false)
== Physical Plan ==
*(1) Project [Arrival_Time#26L, Creation_Time#27L, Device#28, Index#29L, Model#30, User#31, gt#32, x#33, y#34, z#35]
+- *(1) Filter (isnotnull(Index#29L) && (Index#29L = 9999999999))
+- *(1) FileScan parquet [Arrival_Time#26L,Creation_Time#27L,Device#28,Index#29L,Model#30,User#31,gt#32,x#33,y#34,z#35] Batched: true, Format: Parquet, Location: InMemoryFileIndex[maprfs:///tmp/test_column_projection/newdf], PartitionFilters: [], PushedFilters: [IsNotNull(Index), EqualTo(Index,9999999999)], ReadSchema: struct<Arrival_Time:bigint,Creation_Time:bigint,Device:string,Index:bigint,Model:string,User:stri...
Analysis:
1. Explain plan
As we can see, all of the explain plans look the same.
Even after we disabled spark.sql.parquet.filterPushdown, the explain plan did not show any difference between Query 2 and Query 3.
This means, at least from query plan, we could not tell if predicate is pushed down or not.
All of the explain plans show there is predicate push down:
PushedFilters: [IsNotNull(Index), EqualTo(Index,9999999999)]
Note: these tests are done in Spark 2.4.4, this behavior may change in the future release.
2. Event log/Web UI
Query 1's stage shows sum of Input Size is 142.3MB and sum of Records is 6240991:
Query 2's stage shows sum of Input Size is 44.4KB and sum of Records is 0:
Query 3's stage shows sum of Input Size is 142.3MB and sum of Records is 6240991:
Above metrics clearly shows the selectivity for this predicate pushdown feature based on the filter and also on the metadata of parquet files.
The performance difference between Query 2 and Query 3 shows how powerful this feature is.
Note: If the metadata of all parquet files has most/all of the data based on the filter, then this feature may not provide good selectivity. So data distribution also matters here.
Note: Here is the Complete Sample Code.
Nice Post
ReplyDelete