Skip to content

Conversation

@utkarsharma2
Copy link
Contributor

Added - airflow connections import --file-path /some/file/path to import connections.
Supported file type - .json and .env

Raising this PR for discussion around Variables import and Test Cases. The feature is not complete yet.

Issue - #9855

Added CLI option to import connections from a file #9855
@utkarsharma2 utkarsharma2 changed the title Added feature to Import connections from a file. Added feature to Import connections from a file. issue - #9855 Jul 21, 2020
@utkarsharma2 utkarsharma2 changed the title Added feature to Import connections from a file. issue - #9855 Added feature to Import connections from a file. Jul 21, 2020
usharma and others added 2 commits July 22, 2020 03:34
@turbaszek turbaszek requested a review from mik-laj July 31, 2020 09:50
from airflow.models import Connection
from airflow.utils import cli as cli_utils
from airflow.utils.session import create_session
from airflow.secrets.local_filesystem import load_connections
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move these functions to another module in order not to create dependencies between CLI and airflow.secrets.local_filesystem package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, can you suggest where should I move it?

msg = msg + status + " : \n\t"
for conn in conn_list:
msg = msg + '\n\t`conn_id`={conn_id} : {uri}\n'
msg = msg.format(conn_id=conn.conn_id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code looks very similar to the function connections_add. Can you extract common part to the new function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a new function to 'prep_msg' to prepare messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mik-laj, I have created a new function to prepare a status message, after which I found a few test cases to be failing:

  • TestCliAddConnections.test_cli_connection_add_0 : \tSuccessfully added conn_id=new0 : postgresql://airflow:airflow@host:5432/airflow' not found in '\n\tSuccessfully added conn_id=new0 : postgres://airflow:airflow@host:5432/airflow\n\n

  • TestCliAddConnections.test_cli_connection_add_1 : \tSuccessfully added conn_id=new1 : postgresql://airflow:airflow@host:5432/airflow' not found in '\n\tSuccessfully added conn_id=new1 : postgres://airflow:airflow@host:5432/airflow\n\n

  • TestCliAddConnections.test_cli_connection_add_4 : \tSuccessfully added conn_id=new4 : hive_metastore://airflow:******@host:9083/airflow' not found in '\n\tSuccessfully added conn_id=new4 : hive-metastore://airflow:airflow@host:9083/airflow\n\n

  • TestCliAddConnections.test_cli_connection_add_5 : \tSuccessfully added conn_id=new5 : google_cloud_platform://:@:' not found in '\n\tSuccessfully added conn_id=new5 : google-cloud-platform://\n\n

The reason for this is earlier we were using the value of args.conn_uri and now since this method is common and args is not having the same value when called from connections_import() method, I have updated it with conn.get_uri(), where conn is a connection object.

So is this okay? and if so should we update test cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test cases

conn_id = 'CONN_ID3'
with create_session() as session:
current_conn = session.query(Connection).filter(Connection.conn_id == conn_id).first()
self.assertEqual(expected_connection_uris[conn_id][1],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think to compare results for get_uri method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, make sense, updated in the PR.

self.assertIn("\tDid not find a connection with `conn_id`=fake", stdout)


class TestCliImportConnections(unittest.TestCase):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be valuable to test also for emergency situations when an operation fails, or the user gives an incorrect value as a parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mik-laj, I can think of a few scenarios which are already handled:

  1. Missing file scenario is handled by cli_parser.py
  2. Unsupported file format is handled in local_filesystems.py
  3. Parsing error is handled in local_filesystems.py
  4. Wrong parameter for --conflict-disposition is handled by cli_parser.py

Can you provide an example?

Addressed PR comments.
@mik-laj
Copy link
Member

mik-laj commented Aug 9, 2020

We already have the airflow connections import command. Your command will be a great complement to it.
https://airflow.readthedocs.io/en/latest/howto/connection/index.html#exporting-connections-from-the-cli

@utkarsharma2
Copy link
Contributor Author

@mik-laj, I have made the changes, please review.

@utkarsharma2
Copy link
Contributor Author

@mik-laj, there are two checks which are failing - CI Build / Static checks: no pylint and CI Build / Static checks: pylint, but I have not made those changes in file /airflow/providers/google/cloud/example_dags/example_dlp.py, can you please help with this?

@mik-laj
Copy link
Member

mik-laj commented Aug 28, 2020

We are working on it. See: #10635

@utkarsharma2
Copy link
Contributor Author

We are working on it. See: #10635

Thanks for the update.

@mik-laj
Copy link
Member

mik-laj commented Aug 28, 2020

Fix has been merged. Can you do a rebase?

"--conn-schema=airflow",
],
"\tSuccessfully added `conn_id`=new4 : hive_metastore://airflow:******@host:9083/airflow",
"\tSuccessfully added `conn_id`=new4 : hive-metastore://airflow:airflow@host:9083/airflow",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This password should be masked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test cases accordingly.

def _prep_msg(msg, conn):
"""Prepare status messages for connections"""

msg = msg.format(conn_id=conn.conn_id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is incorrect. get_uri will always return a value, and the second block of code will never be executed. I think this block of code was created before the get_uri function was created. Its purpose was to generate a URI, but mask some fields. We can handle this case inside this function now.

From 6caf246fca72910f26d48628ed13000859555824 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= <[email protected]>
Date: Sat, 29 Aug 2020 04:37:38 +0200
Subject: Add sensitive data masking to Connection.get_uri

---
 airflow/cli/commands/connection_command.py | 31 +++++++---------------
 airflow/models/connection.py               |  9 ++++---
 2 files changed, 14 insertions(+), 26 deletions(-)

diff --git a/airflow/cli/commands/connection_command.py b/airflow/cli/commands/connection_command.py
index 618f0079c..2a76b55e3 100644
--- a/airflow/cli/commands/connection_command.py
+++ b/airflow/cli/commands/connection_command.py
@@ -20,7 +20,6 @@ import json
 import os
 import sys
 from typing import List
-from urllib.parse import urlunparse
 
 import pygments
 import yaml
@@ -39,21 +38,6 @@ from airflow.utils.code_utils import get_terminal_formatter
 from airflow.utils.session import create_session
 
 
-def _prep_msg(msg, conn):
-    """Prepare status messages for connections"""
-
-    msg = msg.format(conn_id=conn.conn_id,
-                     uri=conn.get_uri() or
-                     urlunparse((conn.conn_type,
-                                '{login}:{password}@{host}:{port}'
-                                 .format(login=conn.conn_login or '',
-                                         password='******' if conn.conn_password else '',
-                                         host=conn.conn_host or '',
-                                         port=conn.conn_port or ''),
-                                 conn.conn_schema or '', '', '', '')))
-    return msg
-
-
 def _tabulate_connection(conns: List[Connection], tablefmt: str):
     tabulate_data = [
         {
@@ -224,12 +208,13 @@ def connections_add(args):
         if not (session.query(Connection)
                 .filter(Connection.conn_id == new_conn.conn_id).first()):
             session.add(new_conn)
-            msg = '\n\tSuccessfully added `conn_id`={conn_id} : {uri}\n'
-            msg = _prep_msg(msg, new_conn)
+            msg = '\n\tSuccessfully added `conn_id`={conn_id} : {uri}\n'.format(
+                conn_id=new_conn.conn_id,
+                uri=new_conn.get_uri(display_sensitive=False)
+            )
             print(msg)
         else:
-            msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
-            msg = msg.format(conn_id=new_conn.conn_id)
+            msg = f'\n\tA connection with `conn_id`={new_conn.conn_id} already exists\n'
             print(msg)
 
 
@@ -278,8 +263,10 @@ def _prep_import_status_msgs(conn_status_map):
 
         msg = msg + status + " : \n\t"
         for conn in conn_list:
-            msg = msg + '\n\t`conn_id`={conn_id} : {uri}\n'
-            msg = _prep_msg(msg, conn)
+            msg = msg + '\n\t`conn_id`={conn_id} : {uri}\n'.format(
+                conn_id=conn.conn_id,
+                uri=conn.get_uri(display_sensitive=False)
+            )
     return msg
 
 
diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index 44e8e576a..efd17a02e 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -219,7 +219,7 @@ class Connection(Base, LoggingMixin):
         if uri_parts.query:
             self.extra = json.dumps(dict(parse_qsl(uri_parts.query, keep_blank_values=True)))
 
-    def get_uri(self) -> str:
+    def get_uri(self, display_sensitive=True) -> str:
         """Return connection in URI format"""
         uri = '{}://'.format(str(self.conn_type).lower().replace('_', '-'))
 
@@ -228,7 +228,7 @@ class Connection(Base, LoggingMixin):
             authority_block += quote(self.login, safe='')
 
         if self.password is not None:
-            authority_block += ':' + quote(self.password, safe='')
+            authority_block += ':' + quote(self.password, safe='') if display_sensitive else ":******"
 
         if authority_block > '':
             authority_block += '@'
@@ -250,8 +250,9 @@ class Connection(Base, LoggingMixin):
 
         uri += host_block
 
-        if self.extra_dejson:
-            uri += '?{}'.format(urlencode(self.extra_dejson))
+        extra_dejson = self.extra_dejson
+        if extra_dejson:
+            uri += '?{}'.format(urlencode(extra_dejson)) if display_sensitive else "?******"
 
         return uri
 
-- 
2.28.0

To add to your branch, run

curl https://termbin.com/fwtv | git am

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the PR.

DISPOSITIONS = [DIS_RESTRICT, DIS_OVERWRITE, DIS_IGNORE]


def _prep_import_status_msgs(var_status_map):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some assertions that will let you see what the output of this function is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the intent of these assertions?
to check for valid status messages, ex - assert len(msg) > 1?

DISPOSITIONS = [DIS_RESTRICT, DIS_OVERWRITE, DIS_IGNORE]


def _prep_import_status_msgs(var_status_map):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, it is better to avoid abbreviations in function names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - updated function name.

@mik-laj
Copy link
Member

mik-laj commented Aug 29, 2020

I haven't finished the review yet because I noticed one problem with LocalFileBackend. I will come back to this.
https://github.com/apache/airflow/pull/10638/files

conns_map = load_connections(args.file)
except AirflowException as e:
print(e)
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return
sys.exit(1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, updated function.

@mik-laj mik-laj self-requested a review October 29, 2020 10:27
@mik-laj
Copy link
Member

mik-laj commented Oct 29, 2020

@utkarsharma2 Is it ready for re-review?

Copy link
Contributor

@auvipy auvipy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please rebase?

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Mar 17, 2021
@github-actions github-actions bot closed this Mar 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:CLI stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants