Goal:
This article explains the column projection for parquet format(or other columnar format) in Spark.
Solution:
Spark can do column projection for columnar format data such as Parquet.
The idea is to only read the needed columns instead of reading all of the columns.
This can reduce lots of I/O needed to improve the performance.
Below is one example.
Note: To show difference of performance for column projection, I disabled Parquet filter pushdown feature by setting spark.sql.parquet.filterPushdown=false in my configuration.
I will discuss about Parquet filter pushdown feature in another article.
1. Save a sample DataFrame as parquet files.
val df = spark.read.json("/data/activity-data/")
val targetdir = "/tmp/test_column_projection/newdf"
df.write.mode("overwrite").format("parquet").save(targetdir)
2. Select only 1 column
val somecols = "SELECT Device FROM readdf WHERE Model='something_not_exist'"
val goodresult = spark.sql(somecols)
goodresult.explain
goodresult.collect
Output:
scala> goodresult.explain
== Physical Plan ==
*(1) Project [Device#48]
+- *(1) Filter (isnotnull(Model#50) && (Model#50 = something_not_exist))
+- *(1) FileScan parquet [Device#48,Model#50] Batched: true, Format: Parquet, Location: InMemoryFileIndex[maprfs:///tmp/test_column_projection/newdf], PartitionFilters: [], PushedFilters: [IsNotNull(Model), EqualTo(Model,something_not_exist)], ReadSchema: struct<Device:string,Model:string>
scala> goodresult.collect
res5: Array[org.apache.spark.sql.Row] = Array()
3. Select ALL columns
val allcols = "SELECT * FROM readdf where Model='something_not_exist'"
val badresult = spark.sql(allcols)
badresult.explain
badresult.collect
Output:
scala> badresult.explain
== Physical Plan ==
*(1) Project [Arrival_Time#46L, Creation_Time#47L, Device#48, Index#49L, Model#50, User#51, gt#52, x#53, y#54, z#55]
+- *(1) Filter (isnotnull(Model#50) && (Model#50 = something_not_exist))
+- *(1) FileScan parquet [Arrival_Time#46L,Creation_Time#47L,Device#48,Index#49L,Model#50,User#51,gt#52,x#53,y#54,z#55] Batched: true, Format: Parquet, Location: InMemoryFileIndex[maprfs:///tmp/test_column_projection/newdf], PartitionFilters: [], PushedFilters: [IsNotNull(Model), EqualTo(Model,something_not_exist)], ReadSchema: struct<Arrival_Time:bigint,Creation_Time:bigint,Device:string,Index:bigint,Model:string,User:stri...
scala> badresult.collect
res7: Array[org.apache.spark.sql.Row] = Array()
Analysis:
1. Explain plan
During FileScan, we can see only the needed columns are read due to column projection feature:
FileScan parquet [Device#48,Model#50]
vs
FileScan parquet [Arrival_Time#46L,Creation_Time#47L,Device#48,Index#49L,Model#50,User#51,gt#52,x#53,y#54,z#55]
2. Event log/Web UI
The "SELECT only 1 column"'s stage shows sum of Input Size=868.3KB.
The "SELECT ALL columns"'s stage shows sum of Input Size=142.3MB.
Note: Here is the Complete Sample Code.
No comments:
Post a Comment