Skip to content

Conversation

@natanweinberger
Copy link
Contributor

The CLI command airflow connections import <filepath> cannot overwrite existing connections. Conflicts are skipped with the message Could not import connection <connection_id>: connection already exists.

This PR adds functionality to set --overwrite=true to overwrite existing connections when there is a conflict.

The existing behavior (no overwrites) is still the default behavior, the user must set --overwrite=true to enable overwrites.

Original idea for this: #15177 (comment)

@natanweinberger natanweinberger force-pushed the add-connection-cli-overwrite-flag branch from e5a8b2a to aa00851 Compare January 4, 2023 21:09
@natanweinberger natanweinberger force-pushed the add-connection-cli-overwrite-flag branch from aa00851 to daeae0f Compare January 4, 2023 21:51
@natanweinberger natanweinberger changed the title Add connection cli overwrite flag Add --overwrite option to connections import CLI Jan 5, 2023
@natanweinberger natanweinberger changed the title Add --overwrite option to connections import CLI Add --overwrite option to connections import CLI command Jan 5, 2023
Comment on lines 321 to 325
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the overwrite=True case I wonder if it’s a good idea to update the existing row instead of a delete-insert combination. Maybe even upsert on supported database backends?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. I tried to get updates working, but ran into IntegrityErrors a bunch due to how flushes work. I fell back on delete/flush/create, but I'll take another look today to see if I can get update working.

Thanks for the review!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case this helps any future readers (or me 😁), the IntegrityErrors were due to the fact that I thought conn_id was the PK for connections. When I merged a new Connection into the session to overwrite an existing one with the same conn_id, the session wouldn't reconcile the existing and new entries.

While there is a unique constraint on conn_id, the PK is id. The two entries having different PKs with the same conn_id is what caused the IntegrityError.

I fixed that by setting conn.id = existing_conn.id and have upserts working.

@natanweinberger natanweinberger force-pushed the add-connection-cli-overwrite-flag branch from 40b14fa to a635962 Compare January 5, 2023 21:22
Comment on lines 325 to 327
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overwrite part is technically unnecessary; if there is an existing_conn, overwrite is always True.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could even refactor this entire block into

existing_id = session.query(Connection.id).filter(Connection.conn_id == conn_id).first()
if existing_id is not None:
    if not overwrite:
        continue
    conn.id = existing_id
session.merge(conn)

(I omitted some code not related to the flow control.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I'll refactor that.

@natanweinberger natanweinberger force-pushed the add-connection-cli-overwrite-flag branch from a635962 to 965c239 Compare January 6, 2023 04:15
@natanweinberger natanweinberger force-pushed the add-connection-cli-overwrite-flag branch from 965c239 to ab0ae2a Compare January 6, 2023 04:32
if session.query(Connection).filter(Connection.conn_id == conn_id).first():
print(f"Could not import connection {conn_id}: connection already exists.")
continue
existing_conn_id = session.query(Connection.id).filter(Connection.conn_id == conn_id).scalar()
Copy link
Contributor Author

@natanweinberger natanweinberger Jan 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorporated the feedback above with one tweak:

Using .first() yields a tuple representing the select clause, like (1,).

Using .scalar() gets the first row and returns the first item in the tuple (or None, if no rows are returned).

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice and helpful for important use cases. Good idea @natanweinberger !

@potiuk potiuk merged commit 04edef3 into apache:main Jan 6, 2023
@pierrejeambrun pierrejeambrun added this to the Airflow 2.6.0 milestone Jan 9, 2023
@pierrejeambrun pierrejeambrun added the type:new-feature Changelog: New Features label Jan 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:CLI type:new-feature Changelog: New Features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants