Skip to content

Commit 2c557c6

Browse files
[cdc-connector][sqlserver][tests] Fix UT errors by correcting right output (#2864)
1 parent 1839fb5 commit 2c557c6

File tree

6 files changed

+35
-43
lines changed

6 files changed

+35
-43
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ private List<String> testBackfillWhenWritingEvents(
398398
env.enableCheckpointing(1000);
399399
env.setParallelism(1);
400400

401-
ResolvedSchema customersSchame =
401+
ResolvedSchema customersSchema =
402402
new ResolvedSchema(
403403
Arrays.asList(
404404
physical("cid", BIGINT().notNull()),
@@ -407,7 +407,7 @@ private List<String> testBackfillWhenWritingEvents(
407407
physical("phone_number", STRING())),
408408
new ArrayList<>(),
409409
UniqueConstraint.primaryKey("pk", Collections.singletonList("cid")));
410-
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchame);
410+
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema);
411411
MongoDBSource source =
412412
new MongoDBSourceBuilder()
413413
.hosts(CONTAINER.getHostAndPort())

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ private List<String> testBackfillWhenWritingEvents(
302302
env.enableCheckpointing(1000);
303303
env.setParallelism(1);
304304

305-
ResolvedSchema customersSchame =
305+
ResolvedSchema customersSchema =
306306
new ResolvedSchema(
307307
Arrays.asList(
308308
physical("cid", BIGINT().notNull()),
@@ -311,7 +311,7 @@ private List<String> testBackfillWhenWritingEvents(
311311
physical("phone_number", STRING())),
312312
new ArrayList<>(),
313313
UniqueConstraint.primaryKey("pk", Collections.singletonList("cid")));
314-
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchame);
314+
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema);
315315
MongoDBSource source =
316316
new MongoDBSourceBuilder()
317317
.hosts(CONTAINER.getHostAndPort())
@@ -345,11 +345,6 @@ private List<String> testBackfillWhenWritingEvents(
345345
mongoCollection.updateOne(
346346
Filters.eq("cid", 2000L), Updates.set("address", "Pittsburgh"));
347347
mongoCollection.deleteOne(Filters.eq("cid", 1019L));
348-
try {
349-
Thread.sleep(500L);
350-
} catch (InterruptedException e) {
351-
throw new RuntimeException(e);
352-
}
353348
};
354349

355350
if (hookType == USE_POST_LOWWATERMARK_HOOK) {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -528,11 +528,6 @@ private List<String> testBackfillWhenWritingEvents(
528528
connection.setAutoCommit(false);
529529
connection.execute(statements);
530530
connection.commit();
531-
try {
532-
Thread.sleep(500L);
533-
} catch (InterruptedException e) {
534-
throw new RuntimeException(e);
535-
}
536531
};
537532
if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
538533
hooks.setPreHighWatermarkAction(snapshotPhaseHook);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ private List<String> testBackfillWhenWritingEvents(
275275
env.enableCheckpointing(200L);
276276
env.setParallelism(1);
277277

278-
ResolvedSchema customersSchame =
278+
ResolvedSchema customersSchema =
279279
new ResolvedSchema(
280280
Arrays.asList(
281281
physical("ID", BIGINT().notNull()),
@@ -285,7 +285,7 @@ private List<String> testBackfillWhenWritingEvents(
285285
new ArrayList<>(),
286286
UniqueConstraint.primaryKey("pk", Collections.singletonList("ID")));
287287
TestTable customerTable =
288-
new TestTable(ORACLE_DATABASE, ORACLE_SCHEMA, "CUSTOMERS", customersSchame);
288+
new TestTable(ORACLE_DATABASE, ORACLE_SCHEMA, "CUSTOMERS", customersSchema);
289289
String tableId = customerTable.getTableId();
290290

291291
OracleSourceBuilder.OracleIncrementalSource source =
@@ -326,9 +326,6 @@ private List<String> testBackfillWhenWritingEvents(
326326
try (OracleConnection oracleConnection =
327327
OracleConnectionUtils.createOracleConnection(configuration)) {
328328
oracleConnection.execute(statements);
329-
Thread.sleep(500L);
330-
} catch (InterruptedException e) {
331-
throw new RuntimeException(e);
332329
}
333330
};
334331

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ private List<String> testBackfillWhenWritingEvents(
482482
env.enableCheckpointing(1000);
483483
env.setParallelism(1);
484484

485-
ResolvedSchema customersSchame =
485+
ResolvedSchema customersSchema =
486486
new ResolvedSchema(
487487
Arrays.asList(
488488
physical("id", BIGINT().notNull()),
@@ -492,7 +492,7 @@ private List<String> testBackfillWhenWritingEvents(
492492
new ArrayList<>(),
493493
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
494494
TestTable customerTable =
495-
new TestTable(customDatabase, "customer", "customers", customersSchame);
495+
new TestTable(customDatabase, "customer", "customers", customersSchema);
496496
String tableId = customerTable.getTableId();
497497

498498
PostgresSourceBuilder.PostgresIncrementalSource source =
@@ -525,9 +525,6 @@ private List<String> testBackfillWhenWritingEvents(
525525
try (PostgresConnection postgresConnection = dialect.openJdbcConnection()) {
526526
postgresConnection.execute(statements);
527527
postgresConnection.commit();
528-
Thread.sleep(500L);
529-
} catch (InterruptedException e) {
530-
throw new RuntimeException(e);
531528
}
532529
};
533530

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exc
123123
@Test
124124
public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
125125

126-
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK);
126+
List<String> records = testBackfillWhenWritingEvents(false, 25, USE_PRE_HIGHWATERMARK_HOOK);
127127

128128
List<String> expectedRecords =
129129
Arrays.asList(
@@ -146,17 +146,23 @@ public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
146146
"+I[1016, user_17, Shanghai, 123567891234]",
147147
"+I[1017, user_18, Shanghai, 123567891234]",
148148
"+I[1018, user_19, Shanghai, 123567891234]",
149-
"+I[2000, user_21, Pittsburgh, 123567891234]",
150-
"+I[15213, user_15213, Shanghai, 123567891234]");
151-
// when enable backfill, the wal log between [snapshot, high_watermark) will be
152-
// applied as snapshot image
149+
"+I[1019, user_20, Shanghai, 123567891234]",
150+
"+I[2000, user_21, Shanghai, 123567891234]",
151+
"+I[15213, user_15213, Shanghai, 123567891234]",
152+
"-U[2000, user_21, Shanghai, 123567891234]",
153+
"+U[2000, user_21, Pittsburgh, 123567891234]",
154+
"-D[1019, user_20, Shanghai, 123567891234]");
155+
// In sqlserver database, because the capture process extracts change data from the
156+
// transaction log, there is a built-in latency between the time that a change is committed
157+
// to a source table and the time that the change appears within its associated change
158+
// table.Then in streaming phase, the log which should be ignored will be read again.
153159
assertEqualsInAnyOrder(expectedRecords, records);
154160
}
155161

156162
@Test
157163
public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
158164

159-
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK);
165+
List<String> records = testBackfillWhenWritingEvents(false, 25, USE_POST_LOWWATERMARK_HOOK);
160166

161167
List<String> expectedRecords =
162168
Arrays.asList(
@@ -180,9 +186,15 @@ public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
180186
"+I[1017, user_18, Shanghai, 123567891234]",
181187
"+I[1018, user_19, Shanghai, 123567891234]",
182188
"+I[2000, user_21, Pittsburgh, 123567891234]",
183-
"+I[15213, user_15213, Shanghai, 123567891234]");
184-
// when enable backfill, the wal log between [low_watermark, snapshot) will be applied
185-
// as snapshot image
189+
"+I[15213, user_15213, Shanghai, 123567891234]",
190+
"+I[15213, user_15213, Shanghai, 123567891234]",
191+
"-U[2000, user_21, Shanghai, 123567891234]",
192+
"+U[2000, user_21, Pittsburgh, 123567891234]",
193+
"-D[1019, user_20, Shanghai, 123567891234]");
194+
// In sqlserver database, because the capture process extracts change data from the
195+
// transaction log, there is a built-in latency between the time that a change is committed
196+
// to a source table and the time that the change appears within its associated change
197+
// table.Then in streaming phase, the log which should be ignored will be read again.
186198
assertEqualsInAnyOrder(expectedRecords, records);
187199
}
188200

@@ -272,7 +284,7 @@ private List<String> testBackfillWhenWritingEvents(
272284
env.enableCheckpointing(1000);
273285
env.setParallelism(1);
274286

275-
ResolvedSchema customersSchame =
287+
ResolvedSchema customersSchema =
276288
new ResolvedSchema(
277289
Arrays.asList(
278290
physical("id", BIGINT().notNull()),
@@ -281,7 +293,7 @@ private List<String> testBackfillWhenWritingEvents(
281293
physical("phone_number", STRING())),
282294
new ArrayList<>(),
283295
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
284-
TestTable customerTable = new TestTable(databaseName, "dbo", "customers", customersSchame);
296+
TestTable customerTable = new TestTable(databaseName, "dbo", "customers", customersSchema);
285297
String tableId = customerTable.getTableId();
286298

287299
SqlServerSourceBuilder.SqlServerIncrementalSource source =
@@ -310,14 +322,10 @@ private List<String> testBackfillWhenWritingEvents(
310322
(sourceConfig, split) -> {
311323
SqlServerDialect dialect =
312324
new SqlServerDialect((SqlServerSourceConfig) sourceConfig);
313-
JdbcConnection sqlServerConnection =
314-
dialect.openJdbcConnection((JdbcSourceConfig) sourceConfig);
315-
sqlServerConnection.execute(statements);
316-
sqlServerConnection.commit();
317-
try {
318-
Thread.sleep(5000L);
319-
} catch (InterruptedException e) {
320-
throw new RuntimeException(e);
325+
try (JdbcConnection sqlServerConnection =
326+
dialect.openJdbcConnection((JdbcSourceConfig) sourceConfig)) {
327+
sqlServerConnection.execute(statements);
328+
sqlServerConnection.commit();
321329
}
322330
};
323331

0 commit comments

Comments
 (0)