@@ -164,7 +164,7 @@ object ScalaReflection extends ScalaReflection {
164164
165165 /** Returns the current path or `GetColumnByOrdinal`. */
166166 def getPath : Expression = {
167- val dataType = schemaFor (tpe).dataType
167+ val dataType = schemaForDefaultBinaryType (tpe).dataType
168168 if (path.isDefined) {
169169 path.get
170170 } else {
@@ -409,7 +409,8 @@ object ScalaReflection extends ScalaReflection {
409409 val cls = getClassFromType(tpe)
410410
411411 val arguments = params.zipWithIndex.map { case ((fieldName, fieldType), i) =>
412- val Schema (dataType, nullable) = schemaFor(fieldType)
412+ val Schema (dataType, nullablity) = schemaForDefaultBinaryType(fieldType)
413+
413414 val clsName = getClassNameFromType(fieldType)
414415 val newTypePath = s """ - field (class: " $clsName", name: " $fieldName") """ +: walkedTypePath
415416 // For tuples, we based grab the inner fields by ordinal instead of name.
@@ -424,7 +425,7 @@ object ScalaReflection extends ScalaReflection {
424425 Some (addToPath(fieldName, dataType, newTypePath)),
425426 newTypePath)
426427
427- if (! nullable ) {
428+ if (! nullablity ) {
428429 AssertNotNull (constructor, newTypePath)
429430 } else {
430431 constructor
@@ -445,6 +446,7 @@ object ScalaReflection extends ScalaReflection {
445446 }
446447
447448 case _ =>
449+ // default kryo deserializer
448450 DecodeUsingSerializer (getPath, ClassTag (getClassFromType(tpe)), true )
449451 }
450452 }
@@ -644,7 +646,8 @@ object ScalaReflection extends ScalaReflection {
644646 val nullOutput = expressions.Literal .create(null , nonNullOutput.dataType)
645647 expressions.If (IsNull (inputObject), nullOutput, nonNullOutput)
646648
647- case other =>
649+ case _ =>
650+ // default kryo serializer
648651 EncodeUsingSerializer (inputObject, true )
649652 }
650653
@@ -712,6 +715,13 @@ object ScalaReflection extends ScalaReflection {
712715 s.toAttributes
713716 }
714717
718+ /**
719+ * Returns a catalyst DataType and its nullability for the given Scala Type using reflection.
720+ * If the tpe mismatched in schemaFor function, the default BinaryType returned
721+ */
722+ def schemaForDefaultBinaryType (tpe : `Type`): Schema = scala.util.Try (schemaFor(tpe)).toOption
723+ .getOrElse(Schema (BinaryType , nullable = true ))
724+
715725 /** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */
716726 def schemaFor [T : TypeTag ]: Schema = schemaFor(localTypeOf[T ])
717727
@@ -775,7 +785,8 @@ object ScalaReflection extends ScalaReflection {
775785 StructField (fieldName, dataType, nullable)
776786 }), nullable = true )
777787 case other =>
778- Schema (BinaryType , nullable = false )
788+ throw new UnsupportedOperationException (s " Schema for type $other is not supported " )
789+ // Schema(BinaryType, nullable = false)
779790 }
780791 }
781792
0 commit comments