@@ -129,6 +129,7 @@ def registerFunction(self, name, f, returnType=StringType()):
129129 >>> sqlCtx.registerFunction("stringLengthString", lambda x: len(x))
130130 >>> sqlCtx.sql("SELECT stringLengthString('test')").collect()
131131 [Row(c0=u'4')]
132+
132133 >>> from pyspark.sql.types import IntegerType
133134 >>> sqlCtx.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
134135 >>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
@@ -197,31 +198,6 @@ def inferSchema(self, rdd, samplingRatio=None):
197198 >>> df = sqlCtx.inferSchema(rdd)
198199 >>> df.collect()[0]
199200 Row(field1=1, field2=u'row1')
200-
201- >>> NestedRow = Row("f1", "f2")
202- >>> nestedRdd1 = sc.parallelize([
203- ... NestedRow(array('i', [1, 2]), {"row1": 1.0}),
204- ... NestedRow(array('i', [2, 3]), {"row2": 2.0})])
205- >>> df = sqlCtx.inferSchema(nestedRdd1)
206- >>> df.collect()
207- [Row(f1=[1, 2], f2={u'row1': 1.0}), ..., f2={u'row2': 2.0})]
208-
209- >>> nestedRdd2 = sc.parallelize([
210- ... NestedRow([[1, 2], [2, 3]], [1, 2]),
211- ... NestedRow([[2, 3], [3, 4]], [2, 3])])
212- >>> df = sqlCtx.inferSchema(nestedRdd2)
213- >>> df.collect()
214- [Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), ..., f2=[2, 3])]
215-
216- >>> from collections import namedtuple
217- >>> CustomRow = namedtuple('CustomRow', 'field1 field2')
218- >>> rdd = sc.parallelize(
219- ... [CustomRow(field1=1, field2="row1"),
220- ... CustomRow(field1=2, field2="row2"),
221- ... CustomRow(field1=3, field2="row3")])
222- >>> df = sqlCtx.inferSchema(rdd)
223- >>> df.collect()[0]
224- Row(field1=1, field2=u'row1')
225201 """
226202
227203 if isinstance (rdd , DataFrame ):
@@ -252,56 +228,8 @@ def applySchema(self, rdd, schema):
252228 >>> schema = StructType([StructField("field1", IntegerType(), False),
253229 ... StructField("field2", StringType(), False)])
254230 >>> df = sqlCtx.applySchema(rdd2, schema)
255- >>> sqlCtx.registerDataFrameAsTable(df, "table1")
256- >>> df2 = sqlCtx.sql("SELECT * from table1")
257- >>> df2.collect()
258- [Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
259-
260- >>> from datetime import date, datetime
261- >>> rdd = sc.parallelize([(127, -128L, -32768, 32767, 2147483647L, 1.0,
262- ... date(2010, 1, 1),
263- ... datetime(2010, 1, 1, 1, 1, 1),
264- ... {"a": 1}, (2,), [1, 2, 3], None)])
265- >>> schema = StructType([
266- ... StructField("byte1", ByteType(), False),
267- ... StructField("byte2", ByteType(), False),
268- ... StructField("short1", ShortType(), False),
269- ... StructField("short2", ShortType(), False),
270- ... StructField("int1", IntegerType(), False),
271- ... StructField("float1", FloatType(), False),
272- ... StructField("date1", DateType(), False),
273- ... StructField("time1", TimestampType(), False),
274- ... StructField("map1",
275- ... MapType(StringType(), IntegerType(), False), False),
276- ... StructField("struct1",
277- ... StructType([StructField("b", ShortType(), False)]), False),
278- ... StructField("list1", ArrayType(ByteType(), False), False),
279- ... StructField("null1", DoubleType(), True)])
280- >>> df = sqlCtx.applySchema(rdd, schema)
281- >>> results = df.map(
282- ... lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1, x.date1,
283- ... x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1))
284- >>> results.collect()[0] # doctest: +NORMALIZE_WHITESPACE
285- (127, -128, -32768, 32767, 2147483647, 1.0, datetime.date(2010, 1, 1),
286- datetime.datetime(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None)
287-
288- >>> df.registerTempTable("table2")
289- >>> sqlCtx.sql(
290- ... "SELECT byte1 - 1 AS byte1, byte2 + 1 AS byte2, " +
291- ... "short1 + 1 AS short1, short2 - 1 AS short2, int1 - 1 AS int1, " +
292- ... "float1 + 1.5 as float1 FROM table2").collect()
293- [Row(byte1=126, byte2=-127, short1=-32767, short2=32766, int1=2147483646, float1=2.5)]
294-
295- >>> from pyspark.sql.types import _parse_schema_abstract, _infer_schema_type
296- >>> rdd = sc.parallelize([(127, -32768, 1.0,
297- ... datetime(2010, 1, 1, 1, 1, 1),
298- ... {"a": 1}, (2,), [1, 2, 3])])
299- >>> abstract = "byte1 short1 float1 time1 map1{} struct1(b) list1[]"
300- >>> schema = _parse_schema_abstract(abstract)
301- >>> typedSchema = _infer_schema_type(rdd.first(), schema)
302- >>> df = sqlCtx.applySchema(rdd, typedSchema)
303231 >>> df.collect()
304- [Row(byte1=127, short1=-32768, float1=1.0, time1=..., list1=[1, 2, 3] )]
232+ [Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3' )]
305233 """
306234
307235 if isinstance (rdd , DataFrame ):
@@ -459,46 +387,28 @@ def jsonFile(self, path, schema=None, samplingRatio=1.0):
459387 >>> import tempfile, shutil
460388 >>> jsonFile = tempfile.mkdtemp()
461389 >>> shutil.rmtree(jsonFile)
462- >>> ofn = open(jsonFile, 'w')
463- >>> for json in jsonStrings:
464- ... print>>ofn, json
465- >>> ofn.close()
390+ >>> with open(jsonFile, 'w') as f:
391+ ... f.writelines(jsonStrings)
466392 >>> df1 = sqlCtx.jsonFile(jsonFile)
467- >>> sqlCtx.registerDataFrameAsTable(df1, "table1")
468- >>> df2 = sqlCtx.sql(
469- ... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
470- ... "field6 as f4 from table1")
471- >>> for r in df2.collect():
472- ... print r
473- Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
474- Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')])
475- Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
476-
477- >>> df3 = sqlCtx.jsonFile(jsonFile, df1.schema)
478- >>> sqlCtx.registerDataFrameAsTable(df3, "table2")
479- >>> df4 = sqlCtx.sql(
480- ... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
481- ... "field6 as f4 from table2")
482- >>> for r in df4.collect():
483- ... print r
484- Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
485- Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')])
486- Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
393+ >>> df1.printSchema()
394+ root
395+ |-- field1: long (nullable = true)
396+ |-- field2: string (nullable = true)
397+ |-- field3: struct (nullable = true)
398+ | |-- field4: long (nullable = true)
487399
488400 >>> from pyspark.sql.types import *
489401 >>> schema = StructType([
490- ... StructField("field2", StringType(), True ),
402+ ... StructField("field2", StringType()),
491403 ... StructField("field3",
492- ... StructType([
493- ... StructField("field5",
494- ... ArrayType(IntegerType(), False), True)]), False)])
495- >>> df5 = sqlCtx.jsonFile(jsonFile, schema)
496- >>> sqlCtx.registerDataFrameAsTable(df5, "table3")
497- >>> df6 = sqlCtx.sql(
498- ... "SELECT field2 AS f1, field3.field5 as f2, "
499- ... "field3.field5[0] as f3 from table3")
500- >>> df6.collect()
501- [Row(f1=u'row1', f2=None, f3=None)...Row(f1=u'row3', f2=[], f3=None)]
404+ ... StructType([StructField("field5", ArrayType(IntegerType()))]))])
405+ >>> df2 = sqlCtx.jsonFile(jsonFile, schema)
406+ >>> df2.printSchema()
407+ root
408+ |-- field2: string (nullable = true)
409+ |-- field3: struct (nullable = true)
410+ | |-- field5: array (nullable = true)
411+ | | |-- element: integer (containsNull = true)
502412 """
503413 if schema is None :
504414 df = self ._ssql_ctx .jsonFile (path , samplingRatio )
@@ -517,48 +427,23 @@ def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
517427 determine the schema.
518428
519429 >>> df1 = sqlCtx.jsonRDD(json)
520- >>> sqlCtx.registerDataFrameAsTable(df1, "table1")
521- >>> df2 = sqlCtx.sql(
522- ... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
523- ... "field6 as f4 from table1")
524- >>> for r in df2.collect():
525- ... print r
526- Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
527- Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')])
528- Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
529-
530- >>> df3 = sqlCtx.jsonRDD(json, df1.schema)
531- >>> sqlCtx.registerDataFrameAsTable(df3, "table2")
532- >>> df4 = sqlCtx.sql(
533- ... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
534- ... "field6 as f4 from table2")
535- >>> for r in df4.collect():
536- ... print r
537- Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
538- Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')])
539- Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
430+ >>> df1.first()
431+ Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
432+
433+ >>> df2 = sqlCtx.jsonRDD(json, df1.schema)
434+ >>> df2.first()
435+ Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
540436
541437 >>> from pyspark.sql.types import *
542438 >>> schema = StructType([
543- ... StructField("field2", StringType(), True ),
439+ ... StructField("field2", StringType()),
544440 ... StructField("field3",
545- ... StructType([
546- ... StructField("field5",
547- ... ArrayType(IntegerType(), False), True)]), False)])
548- >>> df5 = sqlCtx.jsonRDD(json, schema)
549- >>> sqlCtx.registerDataFrameAsTable(df5, "table3")
550- >>> df6 = sqlCtx.sql(
551- ... "SELECT field2 AS f1, field3.field5 as f2, "
552- ... "field3.field5[0] as f3 from table3")
553- >>> df6.collect()
554- [Row(f1=u'row1', f2=None,...Row(f1=u'row3', f2=[], f3=None)]
555-
556- >>> sqlCtx.jsonRDD(sc.parallelize(['{}',
557- ... '{"key0": {"key1": "value1"}}'])).collect()
558- [Row(key0=None), Row(key0=Row(key1=u'value1'))]
559- >>> sqlCtx.jsonRDD(sc.parallelize(['{"key0": null}',
560- ... '{"key0": {"key1": "value1"}}'])).collect()
561- [Row(key0=None), Row(key0=Row(key1=u'value1'))]
441+ ... StructType([StructField("field5", ArrayType(IntegerType()))]))
442+ ... ])
443+ >>> df3 = sqlCtx.jsonRDD(json, schema)
444+ >>> df3.first()
445+ Row(field2=u'row1', field3=Row(field5=None))
446+
562447 """
563448
564449 def func (iterator ):
@@ -848,7 +733,8 @@ def _test():
848733 globs ['jsonStrings' ] = jsonStrings
849734 globs ['json' ] = sc .parallelize (jsonStrings )
850735 (failure_count , test_count ) = doctest .testmod (
851- pyspark .sql .context , globs = globs , optionflags = doctest .ELLIPSIS )
736+ pyspark .sql .context , globs = globs ,
737+ optionflags = doctest .ELLIPSIS | doctest .NORMALIZE_WHITESPACE )
852738 globs ['sc' ].stop ()
853739 if failure_count :
854740 exit (- 1 )
0 commit comments