Skip to content

Commit d34fe02

Browse files
author
TOGASHI Tomoki
authored
feat(spanner/spansql): add support for missing DDL syntax for ALTER CHANGE STREAM (#7429)
1 parent cf85910 commit d34fe02

File tree

5 files changed

+272
-53
lines changed

5 files changed

+272
-53
lines changed

spanner/spansql/parser.go

Lines changed: 74 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2081,39 +2081,15 @@ func (p *parser) parseCreateChangeStream() (*CreateChangeStream, *parseError) {
20812081
return nil, err
20822082
}
20832083

2084-
if err := p.expect("FOR"); err != nil {
2085-
return nil, err
2086-
}
2087-
20882084
cs := &CreateChangeStream{Name: csname, Position: pos}
20892085

2090-
if p.eat("ALL") {
2091-
cs.WatchAllTables = true
2092-
} else {
2093-
for {
2094-
tname, err := p.parseTableOrIndexOrColumnName()
2095-
if err != nil {
2096-
return nil, err
2097-
}
2098-
pos := p.Pos()
2099-
wd := WatchDef{Table: tname, Position: pos}
2100-
2101-
if p.sniff("(") {
2102-
columns, err := p.parseColumnNameList()
2103-
if err != nil {
2104-
return nil, err
2105-
}
2106-
wd.Columns = columns
2107-
} else {
2108-
wd.WatchAllCols = true
2109-
}
2110-
2111-
cs.Watch = append(cs.Watch, wd)
2112-
if p.eat(",") {
2113-
continue
2114-
}
2115-
break
2086+
if p.sniff("FOR") {
2087+
watch, watchAllTables, err := p.parseChangeStreamWatches()
2088+
if err != nil {
2089+
return nil, err
21162090
}
2091+
cs.Watch = watch
2092+
cs.WatchAllTables = watchAllTables
21172093
}
21182094

21192095
if p.sniff("OPTIONS") {
@@ -2145,19 +2121,79 @@ func (p *parser) parseAlterChangeStream() (*AlterChangeStream, *parseError) {
21452121
}
21462122

21472123
acs := &AlterChangeStream{Name: csname, Position: pos}
2148-
if err := p.expect("SET"); err != nil {
2149-
return nil, err
2124+
2125+
tok := p.next()
2126+
if tok.err != nil {
2127+
return nil, tok.err
21502128
}
2151-
// TODO: Support for altering watch
2152-
if p.sniff("OPTIONS") {
2153-
options, err := p.parseChangeStreamOptions()
2154-
if err != nil {
2129+
switch {
2130+
default:
2131+
return nil, p.errorf("got %q, expected SET or DROP", tok.value)
2132+
case tok.caseEqual("SET"):
2133+
if p.sniff("OPTIONS") {
2134+
options, err := p.parseChangeStreamOptions()
2135+
if err != nil {
2136+
return nil, err
2137+
}
2138+
acs.Alteration = AlterChangeStreamOptions{Options: options}
2139+
return acs, nil
2140+
}
2141+
if p.sniff("FOR") {
2142+
watch, watchAllTables, err := p.parseChangeStreamWatches()
2143+
if err != nil {
2144+
return nil, err
2145+
}
2146+
acs.Alteration = AlterWatch{Watch: watch, WatchAllTables: watchAllTables}
2147+
return acs, nil
2148+
}
2149+
return nil, p.errorf("got %q, expected FOR or OPTIONS", p.next())
2150+
case tok.caseEqual("DROP"):
2151+
if err := p.expect("FOR", "ALL"); err != nil {
21552152
return nil, err
21562153
}
2157-
acs.Alteration = AlterChangeStreamOptions{Options: options}
2154+
acs.Alteration = DropChangeStreamWatch{}
21582155
return acs, nil
21592156
}
2160-
return nil, p.errorf("got %q, expected OPTIONS", p.next())
2157+
}
2158+
2159+
func (p *parser) parseChangeStreamWatches() ([]WatchDef, bool, *parseError) {
2160+
debugf("parseChangeStreamWatches: %v", p)
2161+
2162+
if err := p.expect("FOR"); err != nil {
2163+
return nil, false, err
2164+
}
2165+
2166+
if p.eat("ALL") {
2167+
return nil, true, nil
2168+
}
2169+
2170+
watchDefs := []WatchDef{}
2171+
for {
2172+
tname, err := p.parseTableOrIndexOrColumnName()
2173+
if err != nil {
2174+
return nil, false, err
2175+
}
2176+
pos := p.Pos()
2177+
wd := WatchDef{Table: tname, Position: pos}
2178+
2179+
if p.sniff("(") {
2180+
columns, err := p.parseColumnNameList()
2181+
if err != nil {
2182+
return nil, false, err
2183+
}
2184+
wd.Columns = columns
2185+
} else {
2186+
wd.WatchAllCols = true
2187+
}
2188+
2189+
watchDefs = append(watchDefs, wd)
2190+
if p.eat(",") {
2191+
continue
2192+
}
2193+
break
2194+
}
2195+
2196+
return watchDefs, false, nil
21612197
}
21622198

21632199
func (p *parser) parseChangeStreamOptions() (ChangeStreamOptions, *parseError) {

spanner/spansql/parser_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,6 +1164,106 @@ func TestParseDDL(t *testing.T) {
11641164
},
11651165
},
11661166
},
1167+
{
1168+
`CREATE CHANGE STREAM csname;
1169+
CREATE CHANGE STREAM csname FOR ALL;
1170+
CREATE CHANGE STREAM csname FOR tname, tname2(cname);
1171+
CREATE CHANGE STREAM csname FOR ALL OPTIONS (retention_period = '36h', value_capture_type = 'NEW_VALUES');`,
1172+
&DDL{
1173+
Filename: "filename",
1174+
List: []DDLStmt{
1175+
&CreateChangeStream{
1176+
Name: "csname",
1177+
Position: line(1),
1178+
},
1179+
&CreateChangeStream{
1180+
Name: "csname",
1181+
WatchAllTables: true,
1182+
Position: line(2),
1183+
},
1184+
&CreateChangeStream{
1185+
Name: "csname",
1186+
Watch: []WatchDef{
1187+
{Table: "tname", WatchAllCols: true, Position: line(3)},
1188+
{Table: "tname2", Columns: []ID{ID("cname")}, Position: line(3)},
1189+
},
1190+
Position: line(3),
1191+
},
1192+
&CreateChangeStream{
1193+
Name: "csname",
1194+
WatchAllTables: true,
1195+
Position: line(4),
1196+
Options: ChangeStreamOptions{
1197+
RetentionPeriod: func(b string) *string { return &b }("36h"),
1198+
ValueCaptureType: func(b string) *string { return &b }("NEW_VALUES"),
1199+
},
1200+
},
1201+
},
1202+
},
1203+
},
1204+
{
1205+
`ALTER CHANGE STREAM csname SET FOR ALL;
1206+
ALTER CHANGE STREAM csname SET FOR tname, tname2(cname);
1207+
ALTER CHANGE STREAM csname DROP FOR ALL;
1208+
ALTER CHANGE STREAM csname SET OPTIONS (retention_period = '36h', value_capture_type = 'NEW_VALUES');`,
1209+
&DDL{
1210+
Filename: "filename",
1211+
List: []DDLStmt{
1212+
&AlterChangeStream{
1213+
Name: "csname",
1214+
Alteration: AlterWatch{
1215+
WatchAllTables: true,
1216+
},
1217+
Position: line(1),
1218+
},
1219+
&AlterChangeStream{
1220+
Name: "csname",
1221+
Alteration: AlterWatch{
1222+
Watch: []WatchDef{
1223+
{
1224+
Table: "tname",
1225+
WatchAllCols: true,
1226+
Position: Position{Line: 2, Offset: 78},
1227+
},
1228+
{
1229+
Table: "tname2",
1230+
Columns: []ID{"cname"},
1231+
Position: Position{Line: 2, Offset: 85},
1232+
},
1233+
},
1234+
},
1235+
Position: line(2),
1236+
},
1237+
&AlterChangeStream{
1238+
Name: "csname",
1239+
Alteration: DropChangeStreamWatch{},
1240+
Position: line(3),
1241+
},
1242+
&AlterChangeStream{
1243+
Name: "csname",
1244+
Alteration: AlterChangeStreamOptions{
1245+
Options: ChangeStreamOptions{
1246+
RetentionPeriod: func(b string) *string { return &b }("36h"),
1247+
ValueCaptureType: func(b string) *string { return &b }("NEW_VALUES"),
1248+
},
1249+
},
1250+
Position: line(4),
1251+
},
1252+
},
1253+
},
1254+
},
1255+
{
1256+
`DROP CHANGE STREAM csname`,
1257+
&DDL{
1258+
Filename: "filename",
1259+
List: []DDLStmt{
1260+
&DropChangeStream{
1261+
Name: "csname",
1262+
Position: line(1),
1263+
},
1264+
},
1265+
},
1266+
},
11671267
}
11681268
for _, test := range tests {
11691269
got, err := ParseDDL("filename", test.in)

spanner/spansql/sql.go

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -106,17 +106,7 @@ func (cs CreateChangeStream) SQL() string {
106106
if i > 0 {
107107
str += ", "
108108
}
109-
str += table.Table.SQL()
110-
if !table.WatchAllCols {
111-
str += "("
112-
for i, c := range table.Columns {
113-
if i > 0 {
114-
str += ", "
115-
}
116-
str += c.SQL()
117-
}
118-
str += ")"
119-
}
109+
str += table.SQL()
120110
}
121111
}
122112
if cs.Options != (ChangeStreamOptions{}) {
@@ -126,6 +116,21 @@ func (cs CreateChangeStream) SQL() string {
126116
return str
127117
}
128118

119+
func (w WatchDef) SQL() string {
120+
str := w.Table.SQL()
121+
if !w.WatchAllCols {
122+
str += "("
123+
for i, c := range w.Columns {
124+
if i > 0 {
125+
str += ", "
126+
}
127+
str += c.SQL()
128+
}
129+
str += ")"
130+
}
131+
return str
132+
}
133+
129134
func (dt DropTable) SQL() string {
130135
return "DROP TABLE " + dt.Name.SQL()
131136
}
@@ -143,11 +148,29 @@ func (dc DropChangeStream) SQL() string {
143148
}
144149

145150
func (acs AlterChangeStream) SQL() string {
146-
return "ALTER CHANGE STREAM " + acs.Name.SQL() + " SET " + acs.Alteration.SQL()
151+
return "ALTER CHANGE STREAM " + acs.Name.SQL() + " " + acs.Alteration.SQL()
152+
}
153+
154+
func (scsw AlterWatch) SQL() string {
155+
str := "SET FOR "
156+
if scsw.WatchAllTables {
157+
return str + "ALL"
158+
}
159+
for i, table := range scsw.Watch {
160+
if i > 0 {
161+
str += ", "
162+
}
163+
str += table.SQL()
164+
}
165+
return str
147166
}
148167

149168
func (ao AlterChangeStreamOptions) SQL() string {
150-
return ao.Options.SQL()
169+
return "SET " + ao.Options.SQL()
170+
}
171+
172+
func (dcsw DropChangeStreamWatch) SQL() string {
173+
return "DROP FOR ALL"
151174
}
152175

153176
func (cso ChangeStreamOptions) SQL() string {

spanner/spansql/sql_test.go

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,26 @@ func TestSQL(t *testing.T) {
415415
"ALTER DATABASE dbname SET OPTIONS (optimizer_version=null, optimizer_statistics_package=null, version_retention_period=null, enable_key_visualizer=null, default_leader=null)",
416416
reparseDDL,
417417
},
418+
{
419+
&CreateChangeStream{
420+
Name: "csname",
421+
Watch: []WatchDef{
422+
{Table: "Ta", WatchAllCols: true, Position: line(1)},
423+
{Table: "Tsub", Columns: []ID{ID("Hash")}, Position: line(1)},
424+
},
425+
Position: line(1),
426+
},
427+
"CREATE CHANGE STREAM csname FOR Ta, Tsub(`Hash`)",
428+
reparseDDL,
429+
},
430+
{
431+
&DropChangeStream{
432+
Name: "csname",
433+
Position: line(1),
434+
},
435+
"DROP CHANGE STREAM csname",
436+
reparseDDL,
437+
},
418438
{
419439
&CreateChangeStream{
420440
Name: "csname",
@@ -440,17 +460,52 @@ func TestSQL(t *testing.T) {
440460
"CREATE CHANGE STREAM csname FOR ALL OPTIONS (retention_period='7d', value_capture_type='NEW_VALUES')",
441461
reparseDDL,
442462
},
463+
{
464+
&AlterChangeStream{
465+
Name: "csname",
466+
Alteration: AlterWatch{
467+
WatchAllTables: true,
468+
},
469+
Position: line(1),
470+
},
471+
"ALTER CHANGE STREAM csname SET FOR ALL",
472+
reparseDDL,
473+
},
474+
{
475+
&AlterChangeStream{
476+
Name: "csname",
477+
Alteration: AlterWatch{
478+
Watch: []WatchDef{
479+
{Table: "Ta", WatchAllCols: true, Position: Position{Line: 1, Offset: 35}},
480+
{Table: "Tsub", Columns: []ID{ID("Hash")}, Position: Position{Line: 1, Offset: 39}},
481+
},
482+
},
483+
Position: line(1),
484+
},
485+
"ALTER CHANGE STREAM csname SET FOR Ta, Tsub(`Hash`)",
486+
reparseDDL,
487+
},
443488
{
444489
&AlterChangeStream{
445490
Name: "csname",
446491
Alteration: AlterChangeStreamOptions{
447492
Options: ChangeStreamOptions{
493+
RetentionPeriod: func(s string) *string { return &s }("7d"),
448494
ValueCaptureType: func(s string) *string { return &s }("NEW_VALUES"),
449495
},
450496
},
451497
Position: line(1),
452498
},
453-
"ALTER CHANGE STREAM csname SET OPTIONS (value_capture_type='NEW_VALUES')",
499+
"ALTER CHANGE STREAM csname SET OPTIONS (retention_period='7d', value_capture_type='NEW_VALUES')",
500+
reparseDDL,
501+
},
502+
{
503+
&AlterChangeStream{
504+
Name: "csname",
505+
Alteration: DropChangeStreamWatch{},
506+
Position: line(1),
507+
},
508+
"ALTER CHANGE STREAM csname DROP FOR ALL",
454509
reparseDDL,
455510
},
456511
{

0 commit comments

Comments
 (0)