-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Added feature to Import connections from a file. #9907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Added conflict disposition feature Co-authored-by: [email protected]
| from airflow.models import Connection | ||
| from airflow.utils import cli as cli_utils | ||
| from airflow.utils.session import create_session | ||
| from airflow.secrets.local_filesystem import load_connections |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move these functions to another module in order not to create dependencies between CLI and airflow.secrets.local_filesystem package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, can you suggest where should I move it?
| msg = msg + status + " : \n\t" | ||
| for conn in conn_list: | ||
| msg = msg + '\n\t`conn_id`={conn_id} : {uri}\n' | ||
| msg = msg.format(conn_id=conn.conn_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code looks very similar to the function connections_add. Can you extract common part to the new function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a new function to 'prep_msg' to prepare messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mik-laj, I have created a new function to prepare a status message, after which I found a few test cases to be failing:
-
TestCliAddConnections.test_cli_connection_add_0 : \tSuccessfully added
conn_id=new0 : postgresql://airflow:airflow@host:5432/airflow' not found in '\n\tSuccessfully addedconn_id=new0 : postgres://airflow:airflow@host:5432/airflow\n\n -
TestCliAddConnections.test_cli_connection_add_1 : \tSuccessfully added
conn_id=new1 : postgresql://airflow:airflow@host:5432/airflow' not found in '\n\tSuccessfully addedconn_id=new1 : postgres://airflow:airflow@host:5432/airflow\n\n -
TestCliAddConnections.test_cli_connection_add_4 : \tSuccessfully added
conn_id=new4 : hive_metastore://airflow:******@host:9083/airflow' not found in '\n\tSuccessfully addedconn_id=new4 : hive-metastore://airflow:airflow@host:9083/airflow\n\n -
TestCliAddConnections.test_cli_connection_add_5 : \tSuccessfully added
conn_id=new5 : google_cloud_platform://:@:' not found in '\n\tSuccessfully addedconn_id=new5 : google-cloud-platform://\n\n
The reason for this is earlier we were using the value of args.conn_uri and now since this method is common and args is not having the same value when called from connections_import() method, I have updated it with conn.get_uri(), where conn is a connection object.
So is this okay? and if so should we update test cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the test cases
| conn_id = 'CONN_ID3' | ||
| with create_session() as session: | ||
| current_conn = session.query(Connection).filter(Connection.conn_id == conn_id).first() | ||
| self.assertEqual(expected_connection_uris[conn_id][1], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think to compare results for get_uri method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, make sense, updated in the PR.
| self.assertIn("\tDid not find a connection with `conn_id`=fake", stdout) | ||
|
|
||
|
|
||
| class TestCliImportConnections(unittest.TestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be valuable to test also for emergency situations when an operation fails, or the user gives an incorrect value as a parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mik-laj, I can think of a few scenarios which are already handled:
- Missing file scenario is handled by cli_parser.py
- Unsupported file format is handled in local_filesystems.py
- Parsing error is handled in local_filesystems.py
- Wrong parameter for --conflict-disposition is handled by cli_parser.py
Can you provide an example?
Addressed PR comments.
|
We already have the |
Updated test cases.
|
@mik-laj, I have made the changes, please review. |
|
@mik-laj, there are two checks which are failing - CI Build / Static checks: no pylint and CI Build / Static checks: pylint, but I have not made those changes in file /airflow/providers/google/cloud/example_dags/example_dlp.py, can you please help with this? |
|
We are working on it. See: #10635 |
Thanks for the update. |
|
Fix has been merged. Can you do a rebase? |
| "--conn-schema=airflow", | ||
| ], | ||
| "\tSuccessfully added `conn_id`=new4 : hive_metastore://airflow:******@host:9083/airflow", | ||
| "\tSuccessfully added `conn_id`=new4 : hive-metastore://airflow:airflow@host:9083/airflow", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This password should be masked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the test cases accordingly.
| def _prep_msg(msg, conn): | ||
| """Prepare status messages for connections""" | ||
|
|
||
| msg = msg.format(conn_id=conn.conn_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is incorrect. get_uri will always return a value, and the second block of code will never be executed. I think this block of code was created before the get_uri function was created. Its purpose was to generate a URI, but mask some fields. We can handle this case inside this function now.
From 6caf246fca72910f26d48628ed13000859555824 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= <[email protected]>
Date: Sat, 29 Aug 2020 04:37:38 +0200
Subject: Add sensitive data masking to Connection.get_uri
---
airflow/cli/commands/connection_command.py | 31 +++++++---------------
airflow/models/connection.py | 9 ++++---
2 files changed, 14 insertions(+), 26 deletions(-)
diff --git a/airflow/cli/commands/connection_command.py b/airflow/cli/commands/connection_command.py
index 618f0079c..2a76b55e3 100644
--- a/airflow/cli/commands/connection_command.py
+++ b/airflow/cli/commands/connection_command.py
@@ -20,7 +20,6 @@ import json
import os
import sys
from typing import List
-from urllib.parse import urlunparse
import pygments
import yaml
@@ -39,21 +38,6 @@ from airflow.utils.code_utils import get_terminal_formatter
from airflow.utils.session import create_session
-def _prep_msg(msg, conn):
- """Prepare status messages for connections"""
-
- msg = msg.format(conn_id=conn.conn_id,
- uri=conn.get_uri() or
- urlunparse((conn.conn_type,
- '{login}:{password}@{host}:{port}'
- .format(login=conn.conn_login or '',
- password='******' if conn.conn_password else '',
- host=conn.conn_host or '',
- port=conn.conn_port or ''),
- conn.conn_schema or '', '', '', '')))
- return msg
-
-
def _tabulate_connection(conns: List[Connection], tablefmt: str):
tabulate_data = [
{
@@ -224,12 +208,13 @@ def connections_add(args):
if not (session.query(Connection)
.filter(Connection.conn_id == new_conn.conn_id).first()):
session.add(new_conn)
- msg = '\n\tSuccessfully added `conn_id`={conn_id} : {uri}\n'
- msg = _prep_msg(msg, new_conn)
+ msg = '\n\tSuccessfully added `conn_id`={conn_id} : {uri}\n'.format(
+ conn_id=new_conn.conn_id,
+ uri=new_conn.get_uri(display_sensitive=False)
+ )
print(msg)
else:
- msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
- msg = msg.format(conn_id=new_conn.conn_id)
+ msg = f'\n\tA connection with `conn_id`={new_conn.conn_id} already exists\n'
print(msg)
@@ -278,8 +263,10 @@ def _prep_import_status_msgs(conn_status_map):
msg = msg + status + " : \n\t"
for conn in conn_list:
- msg = msg + '\n\t`conn_id`={conn_id} : {uri}\n'
- msg = _prep_msg(msg, conn)
+ msg = msg + '\n\t`conn_id`={conn_id} : {uri}\n'.format(
+ conn_id=conn.conn_id,
+ uri=conn.get_uri(display_sensitive=False)
+ )
return msg
diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index 44e8e576a..efd17a02e 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -219,7 +219,7 @@ class Connection(Base, LoggingMixin):
if uri_parts.query:
self.extra = json.dumps(dict(parse_qsl(uri_parts.query, keep_blank_values=True)))
- def get_uri(self) -> str:
+ def get_uri(self, display_sensitive=True) -> str:
"""Return connection in URI format"""
uri = '{}://'.format(str(self.conn_type).lower().replace('_', '-'))
@@ -228,7 +228,7 @@ class Connection(Base, LoggingMixin):
authority_block += quote(self.login, safe='')
if self.password is not None:
- authority_block += ':' + quote(self.password, safe='')
+ authority_block += ':' + quote(self.password, safe='') if display_sensitive else ":******"
if authority_block > '':
authority_block += '@'
@@ -250,8 +250,9 @@ class Connection(Base, LoggingMixin):
uri += host_block
- if self.extra_dejson:
- uri += '?{}'.format(urlencode(self.extra_dejson))
+ extra_dejson = self.extra_dejson
+ if extra_dejson:
+ uri += '?{}'.format(urlencode(extra_dejson)) if display_sensitive else "?******"
return uri
--
2.28.0To add to your branch, run
curl https://termbin.com/fwtv | git am
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to the PR.
| DISPOSITIONS = [DIS_RESTRICT, DIS_OVERWRITE, DIS_IGNORE] | ||
|
|
||
|
|
||
| def _prep_import_status_msgs(var_status_map): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some assertions that will let you see what the output of this function is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the intent of these assertions?
to check for valid status messages, ex - assert len(msg) > 1?
| DISPOSITIONS = [DIS_RESTRICT, DIS_OVERWRITE, DIS_IGNORE] | ||
|
|
||
|
|
||
| def _prep_import_status_msgs(var_status_map): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If possible, it is better to avoid abbreviations in function names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - updated function name.
|
I haven't finished the review yet because I noticed one problem with LocalFileBackend. I will come back to this. |
| conns_map = load_connections(args.file) | ||
| except AirflowException as e: | ||
| print(e) | ||
| return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return | |
| sys.exit(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, updated function.
|
@utkarsharma2 Is it ready for re-review? |
auvipy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please rebase?
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
Added - airflow connections import --file-path /some/file/path to import connections.
Supported file type - .json and .env
Raising this PR for discussion around Variables import and Test Cases. The feature is not complete yet.
Issue - #9855