Skip to content

feat(ingest/fivetran): REST API mode and Managed Data Lake destination support#17217

Merged
treff7es merged 14 commits into
masterfrom
feat/fivetran-managed-data-lake-iceberg-rest
May 6, 2026
Merged

feat(ingest/fivetran): REST API mode and Managed Data Lake destination support#17217
treff7es merged 14 commits into
masterfrom
feat/fivetran-managed-data-lake-iceberg-rest

Conversation

@treff7es
Copy link
Copy Markdown
Contributor

@treff7es treff7es commented Apr 28, 2026

Summary

The Fivetran connector previously required customers to set up a Fivetran Platform Connector that writes log data to a Snowflake / BigQuery / Databricks warehouse. That Platform Connector is a higher-tier Fivetran feature (Business / Enterprise plans) — customers on lower plans, or customers whose data lives only in a Managed Data Lake (no warehouse), couldn't ingest Fivetran metadata into DataHub at all.

This PR adds a new REST-API ingestion mode that reads connectors / schemas / users / sync history / destination routing directly via Fivetran's REST API (broadly available across Fivetran plans). No Platform Connector required, no warehouse credentials needed. It also extends destination handling to Fivetran's Managed Data Lake Service (MDL) across AWS S3, Google Cloud Storage, and Azure Data Lake Storage Gen2.

What's new

REST API mode (log_source: rest_api) — the headline feature

  • Works without any Fivetran Platform Connector setup.
  • Accessible to customers who don't have access to (or haven't enabled) the higher-tier Fivetran log-warehouse feature.
  • Reads everything via the public REST API: connectors, full table+column lineage, users, destination routing, Google Sheets connection details.
  • Per-table column-lineage fetch from /v1/connections/{id}/schemas/{schema}/tables/{table}/columns for full column coverage.
  • Parallel per-connector REST work with bounded worker pool (rest_api_max_workers, rest_api_per_connector_timeout_sec).
  • Per-destination cache shared between source and log reader; negative cache for failed destination discovery.
  • Per-destination warning dedup — one warning per destination (not per lineage edge) for URN-construction failures.

DB mode (log_source: log_database) — existing, unchanged

The existing path that reads the Fivetran Platform Connector log from Snowflake / BigQuery / Databricks warehouses. Continues to work as before; nothing in this PR breaks existing recipes.

Hybrid mode (both blocks supplied)

When both credential blocks are configured, the connector runs in hybrid:

  • Default: DB-primary — DB owns lineage / users / sync history; REST owns destination routing + Google Sheets details.
  • Set log_source: rest_api explicitly for REST-primary hybrid — REST owns connectors / schemas / users / routing; the DB log fills in per-run DataProcessInstance events (REST has no sync-history endpoint) and higher-fidelity column lineage.

Log source is auto-inferred from which credential blocks are present; an explicit log_source only needs setting for the REST-primary-hybrid case.

Managed Data Lake destination support

Fivetran's REST API reports service: managed_data_lake for every MDL destination regardless of underlying cloud (AWS / GCS / ADLS) or catalog (Polaris / Iceberg REST / Glue). The connector discovers the destination via /v1/destinations/{id} and routes URNs based on (in precedence order):

  1. User override on destination_to_platform_instance.<id>.platform.
  2. glue if the user supplied database (only Glue uses database among MDL platforms — protects against silent drops).
  3. Auto-detect from MDL config toggles — glue when should_maintain_tables_in_glue: true is set on the destination.
  4. Fallback to iceberg (Polaris / Iceberg REST default).
Pin in destination_to_platform_instance.<id>.platform URN shape
iceberg (default — no pin needed) iceberg.<schema>.<table>
glue (or auto-detected) glue.<database>.<schema>.<table> (user supplies actual Glue DB name)
s3 s3.<bucket>/<prefix>/<schema>/<table> (auto-discovered)
gcs gcs.<bucket>/<prefix>/<schema>/<table> (auto-discovered)
abs abs.<storage_account>/<container>/<prefix>/<schema>/<table> (auto-discovered)

The Databricks-backed MDL variant (service: databricks_via_managed_data_lake) routes via the existing relational service map to databricks.

Other improvements

  • Google Sheets connector workaround extracted to a dedicated handler (google_sheets_handler.py).
  • DB-log fallback for column lineage in REST-primary hybrid mode (DB log carries explicit source_column_name / destination_column_name).
  • fivetran_log_config becomes optional when log_source: rest_api.

Recipe — minimal REST-only setup (no warehouse credentials)

The smallest viable recipe for a customer on a lower Fivetran tier with a Managed Data Lake destination:

source:
  type: fivetran
  config:
    log_source: rest_api
    api_config:
      api_key: "${FIVETRAN_API_KEY}"
      api_secret: "${FIVETRAN_API_SECRET}"
    # For Glue-backed MDL: pin the actual Glue database name (visible
    # in the AWS Glue console). `platform: glue` is auto-detected from
    # `should_maintain_tables_in_glue: true` on the destination response.
    destination_to_platform_instance:
      <destination_id>:
        database: <actual Glue database name>
        platform_instance: <glue_platform_instance>
        env: PROD

Test plan

  • 209 fivetran tests pass (unit + integration), including new integration tests for hybrid discovery, REST-only mode, and DB-vs-REST equivalence.
  • mypy clean for files modified by this branch (5 errors remain on the branch — all pre-existing on master from unrelated commits, not in scope).
  • Live-verified against a real customer Fivetran account in rest_api mode and DB-primary hybrid mode: 23 of 25 connectors with column lineage; 5,333 column-level lineage entries.
  • Comprehensive capability matrix and credential-coverage tables in fivetran_pre.md; per-platform URN-shape table for MDL routing; storage-source URN-alignment caveat documented.

Checklist

  • PR conforms to PR Title Format
  • Links related issue (CUS-7715)
  • Tests added/updated (209 passing)
  • Docs updated (metadata-ingestion/docs/sources/fivetran/fivetran_pre.md)
  • No breaking changes — all existing recipes run unchanged

@github-actions
Copy link
Copy Markdown
Contributor

Linear: ING-2475

@github-actions github-actions Bot added the ingestion PR or Issue related to the ingestion of metadata label Apr 28, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 28, 2026

@datahub-connector-tests
Copy link
Copy Markdown

datahub-connector-tests Bot commented Apr 28, 2026

Connector Tests Results

Connector tests failed for commit 8729960

View full test logs →

To skip connector tests, add the skip-connector-tests label (org members only).

Autogenerated by the connector-tests CI pipeline.

@maggiehays maggiehays added the needs-review Label for PRs that need review from a maintainer. label Apr 28, 2026
@treff7es treff7es force-pushed the feat/fivetran-managed-data-lake-iceberg-rest branch from 3768fce to 08e6f6e Compare April 29, 2026 17:27
@treff7es treff7es changed the title feat(ingest/fivetran): support Iceberg REST Catalog and Polaris for Managed Data Lake destination feat(ingest/fivetran): REST API mode and Managed Data Lake destination support Apr 29, 2026
@treff7es treff7es changed the base branch from feat/fivetran-managed-data-lake to master April 29, 2026 17:29
…n support

Adds a new REST-API ingestion mode to the Fivetran source connector and
extends destination handling to cover Fivetran's Managed Data Lake
Service (MDL) across AWS S3, Google Cloud Storage, and Azure Data Lake
Storage Gen2 — addressing customers (e.g., Angi, CUS-7715) whose Fivetran
deployment writes exclusively to a cloud lakehouse and never to Snowflake
/ BigQuery / Databricks.

== Modes ==

Two ways to read the Fivetran log:
  - `log_source: log_database` (existing) — read the Fivetran Platform
    Connector log from a Snowflake / BigQuery / Databricks warehouse.
  - `log_source: rest_api` (new) — read everything via Fivetran's REST
    API; no warehouse credentials required. Works for accounts whose
    log destination is Managed Data Lake (no SQL log to query) and for
    teams that prefer not to grant DataHub a warehouse role.

When both credential blocks are supplied the connector runs in hybrid
mode. Default is DB-primary (DB owns lineage / users / sync history;
REST fills in destination routing + Google Sheets details). Set
`log_source: rest_api` explicitly for REST-primary hybrid — REST owns
connectors / schemas / users / routing, DB log fills in per-run
DataProcessInstance events (REST has no sync-history endpoint) and
higher-fidelity column lineage.

== Managed Data Lake destinations ==

Fivetran's REST API reports `service: managed_data_lake` for every MDL
destination regardless of underlying cloud or catalog. The connector
discovers the destination via `/v1/destinations/{id}` and routes URNs
based on (in precedence order):

  1. User override on `destination_to_platform_instance.<id>.platform`.
  2. Auto-detection from MDL config toggles — currently `glue` when
     `should_maintain_tables_in_glue: true` is set on the destination.
  3. Fallback to `iceberg` (Polaris / Iceberg REST default).

Supported `platform` values for MDL destinations:
  - `iceberg` — `urn:li:dataset:(iceberg, <schema>.<table>, env)`,
    aligned with DataHub's Iceberg source over Polaris / Iceberg REST.
  - `glue`    — `urn:li:dataset:(glue, fivetran_<schema>.<table>, env)`,
    aligned with DataHub's Glue source. Auto-detected when Fivetran's
    `should_maintain_tables_in_glue` toggle is set; the `fivetran_`
    database prefix is configurable via `mdl_glue_database_prefix`.
  - `s3` / `gcs` / `abs` — path-style URNs (`<bucket-or-container>/<prefix>/<schema>/<table>`)
    aligned with DataHub's S3 / GCS / ABS sources. Bucket/prefix is
    auto-populated from the discovered destination response; user can
    override `database` to point at a different mirror layout. Note
    that DataHub's data-lake source must be configured to emit
    table-level URNs (per-folder, not per-Parquet-file) for the URNs
    to align — see the docs.

The Databricks-backed MDL variant (`service: databricks_via_managed_data_lake`)
is routed via the existing `_RELATIONAL_SERVICE_MAP` to `databricks`.

== Other improvements ==

  - Per-table column-lineage fetch from `/v1/connections/{id}/schemas/{schema}/tables/{table}/columns`
    so REST mode produces full column lineage (the bulk schemas-config
    endpoint only returns user-modified columns).
  - DB-log fallback for column lineage in REST-primary hybrid: REST
    reader prefers DB-log column-lineage rows when available (carries
    explicit `source_column_name` / `destination_column_name`) and
    falls through to per-table REST fetch otherwise.
  - Parallel per-connector REST work with bounded worker pool
    (`rest_api_max_workers`, `rest_api_per_connector_timeout_sec`),
    fail-fast pool shutdown on a single-connector hang.
  - Per-instance destination cache shared between source and log
    reader; negative cache for failed destination discovery to avoid
    re-issuing failing API calls.
  - Google Sheets connector workaround extracted to a dedicated handler.
  - REST-mode `fivetran_log_config` is optional; connector infers
    `log_source` from which credential blocks are present.

== Tests + docs ==

  - 209 fivetran tests pass (unit + integration) across the new modes
    and routing combinations.
  - New integration tests: hybrid discovery, REST-only, DB-vs-REST
    equivalence.
  - Comprehensive capability matrix and credential-coverage tables in
    `fivetran_pre.md`; recipes for `log_database` and `rest_api`.
  - Per-platform URN-shape table for MDL routing (iceberg / glue / s3
    / gcs / abs); storage-source URN-alignment caveat documented.
…ig to per-destination

Drops the global `mdl_glue_database_prefix` config field (with its
unverified `fivetran_` default) and adds `glue_database_prefix` as an
optional field on `PlatformDetail`, settable per-destination via
`destination_to_platform_instance.<id>.glue_database_prefix`.

The previous design had two latent issues:
  1. The default value `fivetran_` was based on observed behaviour for
     one customer setup; Fivetran's REST API doesn't expose the actual
     Glue database name and the convention isn't documented in the
     OpenAPI spec, so the global default risked being silently wrong.
  2. The global prefix shadowed any per-destination `database` override
     because the prefix branch fired whenever `glue_database_prefix`
     was truthy — non-obvious interaction.

New behaviour for `platform: glue`:
  - With `glue_database_prefix` set on the PlatformDetail entry → emit
    `glue.<prefix><schema>.<table>` (the Glue source's two-part shape
    with Fivetran's prefix applied).
  - With no prefix → emit `glue.<schema>.<table>` directly. Correct
    when the customer's Glue catalog uses schema names verbatim as
    database names (no prefix).
  - The relational fallback (`glue.<database>.<schema>.<table>`) is
    removed — Glue is two-part, never three. The old test exercising
    that wrong shape is replaced with one pinning the new
    no-prefix → schema-verbatim behaviour.
…detect of platform=glue

The previous Glue routing assumed Fivetran prepends `fivetran_` to the
schema when creating Glue databases (so the URN was
`glue.fivetran_<schema>.<table>`). On closer inspection of Fivetran's
docs, that assumption doesn't hold: Fivetran shares one Glue database
per region across all destinations, the actual database name isn't
exposed via REST, and the Glue-table-naming convention isn't documented.
Shipping the speculative shape would have produced URNs that silently
fail to align with DataHub's Glue source for most customers.

New design (Option 2 from the design discussion):

  - `glue_database_prefix` config (per-destination) is removed entirely.
    No fictional URN composition.
  - Glue routing falls through to the relational URN path:
    `glue.<database>.<schema>.<table>`. The user MUST supply `database`
    on `destination_to_platform_instance.<id>` — set it to the actual
    Glue database name observed in their AWS Glue console.
  - The `<schema>.<table>` portion is taken verbatim from Fivetran's
    lineage record; customers verify against their Glue catalog that
    the convention matches. If it doesn't, they override per-destination.
  - Auto-detect of `platform: glue` from `should_maintain_tables_in_glue`
    is preserved — saves a config line for the common case. The user
    still has to provide `database`.

Also: per-destination dedup for "URN could not be constructed" warnings.
Without dedup, an auto-routed-to-glue destination missing a user-supplied
`database` would emit one warning per lineage edge (potentially hundreds).
With dedup (`_destinations_with_urn_warning`), exactly one warning per
destination per ingest, with subsequent edges silently skipped.
…gnal

Adds a third tier to the MDL platform-resolution chain:

  1. user override on base.platform (always wins)
  2. NEW: glue if base.database is set (database only makes sense for
     Glue routing among MDL platforms — iceberg/s3/gcs/abs all silently
     drop it)
  3. auto-detect from MDL config toggles
     (should_maintain_tables_in_glue → glue)
  4. iceberg fallback

Closes a silent foot-gun: a user setting database (intending Glue
routing) on a destination whose should_maintain_tables_in_glue toggle
isn't set would previously fall through to iceberg and quietly drop
their database setting.
…sers to REST mode

The original description led with the catalog-linked database (CLD)
case for surfacing Managed Data Lake logs through Snowflake — that
specific use case is now better served by `log_source: rest_api`,
which reads the log directly via API and sidesteps Snowflake's
identifier-casing rules entirely.

Reframes the field as a general escape hatch for case-preserving
Snowflake schemas (quoted lowercase names, etc.) and adds an explicit
nudge for MDL users to prefer REST mode. No behaviour change.
  - _derive_path_prefix: explicitly annotate parts as Tuple[Optional[str], ...]
    so the s3/gcs and abs branches' differing arity is reconciled.
  - fivetran_rest_api.py: tighten params typing from Dict[str, object] to
    Dict[str, Union[str, int]], matching what we pass and what
    requests.Session.get accepts.
  - response_models.py FivetranListedConnection.schema: rename to schema_
    with a Pydantic Field(alias='schema') and populate_by_name=True so
    Fivetran's JSON contract still parses while the field name no longer
    shadows BaseModel.schema(). Update call sites and test fixtures.
  - Add type annotations on test helpers across the fivetran test suite.
  - Replace user_id=None (typed as str on Connector) with empty string in
    test fixtures; readers already treat falsy as missing.
  - Drop stale _connection_details_cache and _max_jobs_per_connector
    attributes from test fixtures; neither exists on the actual classes.

Five lint errors remain on the branch — all pre-existing on master from
unrelated commits. Not addressed here.
…laims

  - Remove the 'Migrating from managed_data_lake_destination_config'
    section: it documented a path away from a config block that this
    branch never shipped to begin with.
  - Fix the example recipe URN comment for glue (was the obsolete
    'glue.fivetran_<schema>.<table>' from the unverified prefix design;
    now 'glue.<database>.<schema>.<table>') and add the required
    'database:' field on the example.
  - Drop two stale claims that 'pinning platform skips REST destination
    discovery' — wrong after the always-on-discovery refactor; per-field
    merge already preserves user overrides while letting discovery fill
    in unpinned fields like database.
  - Update the capability matrix and credential-coverage table: REST
    mode now produces full column lineage via the per-table column
    fetch; the 'table-only by default' / 'future enhancement' framing
    is obsolete.
  - Remove a redundant REST-only example yaml inside the Hybrid mode
    section and fix the tradeoffs paragraph that misclaimed REST has a
    sync-history endpoint.
… 3.10

The per-connector timeout path catches the built-in TimeoutError, which
works on Python 3.11+ where concurrent.futures.TimeoutError is aliased
to the built-in. On Python 3.10 the two are separate classes:
`future.result(timeout=...)` raises `concurrent.futures.TimeoutError`,
which doesn't match `except TimeoutError` and bubbles out of the worker
loop.

CI failure on Py 3.10 testQuick:
TestPerConnectorTimeout::test_timeout_skips_connector_other_workers_continue
- concurrent.futures._base.TimeoutError

Catch both classes so the timeout path works across all supported
Python versions.
…ivetran_log_config

This branch lifted the three max_*_per_connector fields
(max_jobs_per_connector, max_table_lineage_per_connector,
max_column_lineage_per_connector) from FivetranLogConfig to the
top-level FivetranSourceConfig — they govern the lineage payload size
regardless of which log source (DB / REST / hybrid) the connector reads
from.

Without a compat shim, existing recipes that set those fields nested
under fivetran_log_config now fail at config-load time with
'Extra inputs are not permitted'.

Adds a model_validator(mode='before') on FivetranSourceConfig that
hoists legacy nested values to the top level with a ConfigurationWarning,
mirroring the existing compat_sources_to_database pattern. Top-level
values win on conflict.

Also adds two unit tests covering the legacy-nested and dual-spec
(top-level beats nested) cases.
This branch makes Fivetran's per-destination URN routing reflect each
destination's actual platform (via REST destination discovery) instead
of defaulting every destination to fivetran_log_config.destination_platform.

For single-destination accounts and recipes that pin every destination
explicitly, no change. For hybrid-mode multi-destination accounts the
URNs shift to correct values on next ingest, which may break dashboards
keyed on the old (conflated) URNs. Doc entry explains the change and
the opt-out path (declarative override via destination_to_platform_instance).
@treff7es
Copy link
Copy Markdown
Contributor Author

Connector-tests CI status — explanation

The connector-tests / fivetran job is failing on this PR. After investigation, this is a deliberate, correctness-driven behaviour change, not a regression. The connector-tests golden file codifies a latent bug that this PR fixes.

Root cause

Master's behaviour (relevant code: fivetran/fivetran.py on master):

destination_details = self.config.destination_to_platform_instance.get(
    connector.destination_id, PlatformDetail()
)
if destination_details.platform is None:
    destination_details.platform = self.config.fivetran_log_config.destination_platform  # <-- always uses the LOG destination
if destination_details.database is None:
    destination_details.database = self.audit_log.fivetran_log_database

In hybrid mode (when both fivetran_log_config and api_config are configured), master never queries Fivetran's REST API to discover each destination's actual platform. It defaults every destination to the log warehouse's platform/database, regardless of where the connector's data physically goes.

This branch's behaviour: REST destination discovery is always-on when api_config is configured. The connector calls GET /v1/destinations/{id} per destination and uses the actual service returned by Fivetran. URNs match where the data actually lives.

Evidence from the connector-tests Fivetran account

Extracted from the failing CI run's diff output, the test Fivetran account has 4 distinct destinations:

Platform Database Notes
bigquery harshal-playground-306419 The log destination (matches fivetran_log_config.bigquery_destination_config)
bigquery acryl-prod A second BigQuery project — different from the log
snowflake datahub_community A Snowflake destination
snowflake fivetran_smoke_test_db Another Snowflake destination

Master's fivetran_with_bigquery_dest_golden.json: 42,463 destination URN mentions, ALL bigquery / harshal-playground-306419. All four destinations conflated into one.

Master's fivetran_with_snowflake_dest_golden.json: 42,115 destination URN mentions, ALL snowflake / fivetran_smoke_test_db. Same Fivetran account, but because the log was switched to Snowflake, all destinations got mirrored to that Snowflake URL instead.

The two goldens have mutually inconsistent URNs for the same logical connectors, even though the underlying Fivetran account is identical. That's the smoking gun: master's URN routing was driven by the log destination, not the actual destination. Lineage to URNs like bigquery.harshal-playground-306419.<snowflake-only-table> goes nowhere — the table doesn't exist in that BigQuery project.

This branch's output

REST discovery resolves each destination independently → all 4 platform/database pairs appear correctly in the emitted URNs. As a side effect, fivetran_with_bigquery_dest and fivetran_with_snowflake_dest should now produce identical lineage outputs, because the actual destinations don't depend on which log warehouse the connector reads from.

Impact + migration

Documented in docs/how/updating-datahub.md under Next → Breaking Changes:

(Ingestion / Fivetran — Lineage URN shift for multi-destination accounts). Per-destination URN routing now reflects each destination's actual platform instead of defaulting every destination to fivetran_log_config.destination_platform. … Single-destination accounts and recipes that already pin every destination explicitly are unaffected. To opt out and preserve the old behavior, pin each non-default destination explicitly via destination_to_platform_instance.<destination_id>.platform.

Action for the connector-tests repo

The test goldens need regeneration to capture the now-correct URNs:

pytest --update-golden-files smoke-test/integration/fivetran/test_fivetran_with_bigquery_dest.py
pytest --update-golden-files smoke-test/integration/fivetran/test_fivetran_with_snowflake_dest.py
pytest --update-golden-files smoke-test/integration/fivetran/test_fivetran_with_databricks_dest.py

The two goldens should converge to (nearly) the same content modulo the log-source-specific DPI metadata, since the actual destination set is identical between runs.

Happy to do the regeneration as a follow-up PR against acryldata/connector-tests if helpful, but I don't have write access to that repo from this branch.

Copy link
Copy Markdown
Contributor

@askumar27 askumar27 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (blocking): report_connectors_scanned() is called in two places for REST mode:

  1. fivetran_log_rest_reader.py:263 — inside get_allowed_connectors_list after f.result()
  2. fivetran.py:817 — inside _get_connector_workunits (the sole location in DB mode)

Result: the ingest report shows 2x the actual connector count for REST mode. On a 200-connector account, the report shows 400. This erodes trust in report metrics and could trigger false-positive threshold alerts.

issue (blocking): The two implementations handle the empty-string sentinel differently:
fivetran_log_rest_reader.py:134 if user_id is None: ← passes "" through
fivetran_log_db_reader.py:365 if not user_id: ← short-circuits on ""

Connectors with no connected_by user get "" assigned as a sentinel earlier in the REST reader. REST mode then traverses all groups via list_users for every such connector trying to look up "" in the cache — unnecessary API calls on every cache miss.


Adding a REST API ingestion path to a mature connector while maintaining backward compatibility is non-trivial, and the implementation shows real care! There are some blocking issues that need to be addressed. None require architectural rethinking.

)
return DatasetUrn.create_from_ids(
platform_id=destination_details.platform,
table_name=f"{destination_details.database.lower()}.{table_name}",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (blocking): Do we need to wrap this in a config to covert to lower case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 8729960fa2. Added database_lowercase: bool = True to PlatformDetail plus a database_for_urn @property that centralises the rule. Default keeps the existing lower-case URN behaviour (master had database.lower() here too); users who need to align with another DataHub source whose URN preserves database casing can opt out per-destination via destination_to_platform_instance.<id>.database_lowercase: false. Same toggle covers the source-URN path (your other comment). Plain @property rather than @computed_field so it doesn't appear in model_dump() and pollute the customProperties aspect via _compose_custom_properties. Tests in TestDatabaseForUrn (incl. test_property_does_not_appear_in_model_dump regression) and test_glue_database_case_preserved_when_opted_out.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue(blocking): May be we a config to control this - source_table URN construction

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same toggle covers the source side — see reply on the destination URN comment. The new PlatformDetail.database_lowercase field is honoured by the database_for_urn @property at both call sites, so a recipe can flip it once on sources_to_platform_instance.<connector_id> (or destination_to_platform_instance.<destination_id>) without affecting other entries.

f"destination {destination_id!r}: {payload.get('message')}"
)

details = FivetranDestinationDetails.model_validate(payload["data"])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct key access

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar pattern in the file - needs to be reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in 8729960fa2. Added a private _extract_data(payload, *, context) helper that validates code == "Success" and returns payload.get("data") (raising a clear ValueError instead of KeyError on a malformed-but-200 envelope). All five new direct-access sites in this file route through it now (get_destination_details_by_id, list_groups, list_connections, get_connection_schemas, get_table_columns, list_users). get_connection_details was left alone — it predates this PR and was already defensive.

Comment on lines +476 to +477
connector_patterns.allowed(connector_name)
or connector_patterns.allowed(connector_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (blocking): Previously DB mode matched connector_name only.

For an existing recipe with a deny pattern like deny: ["abc123"], this could now also deny connectors whose ID happens to contain that substring — silently dropping connectors that previously passed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, that was a silent backwards-compat regression. Reverted to master's name-only match in DB mode in 8729960fa2. REST mode still ORs connector_id with connector_name (which in REST is listed.schema_), but REST is new — no compat baseline to preserve there. Added a comment in fivetran_log_rest_reader.py pointing at the DB reader and explaining the asymmetry, so the next person doesn't try to "unify" them.

# correction (e.g., to align with another DataHub source).
return base.model_copy(
update={
"platform": base.platform or discovered.service,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (blocking): If discovered.service is not in _RELATIONAL_SERVICE_MAP and the user hasn't pinned a platform, line 647 uses the raw service string as the URN's dataPlatform. This silently produces e.g. urn:li:dataPlatform:aurora_postgres_warehouse_v2 — junk that doesn't align with any DataHub source.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 8729960fa2. The unknown-service fallback no longer synthesizes a URN platform from discovered.service; we leave base.platform untouched (so user overrides still win) and let build_destination_urn raise. The existing per-destination dedup'd warning at _extend_lineage already tells the user to set destination_to_platform_instance.<id>.platform — same pattern used for missing-database-on-glue. We still mirror discovered.config.database so once the user pins a platform they don't have to also pin the database. Regression test in test_unknown_service_without_override_leaves_platform_none.

Comment on lines +237 to +251
class FivetranSyncHistoryItem(BaseModel):
model_config = ConfigDict(extra="ignore")

sync_id: str
started_at: datetime.datetime
completed_at: Optional[datetime.datetime] = None
status: str
message: Optional[str] = None


class FivetranSyncHistoryResponse(BaseModel):
model_config = ConfigDict(extra="ignore")

items: List[FivetranSyncHistoryItem]
next_cursor: Optional[str] = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used in code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct — FivetranSyncHistoryResponse and FivetranSyncHistoryItem were dead. REST has no sync-history endpoint (call out in the PR description). Both classes deleted in 8729960fa2.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Your PR has been assigned to anush.kumar for review (ING-2475).

@alwaysmeticulous
Copy link
Copy Markdown

alwaysmeticulous Bot commented May 5, 2026

🔴 Meticulous spotted visual differences in 1 of 1480 screens tested: view and approve differences detected.

Meticulous evaluated ~10 hours of user flows against your PR.

Last updated for commit 0da6b06 refactor(ingest/fivetran): move database_for_urn off PlatformDetail. This comment will update as new commits are pushed.

- Drop unused FivetranSyncHistoryResponse / FivetranSyncHistoryItem models
  (REST has no sync-history endpoint).
- Align REST get_user_email empty-string sentinel with DB reader's
  falsy check; previously every connector with no `connected_by` user
  triggered an N-group list_users walk on every cache miss.
- Count connectors_scanned only at the canonical site
  (_get_connector_workunits); REST mode was double-counting.
- Centralise Fivetran REST envelope validation in a single
  _extract_data(payload, *, context) helper; all 5 new direct
  payload["data"] accesses now route through it (consistent error
  messages, no KeyError on malformed-but-200 responses).
- Restore master's name-only connector_pattern matching in DB mode;
  the new id-OR-name match silently changed which connectors a
  pre-existing `deny: ["abc123"]` recipe would catch. REST mode
  keeps the OR-match (no compat baseline to preserve there).
- Stop synthesizing URN dataPlatform from the raw discovered.service
  string for unknown services. Previously emitted junk like
  urn:li:dataPlatform:aurora_postgres_warehouse_v2; now leaves
  platform=None so build_destination_urn raises and the existing
  per-destination dedup'd warning fires once. Regression test added.
- Add per-PlatformDetail database_lowercase: bool toggle (default True)
  with a database_for_urn @Property used at both source-URN and
  destination-URN sites. Default preserves existing lowercase URN
  behaviour; users who need to align with another DataHub source whose
  URN preserves database casing can opt out per-destination. Plain
  @Property (not @computed_field) so the derived value stays out of
  model_dump() and doesn't leak into _compose_custom_properties — the
  customProperties aspect intentionally surfaces the user-typed
  `database` verbatim. Regression test pins this.
@treff7es
Copy link
Copy Markdown
Contributor Author

treff7es commented May 5, 2026

Thanks for the careful pass — both blockers were real. All 7 inline comments + both review-level blockers addressed in 8729960fa2 (squashed). Per-comment replies inline.

Quick summary of what changed in 8729960fa2:

  • report_connectors_scanned() now counted only at the canonical site in fivetran.py:817; the duplicate REST-side increment was removed.
  • REST get_user_email now uses if not user_id: to match the DB reader, so the empty-string sentinel short-circuits as expected (no more N-group list_users walk per anonymous connector).
  • connector_patterns matching restored to name-only in DB mode to preserve master's contract; REST keeps the OR-match (no compat baseline there). Comment in REST reader spells out the asymmetry.
  • Unknown-service fallback no longer synthesizes a URN platform from the raw service string. platform=None flows through; the existing per-destination dedup'd warning fires once. Regression test added.
  • All payload["data"] direct accesses on the new REST paths route through a new _extract_data(payload, *, context) helper.
  • Per-PlatformDetail database_lowercase: bool = True toggle (default preserves existing behaviour) addresses the lowercase concern; database_for_urn @property centralises the rule for both source-URN and destination-URN sites. Plain @property (not @computed_field) so it stays out of model_dump() and doesn't leak into _compose_custom_properties.
  • Dropped unused FivetranSyncHistoryResponse / FivetranSyncHistoryItem.

174 unit + 43 integration tests passing, mypy clean.

@treff7es
Copy link
Copy Markdown
Contributor Author

treff7es commented May 5, 2026

Re: Issue A — report_connectors_scanned() double-counted in REST mode

Confirmed and fixed in 8729960fa2. The increment at fivetran_log_rest_reader.py:263 (per-worker on successful build) and the one at fivetran.py:817 (start of _get_connector_workunits) were both running for every REST-mode connector. Kept the fivetran.py site as the single source of truth — it's the one DB mode uses too — and removed the REST-side call. The _report_lock is still used by the seven other report-write sites in that file so the lock itself stays.

@treff7es
Copy link
Copy Markdown
Contributor Author

treff7es commented May 5, 2026

Re: Issue B — empty-string sentinel divergence in REST get_user_email

Fixed in 8729960fa2. fivetran_log_rest_reader.py:134 is now if not user_id:, matching the DB reader's check. The empty-string sentinel set by _build_connector (user_id=listed.connected_by or "") now short-circuits, so anonymous-connected connectors no longer trigger an N-group list_users walk on every cache miss. Added a comment explaining the sentinel for the next reader.

Copy link
Copy Markdown
Contributor

@askumar27 askumar27 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Left a minor comment. This change will require the connector-test golden files update.

# full lineage for that connector via the schemas+columns path.
if self._db_lineage_reader is not None:
try:
lineage_by_id = self._db_lineage_reader.fetch_lineage_for_connectors(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hybrid mode, self._db_lineage_reader.fetch_lineage_for_connectors([connector_id]) is called from inside a rest_api_max_workers-dispatched worker.
The DB method is designed to issue 2 SQL queries regardless of list size. Result: 2×N concurrent warehouse queries instead of 2 sequential ones, and concurrent connections may not inherit the USE DATABASE session state set in _setup_snowflake_engine

@treff7es treff7es merged commit c9cb67b into master May 6, 2026
60 of 61 checks passed
@treff7es treff7es deleted the feat/fivetran-managed-data-lake-iceberg-rest branch May 6, 2026 14:09
david-leifker pushed a commit that referenced this pull request May 27, 2026
- feat(ingest/sigma): emit UpstreamLineage from Data Model elements to warehouse tables (#17296)
- feat(ingest/fivetran): REST API mode and Managed Data Lake destination support (#17217)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ingestion PR or Issue related to the ingestion of metadata needs-review Label for PRs that need review from a maintainer.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants