-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add endpoints for task instances #9597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
jhtimmins
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @mik-laj, apologies that I only noticed the WIP flag after I added some comments. I'll leave them in case they're helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this should live inside API endpoint code. Fetching a task instance is pretty generic functionality. If we don't already support code that does this, we should probably add it to TaskInstance.py or similar files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inside Airflow, we use execution_date as the primary identifier. However, we voted to use dag_run_id in the API
https://lists.apache.org/thread.html/rd4be3829627dcef8b40314c62c041f460992786f3bfcc634d25a6664%40%3Cdev.airflow.apache.org%3E
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry for being unclear. I was suggesting get_task_instance be moved to a different file outside the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
)Can be a follow up PR (and likely candidate would be to TaskIstance.get_by_run_id())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This filter seems somewhat ambiguous. I think the name could better express the purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not know if I understand correctly. Can you say more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mmmm actually nevermind. I think it makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function accepts 16 parameters. If we really need to pass in this many configuration details, is there a data structure that would better encapsulate system state than so many standalone params?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use connexion, which fills these parameters based on the API specification.
https://connexion.readthedocs.io/en/latest/request.html#automatic-parameter-handling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Get list of a task instances | |
| Get list of task instances |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the preceding two lines are duplicates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I will remove it
927896f to
201b31c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we have an n+1 problem. I think we can fetch it much more efficiently if we use the more advanced features of SQLAlchemy.
https://docs.sqlalchemy.org/en/13/orm/loading_relationships.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @mik-laj we are not using relationships in the model so I was planning to use explicit joins to overcome this n + 1 problem the only downside of that it is returns a tuple of object (<task instance>,<sla miss>) so I was planning to overwrite get_attribute. Let me know if this sounds like a good idea. I will be doing something similar here
class TaskInstanceReferenceSchema(Schema):
"""Schema for the task instance reference schema"""
task_id = fields.Str()
dag_run_id = fields.Method('get_run_id')
dag_id = fields.Str()
execution_date = fields.DateTime()
@staticmethod
def get_run_id(obj: TaskInstance):
with create_session() as session:
return session.query(DagRun).filter(DagRun.dag_id==obj.dag_id, DagRun.execution_date==obj.execution_date).one_or_none().run_id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you need it, we can start using relationships.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to look at it tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, in the meantime I will prepare this solution
9b76ceb to
a51527e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mik-laj I have dealt with the n +1 problem, but I am unsure if this query is the best way to go forward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mik-laj is there a better approach that I should work on or is this fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
|
I have a problem with this solution. I added one test that tests this endpoint with task instance and SLAMiss on this branch. Can you take a look at it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mik-laj Passing multiple columns to distinct works perfectly with postgres but raises error in case of sqlite how should we handle this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that the count query doesn't need join, so we can simplify everything and improve SQL compatibility.
|
Everything works. I am merging all commits into one commit on my branch and doing rebase now |
d9d2fdd to
f56ae3a
Compare
f56ae3a to
3bc9b7e
Compare
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
3bc9b7e to
8a6c35e
Compare
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
586a0b1 to
6399bfc
Compare
| .join(DR, and_(TI.dag_id == DR.dag_id, TI.execution_date == DR.execution_date)) | ||
| .filter(DR.run_id == dag_run_id) | ||
| .filter(TI.task_id == task_id) | ||
| .join( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| .join( | |
| .outerjoin( |
if we want to do an outerjoin instead of isouter=True
| ('can_read', 'DagRun'), | ||
| ("can_read", "Dag"), | ||
| ("can_read", "DagRun"), | ||
| ('can_read', 'Task'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change single to double quotes here too for consistency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. :-)
kaxil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left small comments
|
@turbaszek @kaxil @jhtimmins Can I ask for a review? |
kaxil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 minor suggestion left
Co-authored-by: Kaxil Naik <[email protected]>
Make sure to mark the boxes below before creating PR: [x]
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.