-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add extra links endpoint #9475
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add extra links endpoint #9475
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| execution_date=quote(dttm.isoformat()), | |
| execution_date=quote_plus(dttm.isoformat()), |
I am thinking that quote_plus will be better here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2020-01-01T00:00:00+00:00
It doesn't make a difference here because we don't have spaces.
f2cef7f to
6e556bd
Compare
| dagbag: DagBag = current_app.dag_bag | ||
| dag: DAG = dagbag.get_dag(dag_id) | ||
| if not dag: | ||
| raise NotFound("DAG not found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| raise NotFound("DAG not found") | |
| raise NotFound(detail=f"DAG with id: '{dag_id}' not found") |
The errors are a bit more descriptive this way, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should change the title too. Otherwise, it will have a default value - "Object not found". After that it's a good idea.
| @parameterized.expand( | ||
| [ | ||
| ( | ||
| "/api/v1/dags/INVALID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_SINGLE_QUERY/links", | ||
| 'DAG not found' | ||
| ), | ||
| ( | ||
| "/api/v1/dags/TEST_DAG_ID/dagRuns/INVALID/taskInstances/TEST_SINGLE_QUERY/links", | ||
| "DAG Run not found" | ||
| ), | ||
| ( | ||
| "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/INVALID/links", | ||
| "Task not found" | ||
| ), | ||
| ] | ||
| ) | ||
| def test_should_response_404_on_invalid_task_id(self, url, expected_title): | ||
| response = self.client.get(url) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| @parameterized.expand( | |
| [ | |
| ( | |
| "/api/v1/dags/INVALID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_SINGLE_QUERY/links", | |
| 'DAG not found' | |
| ), | |
| ( | |
| "/api/v1/dags/TEST_DAG_ID/dagRuns/INVALID/taskInstances/TEST_SINGLE_QUERY/links", | |
| "DAG Run not found" | |
| ), | |
| ( | |
| "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/INVALID/links", | |
| "Task not found" | |
| ), | |
| ] | |
| ) | |
| def test_should_response_404_on_invalid_task_id(self, url, expected_title): | |
| response = self.client.get(url) | |
| @parameterized.expand( | |
| [ | |
| ( | |
| "invalid dag_id", | |
| "/api/v1/dags/INVALID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_SINGLE_QUERY/links", | |
| 'DAG not found' | |
| ), | |
| ( | |
| "invalid run_id", | |
| "/api/v1/dags/TEST_DAG_ID/dagRuns/INVALID/taskInstances/TEST_SINGLE_QUERY/links", | |
| "DAG Run not found" | |
| ), | |
| ( | |
| "invalid task_id", | |
| "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/INVALID/links", | |
| "Task not found" | |
| ), | |
| ] | |
| ) | |
| def test_should_response_404_on(self, name, url, expected_title): | |
| del name | |
| response = self.client.get(url) |
These tests are dealing with multiple invalid id(s), this is IMO more descriptive, what do you think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In many places, we don't use names. We already have one name that allows you to identify a specific test, then the number allows you to specify a specific variant. Many names will limit the number of tests. What I love in parameterized is the ability to generate many cases in a simple way. It is possible that some cases may overlap, but this may change in the future.
| # TODO(mik-laj): We have to implement it. | ||
| # Do you want to help? Please look at: https://github.com/apache/airflow/issues/8140 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # TODO(mik-laj): We have to implement it. | |
| # Do you want to help? Please look at: https://github.com/apache/airflow/issues/8140 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this note.
| "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_SINGLE_QUERY/links" | ||
| ) | ||
|
|
||
| self.assertEqual(200, response.status_code, response.data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| self.assertEqual(200, response.status_code, response.data) | |
| self.assertEqual(200, response.status_code) |
I have never used it but I guess response.data returns the response.json in bytes literal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 argument is just a message for the developer. It can be in the form of bytes if it is safer and more readable.
| raise NotFound("Task not found") | ||
|
|
||
| execution_date = ( | ||
| session.query(DR.execution_date).filter(DR.dag_id == dag_id).filter(DR.run_id == dag_run_id).scalar() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| session.query(DR.execution_date).filter(DR.dag_id == dag_id).filter(DR.run_id == dag_run_id).scalar() | |
| session.query(DR.execution_date).filter(DR.dag_id == dag_id, DR.run_id == dag_run_id).scalar() |
I'm wrong about using one. scalar() or first() is better
|
|
||
| @provide_session | ||
| def setUp(self, session) -> None: | ||
| self.now = datetime(2020, 1, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| self.now = datetime(2020, 1, 1) | |
| self.default_time = datetime(2020, 1, 1) |
I suggest we use default_time instead of now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks.
|
|
||
| import pytest | ||
| from parameterized import parameterized | ||
| from test_utils.mock_plugins import mock_plugin_manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| from test_utils.mock_plugins import mock_plugin_manager | |
| from tests.test_utils.mock_plugins import mock_plugin_manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks.
| BigQueryExecuteQueryOperator(task_id="TEST_SINGLE_QUERY", sql="SELECT 1") | ||
| BigQueryExecuteQueryOperator(task_id="TEST_MULTIPLE_QUERY", sql=["SELECT 1", "SELECT 2"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| BigQueryExecuteQueryOperator(task_id="TEST_SINGLE_QUERY", sql="SELECT 1") | |
| BigQueryExecuteQueryOperator(task_id="TEST_MULTIPLE_QUERY", sql=["SELECT 1", "SELECT 2"]) | |
| BigQueryInsertJobOperator(task_id="TEST_SINGLE_QUERY", sql="SELECT 1") | |
| BigQueryInsertJobOperator(task_id="TEST_MULTIPLE_QUERY", sql=["SELECT 1", "SELECT 2"]) |
This operator BigQueryExecuteQueryOperator is deprecated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BigQueryInsertJobOperator doesn't have extra links.
|
@turbaszek @michalslowikowski00 Can you look at it? |
| dagbag: DagBag = current_app.dag_bag | ||
| dag: DAG = dagbag.get_dag(dag_id) | ||
| if not dag: | ||
| raise NotFound("DAG not found", detail=f'DAG with ID = "{dag_id}" not found') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we create DagNotFound and TaskNotFound exceptions to unify this between endpoints?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future, this is possible when we focus on improving exceptions to add additional information (type field).
Close: #8140
Make sure to mark the boxes below before creating PR: [x]
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.