Skip to content

Prepared statement invalidation loopholes #20860

@dkropachev

Description

@dkropachev

Currently scylla creates statement id based on query text hash.
But, there are three cases when query text hash can stay the same, but data schema is being changed:

  1. select * from table. If you add/remove a column to/from the table.
  2. select udt from table. If you do ALTER TYPE udt, add or rename a field.
  3. select col from table. if you do ALTER TABLE DROP col and ALTER TABLE ADD col <another_type> .

Implications

Consider following scenario:
Client#1 ==(prepare select * from table)==> Server (get new prepared statement registered)
Client#1 <==(prepared statement id X and schema)== Server
Client#2 ==(prepare select * from table)==> Server
Client#2 <==(prepared statement id X and schema)== Server
Client#3 ==(alter table)==> Server (invalidates X)
Client#1 ==(execute prepared statement with id X)==> Server
Client#1 <==(unknown prepared statement)==> Server
Client#1 ==(prepare select * from table)==> Server
Client#1 <==(prepared statement id X and schema)== Server
Client#1, Now have prepared statement id X with correct schema
Client#2 ==(execute prepared statement with id X)==> Server (already know this id, since it was updated by Client#1)
Client#2 <==(results with new schema)== Server
Client#2, Can't properly unpack data because data schema and schema on the client missmatch.

Mitigation on drivers side

All drivers can be configured to tell server to send metadata with every query, some drivers have this option enabled by default, some have this option disabled.
But such mitigation creates excessive load on network, server and client.

Solutions

  1. Add a schema to prepared statement id calculation
  2. Follow CQLv5 approach and ship metadata id along with prepared statement id

Repros (from gocql)

package gocql

import (
	"github.com/google/go-cmp/cmp"
	"sort"
	"strings"
	"testing"
	"time"
)

func TestPreparedStatementInvalidation(t *testing.T) {
	t.Run("select * from table", func(t *testing.T) {
		session := createSession(t)
		defer session.Close()

		err := session.Query(`DROP TABLE IF EXISTS gocql_test.prepared_stmt_test`).Exec()
		if err != nil {
			t.Fatalf("failed to alter table: %v", err)
		}

		if err := createTable(session, `CREATE TABLE gocql_test.prepared_stmt_test (pkey text,value text, PRIMARY KEY (pkey, value));`); err != nil {
			t.Fatal("create table:", err)
		}

		err = session.Query(`INSERT INTO gocql_test.prepared_stmt_test (pkey, value) values (?,?)`,
			"key", "value").Exec()
		if err != nil {
			t.Fatal("failed to insert record:", err)
		}

		ret := map[string]interface{}{}
		query := session.Query(`SELECT * FROM gocql_test.prepared_stmt_test where pkey = ?`, "key")
		err = query.MapScan(ret)
		if err != nil {
			t.Fatal("session 1 query failed:", err)
		}

		otherSession := createSession(t)
		otherSessionQuery := otherSession.Query(`SELECT * FROM gocql_test.prepared_stmt_test where pkey = ?`, "key")

		otherSessionRet := map[string]interface{}{}
		// Obtain prepared statement id
		err = otherSessionQuery.MapScan(otherSessionRet)
		if err != nil {
			t.Fatal("session 2 query failed:", err)
		}

		if diff := cmp.Diff(ret, otherSessionRet); diff != "" {
			t.Fatalf("expected the results to be different: %s", diff)
		}

		err = otherSession.Query(`ALTER TABLE gocql_test.prepared_stmt_test ADD value2 text`).Exec()
		if err != nil {
			t.Fatalf("failed to alter table: %v", err)
		}

		time.Sleep(100 * time.Millisecond)

		// Reset query prepared statement cache on the server after it is being invalidated
		iter := otherSessionQuery.Iter()
		rows, err := iter.RowData()
		if err != nil {
			t.Fatal("session 2 query failed:", err)
		}
		if len(rows.Columns) != 3 {
			t.Fatalf("expected the results to have 3 columns, but it has %d: %v", len(rows.Columns), rows.Columns)
		}

		iter = query.Iter()
		rows, err = iter.RowData()
		if err != nil {
			t.Fatal("session 1 query failed:", err)
		}
		if len(rows.Columns) != 3 {
			t.Errorf("session 1 expected results to have 3 columns, but it has %d: %v", len(rows.Columns), rows.Columns)
		}
	})

	t.Run("udt", func(t *testing.T) {
		session := createSession(t)
		defer session.Close()

		err := session.Query(`DROP TABLE IF EXISTS gocql_test.prepared_stmt_test_udt`).Exec()
		if err != nil {
			t.Fatalf("failed to alter table: %v", err)
		}

		err = session.Query(`DROP TYPE IF EXISTS gocql_test.phone`).Exec()
		if err != nil {
			t.Fatalf("failed to alter table: %v", err)
		}

		err = session.Query(`CREATE TYPE gocql_test.phone (country_code int, number text);`).Exec()
		if err != nil {
			t.Fatal("failed to create udt type:", err)
		}

		if err := createTable(session, `CREATE TABLE gocql_test.prepared_stmt_test_udt (pkey text, value frozen<phone>, PRIMARY KEY (pkey));`); err != nil {
			t.Fatal("create table:", err)
		}

		udtvalue := map[string]interface{}{
			"country_code": 10,
			"number":       "value",
		}

		err = session.Query(`INSERT INTO gocql_test.prepared_stmt_test_udt (pkey, value) values (?,?)`,
			"testtext", udtvalue).Exec()
		if err != nil {
			t.Fatal("insert:", err)
		}

		originalResult := map[string]interface{}{}
		query := session.Query(`SELECT value FROM gocql_test.prepared_stmt_test_udt where pkey = ?`, "testtext")
		err = query.MapScan(originalResult)
		if err != nil {
			t.Fatal("session 1 query failed:", err)
		}

		otherSession := createSession(t)
		otherSessionQuery := otherSession.Query(`SELECT value FROM gocql_test.prepared_stmt_test_udt where pkey = ?`, "testtext")

		otherSessionResult := map[string]interface{}{}
		// Obtain prepared statement id
		err = otherSessionQuery.MapScan(otherSessionResult)
		if err != nil {
			t.Fatal("session 2 query failed:", err)
		}

		if diff := cmp.Diff(originalResult, otherSessionResult); diff != "" {
			t.Fatalf("expected the results to be different: %s", diff)
		}

		err = session.Query(`ALTER TYPE gocql_test.phone ADD number2 text;`).Exec()
		if err != nil {
			t.Fatal("failed to create udt type:", err)
		}

		iter := otherSessionQuery.Iter()
		otherSessionColumns := iter.Columns()
		err = iter.Close()
		if err != nil {
			t.Fatal("session 2 query failed:", err)
		}
		if len(otherSessionColumns[0].TypeInfo.(UDTTypeInfo).Elements) != 3 {
			t.Fatalf("session 2 has old metadata")
		}

		iter = query.Iter()
		originalSessionColumns := iter.Columns()
		err = iter.Close()
		if err != nil {
			t.Fatal("session 1 query failed:", err)
		}
		if len(originalSessionColumns[0].TypeInfo.(UDTTypeInfo).Elements) != 3 {
			t.Fatalf("session 1 has old metadata")
		}
	})

	t.Run("change column definition", func(t *testing.T) {
		session := createSession(t)
		defer session.Close()

		err := session.Query(`DROP TABLE IF EXISTS gocql_test.prepared_stmt_test_change_column_definition`).Exec()
		if err != nil {
			t.Fatalf("failed to alter table: %v", err)
		}

		if err := createTable(session, `CREATE TABLE gocql_test.prepared_stmt_test_change_column_definition (pkey text, value int, PRIMARY KEY (pkey));`); err != nil {
			t.Fatal("create table:", err)
		}

		err = session.Query(`INSERT INTO gocql_test.prepared_stmt_test_change_column_definition (pkey, value) values (?,?)`,
			"testtext", 10).Exec()
		if err != nil {
			t.Fatal("insert:", err)
		}

		originalResult := map[string]interface{}{}
		query := session.Query(`SELECT value FROM gocql_test.prepared_stmt_test_change_column_definition where pkey = ?`, "testtext")
		err = query.MapScan(originalResult)
		if err != nil {
			t.Fatal("session 1 query failed:", err)
		}

		otherSession := createSession(t)
		otherSessionQuery := otherSession.Query(`SELECT value FROM gocql_test.prepared_stmt_test_change_column_definition where pkey = ?`, "testtext")

		otherSessionResult := map[string]interface{}{}
		// Obtain prepared statement id
		err = otherSessionQuery.MapScan(otherSessionResult)
		if err != nil {
			t.Fatal("session 2 query failed:", err)
		}

		if diff := cmp.Diff(originalResult, otherSessionResult); diff != "" {
			t.Fatalf("expected the results to be different: %s", diff)
		}

		err = otherSession.Query(`ALTER TABLE gocql_test.prepared_stmt_test_change_column_definition DROP value`).Exec()
		if err != nil {
			t.Fatalf("failed to alter table: %v", err)
		}
		err = otherSession.Query(`ALTER TABLE gocql_test.prepared_stmt_test_change_column_definition ADD value text`).Exec()
		if err != nil {
			t.Fatalf("failed to alter table: %v", err)
		}

		iter := otherSessionQuery.Iter()
		otherSessionColumns := iter.Columns()
		err = iter.Close()
		if err != nil {
			t.Fatal("session 2 query failed:", err)
		}
		if otherSessionColumns[0].TypeInfo.Type() != TypeVarchar {
			t.Errorf("session 2 has old metadata: %v", otherSessionColumns[0].TypeInfo)
		}

		iter = query.Iter()
		originalSessionColumns := iter.Columns()
		err = iter.Close()
		if err != nil {
			t.Fatal("session 1 query failed:", err)
		}
		if originalSessionColumns[0].TypeInfo.Type() != TypeVarchar {
			t.Errorf("session 1 has old metadata: %v", originalSessionColumns[0].TypeInfo)
		}
	})
}

Metadata

Metadata

Labels

Type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions