Skip to content

Commit 26652a0

Browse files
committed
Add a regression test
1 parent 4f38e3b commit 26652a0

File tree

2 files changed

+26
-12
lines changed

2 files changed

+26
-12
lines changed

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
2121
import java.sql.{Date, Timestamp}
2222

2323
import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
24-
import org.apache.spark.sql.catalyst.expressions.NamedExpression
2524
import org.apache.spark.sql.catalyst.util.sideBySide
2625
import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SortExec}
2726
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange}
@@ -899,17 +898,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
899898
(1, 2), (1, 1), (2, 1), (2, 2))
900899
}
901900

902-
test("SPARK-19065 dropDuplicates should not create expressions using the same id") {
903-
val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS().dropDuplicates("_1")
904-
var exprs = Set.empty[NamedExpression]
905-
ds.logicalPlan.transformAllExpressions { case e: NamedExpression =>
906-
exprs += e
907-
e
908-
}
909-
val duplicatedExprs = exprs.groupBy(expr => expr.exprId).filter(_._2.size > 1).values
910-
assert(duplicatedExprs.isEmpty)
911-
}
912-
913901
test("SPARK-16097: Encoders.tuple should handle null object correctly") {
914902
val enc = Encoders.tuple(Encoders.tuple(Encoders.STRING, Encoders.STRING), Encoders.STRING)
915903
val data = Seq((("a", "b"), "c"), (null, "d"))

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,32 @@ class StreamSuite extends StreamTest {
304304
q.stop()
305305
}
306306
}
307+
308+
test("SPARK-19065: dropDuplicates should not create expressions using the same id") {
309+
withTempPath { testPath =>
310+
val data = Seq((1, 2), (2, 3), (3, 4))
311+
data.toDS.write.mode("overwrite").json(testPath.getCanonicalPath)
312+
val schema = spark.read.json(testPath.getCanonicalPath).schema
313+
val query = spark
314+
.readStream
315+
.schema(schema)
316+
.json(testPath.getCanonicalPath)
317+
.dropDuplicates("_1")
318+
.writeStream
319+
.format("memory")
320+
.queryName("testquery")
321+
.outputMode("complete")
322+
.start()
323+
try {
324+
query.processAllAvailable()
325+
if (query.exception.isDefined) {
326+
throw query.exception.get
327+
}
328+
} finally {
329+
query.stop()
330+
}
331+
}
332+
}
307333
}
308334

309335
/**

0 commit comments

Comments
 (0)