Skip to content

Commit bb11c93

Browse files
rootroot
authored andcommitted
[SPARK-18122][SQL]Fallback to Kryo for unsupported encoder for class's subfiled
1 parent c8c0906 commit bb11c93

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20+
import scala.reflect.ClassTag
21+
2022
import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedAttribute, UnresolvedExtractValue}
2123
import org.apache.spark.sql.catalyst.expressions._
2224
import org.apache.spark.sql.catalyst.expressions.objects._
@@ -403,6 +405,9 @@ object ScalaReflection extends ScalaReflection {
403405
} else {
404406
newInstance
405407
}
408+
409+
case _ =>
410+
DecodeUsingSerializer(getPath, ClassTag(getClassFromType(tpe)), true)
406411
}
407412
}
408413

@@ -583,8 +588,7 @@ object ScalaReflection extends ScalaReflection {
583588
expressions.If(IsNull(inputObject), nullOutput, nonNullOutput)
584589

585590
case other =>
586-
throw new UnsupportedOperationException(
587-
s"No Encoder found for $tpe\n" + walkedTypePath.mkString("\n"))
591+
EncodeUsingSerializer(inputObject, true)
588592
}
589593

590594
}
@@ -701,7 +705,7 @@ object ScalaReflection extends ScalaReflection {
701705
StructField(fieldName, dataType, nullable)
702706
}), nullable = true)
703707
case other =>
704-
throw new UnsupportedOperationException(s"Schema for type $other is not supported")
708+
Schema(BinaryType, nullable = false)
705709
}
706710
}
707711

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ case class NestedArray(a: Array[Array[Int]]) {
5050
}
5151
}
5252

53+
case class KryoUnsupportedEncoderForSubFiled(
54+
a: String,
55+
b: Seq[Int],
56+
c: Option[Set[Int]])
57+
5358
case class BoxedData(
5459
intField: java.lang.Integer,
5560
longField: java.lang.Long,
@@ -179,6 +184,10 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
179184
encodeDecodeTest(new KryoSerializable(15), "kryo object")(
180185
encoderFor(Encoders.kryo[KryoSerializable]))
181186

187+
// use kryo to ser/deser the type which has a unsupported Encoder
188+
encodeDecodeTest(Seq(KryoUnsupportedEncoderForSubFiled("a", Seq(1), Some(Set(2))),
189+
KryoUnsupportedEncoderForSubFiled("b", Seq(3), None)), "type with unsupported encoder,use kryo")
190+
182191
// Java encoders
183192
encodeDecodeTest("hello", "java string")(encoderFor(Encoders.javaSerialization[String]))
184193
encodeDecodeTest(new JavaSerializable(15), "java object")(

0 commit comments

Comments
 (0)