Skip to content

Commit c304cc8

Browse files
committed
Adding supporting sequncefiles for tests. Cleaning up
1 parent 4b0a43f commit c304cc8

30 files changed

+213
-112
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 89 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ import org.apache.spark.api.java.function.PairFunction
3434
import scala.util.{Success, Failure, Try}
3535
import org.msgpack
3636
import org.msgpack.ScalaMessagePack
37-
import org.apache.hadoop.mapreduce.InputFormat
37+
import org.apache.hadoop.mapred.InputFormat
3838

3939
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
40-
import org.apache.hadoop.mapred.SequenceFileOutputFormat
40+
import org.apache.hadoop.mapred.{JobConf, SequenceFileOutputFormat}
4141
import org.apache.hadoop.conf.Configuration
4242
import java.util
4343

@@ -196,69 +196,6 @@ private object SpecialLengths {
196196
val TIMING_DATA = -3
197197
}
198198

199-
case class TestClass(var id: String, var number: Int) {
200-
def this() = this("", 0)
201-
}
202-
203-
object TestHadoop extends App {
204-
205-
//PythonRDD.writeToStream((1, "bar"), new DataOutputStream(new FileOutputStream("/tmp/test.out")))
206-
207-
208-
//val n = new NullWritable
209-
210-
import SparkContext._
211-
212-
val path = "/tmp/spark/test/sfarray/"
213-
//val path = "/Users/Nick/workspace/java/faunus/output/job-0/"
214-
215-
val sc = new SparkContext("local[2]", "test")
216-
217-
//val rdd = sc.sequenceFile[NullWritable, FaunusVertex](path)
218-
//val data = Seq((1.0, "aa"), (2.0, "bb"), (2.0, "aa"), (3.0, "cc"), (2.0, "bb"), (1.0, "aa"))
219-
val data = Seq(
220-
(1, Array(1.0, 2.0, 3.0)),
221-
(2, Array(3.0, 4.0, 5.0)),
222-
(3, Array(4.0, 5.0, 6.0))
223-
)
224-
val d = new DoubleWritable(5.0)
225-
val a = new ArrayWritable(classOf[DoubleWritable], Array(d))
226-
227-
val rdd = sc.parallelize(data, numSlices = 2)
228-
//.map({ case (k, v) => (new IntWritable(k), v.map(new DoubleWritable(_))) })
229-
.map{ case (k, v) => (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_)))) }
230-
rdd.saveAsNewAPIHadoopFile[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[IntWritable, ArrayWritable]](path)
231-
232-
/*
233-
val data = Seq(
234-
("1", TestClass("test1", 123)),
235-
("2", TestClass("test2", 456)),
236-
("1", TestClass("test3", 123)),
237-
("3", TestClass("test56", 456)),
238-
("2", TestClass("test2", 123))
239-
)
240-
val rdd = sc.parallelize(data, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
241-
rdd.saveAsNewAPIHadoopFile(path,
242-
classOf[Text], classOf[TestClass],
243-
classOf[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[Text, TestClass]])
244-
245-
//val rdd2 = Seq((1, ))
246-
247-
val seq = sc.sequenceFile[Double, String](path)
248-
val seqR = seq.collect()
249-
250-
val packed = PythonRDD.serMsgPack(rdd)
251-
val packedR = packed.collect()
252-
val packed2 = PythonRDD.serMsgPack(seq)
253-
val packedR2 = packed2.collect()
254-
255-
println(seqR.mkString(","))
256-
println(packedR.mkString(","))
257-
println(packedR2.mkString(","))
258-
*/
259-
260-
}
261-
262199
private[spark] object PythonRDD extends Logging {
263200

264201
def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
@@ -281,7 +218,7 @@ private[spark] object PythonRDD extends Logging {
281218

282219
// PySpark / Hadoop InputFormat stuff
283220

284-
// SequenceFile
221+
/** Create and RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]] */
285222
def sequenceFile[K ,V](sc: JavaSparkContext,
286223
path: String,
287224
keyClass: String,
@@ -299,8 +236,11 @@ private[spark] object PythonRDD extends Logging {
299236
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
300237
}
301238

302-
// Arbitrary Hadoop InputFormat, key class and value class
303-
def newHadoopFile[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
239+
/**
240+
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
241+
* key and value class
242+
*/
243+
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
304244
path: String,
305245
inputFormatClazz: String,
306246
keyClazz: String,
@@ -312,30 +252,103 @@ private[spark] object PythonRDD extends Logging {
312252
val baseConf = sc.hadoopConfiguration()
313253
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
314254
val rdd =
315-
newHadoopFileFromClassNames[K, V, F](sc,
316-
path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper, mergedConf)
255+
newAPIHadoopRDDFromClassNames[K, V, F](sc,
256+
Some(path), inputFormatClazz, keyClazz, valueClazz, mergedConf)
257+
val converted = SerDeUtil.convertRDD[K, V](rdd)
258+
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
259+
}
260+
261+
/**
262+
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is passed in from Python,
263+
* using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]], key and value class
264+
*/
265+
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
266+
inputFormatClazz: String,
267+
keyClazz: String,
268+
valueClazz: String,
269+
keyWrapper: String,
270+
valueWrapper: String,
271+
confAsMap: java.util.HashMap[String, String]) = {
272+
val conf = PythonHadoopUtil.mapToConf(confAsMap)
273+
val rdd =
274+
newAPIHadoopRDDFromClassNames[K, V, F](sc,
275+
None, inputFormatClazz, keyClazz, valueClazz, conf)
317276
val converted = SerDeUtil.convertRDD[K, V](rdd)
318277
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
319278
}
320279

321-
private def newHadoopFileFromClassNames[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
322-
path: String,
280+
private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
281+
path: Option[String] = None,
323282
inputFormatClazz: String,
324283
keyClazz: String,
325284
valueClazz: String,
326-
keyWrapper: String,
327-
valueWrapper: String,
328285
conf: Configuration) = {
329286
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]]
330287
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]]
331288
implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)).asInstanceOf[ClassManifest[F]]
332289
val kc = kcm.erasure.asInstanceOf[Class[K]]
333290
val vc = vcm.erasure.asInstanceOf[Class[V]]
334291
val fc = fcm.erasure.asInstanceOf[Class[F]]
335-
sc.sc.newAPIHadoopFile(path, fc, kc, vc, conf)
292+
val rdd = if (path.isDefined) {
293+
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
294+
} else {
295+
sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc)
296+
}
297+
rdd
298+
}
299+
300+
def hadoopFile[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
301+
path: String,
302+
inputFormatClazz: String,
303+
keyClazz: String,
304+
valueClazz: String,
305+
keyWrapper: String,
306+
valueWrapper: String,
307+
confAsMap: java.util.HashMap[String, String]) = {
308+
val conf = PythonHadoopUtil.mapToConf(confAsMap)
309+
val baseConf = sc.hadoopConfiguration()
310+
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
311+
val rdd =
312+
hadoopRDDFromClassNames[K, V, F](sc,
313+
Some(path), inputFormatClazz, keyClazz, valueClazz, mergedConf)
314+
val converted = SerDeUtil.convertRDD[K, V](rdd)
315+
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
316+
}
317+
318+
def hadoopRDD[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
319+
inputFormatClazz: String,
320+
keyClazz: String,
321+
valueClazz: String,
322+
keyWrapper: String,
323+
valueWrapper: String,
324+
confAsMap: java.util.HashMap[String, String]) = {
325+
val conf = PythonHadoopUtil.mapToConf(confAsMap)
326+
val rdd =
327+
hadoopRDDFromClassNames[K, V, F](sc,
328+
None, inputFormatClazz, keyClazz, valueClazz, conf)
329+
val converted = SerDeUtil.convertRDD[K, V](rdd)
330+
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
336331
}
337332

338-
//
333+
private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
334+
path: Option[String] = None,
335+
inputFormatClazz: String,
336+
keyClazz: String,
337+
valueClazz: String,
338+
conf: Configuration) = {
339+
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]]
340+
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]]
341+
implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)).asInstanceOf[ClassManifest[F]]
342+
val kc = kcm.erasure.asInstanceOf[Class[K]]
343+
val vc = vcm.erasure.asInstanceOf[Class[V]]
344+
val fc = fcm.erasure.asInstanceOf[Class[F]]
345+
val rdd = if (path.isDefined) {
346+
sc.sc.hadoopFile(path.get, fc, kc, vc)
347+
} else {
348+
sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc)
349+
}
350+
rdd
351+
}
339352

340353
def writeToStream(elem: Any, dataOut: DataOutputStream)(implicit m: ClassManifest[Any]) {
341354
elem match {

core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,43 +15,60 @@ import scala.util.Failure
1515
*/
1616
private[python] object SerDeUtil extends Logging {
1717

18+
/** Attempts to register a class with MsgPack, only if it is not a primitive or a String */
1819
def register[T](clazz: Class[T], msgpack: ScalaMessagePack) {
20+
//implicit val kcm = ClassManifest.fromClass(clazz)
21+
//val kc = kcm.erasure
1922
Try {
20-
log.info("%s".format(clazz))
21-
clazz match {
22-
case c if c.isPrimitive =>
23-
case c if c.isInstanceOf[java.lang.String] =>
24-
case _ => msgpack.register(clazz)
25-
}
26-
}.getOrElse(log.warn("Failed to register class (%s) with MsgPack. ".format(clazz.getName) +
27-
"Falling back to default MsgPack serialization, or 'toString' as last resort"))
23+
//if (kc.isInstance("") || kc.isPrimitive) {
24+
// log.info("Class: %s doesn't need to be registered".format(kc.getName))
25+
//} else {
26+
msgpack.register(clazz)
27+
log.info("Registered key/value class with MsgPack: %s".format(clazz))
28+
//}
29+
30+
} match {
31+
case Failure(err) =>
32+
log.warn("Failed to register class (%s) with MsgPack. ".format(clazz.getName) +
33+
"Falling back to default MsgPack serialization, or 'toString' as last resort. " +
34+
"Error: %s".format(err.getMessage))
35+
case Success(result) =>
36+
}
2837
}
2938

30-
// serialize and RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack
39+
/** Serializes an RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack */
3140
def serMsgPack[K, V](rdd: RDD[(K, V)]) = {
3241
import org.msgpack.ScalaMessagePack._
33-
val msgpack = new ScalaMessagePack with Serializable
34-
val first = rdd.first()
35-
val kc = ClassManifest.fromClass(first._1.getClass).asInstanceOf[ClassManifest[K]].erasure.asInstanceOf[Class[K]]
36-
val vc = ClassManifest.fromClass(first._2.getClass).asInstanceOf[ClassManifest[V]].erasure.asInstanceOf[Class[V]]
37-
register(kc, msgpack)
38-
register(vc, msgpack)
39-
rdd.map{ pair =>
42+
rdd.mapPartitions{ pairs =>
43+
val mp = new ScalaMessagePack
44+
var triedReg = false
45+
pairs.map{ pair =>
4046
Try {
41-
msgpack.write(pair)
47+
if (!triedReg) {
48+
register(pair._1.getClass, mp)
49+
register(pair._2.getClass, mp)
50+
triedReg = true
51+
}
52+
mp.write(pair)
4253
} match {
4354
case Failure(err) =>
44-
Try {
45-
write((pair._1.toString, pair._2.toString))
46-
} match {
47-
case Success(result) => result
48-
case Failure(e) => throw e
49-
}
55+
log.debug("Failed to write", err)
56+
Try {
57+
write((pair._1.toString, pair._2.toString))
58+
} match {
59+
case Success(result) => result
60+
case Failure(e) => throw e
61+
}
5062
case Success(result) => result
5163
}
5264
}
5365
}
66+
}
5467

68+
/**
69+
* Converts an RDD of (K, V) pairs, where K and/or V could be instances of [[org.apache.hadoop.io.Writable]],
70+
* into an RDD[(K, V)]
71+
*/
5572
def convertRDD[K, V](rdd: RDD[(K, V)]) = {
5673
rdd.map{
5774
case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V])
@@ -61,6 +78,7 @@ private[python] object SerDeUtil extends Logging {
6178
}
6279
}
6380

81+
/** Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or object representation */
6482
def convert(writable: Writable): Any = {
6583
import collection.JavaConversions._
6684
writable match {
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package org.apache.spark.api.python
2+
3+
import org.apache.spark.SparkContext
4+
import org.apache.hadoop.io._
5+
import scala.Array
6+
import java.io.{DataOutput, DataInput}
7+
8+
case class TestWritable(var str: String, var numi: Int, var numd: Double) extends Writable {
9+
def this() = this("", 0, 0.0)
10+
11+
def write(p1: DataOutput) = {
12+
p1.writeUTF(str)
13+
p1.writeInt(numi)
14+
p1.writeDouble(numd)
15+
}
16+
17+
def readFields(p1: DataInput) = {
18+
str = p1.readUTF()
19+
numi = p1.readInt()
20+
numd = p1.readDouble()
21+
}
22+
}
23+
24+
object WriteInputFormatTests extends App {
25+
import SparkContext._
26+
27+
val sc = new SparkContext("local[2]", "test")
28+
29+
val textPath = "../python/test_support/data/sftext/"
30+
val intPath = "../python/test_support/data/sfint/"
31+
val doublePath = "../python/test_support/data/sfdouble/"
32+
val arrPath = "../python/test_support/data/sfarray/"
33+
val classPath = "../python/test_support/data/sfclass/"
34+
35+
val intKeys = Seq((1.0, "aa"), (2.0, "bb"), (2.0, "aa"), (3.0, "cc"), (2.0, "bb"), (1.0, "aa"))
36+
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
37+
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
38+
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
39+
40+
val data = Seq(
41+
(1, Array(1.0, 2.0, 3.0)),
42+
(2, Array(3.0, 4.0, 5.0)),
43+
(3, Array(4.0, 5.0, 6.0))
44+
)
45+
sc.parallelize(data, numSlices = 2)
46+
.map{ case (k, v) =>
47+
(new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
48+
}
49+
.saveAsNewAPIHadoopFile[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
50+
51+
val testClass = Seq(
52+
("1", TestWritable("test1", 123, 54.0)),
53+
("2", TestWritable("test2", 456, 8762.3)),
54+
("1", TestWritable("test3", 123, 423.1)),
55+
("3", TestWritable("test56", 456, 423.5)),
56+
("2", TestWritable("test2", 123, 5435.2))
57+
)
58+
val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
59+
rdd.saveAsNewAPIHadoopFile(classPath,
60+
classOf[Text], classOf[TestWritable],
61+
classOf[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[Text, TestWritable]])
62+
63+
64+
}

0 commit comments

Comments
 (0)