Skip to content

Conversation

@aluminates
Copy link

@aluminates aluminates commented Oct 24, 2025

Summary

This PR introduces first-class MariaDB support for Dagster by providing a complete integration package that enables seamless use of MariaDB as a backend for data orchestration pipelines.

  1. MariaDBResource (dagster_mariadb/resource.py):

    • Configurable resource for MariaDB database connections using SQLAlchemy
    • Connection pooling with automatic retry logic
    • Support for environment variable fallbacks
    • Compatible with PyMySQL driver for MariaDB/MySQL compatibility
  2. MariaDBPandasIOManager (dagster_mariadb/io_manager.py):

    • I/O manager for seamless read/write of Pandas DataFrames to MariaDB
    • Supports replace, append, and fail write modes
    • Automatic table and schema creation
    • Column filtering and metadata support for efficient data loading

Motivation

MariaDB is a widely-used, production-tested database that shares MySQL compatibility while offering enhanced features and performance. Currently, Dagster has excellent support for databases like Postgres and DuckDB, but lacks first-class MariaDB support. This integration fills that gap by:

  • Enabling modern data pipelines: MariaDB is a robust, high-performance database that many organizations already use
  • Reducing boilerplate: Provides standardized, Dagster-native patterns for MariaDB interactions
  • Improving developer experience: Offers type-safe, resource-based connection management
  • Supporting ETL/ELT workflows: Seamless DataFrame-based operations with automatic schema management

This integration follows Dagster's established patterns for database resources and I/O managers, ensuring consistency with the broader ecosystem.

How I Tested These Changes

Unit Tests

  • MariaDBResource Tests:

    • Connection string generation with various configurations
    • Engine creation and caching
    • Connection context manager functionality
  • MariaDBPandasIOManager Tests:

    • Table name extraction from asset keys
    • Full table name construction with/without schemas
    • Schema creation functionality
    • DataFrame writing in all three modes (replace, append, fail)
    • Input loading with and without column filtering
    • Metadata handling

All tests use mock objects to avoid requiring a live MariaDB instance.

Integration Testing

  • OpenFlights Pipeline Tests:
    • Validates data loading from CSV files
    • Tests data cleaning and transformation logic
    • Verifies aggregation and analytics functions
    • Tests route statistics calculations
    • Validates top routes ranking

Manual Testing

1. Basic Resource Usage

from dagster_mariadb import MariaDBResource

resource = MariaDBResource(
    host="localhost",
    user="test",
    password="test",
    database="test_db"
)

with resource.get_connection() as conn:
    result = conn.execute("SELECT 1")
    assert result.fetchone()[0] == 1

2. DataFrame I/O Operations

# Tested DataFrame round-trip operations
from dagster_mariadb import MariaDBResource, MariaDBPandasIOManager
import pandas as pd

df = pd.DataFrame({"id": [1, 2, 3], "name": ["a", "b", "c"]})

# Write to MariaDB
# Read back from MariaDB
# Verified data integrity and schema creation

Other Testing

Successfully Created Test Assets:

✅ daily_sales - Daily partitioned (RANGE partitioning)
✅ monthly_revenue - Monthly partitioned (RANGE partitioning)
✅ weekly_metrics - Weekly partitioned (RANGE partitioning)
⚠️ regional_inventory - Static partitioned (LIST COLUMNS) - Had errors with LIST partitioning
✅ multi_dim_transactions - Multi-dimensional (application-level)
⚠️ daily_sales_summary - Dependent asset - Had errors loading partition column
✅ partition_metadata_report - Inspect partition info
✅ query_specific_partition - Test partition pruning
✅ partition_maintenance - Check/optimize partitions
✅ partition_performance_test - Performance comparison
✅ partition_test_summary - Overall summary

What Was Tested:

✅ Daily RANGE partitioning with TO_DAYS()
✅ Monthly RANGE partitioning
✅ Weekly RANGE partitioning
✅ Native partition table creation
✅ Partition metadata retrieval using get_partition_info()
⚠️ Partition column type inference (had issues, fixed to force DATE type)

Advanced Feature Testing for MariaDB Summary

  1. Storage Engine Information (storage_engine_info)
    Tested: get_storage_engine_info()
    Validates: Retrieves list of available and supported storage engines
    Output: DataFrame showing which engines are available in your MariaDB installation

  2. InnoDB Table Creation (test_innodb_table)
    Tested: create_table_with_storage_engine() with InnoDB
    Validates: Creates ACID-compliant table with row-level locking
    Output: Test table with sample log entries

  3. Aria Table Creation (test_aria_table)
    Tested: create_table_with_storage_engine() with Aria
    Validates: Creates crash-safe table (improved MyISAM)
    Output: Cache table with key-value pairs

  4. Write-Heavy Optimization (test_optimize_write_heavy)
    Tested: optimize_for_write_heavy_load()
    Validates: Engine conversion for better write performance
    Output: Comparison of table engines before/after optimization

  5. Galera Cluster Status (test_galera_cluster_status)
    Tested: check_galera_cluster_status()
    Validates: Cluster status detection (will show "Not in cluster" if standalone)
    Output: Cluster metrics or N/A status

  6. Analytics Warehouse (test_analytics_warehouse)
    Tested: setup_analytics_warehouse()
    Validates: Database creation for analytics workloads
    Output: Confirmation of warehouse database creation

  7. Engine Comparison (compare_storage_engines)
    Tested: Multiple engines side-by-side
    Validates: Metadata retrieval for different engine types
    Output: Comprehensive comparison of all test tables

  8. Multi-Engine Performance (test_multi_engine_performance)
    Tested: InnoDB vs Aria insert performance
    Validates: Practical performance differences between engines
    Output: Timing data for 300 row inserts per engine

  9. ColumnStore Table (test_columnstore_table)
    Tested: create_columnstore_table()
    Validates: Columnar storage for analytics (if available)
    Output: ColumnStore table or availability warning

  10. Comprehensive Summary (advanced_features_summary)
    Tested: All features combined
    Validates: Complete integration test
    Output: Summary statistics across all test tables

@cmpadden
Copy link
Contributor

Hi @aluminates, it's likely possible to use mariadb without adding it to the dagster setup.py, but instead by installing it into the environment where you're running dagster. If that's not what you're experiencing, can you please share more details as to what you're attempting here, thanks.

@aluminates aluminates changed the title Add MariaDB Integration & Support [DRAFT] Add MariaDB Integration & Support Oct 24, 2025
@aluminates
Copy link
Author

Hey @cmpadden, this was an experiment for a hackathon. Please see below,

This PR introduces first-class support for MariaDB within Dagster by providing:

  • A MariaDBResource for connection management.
  • A MariaDBPandasIOManager for seamless read/write of Pandas DataFrames.

The goal is to enable Dagster users to integrate MariaDB into ETL/ELT pipelines with minimal boilerplate.

@cmpadden
Copy link
Contributor

Hey @cmpadden, this was an experiment for a hackathon. Please see below,

This PR introduces first-class support for MariaDB within Dagster by providing:

* A `MariaDBResource` for connection management.

* A `MariaDBPandasIOManager` for seamless read/write of Pandas DataFrames.

The goal is to enable Dagster users to integrate MariaDB into ETL/ELT pipelines with minimal boilerplate.

Gotcha. It looks like not all of the files were checked in to version control. So I'm only seeing a single change to the setup.py.

That's exciting though, thank you!

@aluminates
Copy link
Author

Gotcha. It looks like not all of the files were checked in to version control. So I'm only seeing a single change to the setup.py.

That's exciting though, thank you!

Ah yep, it's a WIP/draft still (I like seeing my changes in PRs during development). Should've clarified. Thanks anyways.

@aluminates aluminates marked this pull request as draft October 26, 2025 07:23
1. remove trailing whitespace
2. refactor get_engine() to avoid creating engines repeatedly
@aluminates
Copy link
Author

@revanthsreeram FYI.

@aluminates aluminates marked this pull request as ready for review November 2, 2025 15:15
aluminates and others added 5 commits November 2, 2025 20:50
…ced_features.py

Co-authored-by: graphite-app[bot] <96075541+graphite-app[bot]@users.noreply.github.com>
Co-authored-by: graphite-app[bot] <96075541+graphite-app[bot]@users.noreply.github.com>
Co-authored-by: graphite-app[bot] <96075541+graphite-app[bot]@users.noreply.github.com>
@aluminates aluminates changed the title [DRAFT] Add MariaDB Integration & Support [dagster-mariadb] Add MariaDB Integration & Support Nov 2, 2025
Co-authored-by: graphite-app[bot] <96075541+graphite-app[bot]@users.noreply.github.com>
if not rows:
return None

cluster_status = {row[0]: row[1] for row in rows}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dict(rows)?

"local_state": cluster_status.get("wsrep_local_state_comment", "Unknown"),
}
except SQLAlchemyError:
return None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe still forward the exception instead?

How I see the user would access this function:
they would have some bigger try block and catch the connection etc. errors. So the call for check_galera_cluster_status would also lay inside that block.

The exception will allow them handling those errors at one place, with a single catch, minimizing the coding effort.

Some of your functions already forward the exception, so better to have a single way to handle connection issues.

I could understand returning None for other (non-connection) reason, if they are very function specific.

Don't forget to fix the return annotation

return None


def optimize_for_write_heavy_load(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how typical such functions for dagster... imo this one looks like a hint for a user, but it doesn't clarify that it'll do a heavy operation, which may run VERY long.

My suggestion: make a function change_engine or alter_engine, where any mariadb engine can be accepted. Make it very clear that a function may run for days.

One good note is that reads won't be blocked by such an ALTER TABLE. In many cases, writes wouldn't, too. A very cool, mariadb-specific feature would be to allow user "force" the non-write-blocking alter with LOCK=NONE (it's always by default, but will fail if that won't be possible).
More info on LOCK=NONE:
https://mariadb.com/docs/server/reference/sql-statements/data-definition/alter/alter-table/online-schema-change

Note that if table has no indices, parallel writes may slow down Online alter table dramatically (worth to mention in funciton doc)

You can still leave this function existing, but re-use change_engine/alter_engine here, or maybe it's better to just document the use-case in the readme:

  • Optimize table for write-heavy load: change_engine(table, StorageEngine.MyRocks if use_myrocks else StorageEngine.InnoDB)
  • Use spider for musti-tenent workload: change_engine(table, StorageEngine.Spider, spider_params)


# Note: This is a simplified example
# Real Spider setup requires additional configuration
query = f"""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same way as above, it's better to avoid typing the CREATE query manually in the function that can be supposed as a shortcut. Make a general function (or maybe there exists one?) and reuse it to follow DRY principle.

connection.execute(text(query))
connection.commit()
except SQLAlchemyError as e:
connection.rollback()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kinda catches my eye in many different ways.

  1. if connection is broken, rollback may cause a cascade exception. Seems safe, since you anyway forward the exception. But the user will likely see "an exception happened while handling another exception".
  2. CREATE TABLE is a serialization point -- it issues an implicit commit before itself, and also after itself. Means both commit() is obsolete. But, perhaps, future-proof.
  3. If CREATE TABLE fails, then its side-effects will be rolled back, no matter what happens.
  4. This pattern is repeated all around

So I'd be fine if there'd be some function to reuse for a repeated pattern. Perhaps then it could (not necessary now, but in perspective) be improved to rollback only when connection is still alive.
Or otherwise CREATE TABLE could be treated specifically, with keeping in mind that rollback is not needed.

I'd go with with connection.begin() as transaction pattern which is simple, pythonic, future-proof and does what we want.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So overall to this function: extract execute_create_table, which will either uses execute_dml (which has no commit), or the with-pattern, which commits, and also rollbacks on exception.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you already have one, called create_partitioned_table. See the note below.

}


def create_partitioned_table(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no reason to have partitioned in the name -- it's just create_rable. Table can have partitions, or can have none, and then partition_config could be None

table_name = self._get_table_name(context)

# Check if table exists
check_query = f"""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query is pretty fine for a general MariaDBPandasIOManager. Note that this one doesn't check for partitions. For that you might want to SELECT from information_schema.PARTITIONS
https://mariadb.com/docs/server/reference/system-tables/information-schema/information-schema-tables/information-schema-partitions-table

WeeklyPartitionsDefinition,
MonthlyPartitionsDefinition,
)):
columns[col] = "DATE"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make constants out of these to avoid typing errors.

def _infer_sql_type(self, dtype) -> str:
"""Infer SQL type from pandas dtype."""
dtype_str = str(dtype)
if 'int' in dtype_str:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Int64 is case-sensitive and won't be caught by this (incorrect) heuristic. Correct way is to use introspection checks like is_integer_dtype etc.
https://pandas.pydata.org/docs/reference/arrays.html#data-type-introspection

result = conn.execute(text(query), coerced_params or {})
rows = [tuple(r) for r in result.fetchall()]
try:
result.close()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't with do that?

@FooBarrior
Copy link

Hi! I am a mariadb server developer. I reached out your work where I've got familiar with hackathon.

Here are some of my notes. I'm not a too much familiar with dagster itself.
Overall I should say I really like the idea, and your deep approach on using native partitions. Utilizing columnstore is also great. spider and galera is nice to have, too, though I wonder how applicable dagster will be for those who use such tables.

enabled: Whether to enable parallel execution
"""
try:
query = f"SET SESSION sql_log_bin=0; SET GLOBAL innodb_parallel_read_threads={64 if enabled else 1}; SET SESSION max_parallel_tables={10 if enabled else 1}"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no innodb_parallel_read_threads in MariaDB. There are some other tuning levers, but I doubt dagster is a right tool for configuring them. These variables are applied globally and will affect anyone using the already running server. The right place for them is server config.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants