Goal:
This article shares the steps to enable GpuKryoRegistrator on RAPIDS Accelerator for Spark.
Env:
Spark 3.1.1
RAPIDS Accelerator for Apache Spark 0.4.1
Solution:
As mentioned in Spark Tuning Doc:
- Java serialization:
By default, Spark serializes objects using Java’s
ObjectOutputStream
framework, and can work with any class you create that implementsjava.io.Serializable
. You can also control the performance of your serialization more closely by extendingjava.io.Externalizable
. Java serialization is flexible but often quite slow, and leads to large serialized formats for many classes. - Kryo serialization: Spark can also use
the Kryo library (version 4) to serialize objects more quickly. Kryo is significantly
faster and more compact than Java serialization (often as much as 10x), but does not support all
Serializable
types and requires you to register the classes you’ll use in the program in advance for best performance.
In Rapids Accelerator, it also has a class named com.nvidia.spark.rapids.GpuKryoRegistrator to use Kryo to register below classes in org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExec :
- SerializeConcatHostBuffersDeserializeBatch
- SerializeBatchDeserializeHostBuffer
How to enable?
Set below 2 parameters(eg, in spark-defaults.conf):
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator com.nvidia.spark.rapids.GpuKryoRegistrator
Common Issues
This is a common issue in Kryo serialization : Buffer overflow.
For example, when running Q7 of TPCDS/NDS, it may fail with:
Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 636
at com.esotericsoftware.kryo.io.Output.require(Output.java:167)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)
at com.esotericsoftware.kryo.io.Output.write(Output.java:219)
at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1859)
at java.base/java.io.ObjectOutputStream.write(ObjectOutputStream.java:712)
at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:123)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
at ai.rapids.cudf.JCudfSerialization$DataOutputStreamWriter.copyDataFrom(JCudfSerialization.java:600)
at ai.rapids.cudf.JCudfSerialization$DataWriter.copyDataFrom(JCudfSerialization.java:546)
at ai.rapids.cudf.JCudfSerialization.copySlicedAndPad(JCudfSerialization.java:1104)
at ai.rapids.cudf.JCudfSerialization.copySlicedOffsets(JCudfSerialization.java:1332)
at ai.rapids.cudf.JCudfSerialization.writeSliced(JCudfSerialization.java:1464)
at ai.rapids.cudf.JCudfSerialization.writeSliced(JCudfSerialization.java:1517)
at ai.rapids.cudf.JCudfSerialization.writeToStream(JCudfSerialization.java:1567)
at org.apache.spark.sql.rapids.execution.SerializeBatchDeserializeHostBuffer.writeObject(GpuBroadcastExchangeExec.scala:153)
at jdk.internal.reflect.GeneratedMethodAccessor91.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at com.esotericsoftware.kryo.serializers.JavaSerializer.write(JavaSerializer.java:51)
... 9 more
The fix is to increase the spark.kryoserializer.buffer.max from default 64M to bigger, say 512M:
spark.kryoserializer.buffer.max 512m
No comments:
Post a Comment