Skip to content

Commit 56a038a

Browse files
cong-zhuKevinYang21
authored andcommitted
[AIRFLOW-4084] fix ElasticSearch log download (#5177)
1 parent 118492e commit 56a038a

File tree

4 files changed

+80
-14
lines changed

4 files changed

+80
-14
lines changed

airflow/utils/log/es_task_handler.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def _read(self, ti, try_number, metadata=None):
9797
offset = metadata['offset']
9898
log_id = self._render_log_id(ti, try_number)
9999

100-
logs = self.es_read(log_id, offset)
100+
logs = self.es_read(log_id, offset, metadata)
101101

102102
next_offset = offset if not logs else logs[-1].offset
103103

@@ -113,7 +113,8 @@ def _read(self, ti, try_number, metadata=None):
113113
# delay before Elasticsearch makes the log available.
114114
if 'last_log_timestamp' in metadata:
115115
last_log_ts = timezone.parse(metadata['last_log_timestamp'])
116-
if cur_ts.diff(last_log_ts).in_minutes() >= 5:
116+
if cur_ts.diff(last_log_ts).in_minutes() >= 5 or 'max_offset' in metadata \
117+
and offset >= metadata['max_offset']:
117118
metadata['end_of_log'] = True
118119

119120
if offset != next_offset or 'last_log_timestamp' not in metadata:
@@ -123,14 +124,16 @@ def _read(self, ti, try_number, metadata=None):
123124

124125
return message, metadata
125126

126-
def es_read(self, log_id, offset):
127+
def es_read(self, log_id, offset, metadata):
127128
"""
128129
Returns the logs matching log_id in Elasticsearch and next offset.
129130
Returns '' if no log is found or there was an error.
130131
:param log_id: the log_id of the log to read.
131132
:type log_id: str
132133
:param offset: the offset start to read log from.
133134
:type offset: str
135+
:param metadata: log metadata, used for steaming log download.
136+
:type metadata: dict
134137
"""
135138

136139
# Offset is the unique key for sorting logs given log_id.
@@ -139,9 +142,15 @@ def es_read(self, log_id, offset):
139142
.sort('offset')
140143

141144
s = s.filter('range', offset={'gt': offset})
145+
max_log_line = s.count()
146+
if 'download_logs' in metadata and metadata['download_logs'] and 'max_offset' not in metadata:
147+
try:
148+
metadata['max_offset'] = s[max_log_line - 1].execute()[-1].offset if max_log_line > 0 else 0
149+
except Exception:
150+
self.log.exception('Could not get current log size with log_id: {}'.format(log_id))
142151

143152
logs = []
144-
if s.count() != 0:
153+
if max_log_line != 0:
145154
try:
146155

147156
logs = s[self.MAX_LINE_PER_PAGE * self.PAGE:self.MAX_LINE_PER_PAGE] \

airflow/www/views.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
#
2020

2121
import copy
22-
from io import BytesIO
2322
import itertools
2423
import json
2524
import logging
@@ -36,7 +35,7 @@
3635
import sqlalchemy as sqla
3736
from flask import (
3837
redirect, request, Markup, Response, render_template,
39-
make_response, flash, jsonify, send_file, url_for)
38+
make_response, flash, jsonify, url_for)
4039
from flask_appbuilder import BaseView, ModelView, expose, has_access
4140
from flask_appbuilder.actions import action
4241
from flask_appbuilder.models.sqla.filters import BaseFilter
@@ -545,30 +544,49 @@ def get_logs_with_metadata(self, session=None):
545544
models.TaskInstance.dag_id == dag_id,
546545
models.TaskInstance.task_id == task_id,
547546
models.TaskInstance.execution_date == dttm).first()
548-
try:
547+
548+
def _get_logs_with_metadata(try_number, metadata):
549549
if ti is None:
550550
logs = ["*** Task instance did not exist in the DB\n"]
551551
metadata['end_of_log'] = True
552552
else:
553-
dag = dagbag.get_dag(dag_id)
554-
ti.task = dag.get_task(ti.task_id)
555553
logs, metadatas = handler.read(ti, try_number, metadata=metadata)
556554
metadata = metadatas[0]
555+
return logs, metadata
557556

557+
try:
558+
if ti is not None:
559+
dag = dagbag.get_dag(dag_id)
560+
ti.task = dag.get_task(ti.task_id)
558561
if response_format == 'json':
562+
logs, metadata = _get_logs_with_metadata(try_number, metadata)
559563
message = logs[0] if try_number is not None else logs
560564
return jsonify(message=message, metadata=metadata)
561565

562-
file_obj = BytesIO(b'\n'.join(
563-
log.encode('utf-8') for log in logs
564-
))
565566
filename_template = conf.get('core', 'LOG_FILENAME_TEMPLATE')
566567
attachment_filename = render_log_filename(
567568
ti=ti,
568569
try_number="all" if try_number is None else try_number,
569570
filename_template=filename_template)
570-
return send_file(file_obj, as_attachment=True,
571-
attachment_filename=attachment_filename)
571+
metadata['download_logs'] = True
572+
573+
def _generate_log_stream(try_number, metadata):
574+
if try_number is None and ti is not None:
575+
next_try = ti.next_try_number
576+
try_numbers = list(range(1, next_try))
577+
else:
578+
try_numbers = [try_number]
579+
for try_number in try_numbers:
580+
metadata.pop('end_of_log', None)
581+
metadata.pop('max_offset', None)
582+
metadata.pop('offset', None)
583+
while 'end_of_log' not in metadata or not metadata['end_of_log']:
584+
logs, metadata = _get_logs_with_metadata(try_number, metadata)
585+
yield "\n".join(logs) + "\n"
586+
return Response(_generate_log_stream(try_number, metadata),
587+
mimetype="text/plain",
588+
headers={"Content-Disposition": "attachment; filename={}".format(
589+
attachment_filename)})
572590
except AttributeError as e:
573591
error_message = ["Task log handler {} does not support read logs.\n{}\n"
574592
.format(task_log_reader, str(e))]

tests/utils/log/test_es_task_handler.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,22 @@ def test_read_timeout(self):
190190
self.assertEqual(0, metadatas[0]['offset'])
191191
self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) == ts)
192192

193+
def test_read_as_download_logs(self):
194+
ts = pendulum.now()
195+
logs, metadatas = self.es_task_handler.read(self.ti,
196+
1,
197+
{'offset': 0,
198+
'last_log_timestamp': str(ts),
199+
'download_logs': True,
200+
'end_of_log': False})
201+
self.assertEqual(1, len(logs))
202+
self.assertEqual(len(logs), len(metadatas))
203+
self.assertEqual(self.test_message, logs[0])
204+
self.assertFalse(metadatas[0]['end_of_log'])
205+
self.assertTrue(metadatas[0]['download_logs'])
206+
self.assertEqual(1, metadatas[0]['offset'])
207+
self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) > ts)
208+
193209
def test_read_raises(self):
194210
with mock.patch.object(self.es_task_handler.log, 'exception') as mock_exception:
195211
with mock.patch("elasticsearch_dsl.Search.execute") as mock_execute:

tests/www/test_views.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,29 @@ def test_get_logs_with_metadata_as_download_file(self):
741741
self.assertEqual(200, response.status_code)
742742
self.assertIn('Log for testing.', response.data.decode('utf-8'))
743743

744+
def test_get_logs_with_metadata_as_download_large_file(self):
745+
with mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") as read_mock:
746+
first_return = (['1st line'], [{}])
747+
second_return = (['2nd line'], [{'end_of_log': False}])
748+
third_return = (['3rd line'], [{'end_of_log': True}])
749+
fourth_return = (['should never be read'], [{'end_of_log': True}])
750+
read_mock.side_effect = [first_return, second_return, third_return, fourth_return]
751+
url_template = "get_logs_with_metadata?dag_id={}&" \
752+
"task_id={}&execution_date={}&" \
753+
"try_number={}&metadata={}&format=file"
754+
try_number = 1
755+
url = url_template.format(self.DAG_ID,
756+
self.TASK_ID,
757+
quote_plus(self.DEFAULT_DATE.isoformat()),
758+
try_number,
759+
json.dumps({}))
760+
response = self.client.get(url)
761+
762+
self.assertIn('1st line', response.data.decode('utf-8'))
763+
self.assertIn('2nd line', response.data.decode('utf-8'))
764+
self.assertIn('3rd line', response.data.decode('utf-8'))
765+
self.assertNotIn('should never be read', response.data.decode('utf-8'))
766+
744767
def test_get_logs_with_metadata(self):
745768
url_template = "get_logs_with_metadata?dag_id={}&" \
746769
"task_id={}&execution_date={}&" \

0 commit comments

Comments
 (0)