-
Notifications
You must be signed in to change notification settings - Fork 16.3k
[AIRFLOW-160] Parse DAG files through child processes #1636
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-160] Parse DAG files through child processes #1636
Conversation
d2ba98e to
c9b5fa2
Compare
airflow/jobs.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kill
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove.
|
Cool! Im trying to have it tested over the weekend or early next week. Great work. Still wondering about the different states and if it stays coherent. Would you mind updating the scheduler wiki for it? |
336e71d to
7093520
Compare
airflow/jobs.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really required to do this here? I personally like a more centralized way of configuring logging, maybe not even output it to a file at all but send it to a centralized logging facility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For debugging, we often need to look at how a DAG file scheduled and the associated logging messages. We found this approach to be convenient because otherwise, the main log file becomes a jumble with many processes running in parallel.
is it your intention to have QUEUED being set by the executor and not by the scheduler? Now the scheduler seems to set two different exit states for TaskInstances (actually 3 with UP_FOR_RETRY). |
|
I was in favor of changing the state in the scheduler instead of the executor so that state changes are more centralized. |
|
Let me update the wiki once we finalize these changes... |
eeedd0b to
e03d12d
Compare
airflow/jobs.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you meant len(dag_runs) == 0 here :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, good catch.
0696f16 to
e347b68
Compare
airflow/jobs.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.format to stay in line with the rest of airfow (and below)?
726edf5 to
a9ef6ff
Compare
036edb4 to
dd97758
Compare
Instead of parsing the DAG definition files in the same process as the scheduler, this change parses the files in a child process. This helps to isolate the scheduler from bad user code.
a31a20e to
a4d9afc
Compare
a4d9afc to
92ac293
Compare
|
@bolkedebruin - checking up since it's been a while |
|
Hey @plypaul sorry for the delay something to do with summer and not being behind a computer. On the delay from queued to running this was just a hunch (sorry if I put you on the wrong path). It just seemed to take longer than I expected purely subjectively. Would you mind explaining the reasoning for choosing queued vs scheduled? |
|
When the DAG is parsed and the scheduler figures out that a task instance needs to run, it's placed into the |
|
Is the queued state set by the executor or by the scheduler? (Reading from phone) |
|
It's set by the scheduler. |
|
Ok LGTM, I do think some bits need to be ironed out but we can figure that out when it is in. |
|
@plypaul just a note: might not be true anymore if the backfill dagrun PR gets merged. Due to "lineage" and "auditing" protection: then this can happen in its first iteration as we are not using the real primary of the dag runs |
| # Keeping existing logic, but not entirely sure why this is here. | ||
| if not pickle_id and dag: | ||
| if dag.full_filepath != dag.filepath: | ||
| path = "DAGS_FOLDER/{}".format(dag.filepath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm late to the party, but I think there should be a path = None initialization above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, I merged a PR to fix this issue yesterday.
|
@plypaul can you please update the documentation on this? I would like to add some architectural fixes, but I have some trouble going around in the code. |
|
@plypaul Pull the latest code and unpause (or enable) 7+ examples. You will see it work for a short while and then nothing gets scheduled anymore. It works fine with LocalExecutor (and I suspect Celery, since you folks are running that). |
|
@bolkedebruin Yep, just recently got wiki access. Let me get to that on Monday. @r39132 Since there are multiple processes, and each process schedules one file, scheduling should be distributed and fast. It's strange that the SequentialExecutor is slow. During testing, I used the SequentialExecutor and did not run into any issues. I'll try it again this week. |
|
@bolkedebruin Posted an updated description at https://cwiki.apache.org/confluence/display/AIRFLOW/Scheduler+Basics Unfortunately, I have a bit of a backlog as I am taking care of tasks before heading offline from 8/26 - 9/3. I'm not entirely sure I'll be able to get to the |
|
@plypaul Thanks. Unfortunately there seem to be more issues. There is quantitatively a large delay in tasks getting "run". I can reproduce the behavior quite easily and we have several reports (mailinglist, gitter) where people think tasks are not getting scheduled at all. Hopefully we can spend a little bit of time on this before your holiday so either we fix it before or I can help fixing it (by knowing where to look) . (Tested on LovalExecutor btw) Also: @r39132 @jlowin @mistercrunch |
|
@bolkedebruin So I pulled the latest code from master and tried it out with a test DAG using the LocalExecutor, and it seems to be running without unexpected delays for me. I'm wondering if the issue you're seeing is related to the minimum DAG file processing interval that was introduced in this PR. The default value is 3 minutes, which in retrospect, seems long. I put out a PR (#1761) to change the default so that it runs as fast as possible. |
|
@plypaul it might be, however I am observing it in between scheduler loops as well and I hope that that is not affected by this setting. I'll test it asap |
Instead of parsing the DAG definition files in the same process as the scheduler, this change parses the files in a child process. This helps to isolate the scheduler from bad user code. Closes apache#1636 from plypaul/plypaul_schedule_by_file_rebase_master
This is a follow up to #1559 - we've tested #1559 in production for several weeks and now we're looking to get it into master.
Now that the
SCHEDULEDstate has been introduced, tasks start out in theSCHEDULEDstate and once the task is sent to the executor, it is set to theQUEUEDstate. Tagging @bolkedebruin for thoughts.