Skip to content

Exception when executing execute_helper #9689

@mik-laj

Description

@mik-laj

Apache Airflow version:
v2.0.0 - c58d6d586d20e9342fc520117518e26a37d5c229

Kubernetes version (if you are using kubernetes) (use kubectl version):

Environment:

Breeze (MacOS)

What happened:

I see an exception in the console.

[2020-07-06 11:42:08,727] {scheduler_job.py:1588} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1569, in _execute
    self._run_scheduler_loop()
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1642, in _run_scheduler_loop
    if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1679, in _validate_and_run_task_instances
    self._process_executor_events(simple_dag_bag)
  File "/opt/airflow/airflow/utils/session.py", line 61, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1514, in _process_executor_events
    state, info = event_buffer.pop(key)
KeyError: ('example_bash_operator', 'also_run_this', datetime.datetime(2020, 7, 4, 0, 0, tzinfo=Timezone('UTC')), 2)

What you expected to happen:

No exception

How to reproduce it:

airflow db reset
airflow dags unpause example_bash_operator
airflow scheduler --num-runs 4

Anything else we need to know:

This exception may be related to #9488, but this needs to be confirmed.

CC: @potiuk @mik-laj (PR reviewers)
CC: @turbaszek (PR owner)

Full config
[core]
dags_folder = /files/dags
hostname_callable = socket.getfqdn
default_timezone = utc
executor = SequentialExecutor
sql_alchemy_conn = postgresql+psycopg2://postgres:airflow@postgres/airflow
sql_engine_encoding = utf-8
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 10
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True
sql_alchemy_schema =
parallelism = 32
dag_concurrency = 16
dags_are_paused_at_creation = True
max_active_runs_per_dag = 16
load_examples = True
load_default_connections = True
plugins_folder = /root/airflow/plugins
fernet_key = E7FewLWrVfNN11SSgGd-ER-3wOZ3HwcrvKJfDKOpdAQ=
donot_pickle = True
dagbag_import_timeout = 30
dag_file_processor_timeout = 50
task_runner = StandardTaskRunner
default_impersonation =
security =
unit_test_mode = False
enable_xcom_pickling = True
killed_task_cleanup_time = 60
dag_run_conf_overrides_params = False
worker_precheck = False
dag_discovery_safe_mode = True
default_task_retries = 0
store_serialized_dags = True
min_serialized_dag_update_interval = 30
max_num_rendered_ti_fields_per_task = 30
check_slas = True
xcom_backend = airflow.models.xcom.BaseXCom

[logging]
base_log_folder = /root/airflow/logs
remote_logging = False
remote_log_conn_id =
stackdriver_key_path =
remote_base_log_folder =
encrypt_s3_logs = False
logging_level = INFO
fab_logging_level = WARN
logging_config_class =
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
task_log_prefix_template =
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /root/airflow/logs/dag_processor_manager/dag_processor_manager.log
task_log_reader = task

[secrets]
backend =
backend_kwargs =

[cli]
api_client = airflow.api.client.local_client
endpoint_url = http://localhost:8080

[debug]
fail_fast = False

[api]
auth_backend = airflow.api.auth.backend.deny_all
maximum_page_limit = 100
fallback_page_limit = 100

[lineage]
backend =

[atlas]
sasl_enabled = False
host =
port = 21000
username =
password =

[operators]
default_owner = airflow
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
allow_illegal_arguments = False

[hive]
default_hive_mapred_queue =

[webserver]
base_url = http://localhost:8080
default_ui_timezone = UTC
web_server_host = 0.0.0.0
web_server_port = 8080
web_server_ssl_cert =
web_server_ssl_key =
web_server_master_timeout = 120
web_server_worker_timeout = 120
worker_refresh_batch_size = 1
worker_refresh_interval = 30
reload_on_plugin_change = False
secret_key = YywunKHMYsZPWSez5IdgOA==
workers = 4
worker_class = sync
access_logfile = -
error_logfile = -
expose_config = False
expose_hostname = True
expose_stacktrace = True
dag_default_view = tree
dag_orientation = LR
demo_mode = False
log_fetch_timeout_sec = 5
log_fetch_delay_sec = 2
log_auto_tailing_offset = 30
log_animation_speed = 1000
hide_paused_dags_by_default = False
page_size = 100
navbar_color = #007A87
default_dag_run_display_number = 25
enable_proxy_fix = False
proxy_fix_x_for = 1
proxy_fix_x_proto = 1
proxy_fix_x_host = 1
proxy_fix_x_port = 1
proxy_fix_x_prefix = 1
cookie_secure = False
cookie_samesite =
default_wrap = False
x_frame_enabled = True
show_recent_stats_for_completed_runs = True
update_fab_perms = True
force_log_out_after = 0
session_lifetime_days = 30

[email]
email_backend = airflow.utils.email.send_email_smtp
default_email_on_retry = True
default_email_on_failure = True

[smtp]
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
smtp_port = 25
smtp_mail_from = [email protected]

[sentry]
sentry_dsn =

[celery]
celery_app_name = airflow.executors.celery_executor
worker_concurrency = 8
worker_log_server_port = 8793
worker_umask = 0o077
broker_url = redis://redis:6379/0
result_backend = db+postgresql://postgres:airflow@postgres/airflow
flower_host = 0.0.0.0
flower_url_prefix =
flower_port = 5555
flower_basic_auth =
default_queue = default
sync_parallelism = 0
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =
pool = prefork
operation_timeout = 2

[celery_broker_transport_options]

[dask]
cluster_address = 127.0.0.1:8786
tls_ca =
tls_cert =
tls_key =

[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
num_runs = -1
processor_poll_interval = 1
min_file_process_interval = 0
dag_dir_list_interval = 300
print_stats_interval = 30
dag_cleanup_interval = 300
scheduler_health_check_threshold = 30
child_process_log_directory = /root/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
catchup_by_default = True
max_tis_per_query = 512
statsd_on = False
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
statsd_allow_list =
stat_name_handler =
statsd_datadog_enabled = False
statsd_datadog_tags =
max_threads = 2
use_job_schedule = True
allow_trigger_in_future = False

[ldap]
uri =
user_filter = objectClass=*
user_name_attr = uid
group_member_attr = memberOf
superuser_filter =
data_profiler_filter =
bind_user = cn=Manager,dc=example,dc=com
bind_password = insecure
basedn = dc=example,dc=com
cacert = /etc/ca/ldap_ca.crt
search_scope = LEVEL
ignore_malformed_schema = False

[kerberos]
ccache = /tmp/airflow_krb5_ccache
principal = airflow
reinit_frequency = 3600
kinit_path = kinit
keytab = airflow.keytab

[github_enterprise]
api_rev = v3

[admin]
hide_sensitive_variable_fields = True
sensitive_variable_fields =

[elasticsearch]
host =
log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
end_of_log_mark = end_of_log
frontend =
write_stdout = False
json_format = False
json_fields = asctime, filename, lineno, levelname, message

[elasticsearch_configs]
use_ssl = False
verify_certs = True

[kubernetes]
worker_container_repository =
pod_template_file =
worker_container_tag =
worker_container_image_pull_policy = IfNotPresent
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
namespace = default
airflow_configmap =
airflow_local_settings_configmap =
dags_in_image = False
dags_volume_subpath =
dags_volume_mount_point =
dags_volume_claim =
logs_volume_subpath =
logs_volume_claim =
dags_volume_host =
logs_volume_host =
env_from_configmap_ref =
env_from_secret_ref =
git_repo =
git_branch =
git_sync_depth = 1
git_subpath =
git_sync_rev =
git_user =
git_password =
git_sync_root = /git
git_sync_dest = repo
git_dags_folder_mount_point =
git_ssh_key_secret_name =
git_ssh_known_hosts_configmap_name =
git_sync_credentials_secret =
git_sync_container_repository = k8s.gcr.io/git-sync
git_sync_container_tag = v3.1.1
git_sync_init_container_name = git-sync-clone
git_sync_run_as_user = 65533
worker_service_account_name =
image_pull_secrets =
in_cluster = True
affinity =
tolerations =
kube_client_request_args =
delete_option_kwargs =
run_as_user =
fs_group =
worker_annotations =
worker_resources =

[kubernetes_node_selectors]

[kubernetes_environment_variables]

[kubernetes_secrets]

[kubernetes_labels]
Full log
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-07-06 11:41:32,224] {scheduler_job.py:1533} INFO - Starting the scheduler
[2020-07-06 11:41:32,224] {scheduler_job.py:1538} INFO - Processing each file at most 4 times
[2020-07-06 11:41:32,226] {scheduler_job.py:1559} INFO - Resetting orphaned tasks for active dag runs
[2020-07-06 11:41:32,244] {dag_processing.py:374} INFO - Launched DagFileProcessorManager with pid: 1493
[2020-07-06 11:41:32,261] {settings.py:52} INFO - Configured default timezone Timezone('UTC')
[2020-07-06 11:41:33,321] {scheduler_job.py:1195} INFO - 4 tasks up for execution:
	<TaskInstance: example_bash_operator.runme_2 2020-07-04 00:00:00+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.runme_1 2020-07-04 00:00:00+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.runme_0 2020-07-04 00:00:00+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.also_run_this 2020-07-04 00:00:00+00:00 [scheduled]>
[2020-07-06 11:41:33,338] {scheduler_job.py:1231} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 4 task instances ready to be queued
[2020-07-06 11:41:33,339] {scheduler_job.py:1259} INFO - DAG example_bash_operator has 0/16 running and queued tasks
[2020-07-06 11:41:33,340] {scheduler_job.py:1259} INFO - DAG example_bash_operator has 1/16 running and queued tasks
[2020-07-06 11:41:33,340] {scheduler_job.py:1259} INFO - DAG example_bash_operator has 2/16 running and queued tasks
[2020-07-06 11:41:33,341] {scheduler_job.py:1259} INFO - DAG example_bash_operator has 3/16 running and queued tasks
[2020-07-06 11:41:33,341] {scheduler_job.py:1313} INFO - Setting the following tasks to queued state:
	<TaskInstance: example_bash_operator.runme_2 2020-07-04 00:00:00+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.runme_1 2020-07-04 00:00:00+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.runme_0 2020-07-04 00:00:00+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.also_run_this 2020-07-04 00:00:00+00:00 [scheduled]>
[2020-07-06 11:41:33,369] {scheduler_job.py:1366} INFO - Setting the following 4 tasks to queued state:
	<TaskInstance: example_bash_operator.runme_2 2020-07-04 00:00:00+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.runme_1 2020-07-04 00:00:00+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.runme_0 2020-07-04 00:00:00+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.also_run_this 2020-07-04 00:00:00+00:00 [scheduled]>
[2020-07-06 11:41:33,371] {scheduler_job.py:1402} INFO - Sending ('example_bash_operator', 'runme_2', datetime.datetime(2020, 7, 4, 0, 0, tzinfo=Timezone('UTC')), 1) to executor with priority 3 and queue default
[2020-07-06 11:41:33,372] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_2', '2020-07-04T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/airflow/example_dags/example_bash_operator.py']
[2020-07-06 11:41:33,374] {scheduler_job.py:1402} INFO - Sending ('example_bash_operator', 'runme_1', datetime.datetime(2020, 7, 4, 0, 0, tzinfo=Timezone('UTC')), 1) to executor with priority 3 and queue default
[2020-07-06 11:41:33,376] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_1', '2020-07-04T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/airflow/example_dags/example_bash_operator.py']
[2020-07-06 11:41:33,378] {scheduler_job.py:1402} INFO - Sending ('example_bash_operator', 'runme_0', datetime.datetime(2020, 7, 4, 0, 0, tzinfo=Timezone('UTC')), 1) to executor with priority 3 and queue default
[2020-07-06 11:41:33,382] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0', '2020-07-04T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/airflow/example_dags/example_bash_operator.py']
[2020-07-06 11:41:33,383] {scheduler_job.py:1402} INFO - Sending ('example_bash_operator', 'also_run_this', datetime.datetime(2020, 7, 4, 0, 0, tzinfo=Timezone('UTC')), 1) to executor with priority 2 and queue default
[2020-07-06 11:41:33,384] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', '2020-07-04T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/airflow/example_dags/example_bash_operator.py']
[2020-07-06 11:41:33,386] {sequential_executor.py:60} INFO - Executing command: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_2', '2020-07-04T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/airflow/example_dags/example_bash_operator.py']
[2020-07-06 11:41:37,653] {dagbag.py:365} INFO - Filling up the DagBag from /opt/airflow/airflow/example_dags/example_bash_operator.py
Running <TaskInstance: example_bash_operator.runme_2 2020-07-04T00:00:00+00:00 [None]> on host 9c12dabf63de
[2020-07-06 11:41:43,029] {sequential_executor.py:60} INFO - Executing command: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_1', '2020-07-04T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/airflow/example_dags/example_bash_operator.py']
[2020-07-06 11:41:46,282] {dagbag.py:365} INFO - Filling up the DagBag from /opt/airflow/airflow/example_dags/example_bash_operator.py
Running <TaskInstance: example_bash_operator.runme_1 2020-07-04T00:00:00+00:00 [None]> on host 9c12dabf63de
[2020-07-06 11:41:51,603] {sequential_executor.py:60} INFO - Executing command: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0', '2020-07-04T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/airflow/example_dags/example_bash_operator.py']
[2020-07-06 11:41:54,722] {dagbag.py:365} INFO - Filling up the DagBag from /opt/airflow/airflow/example_dags/example_bash_operator.py
Running <TaskInstance: example_bash_operator.runme_0 2020-07-04T00:00:00+00:00 [None]> on host 9c12dabf63de
[2020-07-06 11:42:00,075] {sequential_executor.py:60} INFO - Executing command: ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', '2020-07-04T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/airflow/example_dags/example_bash_operator.py']
[2020-07-06 11:42:03,378] {dagbag.py:365} INFO - Filling up the DagBag from /opt/airflow/airflow/example_dags/example_bash_operator.py
Running <TaskInstance: example_bash_operator.also_run_this 2020-07-04T00:00:00+00:00 [None]> on host 9c12dabf63de
[2020-07-06 11:42:08,712] {scheduler_job.py:1499} INFO - Executor reports execution of example_bash_operator.runme_2 execution_date=2020-07-04 00:00:00+00:00 exited with status success for try_number 1
[2020-07-06 11:42:08,713] {scheduler_job.py:1499} INFO - Executor reports execution of example_bash_operator.runme_1 execution_date=2020-07-04 00:00:00+00:00 exited with status success for try_number 1
[2020-07-06 11:42:08,713] {scheduler_job.py:1499} INFO - Executor reports execution of example_bash_operator.runme_0 execution_date=2020-07-04 00:00:00+00:00 exited with status success for try_number 1
[2020-07-06 11:42:08,714] {scheduler_job.py:1499} INFO - Executor reports execution of example_bash_operator.also_run_this execution_date=2020-07-04 00:00:00+00:00 exited with status success for try_number 1
[2020-07-06 11:42:08,727] {scheduler_job.py:1588} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1569, in _execute
    self._run_scheduler_loop()
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1642, in _run_scheduler_loop
    if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1679, in _validate_and_run_task_instances
    self._process_executor_events(simple_dag_bag)
  File "/opt/airflow/airflow/utils/session.py", line 61, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1514, in _process_executor_events
    state, info = event_buffer.pop(key)
KeyError: ('example_bash_operator', 'also_run_this', datetime.datetime(2020, 7, 4, 0, 0, tzinfo=Timezone('UTC')), 2)
[2020-07-06 11:42:08,747] {process_utils.py:99} INFO - Sending Signals.SIGTERM to GPID 1493
[2020-07-06 11:42:08,747] {process_utils.py:65} INFO - Process psutil.Process(pid=1493, status='terminated') (1493) terminated with exit code 0
[2020-07-06 11:42:08,748] {scheduler_job.py:1591} INFO - Exited execute loop

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:Schedulerincluding HA (high availability) schedulerkind:bugThis is a clearly a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions