-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version: 2.0.0
Kubernetes version:
Client Version: version.Info{Major:"1", Minor:"15", GitVersion:"v1.15.3", GitCommit:"2d3c76f9091b6bec110a5e63777c332469e0cba2", GitTreeState:"clean", BuildDate:"2019-08-19T11:13:54Z", GoVersion:"go1.12.9", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"16", GitVersion:"v1.16.13", GitCommit:"37c06f456fdb4d25e402b5fbcb72cd6a77a021a9", GitTreeState:"clean", BuildDate:"2020-09-18T21:59:14Z", GoVersion:"go1.13.9", Compiler:"gc", Platform:"linux/amd64"}
Environment:
- Cloud provider or hardware configuration: Azure Kubernetes Service
- Image : apache/airflow/2.0.0-python3.6
- Config Variables:
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
AIRFLOW__CORE__DONOT_PICKLE=false
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=false
AIRFLOW__CORE__EXECUTOR=KubernetesExecutor
AIRFLOW__CORE__FERNET_KEY=*****
AIRFLOW__CORE__LOAD_EXAMPLES=false
AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD=bash -c 'eval "$DATABASE_SQLALCHEMY_CMD"'
AIRFLOW__ELASTICSEARCH__WRITE_STDOUT=True
AIRFLOW__KUBERNETES__ENV_FROM_CONFIGMAP_REF=my-name-env
AIRFLOW__KUBERNETES__NAMESPACE=airflow
AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE=/home/airflow/scripts/pod-template.yaml
AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME=my-name
AIRFLOW__LOGGING__BASE_LOG_FOLDER=/opt/airflow/logs
AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=wasb://airflow-logs@******.blob.core.windows.net
AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY=/opt/airflow/logs/scheduler
AIRFLOW__WEBSERVER__BASE_URL=http://****/my-name
AIRFLOW__WEBSERVER__WEB_SERVER_PORT=8080What happened:
After installing airflow in AKS via helm charts, webserver and scheduler start up as expected. After some time (with activity or while sitting idly) scheduler spits out the following:
scheduler error messages
[2021-01-26 16:22:08,620] {kubernetes_executor.py:111} ERROR - Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/contrib/pyopenssl.py", line 313, in recv_into
return self.connection.recv_into(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/OpenSSL/SSL.py", line 1840, in recv_into
self._raise_ssl_error(self._ssl, result)
File "/home/airflow/.local/lib/python3.6/site-packages/OpenSSL/SSL.py", line 1663, in _raise_ssl_error
raise SysCallError(errno, errorcode.get(errno))
OpenSSL.SSL.SysCallError: (104, 'ECONNRESET')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 436, in _error_catcher
yield
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 763, in read_chunked
self._update_chunk_length()
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 693, in _update_chunk_length
line = self._fp.fp.readline()
File "/usr/local/lib/python3.6/socket.py", line 586, in readinto
return self._sock.recv_into(b)
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/contrib/pyopenssl.py", line 318, in recv_into
raise SocketError(str(e))
OSError: (104, 'ECONNRESET')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 103, in run
kube_client, self.resource_version, self.scheduler_job_id, self.kube_config
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 145, in _run
for event in list_worker_pods():
File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 144, in stream
for line in iter_resp_lines(resp):
File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 46, in iter_resp_lines
for seg in resp.read_chunked(decode_content=False):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 792, in read_chunked
self._original_response.close()
File "/usr/local/lib/python3.6/contextlib.py", line 99, in __exit__
self.gen.throw(type, value, traceback)
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 454, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(104, \'ECONNRESET\')",)', OSError("(104, 'ECONNRESET')",))
Process KubernetesJobWatcher-3:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/contrib/pyopenssl.py", line 313, in recv_into
return self.connection.recv_into(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/OpenSSL/SSL.py", line 1840, in recv_into
self._raise_ssl_error(self._ssl, result)
File "/home/airflow/.local/lib/python3.6/site-packages/OpenSSL/SSL.py", line 1663, in _raise_ssl_error
raise SysCallError(errno, errorcode.get(errno))
OpenSSL.SSL.SysCallError: (104, 'ECONNRESET')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 436, in _error_catcher
yield
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 763, in read_chunked
self._update_chunk_length()
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 693, in _update_chunk_length
line = self._fp.fp.readline()
File "/usr/local/lib/python3.6/socket.py", line 586, in readinto
return self._sock.recv_into(b)
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/contrib/pyopenssl.py", line 318, in recv_into
raise SocketError(str(e))
OSError: (104, 'ECONNRESET')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 103, in run
kube_client, self.resource_version, self.scheduler_job_id, self.kube_config
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 145, in _run
for event in list_worker_pods():
File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 144, in stream
for line in iter_resp_lines(resp):
File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 46, in iter_resp_lines
for seg in resp.read_chunked(decode_content=False):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 792, in read_chunked
self._original_response.close()
File "/usr/local/lib/python3.6/contextlib.py", line 99, in __exit__
self.gen.throw(type, value, traceback)
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 454, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(104, \'ECONNRESET\')",)', OSError("(104, 'ECONNRESET')",))
[2021-01-26 16:22:10,177] {kubernetes_executor.py:266} ERROR - Error while health checking kube watcher process. Process died for unknown reasons
[2021-01-26 16:22:10,189] {kubernetes_executor.py:126} INFO - Event: and now my watch begins starting at resource_version: 0
[2021-01-26 16:23:00,720] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs
What you expected to happen:
Scheduler should run (or sit idly) without error
How to reproduce it:
Unknown
Anything else we need to know:
Steps I've taken to debug:
Based on the location of the errors in the stack trace, I assumed the error was related to the KubernetesExecutor making an api request for a list of pods. To debug this I execed into the pod and ran
KUBE_TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
curl -sSk -H "Authorization: Bearer $KUBE_TOKEN" https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_PORT_443_TCP_PORT/api/v1/pods/which initially gave me a 403 forbidden error. I then created the following ClusterRoleBinding:
rbac-read.yaml
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: system:serviceaccount:airflow:my-name:read-pods
namespace: kube-system
subjects:
- kind: ServiceAccount
name: my-name
namespace: airflow
roleRef:
kind: ClusterRole
name: cluster-admin
apiGroup: rbac.authorization.k8s.ioAfterward the above bash commands successfully returned a list of pods in the cluster. I then opened a python shell (still within the scheduler pod) and successfully ran
>>> from kubernetes import client, config
>>> config.load_incluster_config()
>>> v1 = client.CoreV1Api()
>>> pods = v1.list_pod_for_all_namespaces(watch=False)
>>> airflow_pods = v1.list_namespaced_pod("airflow")Given that this ran successfully, I'm at a loss as to why I'm still getting the ECONNRESET error.