-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add metric for scheduling delay between first run task & expected start time #9544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
199a5fa to
ffaacb3
Compare
|
@jhtimmins @ashb @turbaszek mind taking a look when you get a chance |
|
@mik-laj please when you get a chance |
|
sorry to keep pinging @ashb , if you get a chance |
|
I started looking at this change and I have three questions.
|
|
I have the answer to the 3rd question. We only test cases when DAG Run are still running. |
|
Can you add this test case to avoid regression in number of queries? @provide_session
def test_process_dags_queries_count_after_finish_dag_run(self, session):
with mock.patch.dict("os.environ", {
"PERF_DAGS_COUNT": "3",
"PERF_TASKS_COUNT": "20",
"PERF_START_AGO": "1d",
"PERF_SCHEDULE_INTERVAL": "16h",
"PERF_SHAPE": "grid",
}), conf_vars({
('scheduler', 'use_job_schedule'): 'True',
}):
dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False)
processor = DagFileProcessor([], mock.MagicMock())
# Create new DAG Runs
with assert_queries_count(28):
processor._process_dags(dagbag.dags.values())
self.assertEqual(session.query(DagRun).count(), 3)
self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 3)
# No new DAG Run
with assert_queries_count(19):
processor._process_dags(dagbag.dags.values())
self.assertEqual(session.query(DagRun).count(), 3)
self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 3)
session.query(TaskInstance).update({
"state": State.SUCCESS,
"start_date": timezone.utcnow(),
"end_date": timezone.utcnow(),
"duration": 0,
})
# Finish Dag Runs
with assert_queries_count(19):
processor._process_dags(dagbag.dags.values())
self.assertEqual(session.query(DagRun).count(), 3)
self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 0)
# No new DAG Runs
with assert_queries_count(7):
processor._process_dags(dagbag.dags.values())
self.assertEqual(session.query(DagRun).count(), 3)
self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 0) @provide_session
def test_process_dags_queries_count_after_finish_dag_run(self, session):
with mock.patch.dict("os.environ", {
"PERF_DAGS_COUNT": "3",
"PERF_TASKS_COUNT": "20",
"PERF_START_AGO": "1d",
"PERF_SCHEDULE_INTERVAL": "16h",
"PERF_SHAPE": "grid",
}), conf_vars({
('scheduler', 'use_job_schedule'): 'True',
}):
dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False)
processor = DagFileProcessor([], mock.MagicMock())
# Create new DAG Runs
with assert_queries_count(28):
processor._process_dags(dagbag.dags.values())
self.assertEqual(session.query(DagRun).count(), 3)
self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 3)
# No new DAG Run
with assert_queries_count(19):
processor._process_dags(dagbag.dags.values())
self.assertEqual(session.query(DagRun).count(), 3)
self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 3)
session.query(TaskInstance).update({
"state": State.SUCCESS,
"start_date": timezone.utcnow(),
"end_date": timezone.utcnow(),
"duration": 0,
})
# Finish Dag Runs
with assert_queries_count(19):
processor._process_dags(dagbag.dags.values())
self.assertEqual(session.query(DagRun).count(), 3)
self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 0)
# No new DAG Runs
with assert_queries_count(7):
processor._process_dags(dagbag.dags.values())
self.assertEqual(session.query(DagRun).count(), 3)
self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 0) |
|
@Acehaidrey Is everything okay? Do you need any help? |
|
@mik-laj thank you for all the help. I will get on this next week. To be honest had some family loss due to covid and have been away for 3 weeks. Want to close this out. Just to be sure - adding the tests you have provided me, making sure the filter just gets the single value and not returns the full record. Those are the 2 action items - is there anytihng else? |
|
Sincere condolences @Acehaidrey |
|
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
It seems to me that I do not need to download new data from the database at all. You can get them from the function one level higher. Can you check it? If so, you only need to pass this data to your function. You will also not add new tests as the number of queries will not change. |
|
Hi all - apologies. I am back now, and will complete these updates this weekend to get it in. |
|
@Acehaidrey is there anything we can do to help you with finishing this PR? |
|
Yeah, I'd love to mrege this one. I had ~ 1hr discussion today on similar metrics so it is needed :) |
f56de9e to
1017041
Compare
|
@ryw @potiuk @turbaszek (mik-laj I know you're busy so feel free to ignore and thanks for all your help). If you all get a chance to review this once more.Finally wrapped it up, and rehauled it Have aa few other PRs want to run by you all too after this! |
|
Cool! Some static checks are failing :). I recommend pre-commit installation :) |
|
The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it! |
|
Just updated again. Thank you for the comments. Got the fixes in |
Acehaidrey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool! Some static checks are failing :). I recommend pre-commit installation :)
I did that after :D The breeze environment is awesome to do all this checking btw
airflow/models/dagrun.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to make changes here to support the change jsut commited with @mik-laj
|
@kaxil -> I think this one would be really nice to cherry-pick to 1.10.13. It's not big and I can cherry-pick it - WDYT ? |
…rt time (#9544) Co-authored-by: Ace Haidrey <[email protected]> (cherry picked from commit aac3877)
…rt time (#9544) Co-authored-by: Ace Haidrey <[email protected]> (cherry picked from commit aac3877)
…rt time (#9544) Co-authored-by: Ace Haidrey <[email protected]> (cherry picked from commit aac3877)
…rt time (#9544) Co-authored-by: Ace Haidrey <[email protected]> (cherry picked from commit aac3877)
- BugFix: Tasks with ``depends_on_past`` or ``task_concurrency`` are stuck (apache#12663) - Fix issue with empty Resources in executor_config (apache#12633) - Fix: Deprecated config ``force_log_out_after`` was not used (apache#12661) - Fix empty asctime field in JSON formatted logs (apache#10515) - [AIRFLOW-2809] Fix security issue regarding Flask SECRET_KEY (apache#3651) - [AIRFLOW-2884] Fix Flask SECRET_KEY security issue in www_rbac (apache#3729) - [AIRFLOW-2886] Generate random Flask SECRET_KEY in default config (apache#3738) - Add missing comma in setup.py (apache#12790) - Bugfix: Unable to import Airflow plugins on Python 3.8 (apache#12859) - Fix setup.py missing comma in ``setup_requires`` (apache#12880) - Don't emit first_task_scheduling_delay metric for only-once dags (apache#12835) - Update setup.py to get non-conflicting set of dependencies (apache#12636) - Rename ``[scheduler] max_threads`` to ``[scheduler] parsing_processes`` (apache#12605) - Add metric for scheduling delay between first run task & expected start time (apache#9544) - Add new-style 2.0 command names for Airflow 1.10.x (apache#12725) - Add Kubernetes cleanup-pods CLI command for Helm Chart (apache#11802) - Don't let webserver run with dangerous config (apache#12747) - Replace pkg_resources with importlib.metadata to avoid VersionConflict errors (apache#12694) - Clarified information about supported Databases
…rt time (apache#9544) Co-authored-by: Ace Haidrey <[email protected]> (cherry picked from commit aac3877)
This is all useful when you want to even measure executor delays, considerations for adding more capacity, adding more scheduling processes, etc. Measuring just the delay in the dagrun from its start time to expected time is just one of the measurement, but measuring from the dag run to its first real running job is important as well.
Make sure to mark the boxes below before creating PR: [x]
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.