Skip to content

Commit 5f601bc

Browse files
committed
[SPARK-34039][SQL] ReplaceTable should invalidate cache
This changes `ReplaceTableExec`/`AtomicReplaceTableExec`, and uncaches the target table before it is dropped. In addition, this includes some refactoring by moving the `uncacheTable` method to `DataSourceV2Strategy` so that we don't need to pass a Spark session to the v2 exec. Similar to SPARK-33492 (#30429). When a table is refreshed, the associated cache should be invalidated to avoid potential incorrect results. Yes. Now When a data source v2 is cached (either directly or indirectly), all the relevant caches will be refreshed or invalidated if the table is replaced. Added a new unit test. Closes #31081 from sunchao/SPARK-34039. Authored-by: Chao Sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent d124af5 commit 5f601bc

File tree

4 files changed

+48
-26
lines changed

4 files changed

+48
-26
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartit
2424
import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
2525
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2626
import org.apache.spark.sql.catalyst.plans.logical._
27-
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, TableCapability, TableCatalog, TableChange}
27+
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, Table, TableCapability, TableCatalog, TableChange}
2828
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
2929
import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
3030
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
@@ -78,6 +78,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
7878
}
7979
}
8080

81+
private def invalidateCache(catalog: TableCatalog, table: Table, ident: Identifier): Unit = {
82+
val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
83+
session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
84+
}
85+
8186
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
8287
case PhysicalOperation(project, filters,
8388
relation @ DataSourceV2ScanRelation(_, V1ScanWrapper(scan, translated, pushed), output)) =>
@@ -161,10 +166,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
161166
catalog match {
162167
case staging: StagingTableCatalog =>
163168
AtomicReplaceTableExec(
164-
staging, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
169+
staging, ident, schema, parts, propsWithOwner, orCreate = orCreate,
170+
invalidateCache) :: Nil
165171
case _ =>
166172
ReplaceTableExec(
167-
catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
173+
catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate,
174+
invalidateCache) :: Nil
168175
}
169176

170177
case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
@@ -173,26 +180,26 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
173180
catalog match {
174181
case staging: StagingTableCatalog =>
175182
AtomicReplaceTableAsSelectExec(
176-
session,
177183
staging,
178184
ident,
179185
parts,
180186
query,
181187
planLater(query),
182188
propsWithOwner,
183189
writeOptions,
184-
orCreate = orCreate) :: Nil
190+
orCreate = orCreate,
191+
invalidateCache) :: Nil
185192
case _ =>
186193
ReplaceTableAsSelectExec(
187-
session,
188194
catalog,
189195
ident,
190196
parts,
191197
query,
192198
planLater(query),
193199
propsWithOwner,
194200
writeOptions,
195-
orCreate = orCreate) :: Nil
201+
orCreate = orCreate,
202+
invalidateCache) :: Nil
196203
}
197204

198205
case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException}
2424
import org.apache.spark.sql.catalyst.expressions.Attribute
25-
import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, TableCatalog}
25+
import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
2626
import org.apache.spark.sql.connector.expressions.Transform
2727
import org.apache.spark.sql.types.StructType
2828
import org.apache.spark.util.Utils
@@ -33,10 +33,13 @@ case class ReplaceTableExec(
3333
tableSchema: StructType,
3434
partitioning: Seq[Transform],
3535
tableProperties: Map[String, String],
36-
orCreate: Boolean) extends V2CommandExec {
36+
orCreate: Boolean,
37+
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends V2CommandExec {
3738

3839
override protected def run(): Seq[InternalRow] = {
3940
if (catalog.tableExists(ident)) {
41+
val table = catalog.loadTable(ident)
42+
invalidateCache(catalog, table, ident)
4043
catalog.dropTable(ident)
4144
} else if (!orCreate) {
4245
throw new CannotReplaceMissingTableException(ident)
@@ -54,9 +57,14 @@ case class AtomicReplaceTableExec(
5457
tableSchema: StructType,
5558
partitioning: Seq[Transform],
5659
tableProperties: Map[String, String],
57-
orCreate: Boolean) extends V2CommandExec {
60+
orCreate: Boolean,
61+
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends V2CommandExec {
5862

5963
override protected def run(): Seq[InternalRow] = {
64+
if (catalog.tableExists(identifier)) {
65+
val table = catalog.loadTable(identifier)
66+
invalidateCache(catalog, table, identifier)
67+
}
6068
val staged = if (orCreate) {
6169
catalog.stageCreateOrReplace(
6270
identifier, tableSchema, partitioning.toArray, tableProperties.asJava)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext}
2626
import org.apache.spark.executor.CommitDeniedException
2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.rdd.RDD
29-
import org.apache.spark.sql.SparkSession
3029
import org.apache.spark.sql.catalyst.InternalRow
3130
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
3231
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -131,15 +130,15 @@ case class AtomicCreateTableAsSelectExec(
131130
* ReplaceTableAsSelectStagingExec.
132131
*/
133132
case class ReplaceTableAsSelectExec(
134-
session: SparkSession,
135133
catalog: TableCatalog,
136134
ident: Identifier,
137135
partitioning: Seq[Transform],
138136
plan: LogicalPlan,
139137
query: SparkPlan,
140138
properties: Map[String, String],
141139
writeOptions: CaseInsensitiveStringMap,
142-
orCreate: Boolean) extends TableWriteExecHelper {
140+
orCreate: Boolean,
141+
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper {
143142

144143
override protected def run(): Seq[InternalRow] = {
145144
// Note that this operation is potentially unsafe, but these are the strict semantics of
@@ -152,7 +151,7 @@ case class ReplaceTableAsSelectExec(
152151
// 3. The table returned by catalog.createTable doesn't support writing.
153152
if (catalog.tableExists(ident)) {
154153
val table = catalog.loadTable(ident)
155-
uncacheTable(session, catalog, table, ident)
154+
invalidateCache(catalog, table, ident)
156155
catalog.dropTable(ident)
157156
} else if (!orCreate) {
158157
throw new CannotReplaceMissingTableException(ident)
@@ -177,21 +176,21 @@ case class ReplaceTableAsSelectExec(
177176
* is left untouched.
178177
*/
179178
case class AtomicReplaceTableAsSelectExec(
180-
session: SparkSession,
181179
catalog: StagingTableCatalog,
182180
ident: Identifier,
183181
partitioning: Seq[Transform],
184182
plan: LogicalPlan,
185183
query: SparkPlan,
186184
properties: Map[String, String],
187185
writeOptions: CaseInsensitiveStringMap,
188-
orCreate: Boolean) extends TableWriteExecHelper {
186+
orCreate: Boolean,
187+
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper {
189188

190189
override protected def run(): Seq[InternalRow] = {
191190
val schema = CharVarcharUtils.getRawSchema(query.schema).asNullable
192191
if (catalog.tableExists(ident)) {
193192
val table = catalog.loadTable(ident)
194-
uncacheTable(session, catalog, table, ident)
193+
invalidateCache(catalog, table, ident)
195194
}
196195
val staged = if (orCreate) {
197196
catalog.stageCreateOrReplace(
@@ -393,15 +392,6 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
393392

394393
Nil
395394
}
396-
397-
protected def uncacheTable(
398-
session: SparkSession,
399-
catalog: TableCatalog,
400-
table: Table,
401-
ident: Identifier): Unit = {
402-
val plan = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
403-
session.sharedState.cacheManager.uncacheQuery(session, plan, cascade = true)
404-
}
405395
}
406396

407397
object DataWritingSparkTask extends Logging {

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,23 @@ class DataSourceV2SQLSuite
785785
}
786786
}
787787

788+
test("SPARK-34039: ReplaceTable (atomic or non-atomic) should invalidate cache") {
789+
Seq("testcat.ns.t", "testcat_atomic.ns.t").foreach { t =>
790+
val view = "view"
791+
withTable(t) {
792+
withTempView(view) {
793+
sql(s"CREATE TABLE $t USING foo AS SELECT id, data FROM source")
794+
sql(s"CACHE TABLE $view AS SELECT id FROM $t")
795+
checkAnswer(sql(s"SELECT * FROM $t"), spark.table("source"))
796+
checkAnswer(sql(s"SELECT * FROM $view"), spark.table("source").select("id"))
797+
798+
sql(s"REPLACE TABLE $t (a bigint) USING foo")
799+
assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(view)).isEmpty)
800+
}
801+
}
802+
}
803+
}
804+
788805
test("SPARK-33492: ReplaceTableAsSelect (atomic or non-atomic) should invalidate cache") {
789806
Seq("testcat.ns.t", "testcat_atomic.ns.t").foreach { t =>
790807
val view = "view"

0 commit comments

Comments
 (0)