|
20 | 20 |
|
21 | 21 | import io.debezium.jdbc.JdbcConfiguration; |
22 | 22 | import io.debezium.jdbc.JdbcConnection; |
| 23 | +import org.slf4j.Logger; |
| 24 | +import org.slf4j.LoggerFactory; |
23 | 25 |
|
24 | 26 | import java.sql.DatabaseMetaData; |
25 | 27 | import java.sql.ResultSet; |
26 | 28 | import java.sql.SQLException; |
| 29 | +import java.sql.Timestamp; |
27 | 30 | import java.time.Duration; |
28 | 31 | import java.util.ArrayList; |
29 | 32 | import java.util.List; |
| 33 | +import java.util.Optional; |
30 | 34 | import java.util.Properties; |
31 | 35 | import java.util.regex.Pattern; |
32 | 36 | import java.util.stream.Collectors; |
33 | 37 |
|
34 | 38 | /** {@link JdbcConnection} extension to be used with OceanBase server. */ |
35 | 39 | public class OceanBaseConnection extends JdbcConnection { |
36 | 40 |
|
| 41 | + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseConnection.class); |
| 42 | + |
37 | 43 | private static final String QUOTED_CHARACTER = "`"; |
38 | 44 | private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties(); |
39 | 45 | private static final String MYSQL_URL_PATTERN = |
@@ -105,20 +111,42 @@ private static JdbcConnection.ConnectionFactory factory( |
105 | 111 | formatJdbcUrl(jdbcDriver, jdbcProperties), jdbcDriver, classLoader); |
106 | 112 | } |
107 | 113 |
|
108 | | - private String getSystemSchema() { |
109 | | - return "mysql".equalsIgnoreCase(compatibleMode) ? "oceanbase" : "SYS"; |
110 | | - } |
111 | | - |
112 | 114 | /** |
113 | | - * Get current timestamp in nanoseconds from GTS (Global Timestamp Service). |
| 115 | + * Get current timestamp number in seconds. |
114 | 116 | * |
115 | | - * @return the global timestamp. |
| 117 | + * @return current timestamp number. |
116 | 118 | * @throws SQLException If a database access error occurs. |
117 | 119 | */ |
118 | | - public Long getGlobalTimestamp() throws SQLException { |
| 120 | + public long getCurrentTimestampS() throws SQLException { |
| 121 | + try { |
| 122 | + long globalTimestamp = getGlobalTimestamp(); |
| 123 | + LOG.info("Global timestamp: {}", globalTimestamp); |
| 124 | + return Long.parseLong(String.valueOf(globalTimestamp).substring(0, 10)); |
| 125 | + } catch (Exception e) { |
| 126 | + LOG.warn("Failed to get global timestamp, use local timestamp instead"); |
| 127 | + } |
| 128 | + return getCurrentTimestamp() |
| 129 | + .orElseThrow(IllegalStateException::new) |
| 130 | + .toInstant() |
| 131 | + .getEpochSecond(); |
| 132 | + } |
| 133 | + |
| 134 | + private long getGlobalTimestamp() throws SQLException { |
| 135 | + String schema = "mysql".equalsIgnoreCase(compatibleMode) ? "oceanbase" : "SYS"; |
| 136 | + return querySingleValue( |
| 137 | + connection(), |
| 138 | + "SELECT TS_VALUE FROM " + schema + ".V$OB_TIMESTAMP_SERVICE", |
| 139 | + ps -> {}, |
| 140 | + rs -> rs.getLong(1)); |
| 141 | + } |
| 142 | + |
| 143 | + @Override |
| 144 | + public Optional<Timestamp> getCurrentTimestamp() throws SQLException { |
119 | 145 | return queryAndMap( |
120 | | - String.format("SELECT TS_VALUE FROM %s.V$OB_TIMESTAMP_SERVICE", getSystemSchema()), |
121 | | - rs -> rs.next() ? rs.getLong(1) : null); |
| 146 | + "mysql".equalsIgnoreCase(compatibleMode) |
| 147 | + ? "SELECT CURRENT_TIMESTAMP" |
| 148 | + : "SELECT CURRENT_TIMESTAMP FROM DUAL", |
| 149 | + rs -> rs.next() ? Optional.of(rs.getTimestamp(1)) : Optional.empty()); |
122 | 150 | } |
123 | 151 |
|
124 | 152 | /** |
|
0 commit comments