Skip to content

Commit 7237263

Browse files
committed
Add back msgpack serializer and hadoop file code lost during merging
1 parent 25da1ca commit 7237263

File tree

2 files changed

+115
-1
lines changed

2 files changed

+115
-1
lines changed

python/pyspark/context.py

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from pyspark.files import SparkFiles
3030
from pyspark.java_gateway import launch_gateway
3131
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
32-
PairDeserializer, MsgPackDeserializer
32+
PairDeserializer, MsgpackSerializer
3333
from pyspark.storagelevel import StorageLevel
3434
from pyspark import rdd
3535
from pyspark.rdd import RDD
@@ -297,6 +297,109 @@ def wholeTextFiles(self, path):
297297
return RDD(self._jsc.wholeTextFiles(path), self,
298298
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
299299

300+
###
301+
302+
def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text",
303+
key_wrapper="", value_wrapper="", minSplits=None):
304+
"""
305+
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
306+
a local file system (available on all nodes), or any Hadoop-supported file system URI.
307+
The mechanism is as follows:
308+
1. A Java RDD is created from the SequenceFile, key and value classes
309+
2. Serialization is attempted via MsgPack
310+
3. If this fails, the fallback is to call 'toString' on each key and value
311+
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
312+
313+
>>> sc.sequenceFile("test_support/data/sfint/").collect()
314+
[(1, 'aa'), (2, 'bb'), (2, 'aa'), (3, 'cc'), (2, 'bb'), (1, 'aa')]
315+
>>> sc.sequenceFile("test_support/data/sfdouble/").collect()
316+
[(1.0, 'aa'), (2.0, 'bb'), (2.0, 'aa'), (3.0, 'cc'), (2.0, 'bb'), (1.0, 'aa')]
317+
>>> sc.sequenceFile("test_support/data/sftext/").collect()
318+
[('1', 'aa'), ('2', 'bb'), ('2', 'aa'), ('3', 'cc'), ('2', 'bb'), ('1', 'aa')]
319+
"""
320+
minSplits = minSplits or min(self.defaultParallelism, 2)
321+
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper,
322+
minSplits)
323+
return RDD(jrdd, self, MsgpackSerializer())
324+
325+
def newAPIHadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString",
326+
value_wrapper="toString", conf={}):
327+
"""
328+
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
329+
a local file system (available on all nodes), or any Hadoop-supported file system URI.
330+
The mechanism is as follows:
331+
1. A Java RDD is created from the InputFormat, key and value classes
332+
2. Serialization is attempted via MsgPack
333+
3. If this fails, the fallback is to call 'toString' on each key and value
334+
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
335+
336+
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
337+
"""
338+
jconf = self._jvm.java.util.HashMap()
339+
for k, v in conf.iteritems():
340+
jconf[k] = v
341+
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class,
342+
key_wrapper, value_wrapper, jconf)
343+
return RDD(jrdd, self, MsgpackSerializer())
344+
345+
def newAPIHadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString",
346+
value_wrapper="toString", conf={}):
347+
"""
348+
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
349+
that is passed in as a Python dict. This will be converted into a Configuration in Java.
350+
The mechanism is as follows:
351+
1. A Java RDD is created from the InputFormat, key and value classes
352+
2. Serialization is attempted via MsgPack
353+
3. If this fails, the fallback is to call 'toString' on each key and value
354+
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
355+
"""
356+
jconf = self._jvm.java.util.HashMap()
357+
for k, v in conf.iteritems():
358+
jconf[k] = v
359+
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
360+
value_wrapper, jconf)
361+
return RDD(jrdd, self, MsgpackSerializer())
362+
363+
def hadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString",
364+
value_wrapper="toString", conf={}):
365+
"""
366+
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
367+
a local file system (available on all nodes), or any Hadoop-supported file system URI.
368+
The mechanism is as follows:
369+
1. A Java RDD is created from the InputFormat, key and value classes
370+
2. Serialization is attempted via MsgPack
371+
3. If this fails, the fallback is to call 'toString' on each key and value
372+
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
373+
374+
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
375+
"""
376+
jconf = self._jvm.java.util.HashMap()
377+
for k, v in conf.iteritems():
378+
jconf[k] = v
379+
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper,
380+
value_wrapper, jconf)
381+
return RDD(jrdd, self, MsgpackSerializer())
382+
383+
def hadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString",
384+
value_wrapper="toString", conf={}):
385+
"""
386+
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
387+
that is passed in as a Python dict. This will be converted into a Configuration in Java.
388+
The mechanism is as follows:
389+
1. A Java RDD is created from the InputFormat, key and value classes
390+
2. Serialization is attempted via MsgPack
391+
3. If this fails, the fallback is to call 'toString' on each key and value
392+
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
393+
"""
394+
jconf = self._jvm.java.util.HashMap()
395+
for k, v in conf.iteritems():
396+
jconf[k] = v
397+
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
398+
value_wrapper, jconf)
399+
return RDD(jrdd, self, MsgpackSerializer())
400+
401+
####
402+
300403
def _checkpointFile(self, name, input_deserializer):
301404
jrdd = self._jsc.checkpointFile(name)
302405
return RDD(jrdd, self, input_deserializer)

python/pyspark/serializers.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,17 @@ def load_stream(self, stream):
309309
return
310310

311311

312+
class MsgpackSerializer(FramedSerializer):
313+
"""
314+
Deserializes streams written by Scala/Java MsgPack
315+
"""
316+
def loads(self, obj):
317+
return msgpack.loads(obj, use_list=0)
318+
319+
def dumps(self, obj):
320+
return msgpack.dumps(obj)
321+
322+
312323
def read_long(stream):
313324
length = stream.read(8)
314325
if length == "":

0 commit comments

Comments
 (0)