Skip to content

Commit 327860f

Browse files
Fokkokaxil
authored andcommitted
[AIRFLOW-3515] Remove the run_duration option (#4320)
1 parent 76d755d commit 327860f

6 files changed

Lines changed: 10 additions & 53 deletions

File tree

UPDATING.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ assists users migrating to a new version.
2424

2525
## Airflow Master
2626

27+
#### Remove run_duration
28+
29+
We should not use the `run_duration` option anymore. This used to be for restarting the scheduler from time to time, but right now the scheduler is getting more stable and therefore using this setting is considered bad and might cause an inconsistent state.
30+
2731
### Modification to config file discovery
2832

2933
If the `AIRFLOW_CONFIG` environment variable was not set and the
@@ -33,7 +37,7 @@ will discover its config file using the `$AIRFLOW_CONFIG` and `$AIRFLOW_HOME`
3337
environment variables rather than checking for the presence of a file.
3438

3539
### Modification to `ts_nodash` macro
36-
`ts_nodash` previously contained TimeZone information alongwith execution date. For Example: `20150101T000000+0000`. This is not user-friendly for file or folder names which was a popular use case for `ts_nodash`. Hence this behavior has been changed and using `ts_nodash` will no longer contain TimeZone information, restoring the pre-1.10 behavior of this macro. And a new macro `ts_nodash_with_tz` has been added which can be used to get a string with execution date and timezone info without dashes.
40+
`ts_nodash` previously contained TimeZone information alongwith execution date. For Example: `20150101T000000+0000`. This is not user-friendly for file or folder names which was a popular use case for `ts_nodash`. Hence this behavior has been changed and using `ts_nodash` will no longer contain TimeZone information, restoring the pre-1.10 behavior of this macro. And a new macro `ts_nodash_with_tz` has been added which can be used to get a string with execution date and timezone info without dashes.
3741

3842
Examples:
3943
* `ts_nodash`: `20150101T000000`
@@ -206,7 +210,7 @@ There are five roles created for Airflow by default: Admin, User, Op, Viewer, an
206210
- All ModelViews in Flask-AppBuilder follow a different pattern from Flask-Admin. The `/admin` part of the URL path will no longer exist. For example: `/admin/connection` becomes `/connection/list`, `/admin/connection/new` becomes `/connection/add`, `/admin/connection/edit` becomes `/connection/edit`, etc.
207211
- Due to security concerns, the new webserver will no longer support the features in the `Data Profiling` menu of old UI, including `Ad Hoc Query`, `Charts`, and `Known Events`.
208212
- HiveServer2Hook.get_results() always returns a list of tuples, even when a single column is queried, as per Python API 2.
209-
- **UTC is now the default timezone**: Either reconfigure your workflows scheduling in UTC or set `default_timezone` as explained in https://airflow.apache.org/timezone.html#default-time-zone
213+
- **UTC is now the default timezone**: Either reconfigure your workflows scheduling in UTC or set `default_timezone` as explained in https://airflow.apache.org/timezone.html#default-time-zone
210214

211215
### airflow.contrib.sensors.hdfs_sensors renamed to airflow.contrib.sensors.hdfs_sensor
212216

airflow/bin/cli.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,6 @@ def scheduler(args):
970970
job = jobs.SchedulerJob(
971971
dag_id=args.dag_id,
972972
subdir=process_subdir(args.subdir),
973-
run_duration=args.run_duration,
974973
num_runs=args.num_runs,
975974
do_pickle=args.do_pickle)
976975

@@ -1768,10 +1767,6 @@ class CLIFactory(object):
17681767
"stderr."),
17691768
# scheduler
17701769
'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"),
1771-
'run_duration': Arg(
1772-
("-r", "--run-duration"),
1773-
default=None, type=int,
1774-
help="Set number of seconds to execute before exiting"),
17751770
'num_runs': Arg(
17761771
("-n", "--num_runs"),
17771772
default=-1, type=int,
@@ -2057,7 +2052,7 @@ class CLIFactory(object):
20572052
}, {
20582053
'func': scheduler,
20592054
'help': "Start a scheduler instance",
2060-
'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs',
2055+
'args': ('dag_id_opt', 'subdir', 'num_runs',
20612056
'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
20622057
'log_file'),
20632058
}, {

airflow/config_templates/default_airflow.cfg

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -456,10 +456,6 @@ job_heartbeat_sec = 5
456456
# how often the scheduler should run (in seconds).
457457
scheduler_heartbeat_sec = 5
458458

459-
# after how much time should the scheduler terminate in seconds
460-
# -1 indicates to run continuously (see also num_runs)
461-
run_duration = -1
462-
463459
# after how much time (seconds) a new DAGs should be picked up from the filesystem
464460
min_file_process_interval = 0
465461

airflow/jobs.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,6 @@ def __init__(
545545
subdir=settings.DAGS_FOLDER,
546546
num_runs=-1,
547547
processor_poll_interval=1.0,
548-
run_duration=None,
549548
do_pickle=False,
550549
log=None,
551550
*args, **kwargs):
@@ -563,8 +562,6 @@ def __init__(
563562
:param processor_poll_interval: The number of seconds to wait between
564563
polls of running processors
565564
:type processor_poll_interval: int
566-
:param run_duration: how long to run (in seconds) before exiting
567-
:type run_duration: int
568565
:param do_pickle: once a DAG object is obtained by executing the Python
569566
file, whether to serialize the DAG object to the DB
570567
:type do_pickle: bool
@@ -578,7 +575,6 @@ def __init__(
578575
self.subdir = subdir
579576

580577
self.num_runs = num_runs
581-
self.run_duration = run_duration
582578
self._processor_poll_interval = processor_poll_interval
583579

584580
self.do_pickle = do_pickle
@@ -595,10 +591,6 @@ def __init__(
595591
self.using_sqlite = True
596592

597593
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
598-
if run_duration is None:
599-
self.run_duration = conf.getint('scheduler',
600-
'run_duration')
601-
602594
self.processor_agent = None
603595
self._last_loop = False
604596

@@ -1499,7 +1491,6 @@ def _execute(self):
14991491
(executors.LocalExecutor, executors.SequentialExecutor):
15001492
pickle_dags = True
15011493

1502-
self.log.info("Running execute loop for %s seconds", self.run_duration)
15031494
self.log.info("Processing each file at most %s times", self.num_runs)
15041495

15051496
# Build up a list of Python files that could contain DAGs
@@ -1562,8 +1553,7 @@ def _execute_helper(self):
15621553
last_self_heartbeat_time = timezone.utcnow()
15631554

15641555
# For the execute duration, parse and schedule DAGs
1565-
while (timezone.utcnow() - execute_start_time).total_seconds() < \
1566-
self.run_duration or self.run_duration < 0:
1556+
while True:
15671557
self.log.debug("Starting Loop...")
15681558
loop_start_time = time.time()
15691559

scripts/ci/kubernetes/kube/templates/configmaps.template.yaml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,6 @@ data:
4545
# how often the scheduler should run (in seconds).
4646
scheduler_heartbeat_sec = 5
4747
48-
# after how much time should the scheduler terminate in seconds
49-
# -1 indicates to run continuously (see also num_runs)
50-
run_duration = -1
51-
5248
# after how much time a new DAGs should be picked up from the filesystem
5349
min_file_process_interval = 0
5450

tests/test_jobs.py

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1988,7 +1988,7 @@ def test_change_state_for_tis_without_dagrun(self):
19881988
session.commit()
19891989

19901990
dagbag = self._make_simple_dag_bag([dag1, dag2, dag3])
1991-
scheduler = SchedulerJob(num_runs=0, run_duration=0)
1991+
scheduler = SchedulerJob(num_runs=0)
19921992
scheduler._change_state_for_tis_without_dagrun(
19931993
simple_dag_bag=dagbag,
19941994
old_states=[State.SCHEDULED, State.QUEUED],
@@ -2110,7 +2110,7 @@ def test_execute_helper_reset_orphaned_tasks(self):
21102110

21112111
processor = mock.MagicMock()
21122112

2113-
scheduler = SchedulerJob(num_runs=0, run_duration=0)
2113+
scheduler = SchedulerJob(num_runs=0)
21142114
executor = TestExecutor()
21152115
scheduler.executor = executor
21162116
scheduler.processor_agent = processor
@@ -3059,30 +3059,6 @@ def test_retry_handling_job(self):
30593059
self.assertEqual(ti.try_number, 2)
30603060
self.assertEqual(ti.state, State.UP_FOR_RETRY)
30613061

3062-
def test_scheduler_run_duration(self):
3063-
"""
3064-
Verifies that the scheduler run duration limit is followed.
3065-
"""
3066-
dag_id = 'test_start_date_scheduling'
3067-
dag = self.dagbag.get_dag(dag_id)
3068-
dag.clear()
3069-
self.assertTrue(dag.start_date > DEFAULT_DATE)
3070-
3071-
expected_run_duration = 5
3072-
start_time = timezone.utcnow()
3073-
scheduler = SchedulerJob(dag_id,
3074-
run_duration=expected_run_duration)
3075-
scheduler.run()
3076-
end_time = timezone.utcnow()
3077-
3078-
run_duration = (end_time - start_time).total_seconds()
3079-
logging.info("Test ran in %.2fs, expected %.2fs",
3080-
run_duration,
3081-
expected_run_duration)
3082-
# 5s to wait for child process to exit, 1s dummy sleep
3083-
# in scheduler loop to prevent excessive logs and 1s for last loop to finish.
3084-
self.assertLess(run_duration - expected_run_duration, 6.0)
3085-
30863062
def test_dag_with_system_exit(self):
30873063
"""
30883064
Test to check that a DAG with a system.exit() doesn't break the scheduler.

0 commit comments

Comments
 (0)