Skip to content

Conversation

@Acehaidrey
Copy link
Contributor

    This is to emit the true scheduling delay stats, which is defined as the time when the first
    task in DAG starts minus the expected DAG run datetime. This method will be used in the update_state method
    when the state of the DagRun is updated to a completed status (either success or failure).
    The method will find the first started task within the DAG and calculate the expected DagRun start time (
    based on dag.execution_date & dag.schedule_interval), and minus these two to get the delay.

    The emitted data may contains outlier (e.g. when the first task was cleared, so the second task's start_date
    will be used), but we can get ride of the the outliers on the stats side through the dashboards.

    Note, the stat will only be emitted if the DagRun is a scheduler triggered one (i.e. external_trigger is False).

This is all useful when you want to even measure executor delays, considerations for adding more capacity, adding more scheduling processes, etc. Measuring just the delay in the dagrun from its start time to expected time is just one of the measurement, but measuring from the dag run to its first real running job is important as well.


Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@Acehaidrey Acehaidrey force-pushed the tsd2 branch 5 times, most recently from 199a5fa to ffaacb3 Compare June 29, 2020 01:50
@Acehaidrey
Copy link
Contributor Author

@jhtimmins @ashb @turbaszek mind taking a look when you get a chance

@turbaszek turbaszek requested a review from mik-laj June 30, 2020 06:59
@Acehaidrey
Copy link
Contributor Author

@mik-laj can you please take a look at this when you get a chance.
Also @ashb if you have time too

@Acehaidrey
Copy link
Contributor Author

@mik-laj please when you get a chance

@Acehaidrey
Copy link
Contributor Author

sorry to keep pinging @ashb , if you get a chance

@mik-laj
Copy link
Member

mik-laj commented Jul 8, 2020

I started looking at this change and I have three questions.

  1. What is your average difference in value between dagrun.schedule_delay.<dag_id> and dagrun.<dag_id>.first_task_scheduling_delay?
  2. From what I see, you fetch one task with one query. Have you tried to avoid it? It seems to me that you can have this data in your memory. Please look at: airflow/models/dagrun.py:295 (update_state method)
  3. Do you know why this change does not affect jobs.test_scheduler_job.TestDagFileProcessorQueriesCount? It seems to me that there should be a visible problem with too many queries, but for some reason this is not visible.

@mik-laj
Copy link
Member

mik-laj commented Jul 8, 2020

I have the answer to the 3rd question. We only test cases when DAG Run are still running.

@mik-laj
Copy link
Member

mik-laj commented Jul 9, 2020

Can you add this test case to avoid regression in number of queries?

    @provide_session
    def test_process_dags_queries_count_after_finish_dag_run(self, session):
        with mock.patch.dict("os.environ", {
            "PERF_DAGS_COUNT": "3",
            "PERF_TASKS_COUNT": "20",
            "PERF_START_AGO": "1d",
            "PERF_SCHEDULE_INTERVAL": "16h",
            "PERF_SHAPE": "grid",
        }), conf_vars({
            ('scheduler', 'use_job_schedule'): 'True',
        }):
            dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False)
            processor = DagFileProcessor([], mock.MagicMock())

            # Create new DAG Runs
            with assert_queries_count(28):
                processor._process_dags(dagbag.dags.values())

            self.assertEqual(session.query(DagRun).count(), 3)
            self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 3)

            # No new DAG Run
            with assert_queries_count(19):
                processor._process_dags(dagbag.dags.values())

            self.assertEqual(session.query(DagRun).count(), 3)
            self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 3)

            session.query(TaskInstance).update({
                "state": State.SUCCESS,
                "start_date": timezone.utcnow(),
                "end_date": timezone.utcnow(),
                "duration": 0,
            })

            # Finish Dag Runs
            with assert_queries_count(19):
                processor._process_dags(dagbag.dags.values())

            self.assertEqual(session.query(DagRun).count(), 3)
            self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 0)

            # No new DAG Runs
            with assert_queries_count(7):
                processor._process_dags(dagbag.dags.values())

            self.assertEqual(session.query(DagRun).count(), 3)
            self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 0)    @provide_session
    def test_process_dags_queries_count_after_finish_dag_run(self, session):
        with mock.patch.dict("os.environ", {
            "PERF_DAGS_COUNT": "3",
            "PERF_TASKS_COUNT": "20",
            "PERF_START_AGO": "1d",
            "PERF_SCHEDULE_INTERVAL": "16h",
            "PERF_SHAPE": "grid",
        }), conf_vars({
            ('scheduler', 'use_job_schedule'): 'True',
        }):
            dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False)
            processor = DagFileProcessor([], mock.MagicMock())

            # Create new DAG Runs
            with assert_queries_count(28):
                processor._process_dags(dagbag.dags.values())

            self.assertEqual(session.query(DagRun).count(), 3)
            self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 3)

            # No new DAG Run
            with assert_queries_count(19):
                processor._process_dags(dagbag.dags.values())

            self.assertEqual(session.query(DagRun).count(), 3)
            self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 3)

            session.query(TaskInstance).update({
                "state": State.SUCCESS,
                "start_date": timezone.utcnow(),
                "end_date": timezone.utcnow(),
                "duration": 0,
            })

            # Finish Dag Runs
            with assert_queries_count(19):
                processor._process_dags(dagbag.dags.values())

            self.assertEqual(session.query(DagRun).count(), 3)
            self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 0)

            # No new DAG Runs
            with assert_queries_count(7):
                processor._process_dags(dagbag.dags.values())

            self.assertEqual(session.query(DagRun).count(), 3)
            self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 0)

@mik-laj
Copy link
Member

mik-laj commented Jul 22, 2020

@Acehaidrey Is everything okay? Do you need any help?

@Acehaidrey
Copy link
Contributor Author

@mik-laj thank you for all the help. I will get on this next week. To be honest had some family loss due to covid and have been away for 3 weeks. Want to close this out.

Just to be sure - adding the tests you have provided me, making sure the filter just gets the single value and not returns the full record. Those are the 2 action items - is there anytihng else?

@potiuk
Copy link
Member

potiuk commented Aug 6, 2020

Sincere condolences @Acehaidrey

@stale
Copy link

stale bot commented Sep 20, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Sep 20, 2020
@stale stale bot closed this Sep 27, 2020
@mik-laj
Copy link
Member

mik-laj commented Sep 29, 2020

Just to be sure - adding the tests you have provided me, making sure the filter just gets the single value and not returns the full record. Those are the 2 action items - is there anytihng else?

It seems to me that I do not need to download new data from the database at all. You can get them from the function one level higher. Can you check it? If so, you only need to pass this data to your function. You will also not add new tests as the number of queries will not change.

@mik-laj mik-laj reopened this Sep 29, 2020
@stale stale bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Sep 29, 2020
@Acehaidrey
Copy link
Contributor Author

Hi all - apologies. I am back now, and will complete these updates this weekend to get it in.

@turbaszek
Copy link
Member

@Acehaidrey is there anything we can do to help you with finishing this PR?

@potiuk
Copy link
Member

potiuk commented Oct 9, 2020

Yeah, I'd love to mrege this one. I had ~ 1hr discussion today on similar metrics so it is needed :)

@Acehaidrey Acehaidrey force-pushed the tsd2 branch 2 times, most recently from f56de9e to 1017041 Compare November 13, 2020 07:57
@Acehaidrey
Copy link
Contributor Author

@ryw @potiuk @turbaszek (mik-laj I know you're busy so feel free to ignore and thanks for all your help). If you all get a chance to review this once more.Finally wrapped it up, and rehauled it

Have aa few other PRs want to run by you all too after this!

@potiuk
Copy link
Member

potiuk commented Nov 13, 2020

Cool! Some static checks are failing :). I recommend pre-commit installation :)

@github-actions
Copy link

The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Nov 13, 2020
@Acehaidrey
Copy link
Contributor Author

Just updated again. Thank you for the comments. Got the fixes in

Copy link
Contributor Author

@Acehaidrey Acehaidrey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! Some static checks are failing :). I recommend pre-commit installation :)

I did that after :D The breeze environment is awesome to do all this checking btw

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to make changes here to support the change jsut commited with @mik-laj

@potiuk potiuk merged commit aac3877 into apache:master Nov 13, 2020
@potiuk potiuk added this to the Airflow 1.10.13 milestone Nov 19, 2020
@potiuk
Copy link
Member

potiuk commented Nov 19, 2020

@kaxil -> I think this one would be really nice to cherry-pick to 1.10.13. It's not big and I can cherry-pick it - WDYT ?

potiuk pushed a commit that referenced this pull request Nov 30, 2020
…rt time (#9544)

Co-authored-by: Ace Haidrey <[email protected]>
(cherry picked from commit aac3877)
@potiuk potiuk added the type:improvement Changelog: Improvements label Nov 30, 2020
potiuk pushed a commit that referenced this pull request Dec 1, 2020
…rt time (#9544)

Co-authored-by: Ace Haidrey <[email protected]>
(cherry picked from commit aac3877)
kaxil added a commit that referenced this pull request Dec 2, 2020
kaxil pushed a commit that referenced this pull request Dec 3, 2020
…rt time (#9544)

Co-authored-by: Ace Haidrey <[email protected]>
(cherry picked from commit aac3877)
potiuk added a commit that referenced this pull request Dec 3, 2020
ashb pushed a commit that referenced this pull request Dec 3, 2020
…rt time (#9544)

Co-authored-by: Ace Haidrey <[email protected]>
(cherry picked from commit aac3877)
AntonyRileyAtVerto pushed a commit to vertoanalytics/incubator-airflow that referenced this pull request Feb 2, 2021
- BugFix: Tasks with ``depends_on_past`` or ``task_concurrency`` are stuck (apache#12663)
- Fix issue with empty Resources in executor_config (apache#12633)
- Fix: Deprecated config ``force_log_out_after`` was not used (apache#12661)
- Fix empty asctime field in JSON formatted logs (apache#10515)
- [AIRFLOW-2809] Fix security issue regarding Flask SECRET_KEY (apache#3651)
- [AIRFLOW-2884] Fix Flask SECRET_KEY security issue in www_rbac (apache#3729)
- [AIRFLOW-2886] Generate random Flask SECRET_KEY in default config (apache#3738)
- Add missing comma in setup.py (apache#12790)
- Bugfix: Unable to import Airflow plugins on Python 3.8 (apache#12859)
- Fix setup.py missing comma in ``setup_requires`` (apache#12880)
- Don't emit first_task_scheduling_delay metric for only-once dags (apache#12835)

- Update setup.py to get non-conflicting set of dependencies (apache#12636)
- Rename ``[scheduler] max_threads`` to ``[scheduler] parsing_processes`` (apache#12605)
- Add metric for scheduling delay between first run task & expected start time (apache#9544)
- Add new-style 2.0 command names for Airflow 1.10.x (apache#12725)
- Add Kubernetes cleanup-pods CLI command for Helm Chart (apache#11802)
- Don't let webserver run with dangerous config (apache#12747)
- Replace pkg_resources with importlib.metadata to avoid VersionConflict errors (apache#12694)

- Clarified information about supported Databases
cfei18 pushed a commit to cfei18/incubator-airflow that referenced this pull request Mar 5, 2021
…rt time (apache#9544)

Co-authored-by: Ace Haidrey <[email protected]>
(cherry picked from commit aac3877)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:metrics full tests needed We need to run full set of tests for this PR to merge type:improvement Changelog: Improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants