-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Feature][Jdbc] Support read multiple tables by regular expressions #9380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ssions in PostgreSQL/mysql/oracle/sqlserver, and fix the null pointer issue in the regular expression
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to add a option to mark whether the TablePath is a regular expression.
1、Simplified Path Matching Design,If you enter a path like /project/table_123, the system first checks for this exact path,If no exact match exists, it automatically treats your input as a regex pattern.
2、Like Flink CDC (where you directly use exact table names OR regex patterns without special parameters), we keep things simple. |
docs/en/connector-v2/source/Jdbc.md
Outdated
|
|
||
| Examples: | ||
| - `testdb.table+` - Matches all tables starting with "table" followed by numbers | ||
| - `testdb.*` - Matches all tables in the `testdb` database |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this expression will match database testdb2 too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The regular expression testdb.* will only match all the tables in the testdb database and will not match the testdb2 database.
| for (JdbcSourceTableConfig tableConfig : tablePaths) { | ||
| List<String> schemaTables = new ArrayList<>(); | ||
| String tablePath = tableConfig.getTablePath(); | ||
| String query = tableConfig.getQuery(); | ||
| LOG.info("Processing table path: {}, custom query: {}", tablePath, query); | ||
| String sql; | ||
| if (StringUtils.isBlank(query)) { | ||
| String schemaName; | ||
| if (jdbcDialect.dialectName().startsWith(DatabaseTypeEnum.ORACLE.getValue())) { | ||
| schemaName = tablePath.split("\\.")[0]; | ||
| sql = "SELECT OWNER, TABLE_NAME FROM dba_tables where OWNER=?"; | ||
| ps = connection.prepareStatement(sql); | ||
| ps.setString(1, schemaName); | ||
| rs = ps.executeQuery(); | ||
| while (rs.next()) { | ||
| // For Oracle: schema.table | ||
| String foundTable = | ||
| rs.getString("OWNER") + POINT + rs.getString("TABLE_NAME"); | ||
| schemaTables.add(foundTable); | ||
| LOG.info("Found table in Oracle: {}", foundTable); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not reuse JDBCCatalog to scan all should read tables? This way support all database type with catalog. https://github.com/apache/seatunnel/blob/dev/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java#L102
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current approach of manually implementing table scanning in JdbcSource offers several key advantages:
Precise Control
- Custom SQL
- queries tailored for each database type (Oracle, MySQL, SQLServer, PostgreSQL)
- Direct access to system tables for accurate table information retrieval
- Detailed Logging
- Comprehensive logging for debugging and troubleshooting
- Database-specific logging (e.g., LOG.info("Found table in PostgreSQL: {}", foundTable))
- Specialized Format Handling
-
Dedicated handling for different database table path formats:
MySQL: database.table
PostgreSQL: database.schema.table
Oracle: schema.table
SQLServer: database.schema.table
- Flexible Pattern Matching
- Two-step matching strategy in filterCapturedTablesByRegrex:First attempts exact matches
Falls back to regex pattern matching
-
Reduced Dependencies
Functions independently of complete Catalog implementations
Works even when some databases have incomplete Catalog implementations -
Compatibility Assurance
Explicitly defines supported databases via isSupportedDatabase
Provides fallback to JdbcCatalogUtils.getTables for unsupported database types
While using the Catalog interface might lead to more unified code, the current implementation provides greater flexibility and adaptability to specific database behaviors, especially when handling differences in system table structures and naming conventions.
# Conflicts: # docs/zh/connector-v2/source/Mysql.md
|
The modification has been made. I hope you can review it when you are free. |
| .withDescription("The database names RegEx of the database to capture."); | ||
|
|
||
| Option<String> SCHEMA_PATTERN = | ||
| Options.key("schema-pattern") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused?
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| @Disabled("Please Test it in your local environment") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why disabled the test case? You can reuse
Line 43 in 2f96f2e
| public class TestResultSet implements ResultSet { |
|
good job. This is already close to merge. |
| boolean isOracleDialect = | ||
| jdbcDialect.dialectName().equalsIgnoreCase("oracle") | ||
| || jdbcDialect.dialectName().equalsIgnoreCase("oceanbase-oracle"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot add this special processing logic. There are more than two Oracle-like databases.
E2E tests for MySQL and Oracle have been added. |
|
|
||
| default Optional<String> getDefaultDatabase() { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| default boolean useThreePartTablePath() { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not reuse
Line 296 in cbb777e
| default TablePath parse(String tablePath) { |
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM except two minior problems. Thanks @yzeng1618
| public Boolean getUseRegex() { | ||
| return useRegex != null ? useRegex : false; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public Boolean getUseRegex() { | |
| return useRegex != null ? useRegex : false; | |
| } |
| boolean isRegexPath = tableConfig.getUseRegex(); | ||
|
|
||
| if (StringUtils.isNotEmpty(tableConfig.getTablePath()) | ||
| && StringUtils.isEmpty(tableConfig.getQuery()) | ||
| && isRegexPath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| boolean isRegexPath = tableConfig.getUseRegex(); | |
| if (StringUtils.isNotEmpty(tableConfig.getTablePath()) | |
| && StringUtils.isEmpty(tableConfig.getQuery()) | |
| && isRegexPath) { | |
| if (StringUtils.isNotEmpty(tableConfig.getTablePath()) | |
| && StringUtils.isEmpty(tableConfig.getQuery()) | |
| && tableConfig.getUseRegex()) { |
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thansk @yzeng1618 ! LGTM if ci passes.
davidzollo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
good job
…pache#9380) Co-authored-by: zengyi <[email protected]>
…ssions in PostgreSQL/mysql/oracle/sqlserver, and fix the null pointer issue in the regular expression
#9209
Purpose of this pull request
added contents about multiple tables and regular expressions in PostgreSQL/mysql/oracle/sqlserver, and fix the null pointer issue in the regular expression.
Does this PR introduce any user-facing change?
Yes.
This PR enhances the table_list parameter to directly support regex patterns for table filtering, while maintaining backward compatibility.
Here's the detailed breakdown:
New Feature: Direct Use of Regular Expressions in table_path
Purpose: Allow users to write regular expressions directly in the table_path field within table_list to filter tables.
Example Configuration:
"table_list"=[
{
"table_path"="TEST.TEST_DB_*" # Matches all tables with the "TEST_DB" prefix
}
]
This configuration matches all tables prefixed with TEST_DB (e.g., TEST_DB_2023, TEST_DB_2024).
Improvement and Fix:
Enhanced the robustness of the approximateRowCntStatement method in OracleDialect.
Fixed a null pointer error in Oracle when executing queries with empty or invalid parameters.
Key Notes:
The table_path now supports regex syntax (e.g., TEST.TEST_DB_* → matches all tables under the TEST schema with names starting with TEST_DB_).
The Oracle fix ensures stable execution of row count estimation logic, avoiding crashes due to unhandled edge cases.
How was this patch tested?
Testing Environment
OS: Linux (Ubuntu 20.04)
SeaTunnel Version: 2.3.9
Execution Mode: Flink on YARN (yarn-application)
Databases:
PostgreSQL 16.0(Source)
Iceberg (Sink)
Test Configuration:
Created a configuration file pg2iceberg.conf to read tables matching the regex TEST.TEST_DB_* from PostgreSQL:
{
env {
execution.parallelism = 1
job.mode = "BATCH"
job.name = "seatunnel_batch_job"
}
source配置
source {
JDBC {
url = "jdbc:postgresql://xxxxxxx:xxxxx/xxxxx"
driver = "org.postgresql.Driver"
user = "xxxxxxxx"
password = "xxxxxxx"
"table_list" = [
{
"table_path" = "postgres.public.test_db_2.*"
}
]
split.size = 5000
fetch_size = 2000
}
}
sink配置
sink {
Iceberg {
........
}
}
}
Execution Command:
./bin/start-seatunnel-flink-15-connector-v2.sh --config pg2iceberg.conf --deploy-mode run-application --target yarn-application --name multitable_pg2Iceberg
Check log:
Filtering tables with regex pattern: postgres.public.test_db_2.*
Found regex match table: postgres.public.test_db_20
Found regex match table: postgres.public.test_db_21
Found regex match table: postgres.public.test_db_22
Found regex match table: postgres.public.test_db_20250324
Found regex match table: postgres.public.test_db_202502
Found regex match table: postgres.public.test_db_202501
Total tables matched after filtering: 6
Check list
New License Guide