-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[dagster-mariadb] Add MariaDB Integration & Support #32618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[dagster-mariadb] Add MariaDB Integration & Support #32618
Conversation
|
Hi @aluminates, it's likely possible to use mariadb without adding it to the |
|
Hey @cmpadden, this was an experiment for a hackathon. Please see below, This PR introduces first-class support for MariaDB within Dagster by providing:
The goal is to enable Dagster users to integrate MariaDB into ETL/ELT pipelines with minimal boilerplate. |
Gotcha. It looks like not all of the files were checked in to version control. So I'm only seeing a single change to the setup.py. That's exciting though, thank you! |
Ah yep, it's a WIP/draft still (I like seeing my changes in PRs during development). Should've clarified. Thanks anyways. |
python_modules/libraries/dagster-mariadb/dagster_mariadb/__init__.py
Outdated
Show resolved
Hide resolved
1. remove trailing whitespace 2. refactor get_engine() to avoid creating engines repeatedly
|
@revanthsreeram FYI. |
…on' into mariadb-feature-support-and-testing-for-dagster
…ting-for-dagster Mariadb feature support and testing for dagster
python_modules/libraries/dagster-mariadb/dagster_mariadb/__init__.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-mariadb/dagster_mariadb/alembic/alembic.ini
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-mariadb/dagster_mariadb/advanced_features.py
Outdated
Show resolved
Hide resolved
…ced_features.py Co-authored-by: graphite-app[bot] <96075541+graphite-app[bot]@users.noreply.github.com>
Co-authored-by: graphite-app[bot] <96075541+graphite-app[bot]@users.noreply.github.com>
Co-authored-by: graphite-app[bot] <96075541+graphite-app[bot]@users.noreply.github.com>
Co-authored-by: graphite-app[bot] <96075541+graphite-app[bot]@users.noreply.github.com>
| if not rows: | ||
| return None | ||
|
|
||
| cluster_status = {row[0]: row[1] for row in rows} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dict(rows)?
| "local_state": cluster_status.get("wsrep_local_state_comment", "Unknown"), | ||
| } | ||
| except SQLAlchemyError: | ||
| return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe still forward the exception instead?
How I see the user would access this function:
they would have some bigger try block and catch the connection etc. errors. So the call for check_galera_cluster_status would also lay inside that block.
The exception will allow them handling those errors at one place, with a single catch, minimizing the coding effort.
Some of your functions already forward the exception, so better to have a single way to handle connection issues.
I could understand returning None for other (non-connection) reason, if they are very function specific.
Don't forget to fix the return annotation
| return None | ||
|
|
||
|
|
||
| def optimize_for_write_heavy_load( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how typical such functions for dagster... imo this one looks like a hint for a user, but it doesn't clarify that it'll do a heavy operation, which may run VERY long.
My suggestion: make a function change_engine or alter_engine, where any mariadb engine can be accepted. Make it very clear that a function may run for days.
One good note is that reads won't be blocked by such an ALTER TABLE. In many cases, writes wouldn't, too. A very cool, mariadb-specific feature would be to allow user "force" the non-write-blocking alter with LOCK=NONE (it's always by default, but will fail if that won't be possible).
More info on LOCK=NONE:
https://mariadb.com/docs/server/reference/sql-statements/data-definition/alter/alter-table/online-schema-change
Note that if table has no indices, parallel writes may slow down Online alter table dramatically (worth to mention in funciton doc)
You can still leave this function existing, but re-use change_engine/alter_engine here, or maybe it's better to just document the use-case in the readme:
- Optimize table for write-heavy load: change_engine(table, StorageEngine.MyRocks if use_myrocks else StorageEngine.InnoDB)
- Use spider for musti-tenent workload: change_engine(table, StorageEngine.Spider, spider_params)
|
|
||
| # Note: This is a simplified example | ||
| # Real Spider setup requires additional configuration | ||
| query = f""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same way as above, it's better to avoid typing the CREATE query manually in the function that can be supposed as a shortcut. Make a general function (or maybe there exists one?) and reuse it to follow DRY principle.
| connection.execute(text(query)) | ||
| connection.commit() | ||
| except SQLAlchemyError as e: | ||
| connection.rollback() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kinda catches my eye in many different ways.
- if connection is broken, rollback may cause a cascade exception. Seems safe, since you anyway forward the exception. But the user will likely see "an exception happened while handling another exception".
- CREATE TABLE is a serialization point -- it issues an implicit commit before itself, and also after itself. Means both commit() is obsolete. But, perhaps, future-proof.
- If CREATE TABLE fails, then its side-effects will be rolled back, no matter what happens.
- This pattern is repeated all around
So I'd be fine if there'd be some function to reuse for a repeated pattern. Perhaps then it could (not necessary now, but in perspective) be improved to rollback only when connection is still alive.
Or otherwise CREATE TABLE could be treated specifically, with keeping in mind that rollback is not needed.
I'd go with with connection.begin() as transaction pattern which is simple, pythonic, future-proof and does what we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So overall to this function: extract execute_create_table, which will either uses execute_dml (which has no commit), or the with-pattern, which commits, and also rollbacks on exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, you already have one, called create_partitioned_table. See the note below.
| } | ||
|
|
||
|
|
||
| def create_partitioned_table( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no reason to have partitioned in the name -- it's just create_rable. Table can have partitions, or can have none, and then partition_config could be None
| table_name = self._get_table_name(context) | ||
|
|
||
| # Check if table exists | ||
| check_query = f""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This query is pretty fine for a general MariaDBPandasIOManager. Note that this one doesn't check for partitions. For that you might want to SELECT from information_schema.PARTITIONS
https://mariadb.com/docs/server/reference/system-tables/information-schema/information-schema-tables/information-schema-partitions-table
| WeeklyPartitionsDefinition, | ||
| MonthlyPartitionsDefinition, | ||
| )): | ||
| columns[col] = "DATE" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make constants out of these to avoid typing errors.
| def _infer_sql_type(self, dtype) -> str: | ||
| """Infer SQL type from pandas dtype.""" | ||
| dtype_str = str(dtype) | ||
| if 'int' in dtype_str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Int64 is case-sensitive and won't be caught by this (incorrect) heuristic. Correct way is to use introspection checks like is_integer_dtype etc.
https://pandas.pydata.org/docs/reference/arrays.html#data-type-introspection
| result = conn.execute(text(query), coerced_params or {}) | ||
| rows = [tuple(r) for r in result.fetchall()] | ||
| try: | ||
| result.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't with do that?
|
Hi! I am a mariadb server developer. I reached out your work where I've got familiar with hackathon. Here are some of my notes. I'm not a too much familiar with dagster itself. |
| enabled: Whether to enable parallel execution | ||
| """ | ||
| try: | ||
| query = f"SET SESSION sql_log_bin=0; SET GLOBAL innodb_parallel_read_threads={64 if enabled else 1}; SET SESSION max_parallel_tables={10 if enabled else 1}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no innodb_parallel_read_threads in MariaDB. There are some other tuning levers, but I doubt dagster is a right tool for configuring them. These variables are applied globally and will affect anyone using the already running server. The right place for them is server config.
Summary
This PR introduces first-class MariaDB support for Dagster by providing a complete integration package that enables seamless use of MariaDB as a backend for data orchestration pipelines.
MariaDBResource (
dagster_mariadb/resource.py):MariaDBPandasIOManager (
dagster_mariadb/io_manager.py):replace,append, andfailwrite modesMotivation
MariaDB is a widely-used, production-tested database that shares MySQL compatibility while offering enhanced features and performance. Currently, Dagster has excellent support for databases like Postgres and DuckDB, but lacks first-class MariaDB support. This integration fills that gap by:
This integration follows Dagster's established patterns for database resources and I/O managers, ensuring consistency with the broader ecosystem.
How I Tested These Changes
Unit Tests
MariaDBResource Tests:
MariaDBPandasIOManager Tests:
All tests use mock objects to avoid requiring a live MariaDB instance.
Integration Testing
Manual Testing
1. Basic Resource Usage
2. DataFrame I/O Operations
Other Testing
Successfully Created Test Assets:
✅ daily_sales - Daily partitioned (RANGE partitioning)
⚠️ regional_inventory - Static partitioned (LIST COLUMNS) - Had errors with LIST partitioning
⚠️ daily_sales_summary - Dependent asset - Had errors loading partition column
✅ monthly_revenue - Monthly partitioned (RANGE partitioning)
✅ weekly_metrics - Weekly partitioned (RANGE partitioning)
✅ multi_dim_transactions - Multi-dimensional (application-level)
✅ partition_metadata_report - Inspect partition info
✅ query_specific_partition - Test partition pruning
✅ partition_maintenance - Check/optimize partitions
✅ partition_performance_test - Performance comparison
✅ partition_test_summary - Overall summary
What Was Tested:
✅ Daily RANGE partitioning with TO_DAYS()
⚠️ Partition column type inference (had issues, fixed to force DATE type)
✅ Monthly RANGE partitioning
✅ Weekly RANGE partitioning
✅ Native partition table creation
✅ Partition metadata retrieval using get_partition_info()
Advanced Feature Testing for MariaDB Summary
Storage Engine Information (storage_engine_info)
Tested: get_storage_engine_info()
Validates: Retrieves list of available and supported storage engines
Output: DataFrame showing which engines are available in your MariaDB installation
InnoDB Table Creation (test_innodb_table)
Tested: create_table_with_storage_engine() with InnoDB
Validates: Creates ACID-compliant table with row-level locking
Output: Test table with sample log entries
Aria Table Creation (test_aria_table)
Tested: create_table_with_storage_engine() with Aria
Validates: Creates crash-safe table (improved MyISAM)
Output: Cache table with key-value pairs
Write-Heavy Optimization (test_optimize_write_heavy)
Tested: optimize_for_write_heavy_load()
Validates: Engine conversion for better write performance
Output: Comparison of table engines before/after optimization
Galera Cluster Status (test_galera_cluster_status)
Tested: check_galera_cluster_status()
Validates: Cluster status detection (will show "Not in cluster" if standalone)
Output: Cluster metrics or N/A status
Analytics Warehouse (test_analytics_warehouse)
Tested: setup_analytics_warehouse()
Validates: Database creation for analytics workloads
Output: Confirmation of warehouse database creation
Engine Comparison (compare_storage_engines)
Tested: Multiple engines side-by-side
Validates: Metadata retrieval for different engine types
Output: Comprehensive comparison of all test tables
Multi-Engine Performance (test_multi_engine_performance)
Tested: InnoDB vs Aria insert performance
Validates: Practical performance differences between engines
Output: Timing data for 300 row inserts per engine
ColumnStore Table (test_columnstore_table)
Tested: create_columnstore_table()
Validates: Columnar storage for analytics (if available)
Output: ColumnStore table or availability warning
Comprehensive Summary (advanced_features_summary)
Tested: All features combined
Validates: Complete integration test
Output: Summary statistics across all test tables