Monday, May 3, 2021

Understand Decimal precision and scale calculation in Spark using GPU or CPU mode

Goal:

This article research on how Spark calculates the Decimal precision and scale using GPU or CPU mode.

Basically we will test Addition/Subtraction/Multiplication/Division/Modulo/Union in this post.

Env:

Spark 3.1.1

Rapids accelerator 0.5 snapshot with cuDF 0.19 snapshot jar

Concept:

Spark's logic to calculates the Decimal precision and scale is inside DecimalPrecision.scala.

 * In particular, if we have expressions e1 and e2 with precision/scale p1/s1 and p2/s2
* respectively, then the following operations have the following precision / scale:
*
* Operation Result Precision Result Scale
* ------------------------------------------------------------------------
* e1 + e2 max(s1, s2) + max(p1-s1, p2-s2) + 1 max(s1, s2)
* e1 - e2 max(s1, s2) + max(p1-s1, p2-s2) + 1 max(s1, s2)
* e1 * e2 p1 + p2 + 1 s1 + s2
* e1 / e2 p1 - s1 + s2 + max(6, s1 + p2 + 1) max(6, s1 + p2 + 1)
* e1 % e2 min(p1-s1, p2-s2) + max(s1, s2) max(s1, s2)
* e1 union e2 max(s1, s2) + max(p1-s1, p2-s2) max(s1, s2)

This matches the Hive's rule in this Hive Decimal Precision/Scale Support document.

Other than that, Spark has a parameter spark.sql.decimalOperations.allowPrecisionLoss (default true) to control if the precision / scale needed are out of the range of available values, the scale is reduced up to 6, in order to prevent the truncation of the integer part of the decimals.

 

Now let's look at GPU mode(with Rapids accelerator)'s limit: 

Currently in Rapids accelerator 0.4.1/0.5 snapshot release, the limit for decimal is up to 18 digits(64bits) as per this Doc.

So if the precision is > 18, it will fallback to CPU mode.

Below let's do some tests to confirm the theory matches practice.

Solution:

1. Prepare an example Dataframe with different types of decimal

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.types._
spark.conf.set("spark.rapids.sql.enabled", true)
spark.conf.set("spark.rapids.sql.decimalType.enabled", true)

val df = spark.sparkContext.parallelize(Seq(1)).toDF()
val df2=df.withColumn("value82", (lit("123456.78").cast(DecimalType(8,2)))).
withColumn("value63", (lit("123.456").cast(DecimalType(6,3)))).
withColumn("value1510", (lit("12345.0123456789").cast(DecimalType(15,10)))).
withColumn("value2510", (lit("123456789012345.0123456789").cast(DecimalType(25,10))))

df2.write.parquet("/tmp/df2.parquet")
val newdf2=spark.read.parquet("/tmp/df2.parquet")
newdf2.createOrReplaceTempView("df2")
newdf2's schema:
scala> newdf2.printSchema
root
|-- value: integer (nullable = false)
|-- value82: decimal(8,2) (nullable = true)
|-- value63: decimal(6,3) (nullable = true)
|-- value1510: decimal(15,10) (nullable = true)
|-- value2510: decimal(25,10) (nullable = true)

2. GPU Mode (Result Decimal within GPU's limit : <=18 digits)

Below tests make sure all result decimal's precision is within GPU's limit which is 18 digits in this Rapids accelerator version.

So we only use 2 fields -- value82: decimal(8,2) and value63: decimal(6,3) of df2.

This is to confirm that the theory works fine in GPU mode or not.

To use above concept/theory to calculate the expected result precision and scale, let's use below code to calculate it in an easy way:

import scala.math.{max, min}
val (p1,s1)=(8,2)
val (p2,s2)=(6,3)

2.1 Addition

val df_plus=spark.sql("SELECT value82+value63 FROM df2")
df_plus.printSchema
df_plus.explain
df_plus.collect

Output:

scala> val df_plus=spark.sql("SELECT value82+value63 FROM df2")
df_plus: org.apache.spark.sql.DataFrame = [(CAST(value82 AS DECIMAL(10,3)) + CAST(value63 AS DECIMAL(10,3))): decimal(10,3)]

scala> df_plus.printSchema
root
|-- (CAST(value82 AS DECIMAL(10,3)) + CAST(value63 AS DECIMAL(10,3))): decimal(10,3) (nullable = true)


scala> df_plus.explain
== Physical Plan ==
GpuColumnarToRow false
+- GpuProject [gpucheckoverflow((gpupromoteprecision(cast(value82#58 as decimal(10,3))) + gpupromoteprecision(cast(value63#59 as decimal(10,3)))), DecimalType(10,3), true) AS (CAST(value82 AS DECIMAL(10,3)) + CAST(value63 AS DECIMAL(10,3)))#88]
+- GpuFileGpuScan parquet [value82#58,value63#59] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/df2.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value82:decimal(8,2),value63:decimal(6,3)>



scala> df_plus.collect
res21: Array[org.apache.spark.sql.Row] = Array([123580.236])

The result Decimal is (10,3) which matches the theory, and it also runs on GPU as show from explain output.

scala> max(s1, s2) + max(p1-s1, p2-s2) + 1
res7: Int = 10

scala> max(s1, s2)
res8: Int = 3

Note: In the following tests, I will just show you the result instead of printing too much output to save the length of this post. But feel free to do the math yourself.

2.2 Subtraction

# Result Decimal (10,3)
val df_minus=spark.sql("SELECT value82-value63 FROM df2")
df_minus.printSchema
df_minus.explain
df_minus.collect

2.3 Multiplication

# Result Decimal (15,5) 
val df_multi=spark.sql("SELECT value82*value63 FROM df2")
df_multi.printSchema
df_multi.explain
df_multi.collect
Output:
scala> val df_multi=spark.sql("SELECT value82*value63 FROM df2")
df_multi: org.apache.spark.sql.DataFrame = [(CAST(value82 AS DECIMAL(9,3)) * CAST(value63 AS DECIMAL(9,3))): decimal(15,5)]

scala> df_multi.printSchema
root
|-- (CAST(value82 AS DECIMAL(9,3)) * CAST(value63 AS DECIMAL(9,3))): decimal(15,5) (nullable = true)


scala> df_multi.explain
21/05/04 18:02:21 WARN GpuOverrides:
!Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced
@Expression <Alias> CheckOverflow((promote_precision(cast(value82#58 as decimal(9,3))) * promote_precision(cast(value63#59 as decimal(9,3)))), DecimalType(15,5), true) AS (CAST(value82 AS DECIMAL(9,3)) * CAST(value63 AS DECIMAL(9,3)))#96 could run on GPU
@Expression <CheckOverflow> CheckOverflow((promote_precision(cast(value82#58 as decimal(9,3))) * promote_precision(cast(value63#59 as decimal(9,3)))), DecimalType(15,5), true) could run on GPU
!Expression <Multiply> (promote_precision(cast(value82#58 as decimal(9,3))) * promote_precision(cast(value63#59 as decimal(9,3)))) cannot run on GPU because The actual output precision of the multiply is too large to fit on the GPU DecimalType(19,6)
@Expression <PromotePrecision> promote_precision(cast(value82#58 as decimal(9,3))) could run on GPU
@Expression <Cast> cast(value82#58 as decimal(9,3)) could run on GPU
@Expression <AttributeReference> value82#58 could run on GPU
@Expression <PromotePrecision> promote_precision(cast(value63#59 as decimal(9,3))) could run on GPU
@Expression <Cast> cast(value63#59 as decimal(9,3)) could run on GPU
@Expression <AttributeReference> value63#59 could run on GPU

== Physical Plan ==
*(1) Project [CheckOverflow((promote_precision(cast(value82#58 as decimal(9,3))) * promote_precision(cast(value63#59 as decimal(9,3)))), DecimalType(15,5), true) AS (CAST(value82 AS DECIMAL(9,3)) * CAST(value63 AS DECIMAL(9,3)))#96]
+- GpuColumnarToRow false
+- GpuFileGpuScan parquet [value82#58,value63#59] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/df2.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value82:decimal(8,2),value63:decimal(6,3)>



scala> df_multi.collect
res27: Array[org.apache.spark.sql.Row] = Array([15241480.23168])
Here even though the result Decimal is just (15,5) but it still falls back on CPU.
This is because Spark inserts "PromotePrecision" to CAST both sides to the same type -- Decimal(9,3).
Currently GPU has to be very cautious on this PromotePrecision, so it thought the result is Decimal (19,6) instead of (15,5).

2.4 Division

# Result Decimal (18,9) -- Fallback on CPU
val df_div=spark.sql("SELECT value82/value63 FROM df2")
df_div.printSchema
df_div.explain
df_div.collect

2.5 Modulo

# Result Decimal (6,3) -- Fallback on CPU
val df_mod=spark.sql("SELECT value82 % value63 FROM df2")
df_mod.printSchema
df_mod.explain
df_mod.collect
Note: this is because Modulo is not supported for Decimal on GPU as per this supported_ops.md.

2.6 Union

# Result Decimal (9,3) 
val df_union=spark.sql("SELECT value82 from df2 union SELECT value63 from df2")
df_union.printSchema
df_union.explain
df_union.collect

3. GPU Mode fallback to CPU (19 ~ 38 digits)

Below tests may fall back to CPU if result decimal's precision is above GPU's limit.

So we only use 2 fields -- value82: decimal(8,2) and value1510: decimal(15,10) of df2.

3.1 Addition

# Result Decimal (17,10) -- within GPU limit
val df_plus=spark.sql("SELECT value82+value1510 FROM df2")
df_plus.printSchema
df_plus.explain
df_plus.collect

3.2 Subtraction

# Result Decimal (17,10) -- within GPU limit
val df_minus=spark.sql("SELECT value82-value1510 FROM df2")
df_minus.printSchema
df_minus.explain
df_minus.collect

3.3 Multiplication

# Result Decimal (24,12) -- outside of GPU limit
val df_multi=spark.sql("SELECT value82*value1510 FROM df2")
df_multi.printSchema
df_multi.explain
Output:
scala> val df_multi=spark.sql("SELECT value82*value1510 FROM df2")
df_multi: org.apache.spark.sql.DataFrame = [(CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10))): decimal(24,12)]

scala> df_multi.printSchema
root
|-- (CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10))): decimal(24,12) (nullable = true)


scala> df_multi.explain
21/05/04 18:44:46 WARN GpuOverrides:
!Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced; unsupported data types in output: DecimalType(24,12)
!Expression <Alias> CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) AS (CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10)))#132 cannot run on GPU because expression Alias CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) AS (CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10)))#132 produces an unsupported type DecimalType(24,12); expression CheckOverflow CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) produces an unsupported type DecimalType(24,12)
!Expression <CheckOverflow> CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) cannot run on GPU because expression CheckOverflow CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) produces an unsupported type DecimalType(24,12)
!Expression <Multiply> (promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))) cannot run on GPU because The actual output precision of the multiply is too large to fit on the GPU DecimalType(33,20)
@Expression <PromotePrecision> promote_precision(cast(value82#58 as decimal(16,10))) could run on GPU
@Expression <Cast> cast(value82#58 as decimal(16,10)) could run on GPU
@Expression <AttributeReference> value82#58 could run on GPU
@Expression <PromotePrecision> promote_precision(cast(value1510#60 as decimal(16,10))) could run on GPU
@Expression <Cast> cast(value1510#60 as decimal(16,10)) could run on GPU
@Expression <AttributeReference> value1510#60 could run on GPU

== Physical Plan ==
*(1) Project [CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) AS (CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10)))#132]
+- GpuColumnarToRow false
+- GpuFileGpuScan parquet [value82#58,value1510#60] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/df2.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value82:decimal(8,2),value1510:decimal(15,10)>



scala> df_multi.collect
res51: Array[org.apache.spark.sql.Row] = Array([1524075473.257763907942])

3.4 Division

# Result Decimal (34,18) -- outside of GPU limit
val df_div=spark.sql("SELECT value82/value1510 FROM df2")
df_div.printSchema
df_div.explain
df_div.collect

3.5 Modulo

# Result Decimal(15,10) -- within GPU limit, but fallback on CPU
val df_mod=spark.sql("SELECT value82 % value1510 FROM df2")
df_mod.printSchema
df_mod.explain
df_mod.collect
Note: this is because Modulo is not supported for Decimal on GPU as per this supported_ops.md

3.6 Union

# Result Decimal (16,10) -- within GPU limit
val df_union=spark.sql("SELECT value82 from df2 union SELECT value1510 from df2")
df_union.printSchema
df_union.explain
df_union.collect

4. Above decimal max range (> 38 digits)

If the result decimal is above 38 digits, spark.sql.decimalOperations.allowPrecisionLoss can be used to control the behavior.
So we only use 2 fields -- value1510: decimal(15,10) and value2510: decimal(25,10) of df2. 
# Result Decimal (38,17)
val df_multi=spark.sql("SELECT value1510*value2510 FROM df2")
df_multi.printSchema
df_multi.explain
df_multi.collect
As per the theory, the result decimal should be (41,20):
scala> val (p1,s1)=(15,10)
p1: Int = 15
s1: Int = 10

scala> val (p2,s2)=(25,10)
p2: Int = 25
s2: Int = 10

scala> p1 + p2 + 1
res31: Int = 41

scala> s1 + s2
res32: Int = 20
However since 41>38, so another function adjustPrecisionScale inside DecimalType.scala is called to adjust the precision and scale. 
For this specific example, below code logic is applied:
    } else {
// Precision/scale exceed maximum precision. Result must be adjusted to MAX_PRECISION.
val intDigits = precision - scale
// If original scale is less than MINIMUM_ADJUSTED_SCALE, use original scale value; otherwise
// preserve at least MINIMUM_ADJUSTED_SCALE fractional digits
val minScaleValue = Math.min(scale, MINIMUM_ADJUSTED_SCALE)
// The resulting scale is the maximum between what is available without causing a loss of
// digits for the integer part of the decimal and the minimum guaranteed scale, which is
// computed above
val adjustedScale = Math.max(MAX_PRECISION - intDigits, minScaleValue)

DecimalType(MAX_PRECISION, adjustedScale)
}
So intDigits=41-20=21,  minScaleValue=6, adjustedScale=max(38-21,6)=17.
That is why the result decimal is (38,17).
 
Since above function is only called when spark.sql.decimalOperations.allowPrecisionLoss=true, so if we set it false, it will return null:
scala> df_multi.collect
res67: Array[org.apache.spark.sql.Row] = Array([null])

References:

https://cwiki.apache.org/confluence/download/attachments/27362075/Hive_Decimal_Precision_Scale_Support.pdf

 

15 comments:

  1. Great survey, I'm sure you're getting a great response.

    ReplyDelete
  2. wonderful article. Very interesting to read this article. I would like to thank you for the efforts you had made for writing this awesome article. This article resolved my all queries.

    ReplyDelete

Popular Posts