feat(persistence): Add JDBC persistence layer for Iceberg metrics reporting (#3337)#3385
Conversation
polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsSessionTagsBuilder.java
Outdated
Show resolved
Hide resolved
persistence/relational-jdbc/src/main/resources/postgres/schema-v4.sql
Outdated
Show resolved
Hide resolved
runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
Show resolved
Hide resolved
...me/service/src/main/java/org/apache/polaris/service/reporting/PersistingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/polaris/persistence/relational/jdbc/models/MetricsReportConverter.java
Outdated
Show resolved
Hide resolved
...me/service/src/main/java/org/apache/polaris/service/reporting/PersistingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../service/src/main/java/org/apache/polaris/service/reporting/MetricsReportCleanupService.java
Outdated
Show resolved
Hide resolved
|
Retest this please. |
461cfde to
2fc2a20
Compare
830085b to
fcf2716
Compare
|
@dimas-b thanks for the help in merging #3414. I have rebased this PR against I think this PR can be split into two. PR 1: Enable metrics event emission PR 2: Add metrics persistence (depends on PR 1) If this helps, please let me know and I can split this PR into two. |
dimas-b
left a comment
There was a problem hiding this comment.
Preliminary comments, not an end-to-end review :)
...bc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
Outdated
Show resolved
Hide resolved
...me/service/src/main/java/org/apache/polaris/service/reporting/PersistingMetricsReporter.java
Outdated
Show resolved
Hide resolved
runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
Outdated
Show resolved
Hide resolved
|
@obelix74 : Re: splitting into two PRs - if it's not too much overhead, it might be preferable from my POV. I think persistence impl. may be a bit more involved as I commented above. |
On it. Moving this PR to draft until the event PR is merged and this branch / PR is rebased against main. |
|
35bbf1e to
1954427
Compare
@dimas-b I have further redone this PR into the following distinct commits. The feat/add-metrics-persistence-handler branch now contains 7 well-organized commits:
PR#1 is #3468. If need be, we can separate out each one of these self-contained commits into their own PRs for review. |
Address reviewer feedback from PR apache#3385 (comment r2700474938): - Introduce MetricsPersistence interface in polaris-core for backend-agnostic metrics persistence - Add MetricsContext immutable class to encapsulate metrics context information - Add NoOpMetricsPersistence for backends that don't support metrics persistence - Add JdbcMetricsPersistence implementation that wraps JdbcBasePersistenceImpl - Add MetricsPersistenceProducer CDI producer to select implementation at runtime - Refactor PersistenceMetricsProcessor to use MetricsPersistence instead of instanceof checks This design supports the upcoming NoSQL persistence backend (apache#3396) by allowing each backend to provide its own MetricsPersistence implementation or use the no-op implementation that silently discards metrics.
|
This PR for metrics persistence is now ready for review. PR1 (this PR): Database schema v4 and persistence layer for metrics storage. PR2 (follow-up, not raised yet): Metrics processing framework that uses this persistence layer. |
persistence/relational-jdbc/src/main/resources/postgres/schema-v4.sql
Outdated
Show resolved
Hide resolved
persistence/relational-jdbc/src/main/resources/postgres/schema-v4.sql
Outdated
Show resolved
Hide resolved
Metrics tables are now part of the main schema and use the primary datasource instead of a separate named 'metrics' datasource. Changes: - Deleted schema-metrics-v1.sql files (postgres and h2) - Added metrics tables to main schema-v4.sql files - Deleted MetricsSchemaBootstrapUtil.java - Removed openMetricsSchemaResource from DatabaseType - Simplified JdbcBasePersistenceImpl (removed metricsDatasourceOperations) - Removed @nAmed('metrics') datasource injection from JdbcMetaStoreManagerFactory - Removed metrics datasource examples from application.properties - Updated CHANGELOG.md - Updated tests to work with single datasource
| // Run the set-up script to create the tables. | ||
| // Run the set-up script to create the tables (includes metrics tables). |
There was a problem hiding this comment.
Nit: do we need this comments?
There was a problem hiding this comment.
Removed comment "(includes metrics tables)" from JdbcMetaStoreManagerFactory.java
...rc/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java
Show resolved
Hide resolved
| @Inject PolarisDiagnostics diagnostics; | ||
| @Inject PolarisStorageIntegrationProvider storageIntegrationProvider; | ||
| @Inject Instance<DataSource> dataSource; | ||
|
|
| * @param conflictColumns The columns that form the conflict/key constraint. | ||
| * @return INSERT query with value bindings that ignores conflicts. | ||
| */ | ||
| public static PreparedQuery generateIdempotentInsertQuery( |
There was a problem hiding this comment.
why do we change the idempotent query in this PR?
There was a problem hiding this comment.
The generateIdempotentInsertQuery method is needed for metrics persistence to handle potential duplicate metric reports gracefully. When a compute engine (like Spark) retries a request due to network issues, it may send the same metric report multiple times with the same report ID.
Using idempotent inserts (ON CONFLICT DO NOTHING for PostgreSQL, MERGE for H2) ensures:
- The first insert succeeds and persists the metric
- Subsequent duplicate inserts are silently ignored without errors
- No data corruption or duplicate entries in the metrics tables
I can remove it from this PR if you feel it is not necessary.
There was a problem hiding this comment.
The current conflict key will never cause conflicts, as report id is uuid.
private static final List<String> METRICS_CONFLICT_COLUMNS = List.of("realm_id", "report_id");
Further more, I don't think we can actually easily distinguish two duplicated scan report. I'd suggest to remove it before we can actually figure a way to distinguish duplicated report.
|
|
||
| // BasePersistence doesn't implement MetricsPersistence | ||
| return MetricsPersistence.NOOP; | ||
| } |
There was a problem hiding this comment.
This is the place I really hope any contributor to read though the code to understand how metastore mananger work. PolarisMetaStoreManager itself should be able to used directly here to persist metrics instead of grabbing the instance of JdbcBasePersistenceImpl, which violates the principal of abstraction, this layer shouldn't know any persistence-specific classes, it is coded against an interface instead of a concrete class, otherwise, this part will break when any user switches to a different persistence.
There was a problem hiding this comment.
Sorry I did not understand that part of the code correctly. Followed the design of PolarisEventManager and created a PolarisMetricsManager. Modified PolarisMetastoreManager to implement PolarisMetricsManager. Then injected PolarisMetastoreManager and delegated persistence to that.
|
Re: question about generateIdempotentInsertQuery The Using idempotent inserts (ON CONFLICT DO NOTHING for PostgreSQL, MERGE for H2) ensures:
This is different from regular entity persistence where we want conflicts to fail so we can detect concurrent modifications. |
- Remove unnecessary comments and blank lines in JdbcMetaStoreManagerFactory - Remove extra blank line in runtime/admin application.properties - Update CHANGELOG.md to clarify configuration requirement - Fix abstraction violation: use MetricsPersistence interface instead of JdbcBasePersistenceImpl in PersistingMetricsReporter - Add setMetricsRequestContext() to MetricsPersistence interface with default no-op implementation - Update tests to mock interface instead of concrete class
Address PR review comment #2855743843 by using PolarisMetaStoreManager instead of directly accessing persistence, following the same pattern as PolarisEventManager. Changes: - Add PolarisMetricsManager interface with default methods that delegate to MetricsPersistence when supported - Extend PolarisMetaStoreManager with PolarisMetricsManager - Update PersistingMetricsReporter to inject PolarisMetaStoreManager and use its writeScanMetrics/writeCommitMetrics methods - Update tests to verify calls to PolarisMetaStoreManager
The NOOP constant and NoOpMetricsPersistence class are no longer used after the refactoring to use PolarisMetricsManager, which silently ignores metrics when the underlying persistence doesn't support them.
| effectiveSchemaVersion, | ||
| realm); | ||
| try { | ||
| // Run the set-up script to create the tables. |
There was a problem hiding this comment.
nit: we may still keep this comment
| this.callContext = callContext; | ||
| this.metaStoreManager = metaStoreManager; | ||
| this.polarisPrincipal = polarisPrincipal; | ||
| this.requestIdSupplier = requestIdSupplier; |
There was a problem hiding this comment.
Where do we inject the bean?
There was a problem hiding this comment.
The PersistingMetricsReporter bean is injected via CDI in ServiceProducers.java (line 434-439):
@Produces
@ApplicationScoped
public PolarisMetricsReporter metricsReporter(
MetricsReportingConfiguration config, @Any Instance<PolarisMetricsReporter> reporters) {
return reporters.select(Identifier.Literal.of(config.type())).get();
}This producer method:
- Reads polaris.iceberg-metrics.reporting.type config (defaults to "default" via MetricsReportingConfiguration)
- Selects the PolarisMetricsReporter implementation that has the matching @Identifier annotation
- When type=persisting, it selects PersistingMetricsReporter (which has @Identifier("persisting"))
The reporter is then used by IcebergCatalogHandler.reportMetrics() to handle incoming metrics reports.
To enable persisting metrics, users would set:
polaris.iceberg-metrics.reporting.type=persistingThere was a problem hiding this comment.
I meant the line 78, where do we inject the requestIdSupplier bean?
There was a problem hiding this comment.
There was a producer in ServiceProducers, that seems to have been deleted by accident. I've added a @RequestScoped producer for RequestIdSupplier in ServiceProducers.java. It retrieves the request ID that RequestIdFilter stores on each incoming request via ContainerRequestContext.setProperty().
| * @param callCtx the call context containing the persistence layer | ||
| * @return true if metrics can be persisted, false otherwise | ||
| */ | ||
| default boolean supportsMetricsPersistence(@Nonnull PolarisCallContext callCtx) { |
There was a problem hiding this comment.
this is not needed, can we remove it?
...is-core/src/main/java/org/apache/polaris/core/persistence/metrics/PolarisMetricsManager.java
Outdated
Show resolved
Hide resolved
...nal-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/SpiModelConverter.java
Outdated
Show resolved
Hide resolved
...is-core/src/main/java/org/apache/polaris/core/persistence/metrics/PolarisMetricsManager.java
Outdated
Show resolved
Hide resolved
- Restore comment 'Run the set-up script to create the tables' in JdbcMetaStoreManagerFactory - Remove unused supportsMetricsPersistence() method from PolarisMetricsManager - Move SpiModelConverter methods to Model classes (ModelScanMetricsReport, ModelCommitMetricsReport) - Delete SpiModelConverter.java - conversion logic now in Model classes following ModelEntity pattern - Add RequestIdSupplier producer in ServiceProducers for request ID injection - Update JdbcBasePersistenceImpl and tests to use new Model class methods
- Add principalName, requestId, otelTraceId, otelSpanId to MetricsRecordIdentity - Remove setMetricsRequestContext() from MetricsPersistence SPI - Simplify PolarisMetricsManager.write*() to take only (callCtx, record) - Populate request context in PersistingMetricsReporter before calling manager - Update MetricsRecordConverter builders with request context methods - Simplify JdbcBasePersistenceImpl - no more member variables for context Addresses reviewer feedback: single SPI method with complete record data
- BasePersistence now extends MetricsPersistence with default no-op methods - Remove instanceof checks in PolarisMetricsManager - Each implementation decides to override or use default no-op Addresses review feedback: cleaner design without runtime type checking
...bc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
Outdated
Show resolved
Hide resolved
...bc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
Outdated
Show resolved
Hide resolved
...bc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
Outdated
Show resolved
Hide resolved
runtime/admin/src/main/java/org/apache/polaris/admintool/config/AdminToolProducers.java
Outdated
Show resolved
Hide resolved
...st/java/org/apache/polaris/admintool/relational/jdbc/RelationalJdbcBootstrapCommandTest.java
Show resolved
Hide resolved
...bc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
Outdated
Show resolved
Hide resolved
...bc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
Outdated
Show resolved
Hide resolved
...bc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
Outdated
Show resolved
Hide resolved
...bc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
Outdated
Show resolved
Hide resolved
| * @param conflictColumns The columns that form the conflict/key constraint. | ||
| * @return INSERT query with value bindings that ignores conflicts. | ||
| */ | ||
| public static PreparedQuery generateIdempotentInsertQuery( |
There was a problem hiding this comment.
The current conflict key will never cause conflicts, as report id is uuid.
private static final List<String> METRICS_CONFLICT_COLUMNS = List.of("realm_id", "report_id");
Further more, I don't think we can actually easily distinguish two duplicated scan report. I'd suggest to remove it before we can actually figure a way to distinguish duplicated report.
00bb147 to
115921c
Compare
115921c to
7cfc20e
Compare
| * .build(); | ||
| * }</pre> | ||
| */ | ||
| public final class MetricsRecordConverter { |
There was a problem hiding this comment.
I'd suggest to move this class to package persistence. metrics. Not a blocker though. We could refactor later.
| // Test that bootstrap works without --include-metrics (default behavior) | ||
| public void testBootstrap(QuarkusMainLauncher launcher) { | ||
| // Test that bootstrap command works successfully. | ||
| // Metrics tables are now part of the base schema (v4) and bootstrapped automatically. |
There was a problem hiding this comment.
nit: I think line 51 isn't needed.
runtime/admin/build.gradle.kts
Outdated
|
|
||
| compileOnly("com.fasterxml.jackson.core:jackson-annotations") | ||
|
|
||
| compileOnly(project(":polaris-relational-jdbc")) |
There was a problem hiding this comment.
Can we remove this dependency?
…lational-jdbc The admin module does not have any compile-time imports from polaris-relational-jdbc. The runtimeOnly dependency is sufficient since the module is resolved via CDI at runtime.
Checklist
CHANGELOG.md(if needed)site/content/in-dev/unreleased(if needed)Description
This PR introduces the JDBC persistence layer for storing Iceberg metrics reports (scan and commit operations) as part of the Compute Client Audit Reporting feature (#3337).
Overview
This change adds the foundational persistence infrastructure required to store Iceberg metrics data received via the REST catalog's /reportMetrics endpoint. The metrics data includes detailed information about scan operations (file counts, data volumes, filter expressions) and commit operations (added/removed files, snapshot details).
Components Introduced
Model Classes
• ModelScanMetricsReport - Immutable model representing scan metrics with 30+ fields
• ModelCommitMetricsReport - Immutable model representing commit metrics with 30+ fields
Converters
• MetricsReportConverter - Converts Iceberg ScanReport and CommitReport objects to persistence models
• ModelScanMetricsReportConverter - Maps JDBC ResultSet to ModelScanMetricsReport
• ModelCommitMetricsReportConverter - Maps JDBC ResultSet to ModelCommitMetricsReport
Persistence Methods (added to JdbcBasePersistenceImpl)
• writeScanMetricsReport() / writeCommitMetricsReport() - Insert metrics into the database
• queryScanMetricsReports() / queryCommitMetricsReports() - Query metrics by realm and catalog
• queryScanMetricsReportsByTraceId() / queryCommitMetricsReportsByTraceId() - Query metrics by OpenTelemetry trace ID
• deleteScanMetricsReportsOlderThan() / deleteCommitMetricsReportsOlderThan() - Time-based cleanup
• deleteAllMetricsReportsOlderThan() - Unified cleanup for both metric types
Testing
Related Issues