Skip to content

Conversation

@KevinYang21
Copy link
Member

@KevinYang21 KevinYang21 commented Sep 10, 2018

Jira

Description

  • Here are some details about my PR, including screenshots of any UI changes:

Before this change, scheduler loop is responsible for spinning up new DAG file processors after it harvested the results of DAG parsing. And thus the speed of new DAG file processor getting spinned up is bounded by the scheduler loop speed. Like this:
6

After the change, scheduler loop will only not longer be responsible for spinning up new DAG file processors and the DAGFileProcessorManager will be responsible for it instead. DAG parsing will be much faster and not longer bounded by scheduler loop speed. And thus scheduler will be able to schedule more tasks. Like this:
8

Summary of big behavior changes:

  1. Scheduler loop and DAG parsing loop are decoupled and run in two different processes.
  2. DAG parsing manager logging will go into a separate file.
  3. Zombie tasks will be calculate by DAG parsing manager and send to DAG parsing processor to kill. This is to reduce DB CPU load( identified to produce 80% of CPU load during stress test, CPU usage went down from 80%+ to ~40% after this change).

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    test_dag_processing.py:TestDagFileProcessorAgent
    jobs.py:SchedulerJobTest.test_no_orphan_process_will_be_left
    plan to add zombie detecting test in test_dag_processing.py
    The change is more about doing same thing in different way and thus didn't add a lot unit tests. But I'm still thinking about proper unit tests to add, and suggestion on what unit tests to add would be greatly appreciated.

Tested in our stress testing cluster with 4k files and 30k running tasks and scheduling delay has been kept within 5 mins( except when we need to scheduling huge amount of tasks at the time, which is going to be dealt with later) for 1+ month and our production cluster has been running with the change for 1+ week.

Before( #running tasks are lower because DB was stressed out and refusing connections, which caused tasks to fail):

The scheduling delay is generated by a monitoring task that is running on the cluster with 5m interval. It will compare the current timestamp against the expected scheduling timestamp(execution_date + 5m) and send the time diff in min as one data point on the metric graph, e.g. monitoring task with execution_date 2018-01-01T00:00:00 started at 2018-01-01T00:06:00 will put a 1m scheduling delay data point onto the metric graph.
screen shot 2018-07-31 at 12 23 52 pm

After:
screen shot 2018-07-30 at 5 17 10 pm

Commits

  • My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added.

Code Quality

  • Passes git diff upstream/master -u -- "*.py" | flake8 --diff

@KevinYang21 KevinYang21 force-pushed the kevin_yang_decouple_dag_parsing branch 2 times, most recently from ebf4b09 to 4055e43 Compare September 10, 2018 07:16
@KevinYang21 KevinYang21 changed the title [WIP][Airflow-2760] Decouple DAG parsing loop from scheduler loop [Airflow-2760] Decouple DAG parsing loop from scheduler loop Sep 10, 2018
@KevinYang21
Copy link
Member Author

@KevinYang21 KevinYang21 force-pushed the kevin_yang_decouple_dag_parsing branch from 4055e43 to 2cdb90d Compare September 10, 2018 08:49
@ashb
Copy link
Member

ashb commented Sep 10, 2018

First thought: this sounds like a good thing, but could put some of the description of this PR into the docs too??https://airflow.apache.org/scheduler.html seems like a likely place to put them.

(I haven't looked at the code for this PR yet)

@KevinYang21
Copy link
Member Author

@ashb We definitely can do that. I was not aware of that page before. Would you guide me how I can update that page please? Thank you.

@kaxil
Copy link
Member

kaxil commented Sep 10, 2018

@KevinYang21
Copy link
Member Author

@kaxil Thank you!

@KevinYang21 KevinYang21 force-pushed the kevin_yang_decouple_dag_parsing branch from 2cdb90d to a3e23dd Compare September 10, 2018 19:42
@feng-tao
Copy link
Member

will take a look later this week.

@KevinYang21 KevinYang21 force-pushed the kevin_yang_decouple_dag_parsing branch from a3e23dd to 61d9411 Compare September 15, 2018 02:22
@KevinYang21
Copy link
Member Author

A friendly reminder for reviews :D

@KevinYang21 KevinYang21 force-pushed the kevin_yang_decouple_dag_parsing branch 4 times, most recently from da03da2 to 932bf4f Compare September 17, 2018 18:57
@feng-tao
Copy link
Member

sorry for the delay, haven't fully reviewed yet. And one thing worth mentioning is that the community has been discussed about DagFetcher(#3138) for some time, and I wonder whether you have considered your change with the DagFetcher compatibility.

Copy link
Member

@feng-tao feng-tao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't finished, a quick pass

UPDATING.md Outdated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you move it before L8? I think the file should be sorted based on the feature added.

UPDATING.md Outdated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, but I thought the list is sorted by descending chronological order?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sry I didn't know that, if that is the case I'll update the order.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n00b qq: are these mode options defined in RotatingFileHandler?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sry I didn't get ur question... Are you asking what are these options for? That is the file opening mode, set to 'a' so we keep appending to the file( so we don't overwrite the log if we restart the scheduler or so). 'a' is indeed the default mode but just wanted to keep it here to make it more clear

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: constant for 777 and qq: why set 777 permission?

Copy link
Member Author

@KevinYang21 KevinYang21 Sep 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad here, miss read the comment from here and thought I need 777 for the mkdirs to work. Will update it to 755.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than checking for if the path already exists it's better to try to create and catching the FileExists error - we had a race condition else where (in the utils.file_handler or something) that was fixed by doing this.

It's less likely to apply here, but better safe than sorry/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are perfectly correct. Did that before but forgot about it entirely :P

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually our mkdirs util method already covered this... Will just mkdirs(directory, 0o755)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a comment here, and how we do handle this kinda version dependent change at other places in airflow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely, I'll put comment in next iteration. We have a couple of places also branching based on sys.version_info( mostly because of the str to unicode mess).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw this change #3298 and will try to use that more decent approach

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, why not namedTuple?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely a better candidate for namedTuple, will try update. Ty.

@feng-tao
Copy link
Member

Given this change has correlation with DagFetcher(#3138), it would be good to get some input from Max @mistercrunch .

@feng-tao
Copy link
Member

Great work BTW !

@KevinYang21
Copy link
Member Author

KevinYang21 commented Sep 17, 2018

@feng-tao Thank you for bringing up the DagFetcher discussion, I was not aware of that PR before( quite an interesting PR tho, I'll keep an eye on it and provide help if needed). I took a quick look at the PR and I believe it will work perfectly with this change as this change is more like decoupling some existing logic but not changing foundations and the dag fetching is an independent step before the parsing happens.

Copy link
Contributor

@dlamblin dlamblin Sep 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is either some Immutable Singleton or

DagParsingSignal = namedtuple(
    'DagParsingSignal',
    'AGENT_HEARTBEAT,   MANAGER_DONE,   TERMINATE_MANAGER,   END_MANAGER')(
    'agent_heartbeat', 'manager_done', 'terminate_manager', 'end_manager')

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 @feng-tao had the same comment and will try use namedtuple. Ty!

Copy link
Member

@XD-DENG XD-DENG Sep 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @yrqls21 , this may be an invalid question: may you advise what this environment variable CONFIG_PROCESSOR_MANAGER_LOGGER is meant to be? Is it going to be specified by the user, or it's environment-specific?

It's not documented anywhere else.

Copy link
Member

@XD-DENG XD-DENG Sep 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out I DID ask an invalid question.... I have noticed the relevant chunk in airflow/utils/dag_processing.py.....

So this environment variable is some sort of "flag" to help determine if these a few following lines here in airflow/config_templates/airflow_local_settings.py should be run when we load/reload airflow_local_settings, if I'm not mistaken?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it should be a config option somewhere in the config, and then the normal AIRFLOW__${SECT}__${OPT} environment variables could be used.

Also there is zero documentation or metion anywher eelse what this block is doing. Adding to the default config (with comments explaining it's use) gives us two birds with one stone.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@XD-DENG you are perfectly correctly. The purpose is to avoid RotatingFileHandler to be initialized in multiple processes, which would cause exceptions. I thought I had some comments around it but obviously I'm wrong :(

@ashb This is more like a internal flag controlling the behavior of different processes, similar idea as this line https://github.com/apache/incubator-airflow/blob/master/airflow/bin/cli.py#L868, I'll definitely add comments here.

airflow/jobs.py Outdated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "Exiting gracefully after receiving signal {}" please

airflow/jobs.py Outdated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: following

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the docs:

If timeout is None then an infinite timeout is used.

So I thine we can do self._parent_signal_conn.poll(None) and then the kernel/OS will deal with waking us when there's something to read, and we wouldn't need a sleep here either.

I think.

Alternatively, an even simpler approach: Just call recv() - that will block until there is something to read. No poll or sleep needed that way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are perfectly correct. Totally forget about recv() is blocking. Will update to just use recv(). Ty!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reload is done soley to reconfigure the logger right? Wouldn't it be more immediate and less "hacky" (using reload is a bit of a kludge) to reconfigure the logger instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or even better: could we use a different logger name and have it already configured with normal logging, but just only used by this class/logger?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

W/o reloading airflow_local_settings, the custom logic here would not be evaluated and thus we'll end up reconfigure with the same logging config again...

Also as mentioned in the comment, right now the major purposes of reloading settings is to reload logger and also the connection pool.

Tho I do agree reload is a bit of a kludge--I would avoid it if I had other solutions :(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do something like this?

logging.config.dictConfig({
  # New handler,
  disable_existing_loggers=False
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice trick here 👍 But moving the def of handler and logger here would prevent people from using there custom handler and logger for DAG parsing right?(e.g. logger with both processor_manager and console handler) Also we've recently encountered a rare case that same logger on multiple process would create lock issue if we do not reload and thus it feel safer to just reload all loggers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit: Since python2 is going away in less than a year can we reverse this:

i.e. In py2 re-define range = xrange and then just use range here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "Sending termination message" - signal could be confused with OS level signals, but this is a message/request.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this timeout need to be configureable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be adding a TODO comment here to make it aligned with the existing code. I might later on start another discussion around this magic number.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "upon receiving signal" (the same in both cases of this please)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this another magic number?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is. Will put a similar TODO here and revisit it together with termination timeout magic number later.

@KevinYang21
Copy link
Member Author

@Fokko Thanks for reviewing. I actually created a ticket for further refactoring in the code base to use with block as there're a lot such cases and I feel it is beyond the scope of this PR to change all of them( updated all use cases that I touched).

Would you actually elaborate a bit more on the "weird stuff in the number of connections" please? Might be something we didn't catch.

Also this reminds me about one thing: with the new logic, it is more likely that the scheduler will leave orphan processes in extreme cases, e.g. for some reason the agent process crashed(or got a SIGTERM) or cannot gracefully exit, the dag parsing processes will be left behind. Should we put a note somewhere about this? Actually we already have similar problem, e.g. scheduler process got SIGTERM. Do we just expect people to know this and clean up the left over processes?

@kbl
Copy link

kbl commented Oct 12, 2018

What @Fokko mentioned about session leakage could be true. I've deployed that a while ago as a patch on top of 1.10. We're running Airflow with 2 airworkers x 12 celery processors on each node. We've seen an increase in number of open connections to our database after the upgrade from 1.9 to 1.10+patch.

@KevinYang21 KevinYang21 force-pushed the kevin_yang_decouple_dag_parsing branch from 69e9311 to 14cf345 Compare October 15, 2018 05:28
@KevinYang21
Copy link
Member Author

Is there more I can do to have this PR merged?

@Fokko
Copy link
Contributor

Fokko commented Oct 20, 2018

@feng-tao @ashb @XD-DENG Any further comments? Otherwise I'll merge this one.

@feng-tao
Copy link
Member

feng-tao commented Oct 20, 2018

Sorry @KevinYang21 , I was on vacation for the last month and didn't go through the whole PR. But I assume @Fokko has gone through. So I am ok from my side.

@ashb
Copy link
Member

ashb commented Oct 22, 2018

@Fokko Go for it - I didn't have the time to give this PR the attention it deserved.

@ashb ashb changed the title [Airflow-2760] Decouple DAG parsing loop from scheduler loop [AIRFLOW-2760] Decouple DAG parsing loop from scheduler loop Oct 22, 2018
@XD-DENG
Copy link
Member

XD-DENG commented Oct 25, 2018

Hi @Fokko , I don't have further comment. Thanks

@XD-DENG
Copy link
Member

XD-DENG commented Oct 25, 2018

A side point: We may want to update the doc "Scheduler Basic" on Confluence https://cwiki.apache.org/confluence/display/AIRFLOW/Scheduler+Basics accordingly after this PR is merged/released.

@kaxil
Copy link
Member

kaxil commented Oct 26, 2018

I think it's a unanimous decision to merge this one now :)

Thanks everyone and specially @KevinYang21

@kaxil kaxil merged commit 75e2288 into apache:master Oct 26, 2018
wyndhblb added a commit to asappinc/incubator-airflow that referenced this pull request Nov 9, 2018
…pache#3873)"

This reverts commit 75e2288.

# Conflicts:
#	airflow/utils/dag_processing.py
aliceabe pushed a commit to aliceabe/incubator-airflow that referenced this pull request Jan 3, 2019
ashb pushed a commit to ashb/airflow that referenced this pull request Jan 10, 2019
cfei18 pushed a commit to cfei18/incubator-airflow that referenced this pull request Jan 23, 2019
wmorris75 pushed a commit to modmed-external/incubator-airflow that referenced this pull request Jul 29, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants