Skip to content

Conversation

@plypaul
Copy link
Contributor

@plypaul plypaul commented Aug 25, 2016

Dear Airflow Maintainers,

Please accept this PR that addresses the following issues:

One of the scheduler default values may make it look like it's not scheduling as fast as it should, so changing the default so that it processes DAG definition files as fast as possible.

Testing Done:

  • Unittests are required, if you do not include new unit tests please
    specify why you think this is not required. We like to improve our
    coverage so a non existing test is even a better reason to include one.

Reminders for contributors (REQUIRED!):

  • Your PR's title must reference an issue on
    Airflow's JIRA.
    For example, a PR called "[AIRFLOW-1] My Amazing PR" would close JIRA
    issue Improving the search functionality in the graph view #1. Please open a new issue if required!
  • Please squash your commits when possible and follow the How to write a good git commit message.
    Summarized as follows:
    1. Separate subject from body with a blank line
    2. Limit the subject line to 50 characters
    3. Do not end the subject line with a period
    4. Use the imperative mood in the subject line (add, not adding)
    5. Wrap the body at 72 characters
    6. Use the body to explain what and why vs. how

@r39132
Copy link
Contributor

r39132 commented Aug 25, 2016

Thanks for the documentation and for the change. Just waiting for travis and landscape to pass. Can you run these in your fork and send me a link if they pass. And will +1 once they do either in your fork or here.

@bolkedebruin
Copy link
Contributor

I'll test it hopefully today

@bolkedebruin
Copy link
Contributor

bolkedebruin commented Aug 25, 2016

Ok i have just tested it. It has improved, but I also see that the delay in processing is applied between scheduling of the task instances. This means that a first instance is launched and then the scheduler will wait ~3min before scheduling the next task (with the old config values). I don't think that is the idea behind these config items. At least I think that tasks from a running dag should be scheduled as soon as they are available for running, no extra delay should be added on behalf of the scheduler.

What is the reasoning behind this configuration and delay by the way?

In addition I observe that Pools are often "under used" (only 6 slots used but 8 available, task instances are queued) and also slots_used are not recorded in the UI view.

@plypaul
Copy link
Contributor Author

plypaul commented Aug 25, 2016

The idea behind the delay was that there wasn't a need to process DAGs so quickly - with higher thread counts you could be processing a given DAG every few seconds. To reduce the log volume and potential load on the DB, the configurable delay was added.

For the case that you're seeing - could you enable debug logging and send me the log contents?

@bolkedebruin
Copy link
Contributor

bolkedebruin commented Aug 25, 2016

Ok will do when I have to chance. I also noticed that the scheduler now quits by default after X time. I think this should not be the default and in addition documentation and upgrade notes should be updated on this (I didn't check, but assume it wasn't). I was really caught by surprise with this.

Copy link
Contributor

@bolkedebruin bolkedebruin Aug 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be the default in my opinion. In addition the comment is ambiguous, I was reading it as if it would restart itself, but that is not the case.

Copy link
Contributor

@bolkedebruin bolkedebruin Sep 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am really not happy with this as this is tied to CeleryExecutor not behaving as it should. Now we have "num_runs" and "max_run_duration" that both influence how long the scheduler will be running. The old num_runs has changed behavior (not documented) and max_run_duration always applies, so all the startup scripts (systemd/upstart) are currently not valid. That is a bit too much regression to my taste.

I really don't understand the need for both and I think we should probably revert on this (particular bit). I still really appreciate the work, but it needs some extra love and care. @plypaul

@bolkedebruin
Copy link
Contributor

Ok I executed this DAG for integration tests. This exhbibits the issue with slow scheduling when min_file_process_interval = 180. On LocalExecutor:

from datetime import timedelta, datetime
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator, DummyOperator
from airflow.executors import SequentialExecutor
import airflow


# -----------------------------------------------------------------\
# DEFINE THE POOLS
# -----------------------------------------------------------------/
session = airflow.settings.Session()
for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
    pool = (
        session.query(Pool)
        .filter(Pool.pool == p)
        .first())
    if not pool:
        session.add(Pool(pool=p, slots=8))
        session.commit()


# -----------------------------------------------------------------\
# DEFINE THE DAG
# -----------------------------------------------------------------/

# Define the Dag Name. This must be unique.
dag_name = 'hanging_subdags_n16_sqe'

# Default args are passed to each task
default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 04, 10),
    'retries': 0,
    'retry_interval': timedelta(minutes=5),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval='0 0 * * *'
          )

# -----------------------------------------------------------------\
# DEFINE THE TASKS
# -----------------------------------------------------------------/


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='{sd_id}_step_1'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 1',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='{sd_id}_step_two'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 2',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=0,
        retry_delay=timedelta(seconds=5),
        dag=dag,
        depends_on_past=False,
        executor=SequentialExecutor()
    )

    return sdo

start_task = DummyOperator(
    task_id='start',
    dag=dag
)

for n in range(1, 17):
    sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1')
    sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2')
    sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3')

    sd_i.set_upstream(start_task)
    sd_ii.set_upstream(sd_i)
    sd_iii.set_upstream(sd_ii)

I turned on the DAG at 17:24

Logs

[2016-09-05 17:26:54,149] {dag_processing.py:591} INFO - Started a process (PID: 23223) to generate tasks for /root/airflow/dags/dagrun-update.py - logging into /tmp/airflow/scheduler/logs/2016-09-05/dagrun-update.py.log
[2016-09-05 17:26:54,150] {jobs.py:1382} INFO - Heartbeating the executor
[2016-09-05 17:26:54,151] {jobs.py:1392} INFO - Heartbeating the scheduler
[2016-09-05 17:26:54,164] {jobs.py:1179} INFO -
================================================================================
DAG File Processing Stats

File Path                              PID  Runtime    Last Runtime    Last Run
-----------------------------------  -----  ---------  --------------  -------------------
/root/airflow/dags/example_xcom.py                     1.00s           2016-09-05T17:23:56
/root/airflow/dags/test_subdag.py                      1.00s           2016-09-05T17:23:54
/root/airflow/dags/jlowin_dag.py                       1.00s           2016-09-05T17:23:55
/root/airflow/dags/dagrun-update.py  23223  0.01s      1.01s           2016-09-05T17:23:53
/root/airflow/dags/dag-xcom.py                         1.01s           2016-09-05T17:23:55
/root/airflow/dags/dag-queue.py                        2.02s           2016-09-05T17:23:54
================================================================================
[2016-09-05 17:26:55,169] {jobs.py:1345} INFO - Heartbeating the process manager
[2016-09-05 17:26:55,170] {dag_processing.py:523} INFO - Processor for /root/airflow/dags/dagrun-update.py finished
[2016-09-05 17:26:55,174] {dag_processing.py:591} INFO - Started a process (PID: 23225) to generate tasks for /root/airflow/dags/dag-queue.py - logging into /tmp/airflow/scheduler/logs/2016-09-05/dag-queue.py.log
[2016-09-05 17:26:55,178] {dag_processing.py:591} INFO - Started a process (PID: 23226) to generate tasks for /root/airflow/dags/test_subdag.py - logging into /tmp/airflow/scheduler/logs/2016-09-05/test_subdag.py.log
[2016-09-05 17:26:55,179] {jobs.py:1382} INFO - Heartbeating the executor
[2016-09-05 17:26:56,180] {jobs.py:1345} INFO - Heartbeating the process manager
[2016-09-05 17:26:56,180] {dag_processing.py:523} INFO - Processor for /root/airflow/dags/test_subdag.py finished
[2016-09-05 17:26:56,184] {dag_processing.py:591} INFO - Started a process (PID: 23228) to generate tasks for /root/airflow/dags/dag-xcom.py - logging into /tmp/airflow/scheduler/logs/2016-09-05/dag-xcom.py.log
[2016-09-05 17:26:56,185] {jobs.py:1382} INFO - Heartbeating the executor
[2016-09-05 17:26:57,187] {jobs.py:1345} INFO - Heartbeating the process manager
[2016-09-05 17:26:57,188] {dag_processing.py:523} INFO - Processor for /root/airflow/dags/dag-xcom.py finished
[2016-09-05 17:26:57,188] {dag_processing.py:523} INFO - Processor for /root/airflow/dags/dag-queue.py finished
[2016-09-05 17:26:57,193] {dag_processing.py:591} INFO - Started a process (PID: 23231) to generate tasks for /root/airflow/dags/jlowin_dag.py - logging into /tmp/airflow/scheduler/logs/2016-09-05/jlowin_dag.py.log
[2016-09-05 17:26:57,218] {jobs.py:936} INFO - Tasks up for execution:
    <TaskInstance: hanging_subdags_n16_sqe.start 2016-04-10 00:00:00 [scheduled]>
[2016-09-05 17:26:57,221] {jobs.py:959} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 1 task instances in queue
[2016-09-05 17:26:57,228] {jobs.py:1002} INFO - DAG hanging_subdags_n16_sqe has 0/16 running tasks
[2016-09-05 17:26:57,228] {jobs.py:1028} INFO - Sending to executor ('hanging_subdags_n16_sqe', 'start', datetime.datetime(2016, 4, 10, 0, 0)) with priority 49 and queue default
[2016-09-05 17:26:57,228] {jobs.py:1032} INFO - Setting state of ('hanging_subdags_n16_sqe', 'start', datetime.datetime(2016, 4, 10, 0, 0)) to queued
[2016-09-05 17:26:57,237] {base_executor.py:50} INFO - Adding to queue: airflow run hanging_subdags_n16_sqe start 2016-04-10T00:00:00 --local -sd /root/airflow/dags/dag-queue.py
[2016-09-05 17:26:57,237] {jobs.py:1382} INFO - Heartbeating the executor
[2016-09-05 17:26:57,242] {local_executor.py:45} INFO - LocalWorker running airflow run hanging_subdags_n16_sqe start 2016-04-10T00:00:00 --local -sd /root/airflow/dags/dag-queue.py
[2016-09-05 17:26:57,710] {__init__.py:50} INFO - Using executor LocalExecutor
[2016-09-05 17:26:57,860] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2016-09-05 17:26:57,923] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/helpers.py:259: DeprecationWarning: Importing BashOperator directly from <module 'airflow.operators' from '/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/__init__.pyc'> has been deprecated. Please import from '<module 'airflow.operators' from '/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/__init__.pyc'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
  DeprecationWarning)
/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/helpers.py:259: DeprecationWarning: Importing SubDagOperator directly from <module 'airflow.operators' from '/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/__init__.pyc'> has been deprecated. Please import from '<module 'airflow.operators' from '/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/__init__.pyc'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
  DeprecationWarning)
/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/helpers.py:259: DeprecationWarning: Importing DummyOperator directly from <module 'airflow.operators' from '/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/__init__.pyc'> has been deprecated. Please import from '<module 'airflow.operators' from '/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/__init__.pyc'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
  DeprecationWarning)
[2016-09-05 17:26:58,242] {jobs.py:1345} INFO - Heartbeating the process manager
[2016-09-05 17:26:58,243] {dag_processing.py:523} INFO - Processor for /root/airflow/dags/jlowin_dag.py finished
[2016-09-05 17:26:58,246] {dag_processing.py:591} INFO - Started a process (PID: 23239) to generate tasks for /root/airflow/dags/example_xcom.py - logging into /tmp/airflow/scheduler/logs/2016-09-05/example_xcom.py.log
[2016-09-05 17:26:58,247] {jobs.py:1382} INFO - Heartbeating the executor
Logging into: /root/airflow/logs/hanging_subdags_n16_sqe/start/2016-04-10T00:00:00
[2016-09-05 17:26:58,896] {__init__.py:50} INFO - Using executor LocalExecutor
[2016-09-05 17:26:59,014] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2016-09-05 17:26:59,048] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2016-09-05 17:26:59,249] {jobs.py:1345} INFO - Heartbeating the process manager
[2016-09-05 17:26:59,249] {dag_processing.py:523} INFO - Processor for /root/airflow/dags/example_xcom.py finished
[2016-09-05 17:26:59,250] {jobs.py:1382} INFO - Heartbeating the executor
[2016-09-05 17:26:59,250] {jobs.py:1392} INFO - Heartbeating the scheduler
/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/helpers.py:259: DeprecationWarning: Importing BashOperator directly from <module 'airflow.operators' from '/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/__init__.pyc'> has been deprecated. Please import from '<module 'airflow.operators' from '/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/__init__.pyc'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
  DeprecationWarning)
/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/helpers.py:259: DeprecationWarning: Importing SubDagOperator directly from <module 'airflow.operators' from '/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/__init__.pyc'> has been deprecated. Please import from '<module 'airflow.operators' from '/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/__init__.pyc'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
  DeprecationWarning)
/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/helpers.py:259: DeprecationWarning: Importing DummyOperator directly from <module 'airflow.operators' from '/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/__init__.pyc'> has been deprecated. Please import from '<module 'airflow.operators' from '/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/__init__.pyc'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
  DeprecationWarning)
/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/bin/cli.py:422: DeprecationWarning: The S3_LOG_FOLDER conf key has been replaced by REMOTE_BASE_LOG_FOLDER. Your conf still works but please update airflow.cfg to ensure future compatibility.
  DeprecationWarning)
[2016-09-05 17:27:00,261] {jobs.py:1345} INFO - Heartbeating the process manager
[2016-09-05 17:27:00,261] {jobs.py:1382} INFO - Heartbeating the executor

Then there is a gap before the next tasks are scheduled (notice the 3mins):

[2016-09-05 17:29:55,487] {dag_processing.py:591} INFO - Started a process (PID: 23248) to generate tasks for /root/airflow/dags/dagrun-update.py - logging into /tmp/airflow/scheduler/logs/2016-09-05/dagrun-update.py.log
[2016-09-05 17:29:55,488] {jobs.py:1382} INFO - Heartbeating the executor
[2016-09-05 17:29:55,489] {jobs.py:1392} INFO - Heartbeating the scheduler
[2016-09-05 17:29:55,506] {jobs.py:1179} INFO -
================================================================================
DAG File Processing Stats

File Path                              PID  Runtime    Last Runtime    Last Run
-----------------------------------  -----  ---------  --------------  -------------------
/root/airflow/dags/test_subdag.py                      1.00s           2016-09-05T17:26:56
/root/airflow/dags/example_xcom.py                     1.00s           2016-09-05T17:26:59
/root/airflow/dags/dag-xcom.py                         1.00s           2016-09-05T17:26:57
/root/airflow/dags/dagrun-update.py  23248  0.02s      1.02s           2016-09-05T17:26:55
/root/airflow/dags/jlowin_dag.py                       1.05s           2016-09-05T17:26:58
/root/airflow/dags/dag-queue.py                        2.02s           2016-09-05T17:26:57
================================================================================
[2016-09-05 17:29:56,508] {jobs.py:1345} INFO - Heartbeating the process manager
[2016-09-05 17:29:56,508] {dag_processing.py:523} INFO - Processor for /root/airflow/dags/dagrun-update.py finished
[2016-09-05 17:29:56,514] {dag_processing.py:591} INFO - Started a process (PID: 23250) to generate tasks for /root/airflow/dags/test_subdag.py - logging into /tmp/airflow/scheduler/logs/2016-09-05/test_subdag.py.log
[2016-09-05 17:29:56,515] {jobs.py:1382} INFO - Heartbeating the executor
[2016-09-05 17:29:57,517] {jobs.py:1345} INFO - Heartbeating the process manager
[2016-09-05 17:29:57,518] {dag_processing.py:523} INFO - Processor for /root/airflow/dags/test_subdag.py finished
[2016-09-05 17:29:57,523] {dag_processing.py:591} INFO - Started a process (PID: 23252) to generate tasks for /root/airflow/dags/dag-xcom.py - logging into /tmp/airflow/scheduler/logs/2016-09-05/dag-xcom.py.log
[2016-09-05 17:29:57,530] {dag_processing.py:591} INFO - Started a process (PID: 23253) to generate tasks for /root/airflow/dags/dag-queue.py - logging into /tmp/airflow/scheduler/logs/2016-09-05/dag-queue.py.log
[2016-09-05 17:29:57,531] {jobs.py:1382} INFO - Heartbeating the executor
[2016-09-05 17:29:58,533] {jobs.py:1345} INFO - Heartbeating the process manager
[2016-09-05 17:29:58,534] {dag_processing.py:523} INFO - Processor for /root/airflow/dags/dag-xcom.py finished
[2016-09-05 17:29:58,538] {dag_processing.py:591} INFO - Started a process (PID: 23255) to generate tasks for /root/airflow/dags/jlowin_dag.py - logging into /tmp/airflow/scheduler/logs/2016-09-05/jlowin_dag.py.log
[2016-09-05 17:29:58,539] {jobs.py:1382} INFO - Heartbeating the executor
[2016-09-05 17:29:59,540] {jobs.py:1345} INFO - Heartbeating the process manager
[2016-09-05 17:29:59,540] {dag_processing.py:523} INFO - Processor for /root/airflow/dags/jlowin_dag.py finished
[2016-09-05 17:29:59,544] {dag_processing.py:591} INFO - Started a process (PID: 23257) to generate tasks for /root/airflow/dags/example_xcom.py - logging into /tmp/airflow/scheduler/logs/2016-09-05/example_xcom.py.log
[2016-09-05 17:29:59,545] {jobs.py:1382} INFO - Heartbeating the executor
[2016-09-05 17:30:00,547] {jobs.py:1345} INFO - Heartbeating the process manager
[2016-09-05 17:30:00,547] {dag_processing.py:523} INFO - Processor for /root/airflow/dags/dag-queue.py finished
[2016-09-05 17:30:00,548] {dag_processing.py:523} INFO - Processor for /root/airflow/dags/example_xcom.py finished
[2016-09-05 17:30:00,595] {jobs.py:936} INFO - Tasks up for execution:
    <TaskInstance: hanging_subdags_n16_sqe.level_1_1 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_10 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_11 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_12 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_13 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_14 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_15 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_16 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_2 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_3 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_4 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_5 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_6 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_7 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_8 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.level_1_9 2016-04-10 00:00:00 [scheduled]>
    <TaskInstance: hanging_subdags_n16_sqe.start 2016-04-11 00:00:00 [scheduled]>
[2016-09-05 17:30:00,597] {jobs.py:959} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 17 task instances in queue
[2016-09-05 17:30:00,604] {jobs.py:1002} INFO - DAG hanging_subdags_n16_sqe has 0/16 running tasks
[2016-09-05 17:30:00,604] {jobs.py:1028} INFO - Sending to executor ('hanging_subdags_n16_sqe', 'start', datetime.datetime(2016, 4, 11, 0, 0)) with priority 49 and queue default
[2016-09-05 17:30:00,604] {jobs.py:1032} INFO - Setting state of ('hanging_subdags_n16_sqe', 'start', datetime.datetime(2016, 4, 11, 0, 0)) to queued
[2016-09-05 17:30:00,610] {base_executor.py:50} INFO - Adding to queue: airflow run hanging_subdags_n16_sqe start 2016-04-11T00:00:00 --local -sd /root/airflow/dags/dag-queue.py
[2016-09-05 17:30:00,613] {jobs.py:1002} INFO - DAG hanging_subdags_n16_sqe has 0/16 running tasks
[2016-09-05 17:30:00,613] {jobs.py:1028} INFO - Sending to executor ('hanging_subdags_n16_sqe', 'level_1_1', datetime.datetime(2016, 4, 10, 0, 0)) with priority 3 and queue default
[2016-09-05 17:30:00,614] {jobs.py:1032} INFO - Setting state of ('hanging_subdags_n16_sqe', 'level_1_1', datetime.datetime(2016, 4, 10, 0, 0)) to queued
[2016-09-05 17:30:00,620] {base_executor.py:50} INFO - Adding to queue: airflow run hanging_subdags_n16_sqe level_1_1 2016-04-10T00:00:00 --local -sd /root/airflow/dags/dag-queue.py
[2016-09-05 17:30:00,623] {jobs.py:1002} INFO - DAG hanging_subdags_n16_sqe has 0/16 running tasks
[2016-09-05 17:30:00,624] {jobs.py:1028} INFO - Sending to executor ('hanging_subdags_n16_sqe', 'level_1_10', datetime.datetime(2016, 4, 10, 0, 0)) with priority 3 and queue default
[2016-09-05 17:30:00,624] {jobs.py:1032} INFO - Setting state of ('hanging_subdags_n16_sqe', 'level_1_10', datetime.datetime(2016, 4, 10, 0, 0)) to queued
[2016-09-05 17:30:00,629] {base_executor.py:50} INFO - Adding to queue: airflow run hanging_subdags_n16_sqe level_1_10 2016-04-10T00:00:00 --local -sd /root/airflow/dags/dag-queue.py
[2016-09-05 17:30:00,632] {jobs.py:1002} INFO - DAG hanging_subdags_n16_sqe has 0/16 running tasks
[2016-09-05 17:30:00,632] {jobs.py:1028} INFO - Sending to executor ('hanging_subdags_n16_sqe', 'level_1_11', datetime.datetime(2016, 4, 10, 0, 0)) with priority 3 and queue default
[2016-09-05 17:30:00,632] {jobs.py:1032} INFO - Setting state of ('hanging_subdags_n16_sqe', 'level_1_11', datetime.datetime(2016, 4, 10, 0, 0)) to queued
[2016-09-05 17:30:00,639] {base_executor.py:50} INFO - Adding to queue: airflow run hanging_subdags_n16_sqe level_1_11 2016-04-10T00:00:00 --local -sd /root/airflow/dags/dag-queue.py
[2016-09-05 17:30:00,641] {jobs.py:1002} INFO - DAG hanging_subdags_n16_sqe has 0/16 running tasks
[2016-09-05 17:30:00,641] {jobs.py:1028} INFO - Sending to executor ('hanging_subdags_n16_sqe', 'level_1_12', datetime.datetime(2016, 4, 10, 0, 0)) with priority 3 and queue default
[2016-09-05 17:30:00,642] {jobs.py:1032} INFO - Setting state of ('hanging_subdags_n16_sqe', 'level_1_12', datetime.datetime(2016, 4, 10, 0, 0)) to queued
[2016-09-05 17:30:00,647] {base_executor.py:50} INFO - Adding to queue: airflow run hanging_subdags_n16_sqe level_1_12 2016-04-10T00:00:00 --local -sd /root/airflow/dags/dag-queue.py
[2016-09-05 17:30:00,650] {jobs.py:1002} INFO - DAG hanging_subdags_n16_sqe has 0/16 running tasks
[2016-09-05 17:30:00,650] {jobs.py:1028} INFO - Sending to executor ('hanging_subdags_n16_sqe', 'level_1_13', datetime.datetime(2016, 4, 10, 0, 0)) with priority 3 and queue default
[2016-09-05 17:30:00,650] {jobs.py:1032} INFO - Setting state of ('hanging_subdags_n16_sqe', 'level_1_13', datetime.datetime(2016, 4, 10, 0, 0)) to queued
[2016-09-05 17:30:00,657] {base_executor.py:50} INFO - Adding to queue: airflow run hanging_subdags_n16_sqe level_1_13 2016-04-10T00:00:00 --local -sd /root/airflow/dags/dag-queue.py
[2016-09-05 17:30:00,660] {jobs.py:1002} INFO - DAG hanging_subdags_n16_sqe has 0/16 running tasks
[2016-09-05 17:30:00,660] {jobs.py:1028} INFO - Sending to executor ('hanging_subdags_n16_sqe', 'level_1_14', datetime.datetime(2016, 4, 10, 0, 0)) with priority 3 and queue default
[2016-09-05 17:30:00,660] {jobs.py:1032} INFO - Setting state of ('hanging_subdags_n16_sqe', 'level_1_14', datetime.datetime(2016, 4, 10, 0, 0)) to queued
[2016-09-05 17:30:00,675] {base_executor.py:50} INFO - Adding to queue: airflow run hanging_subdags_n16_sqe level_1_14 2016-04-10T00:00:00 --local -sd /root/airflow/dags/dag-queue.py
[2016-09-05 17:30:00,678] {jobs.py:1002} INFO - DAG hanging_subdags_n16_sqe has 0/16 running tasks
[2016-09-05 17:30:00,679] {jobs.py:1028} INFO - Sending to executor ('hanging_subdags_n16_sqe', 'level_1_15', datetime.datetime(2016, 4, 10, 0, 0)) with priority 3 and queue default
[2016-09-05 17:30:00,679] {jobs.py:1032} INFO - Setting state of ('hanging_subdags_n16_sqe', 'level_1_15', datetime.datetime(2016, 4, 10, 0, 0)) to queued
[2016-09-05 17:30:00,684] {base_executor.py:50} INFO - Adding to queue: airflow run hanging_subdags_n16_sqe level_1_15 2016-04-10T00:00:00 --local -sd /root/airflow/dags/dag-queue.py
[2016-09-05 17:30:00,687] {jobs.py:1002} INFO - DAG hanging_subdags_n16_sqe has 0/16 running tasks
[2016-09-05 17:30:00,687] {jobs.py:1028} INFO - Sending to executor ('hanging_subdags_n16_sqe', 'level_1_16', datetime.datetime(2016, 4, 10, 0, 0)) with priority 3 and queue default
[2016-09-05 17:30:00,687] {jobs.py:1032} INFO - Setting state of ('hanging_subdags_n16_sqe', 'level_1_16', datetime.datetime(2016, 4, 10, 0, 0)) to queued
[2016-09-05 17:30:00,693] {base_executor.py:50} INFO - Adding to queue: airflow run hanging_subdags_n16_sqe level_1_16 2016-04-10T00:00:00 --local -sd /root/airflow/dags/dag-queue.py
[2016-09-05 17:30:00,695] {jobs.py:1002} INFO - DAG hanging_subdags_n16_sqe has 0/16 running tasks
[2016-09-05 17:30:00,695] {jobs.py:1028} INFO - Sending to executor ('hanging_subdags_n16_sqe', 'level_1_2', datetime.datetime(2016, 4, 10, 0, 0)) with priority 3 and queue default

@bolkedebruin
Copy link
Contributor

bolkedebruin commented Sep 7, 2016

Further observations: the dag run state is not being updated (max runs gets reached) and dags are completely skipped by processing

================================================================================
DAG File Processing Stats

File Path                              PID  Runtime      Last Runtime    Last Run
-----------------------------------  -----  ---------  --------------  ----------
/root/airflow/dags/test_subdag.py
/root/airflow/dags/dag-queue.py      10289  0.01s
/root/airflow/dags/jlowin_dag.py
/root/airflow/dags/example_xcom.py
/root/airflow/dags/dag-xcom.py
/root/airflow/dags/dagrun-update.py  10290  0.00s
================================================================================
[2016-09-07 18:06:31,260] {jobs.py:1077} DagFileProcessor162 INFO - Not processing DAG hanging_subdags_n16_sqe since its max runs has been reached

Probably process_task_instances needs to be called before create_dagrun. Are you not encountering this issue?

@ldct
Copy link
Contributor

ldct commented Sep 8, 2016

@bolkedebruin
Copy link
Contributor

bolkedebruin commented Sep 8, 2016

I Confirmed a fix for the "not processing part" by moving self._ process_task_instances to the beginning of the function to have it always executed.

I am now seeing that this issue was introduced in #1716 . Several PRs try to address the issue introduced in #1716 , but no solution has been derived yet

@plypaul plypaul force-pushed the plypaul_set_default_file_refresh_rate branch from 0a66695 to ba03268 Compare October 6, 2016 04:40
Add additional comments for various scheduler configuration options.
@plypaul plypaul force-pushed the plypaul_set_default_file_refresh_rate branch from ba03268 to 6ef2b22 Compare October 6, 2016 04:42
@codecov-io
Copy link

codecov-io commented Oct 6, 2016

Current coverage is 66.12% (diff: 100%)

Merging #1761 into master will not change coverage

@@             master      #1761   diff @@
==========================================
  Files           127        127          
  Lines          9750       9750          
  Methods           0          0          
  Messages          0          0          
  Branches          0          0          
==========================================
  Hits           6447       6447          
  Misses         3303       3303          
  Partials          0          0          

Powered by Codecov. Last update 4405940...6ef2b22

@r39132
Copy link
Contributor

r39132 commented Oct 28, 2016

@plypaul any updates here? Can you have someone review and merge this at the earliest? Else, I will need to close this for inactivity.

@r39132
Copy link
Contributor

r39132 commented Nov 15, 2016

Closing for inactivity. Please reopen when ready!

@asfgit asfgit closed this in 818dda3 Nov 15, 2016
alekstorm pushed a commit to alekstorm/incubator-airflow that referenced this pull request Jun 1, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants