Skip to content

Commit 4686a24

Browse files
author
stczwd
committed
change dropPartition returns
Change-Id: Id45c2cfd3acbbd6fbcb88d0ac1452fc8aa4c19fa
1 parent c96e0fc commit 4686a24

File tree

4 files changed

+12
-19
lines changed

4 files changed

+12
-19
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.spark.annotation.Experimental;
2323
import org.apache.spark.sql.catalyst.InternalRow;
24-
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException;
2524
import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
2625
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException;
2726

@@ -54,12 +53,7 @@ default void createPartition(
5453

5554
@Override
5655
default boolean dropPartition(InternalRow ident) {
57-
try {
58-
dropPartitions(new InternalRow[]{ident});
59-
return true;
60-
} catch (NoSuchPartitionsException e) {
61-
return false;
62-
}
56+
return dropPartitions(new InternalRow[]{ident});
6357
}
6458

6559
/**
@@ -85,8 +79,7 @@ void createPartitions(
8579
* the operation of dropPartitions need to be safely rolled back.
8680
*
8781
* @param idents an array of partition identifiers
88-
* @throws NoSuchPartitionsException If any partition identifier to drop doesn't exist
82+
* @return true if partitions were deleted, false if any partition not exists
8983
*/
90-
void dropPartitions(
91-
InternalRow[] idents) throws NoSuchPartitionsException;
84+
boolean dropPartitions(InternalRow[] idents);
9285
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
public interface SupportsPartitionManagement extends Table {
4646

4747
/**
48+
* Get the partition schema of table,
49+
* this must be consistent with ${@link Table#partitioning()}
4850
* @return the partition schema of table
4951
*/
5052
StructType partitionSchema();
@@ -71,7 +73,7 @@ void createPartition(
7173
boolean dropPartition(InternalRow ident);
7274

7375
/**
74-
* Test whether a partition exists using an {@link Identifier identifier} from the table.
76+
* Test whether a partition exists using an {@link InternalRow ident} from the table.
7577
*
7678
* @param ident a partition identifier
7779
* @return true if the partition exists, false otherwise

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.connector
2020
import java.util
2121

2222
import org.apache.spark.sql.catalyst.InternalRow
23-
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionAlreadyExistsException, PartitionsAlreadyExistException}
23+
import org.apache.spark.sql.catalyst.analysis.{PartitionAlreadyExistsException, PartitionsAlreadyExistException}
2424
import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement
2525
import org.apache.spark.sql.connector.expressions.Transform
2626
import org.apache.spark.sql.types.StructType
@@ -67,11 +67,10 @@ class InMemoryAtomicPartitionTable (
6767
}
6868
}
6969

70-
override def dropPartitions(idents: Array[InternalRow]): Unit = {
70+
override def dropPartitions(idents: Array[InternalRow]): Boolean = {
7171
if (!idents.forall(partitionExists)) {
72-
throw new NoSuchPartitionsException(
73-
name, idents.filterNot(partitionExists), partitionSchema)
72+
return false;
7473
}
75-
idents.foreach(dropPartition)
74+
idents.forall(dropPartition)
7675
}
7776
}

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util
2121

2222
import org.apache.spark.SparkFunSuite
2323
import org.apache.spark.sql.catalyst.InternalRow
24-
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException}
24+
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
2525
import org.apache.spark.sql.connector.{InMemoryAtomicPartitionTable, InMemoryTableCatalog}
2626
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference}
2727
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
@@ -117,8 +117,7 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
117117
assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1)
118118

119119
val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
120-
assertThrows[NoSuchPartitionsException](
121-
partTable.dropPartitions(partIdents))
120+
assert(!partTable.dropPartitions(partIdents))
122121
assert(partTable.partitionExists(partIdent))
123122

124123
partTable.dropPartition(partIdent)

0 commit comments

Comments
 (0)