@@ -34,10 +34,10 @@ import org.apache.spark.api.java.function.PairFunction
3434import scala .util .{Success , Failure , Try }
3535import org .msgpack
3636import org .msgpack .ScalaMessagePack
37- import org .apache .hadoop .mapreduce .InputFormat
37+ import org .apache .hadoop .mapred .InputFormat
3838
3939import org .apache .hadoop .mapreduce .{InputFormat => NewInputFormat }
40- import org .apache .hadoop .mapred .SequenceFileOutputFormat
40+ import org .apache .hadoop .mapred .{ JobConf , SequenceFileOutputFormat }
4141import org .apache .hadoop .conf .Configuration
4242import java .util
4343
@@ -196,69 +196,6 @@ private object SpecialLengths {
196196 val TIMING_DATA = - 3
197197}
198198
199- case class TestClass (var id : String , var number : Int ) {
200- def this () = this (" " , 0 )
201- }
202-
203- object TestHadoop extends App {
204-
205- // PythonRDD.writeToStream((1, "bar"), new DataOutputStream(new FileOutputStream("/tmp/test.out")))
206-
207-
208- // val n = new NullWritable
209-
210- import SparkContext ._
211-
212- val path = " /tmp/spark/test/sfarray/"
213- // val path = "/Users/Nick/workspace/java/faunus/output/job-0/"
214-
215- val sc = new SparkContext (" local[2]" , " test" )
216-
217- // val rdd = sc.sequenceFile[NullWritable, FaunusVertex](path)
218- // val data = Seq((1.0, "aa"), (2.0, "bb"), (2.0, "aa"), (3.0, "cc"), (2.0, "bb"), (1.0, "aa"))
219- val data = Seq (
220- (1 , Array (1.0 , 2.0 , 3.0 )),
221- (2 , Array (3.0 , 4.0 , 5.0 )),
222- (3 , Array (4.0 , 5.0 , 6.0 ))
223- )
224- val d = new DoubleWritable (5.0 )
225- val a = new ArrayWritable (classOf [DoubleWritable ], Array (d))
226-
227- val rdd = sc.parallelize(data, numSlices = 2 )
228- // .map({ case (k, v) => (new IntWritable(k), v.map(new DoubleWritable(_))) })
229- .map{ case (k, v) => (new IntWritable (k), new ArrayWritable (classOf [DoubleWritable ], v.map(new DoubleWritable (_)))) }
230- rdd.saveAsNewAPIHadoopFile[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat [IntWritable , ArrayWritable ]](path)
231-
232- /*
233- val data = Seq(
234- ("1", TestClass("test1", 123)),
235- ("2", TestClass("test2", 456)),
236- ("1", TestClass("test3", 123)),
237- ("3", TestClass("test56", 456)),
238- ("2", TestClass("test2", 123))
239- )
240- val rdd = sc.parallelize(data, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
241- rdd.saveAsNewAPIHadoopFile(path,
242- classOf[Text], classOf[TestClass],
243- classOf[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[Text, TestClass]])
244-
245- //val rdd2 = Seq((1, ))
246-
247- val seq = sc.sequenceFile[Double, String](path)
248- val seqR = seq.collect()
249-
250- val packed = PythonRDD.serMsgPack(rdd)
251- val packedR = packed.collect()
252- val packed2 = PythonRDD.serMsgPack(seq)
253- val packedR2 = packed2.collect()
254-
255- println(seqR.mkString(","))
256- println(packedR.mkString(","))
257- println(packedR2.mkString(","))
258- */
259-
260- }
261-
262199private [spark] object PythonRDD extends Logging {
263200
264201 def readRDDFromFile (sc : JavaSparkContext , filename : String , parallelism : Int ):
@@ -281,7 +218,7 @@ private[spark] object PythonRDD extends Logging {
281218
282219 // PySpark / Hadoop InputFormat stuff
283220
284- // SequenceFile
221+ /** Create and RDD from a path using [[ org.apache.hadoop.mapred.SequenceFileInputFormat ]] */
285222 def sequenceFile [K ,V ](sc : JavaSparkContext ,
286223 path : String ,
287224 keyClass : String ,
@@ -299,8 +236,11 @@ private[spark] object PythonRDD extends Logging {
299236 JavaRDD .fromRDD(SerDeUtil .serMsgPack[K , V ](converted))
300237 }
301238
302- // Arbitrary Hadoop InputFormat, key class and value class
303- def newHadoopFile [K , V , F <: NewInputFormat [K , V ]](sc : JavaSparkContext ,
239+ /**
240+ * Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat ]],
241+ * key and value class
242+ */
243+ def newAPIHadoopFile [K , V , F <: NewInputFormat [K , V ]](sc : JavaSparkContext ,
304244 path : String ,
305245 inputFormatClazz : String ,
306246 keyClazz : String ,
@@ -312,30 +252,103 @@ private[spark] object PythonRDD extends Logging {
312252 val baseConf = sc.hadoopConfiguration()
313253 val mergedConf = PythonHadoopUtil .mergeConfs(baseConf, conf)
314254 val rdd =
315- newHadoopFileFromClassNames[K , V , F ](sc,
316- path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper, mergedConf)
255+ newAPIHadoopRDDFromClassNames[K , V , F ](sc,
256+ Some (path), inputFormatClazz, keyClazz, valueClazz, mergedConf)
257+ val converted = SerDeUtil .convertRDD[K , V ](rdd)
258+ JavaRDD .fromRDD(SerDeUtil .serMsgPack[K , V ](converted))
259+ }
260+
261+ /**
262+ * Create an RDD from a [[org.apache.hadoop.conf.Configuration ]] converted from a map that is passed in from Python,
263+ * using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat ]], key and value class
264+ */
265+ def newAPIHadoopRDD [K , V , F <: NewInputFormat [K , V ]](sc : JavaSparkContext ,
266+ inputFormatClazz : String ,
267+ keyClazz : String ,
268+ valueClazz : String ,
269+ keyWrapper : String ,
270+ valueWrapper : String ,
271+ confAsMap : java.util.HashMap [String , String ]) = {
272+ val conf = PythonHadoopUtil .mapToConf(confAsMap)
273+ val rdd =
274+ newAPIHadoopRDDFromClassNames[K , V , F ](sc,
275+ None , inputFormatClazz, keyClazz, valueClazz, conf)
317276 val converted = SerDeUtil .convertRDD[K , V ](rdd)
318277 JavaRDD .fromRDD(SerDeUtil .serMsgPack[K , V ](converted))
319278 }
320279
321- private def newHadoopFileFromClassNames [K , V , F <: NewInputFormat [K , V ]](sc : JavaSparkContext ,
322- path : String ,
280+ private def newAPIHadoopRDDFromClassNames [K , V , F <: NewInputFormat [K , V ]](sc : JavaSparkContext ,
281+ path : Option [ String ] = None ,
323282 inputFormatClazz : String ,
324283 keyClazz : String ,
325284 valueClazz : String ,
326- keyWrapper : String ,
327- valueWrapper : String ,
328285 conf : Configuration ) = {
329286 implicit val kcm = ClassManifest .fromClass(Class .forName(keyClazz)).asInstanceOf [ClassManifest [K ]]
330287 implicit val vcm = ClassManifest .fromClass(Class .forName(valueClazz)).asInstanceOf [ClassManifest [V ]]
331288 implicit val fcm = ClassManifest .fromClass(Class .forName(inputFormatClazz)).asInstanceOf [ClassManifest [F ]]
332289 val kc = kcm.erasure.asInstanceOf [Class [K ]]
333290 val vc = vcm.erasure.asInstanceOf [Class [V ]]
334291 val fc = fcm.erasure.asInstanceOf [Class [F ]]
335- sc.sc.newAPIHadoopFile(path, fc, kc, vc, conf)
292+ val rdd = if (path.isDefined) {
293+ sc.sc.newAPIHadoopFile[K , V , F ](path.get, fc, kc, vc, conf)
294+ } else {
295+ sc.sc.newAPIHadoopRDD[K , V , F ](conf, fc, kc, vc)
296+ }
297+ rdd
298+ }
299+
300+ def hadoopFile [K , V , F <: InputFormat [K , V ]](sc : JavaSparkContext ,
301+ path : String ,
302+ inputFormatClazz : String ,
303+ keyClazz : String ,
304+ valueClazz : String ,
305+ keyWrapper : String ,
306+ valueWrapper : String ,
307+ confAsMap : java.util.HashMap [String , String ]) = {
308+ val conf = PythonHadoopUtil .mapToConf(confAsMap)
309+ val baseConf = sc.hadoopConfiguration()
310+ val mergedConf = PythonHadoopUtil .mergeConfs(baseConf, conf)
311+ val rdd =
312+ hadoopRDDFromClassNames[K , V , F ](sc,
313+ Some (path), inputFormatClazz, keyClazz, valueClazz, mergedConf)
314+ val converted = SerDeUtil .convertRDD[K , V ](rdd)
315+ JavaRDD .fromRDD(SerDeUtil .serMsgPack[K , V ](converted))
316+ }
317+
318+ def hadoopRDD [K , V , F <: InputFormat [K , V ]](sc : JavaSparkContext ,
319+ inputFormatClazz : String ,
320+ keyClazz : String ,
321+ valueClazz : String ,
322+ keyWrapper : String ,
323+ valueWrapper : String ,
324+ confAsMap : java.util.HashMap [String , String ]) = {
325+ val conf = PythonHadoopUtil .mapToConf(confAsMap)
326+ val rdd =
327+ hadoopRDDFromClassNames[K , V , F ](sc,
328+ None , inputFormatClazz, keyClazz, valueClazz, conf)
329+ val converted = SerDeUtil .convertRDD[K , V ](rdd)
330+ JavaRDD .fromRDD(SerDeUtil .serMsgPack[K , V ](converted))
336331 }
337332
338- //
333+ private def hadoopRDDFromClassNames [K , V , F <: InputFormat [K , V ]](sc : JavaSparkContext ,
334+ path : Option [String ] = None ,
335+ inputFormatClazz : String ,
336+ keyClazz : String ,
337+ valueClazz : String ,
338+ conf : Configuration ) = {
339+ implicit val kcm = ClassManifest .fromClass(Class .forName(keyClazz)).asInstanceOf [ClassManifest [K ]]
340+ implicit val vcm = ClassManifest .fromClass(Class .forName(valueClazz)).asInstanceOf [ClassManifest [V ]]
341+ implicit val fcm = ClassManifest .fromClass(Class .forName(inputFormatClazz)).asInstanceOf [ClassManifest [F ]]
342+ val kc = kcm.erasure.asInstanceOf [Class [K ]]
343+ val vc = vcm.erasure.asInstanceOf [Class [V ]]
344+ val fc = fcm.erasure.asInstanceOf [Class [F ]]
345+ val rdd = if (path.isDefined) {
346+ sc.sc.hadoopFile(path.get, fc, kc, vc)
347+ } else {
348+ sc.sc.hadoopRDD(new JobConf (conf), fc, kc, vc)
349+ }
350+ rdd
351+ }
339352
340353 def writeToStream (elem : Any , dataOut : DataOutputStream )(implicit m : ClassManifest [Any ]) {
341354 elem match {
0 commit comments