Skip to content

Commit 26dfeeb

Browse files
authored
[cdc-connector][oceanbase] Fallback to current timestamp when query gts failure (#2868)
1 parent 2c557c6 commit 26dfeeb

File tree

2 files changed

+38
-14
lines changed

2 files changed

+38
-14
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseConnection.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,26 @@
2020

2121
import io.debezium.jdbc.JdbcConfiguration;
2222
import io.debezium.jdbc.JdbcConnection;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
2325

2426
import java.sql.DatabaseMetaData;
2527
import java.sql.ResultSet;
2628
import java.sql.SQLException;
29+
import java.sql.Timestamp;
2730
import java.time.Duration;
2831
import java.util.ArrayList;
2932
import java.util.List;
33+
import java.util.Optional;
3034
import java.util.Properties;
3135
import java.util.regex.Pattern;
3236
import java.util.stream.Collectors;
3337

3438
/** {@link JdbcConnection} extension to be used with OceanBase server. */
3539
public class OceanBaseConnection extends JdbcConnection {
3640

41+
private static final Logger LOG = LoggerFactory.getLogger(OceanBaseConnection.class);
42+
3743
private static final String QUOTED_CHARACTER = "`";
3844
private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties();
3945
private static final String MYSQL_URL_PATTERN =
@@ -105,20 +111,42 @@ private static JdbcConnection.ConnectionFactory factory(
105111
formatJdbcUrl(jdbcDriver, jdbcProperties), jdbcDriver, classLoader);
106112
}
107113

108-
private String getSystemSchema() {
109-
return "mysql".equalsIgnoreCase(compatibleMode) ? "oceanbase" : "SYS";
110-
}
111-
112114
/**
113-
* Get current timestamp in nanoseconds from GTS (Global Timestamp Service).
115+
* Get current timestamp number in seconds.
114116
*
115-
* @return the global timestamp.
117+
* @return current timestamp number.
116118
* @throws SQLException If a database access error occurs.
117119
*/
118-
public Long getGlobalTimestamp() throws SQLException {
120+
public long getCurrentTimestampS() throws SQLException {
121+
try {
122+
long globalTimestamp = getGlobalTimestamp();
123+
LOG.info("Global timestamp: {}", globalTimestamp);
124+
return Long.parseLong(String.valueOf(globalTimestamp).substring(0, 10));
125+
} catch (Exception e) {
126+
LOG.warn("Failed to get global timestamp, use local timestamp instead");
127+
}
128+
return getCurrentTimestamp()
129+
.orElseThrow(IllegalStateException::new)
130+
.toInstant()
131+
.getEpochSecond();
132+
}
133+
134+
private long getGlobalTimestamp() throws SQLException {
135+
String schema = "mysql".equalsIgnoreCase(compatibleMode) ? "oceanbase" : "SYS";
136+
return querySingleValue(
137+
connection(),
138+
"SELECT TS_VALUE FROM " + schema + ".V$OB_TIMESTAMP_SERVICE",
139+
ps -> {},
140+
rs -> rs.getLong(1));
141+
}
142+
143+
@Override
144+
public Optional<Timestamp> getCurrentTimestamp() throws SQLException {
119145
return queryAndMap(
120-
String.format("SELECT TS_VALUE FROM %s.V$OB_TIMESTAMP_SERVICE", getSystemSchema()),
121-
rs -> rs.next() ? rs.getLong(1) : null);
146+
"mysql".equalsIgnoreCase(compatibleMode)
147+
? "SELECT CURRENT_TIMESTAMP"
148+
: "SELECT CURRENT_TIMESTAMP FROM DUAL",
149+
rs -> rs.next() ? Optional.of(rs.getTimestamp(1)) : Optional.empty());
122150
}
123151

124152
/**

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,7 @@ public void run(SourceContext<T> ctx) throws Exception {
152152
initTableWhiteList();
153153

154154
if (shouldReadSnapshot()) {
155-
Long globalTimestamp = getSnapshotConnection().getGlobalTimestamp();
156-
if (globalTimestamp == null || globalTimestamp <= 0) {
157-
throw new RuntimeException("Got invalid global timestamp: " + globalTimestamp);
158-
}
159-
long startTimestamp = globalTimestamp / 1000_000_000;
155+
long startTimestamp = getSnapshotConnection().getCurrentTimestampS();
160156
LOG.info("Snapshot reading started from timestamp: {}", startTimestamp);
161157
readSnapshotRecords();
162158
LOG.info("Snapshot reading finished");

0 commit comments

Comments
 (0)