|
18 | 18 | package org.apache.spark.api.python |
19 | 19 |
|
20 | 20 | import org.apache.spark.rdd.RDD |
21 | | -import org.apache.spark.SparkContext |
| 21 | +import org.apache.spark.{Logging, SparkContext} |
22 | 22 | import org.apache.hadoop.conf.Configuration |
23 | 23 | import org.apache.hadoop.io._ |
| 24 | +import scala.util.{Failure, Success, Try} |
24 | 25 |
|
25 | 26 |
|
| 27 | +trait Converter { |
| 28 | + def convert(obj: Any): Any |
| 29 | +} |
| 30 | + |
| 31 | +object DefaultConverter extends Converter { |
| 32 | + |
| 33 | + /** |
| 34 | + * Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or |
| 35 | + * object representation |
| 36 | + */ |
| 37 | + private def convertWritable(writable: Writable): Any = { |
| 38 | + import collection.JavaConversions._ |
| 39 | + writable match { |
| 40 | + case iw: IntWritable => SparkContext.intWritableConverter().convert(iw) |
| 41 | + case dw: DoubleWritable => SparkContext.doubleWritableConverter().convert(dw) |
| 42 | + case lw: LongWritable => SparkContext.longWritableConverter().convert(lw) |
| 43 | + case fw: FloatWritable => SparkContext.floatWritableConverter().convert(fw) |
| 44 | + case t: Text => SparkContext.stringWritableConverter().convert(t) |
| 45 | + case bw: BooleanWritable => SparkContext.booleanWritableConverter().convert(bw) |
| 46 | + case byw: BytesWritable => SparkContext.bytesWritableConverter().convert(byw) |
| 47 | + case n: NullWritable => null |
| 48 | + case aw: ArrayWritable => aw.get().map(convertWritable(_)) |
| 49 | + case mw: MapWritable => mapAsJavaMap(mw.map{ case (k, v) => |
| 50 | + (convertWritable(k), convertWritable(v)) |
| 51 | + }.toMap) |
| 52 | + case other => other |
| 53 | + } |
| 54 | + } |
| 55 | + |
| 56 | + def convert(obj: Any): Any = { |
| 57 | + obj match { |
| 58 | + case writable: Writable => |
| 59 | + convertWritable(writable) |
| 60 | + case _ => |
| 61 | + obj |
| 62 | + } |
| 63 | + } |
| 64 | +} |
| 65 | + |
| 66 | +class ConverterRegistry extends Logging { |
| 67 | + |
| 68 | + var keyConverter: Converter = DefaultConverter |
| 69 | + var valueConverter: Converter = DefaultConverter |
| 70 | + |
| 71 | + def convertKey(obj: Any): Any = keyConverter.convert(obj) |
| 72 | + |
| 73 | + def convertValue(obj: Any): Any = valueConverter.convert(obj) |
| 74 | + |
| 75 | + def registerKeyConverter(converterClass: String) = { |
| 76 | + keyConverter = register(converterClass) |
| 77 | + logInfo(s"Loaded and registered key converter ($converterClass)") |
| 78 | + } |
| 79 | + |
| 80 | + def registerValueConverter(converterClass: String) = { |
| 81 | + valueConverter = register(converterClass) |
| 82 | + logInfo(s"Loaded and registered value converter ($converterClass)") |
| 83 | + } |
| 84 | + |
| 85 | + private def register(converterClass: String): Converter = { |
| 86 | + Try { |
| 87 | + val converter = Class.forName(converterClass).newInstance().asInstanceOf[Converter] |
| 88 | + converter |
| 89 | + } match { |
| 90 | + case Success(s) => s |
| 91 | + case Failure(err) => |
| 92 | + logError(s"Failed to register converter: $converterClass") |
| 93 | + throw err |
| 94 | + } |
| 95 | + |
| 96 | + } |
| 97 | +} |
| 98 | + |
26 | 99 | /** Utilities for working with Python objects -> Hadoop-related objects */ |
27 | 100 | private[python] object PythonHadoopUtil { |
28 | 101 |
|
@@ -51,33 +124,18 @@ private[python] object PythonHadoopUtil { |
51 | 124 | * Converts an RDD of key-value pairs, where key and/or value could be instances of |
52 | 125 | * [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)] |
53 | 126 | */ |
54 | | - def convertRDD[K, V](rdd: RDD[(K, V)]) = { |
55 | | - rdd.map{ |
56 | | - case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V]) |
57 | | - case (k: Writable, v) => (convert(k).asInstanceOf[K], v.asInstanceOf[V]) |
58 | | - case (k, v: Writable) => (k.asInstanceOf[K], convert(v).asInstanceOf[V]) |
59 | | - case (k, v) => (k.asInstanceOf[K], v.asInstanceOf[V]) |
60 | | - } |
61 | | - } |
62 | | - |
63 | | - /** |
64 | | - * Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or |
65 | | - * object representation |
66 | | - */ |
67 | | - private def convert(writable: Writable): Any = { |
68 | | - import collection.JavaConversions._ |
69 | | - writable match { |
70 | | - case iw: IntWritable => SparkContext.intWritableConverter().convert(iw) |
71 | | - case dw: DoubleWritable => SparkContext.doubleWritableConverter().convert(dw) |
72 | | - case lw: LongWritable => SparkContext.longWritableConverter().convert(lw) |
73 | | - case fw: FloatWritable => SparkContext.floatWritableConverter().convert(fw) |
74 | | - case t: Text => SparkContext.stringWritableConverter().convert(t) |
75 | | - case bw: BooleanWritable => SparkContext.booleanWritableConverter().convert(bw) |
76 | | - case byw: BytesWritable => SparkContext.bytesWritableConverter().convert(byw) |
77 | | - case n: NullWritable => null |
78 | | - case aw: ArrayWritable => aw.get().map(convert(_)) |
79 | | - case mw: MapWritable => mapAsJavaMap(mw.map{ case (k, v) => (convert(k), convert(v)) }.toMap) |
80 | | - case other => other |
| 127 | + def convertRDD[K, V](rdd: RDD[(K, V)], |
| 128 | + keyClass: String, |
| 129 | + keyConverter: Option[String], |
| 130 | + valueClass: String, |
| 131 | + valueConverter: Option[String]) = { |
| 132 | + rdd.mapPartitions { case iter => |
| 133 | + val registry = new ConverterRegistry |
| 134 | + keyConverter.foreach(registry.registerKeyConverter(_)) |
| 135 | + valueConverter.foreach(registry.registerValueConverter(_)) |
| 136 | + iter.map { case (k, v) => |
| 137 | + (registry.convertKey(k).asInstanceOf[K], registry.convertValue(v).asInstanceOf[V]) |
| 138 | + } |
81 | 139 | } |
82 | 140 | } |
83 | 141 |
|
|
0 commit comments