Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions airflow/contrib/operators/qubole_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

class QDSLink(BaseOperatorLink):

name = 'Go to QDS'

def get_link(self, operator, dttm):
return operator.get_hook().get_extra_links(operator, dttm)

Expand Down Expand Up @@ -155,9 +157,9 @@ class QuboleOperator(BaseOperator):
ui_fgcolor = '#fff'
qubole_hook_allowed_args_list = ['command_type', 'qubole_conn_id', 'fetch_logs']

operator_extra_link_dict = {
'Go to QDS': QDSLink(),
}
operator_extra_links = (
QDSLink(),
)

@apply_defaults
def __init__(self, qubole_conn_id="qubole_default", *args, **kwargs):
Expand Down
38 changes: 35 additions & 3 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# under the License.

from abc import ABCMeta, abstractmethod
from cached_property import cached_property
import copy
import functools
import logging
Expand Down Expand Up @@ -232,7 +233,7 @@ class derived from this one results in the creation of a task object,
shallow_copy_attrs = () # type: Iterable[str]

# Defines the operator level extra links
operator_extra_link_dict = {} # type: Dict[str, BaseOperatorLink]
operator_extra_links = () # type: Iterable[BaseOperatorLink]

@apply_defaults
def __init__(
Expand Down Expand Up @@ -564,6 +565,15 @@ def priority_weight_total(self):
self.get_flat_relative_ids(upstream=upstream))
)

@cached_property
def operator_extra_link_dict(self):
return {link.name: link for link in self.operator_extra_links}

@cached_property
def global_operator_extra_link_dict(self):
from airflow.plugins_manager import global_operator_extra_links
return {link.name: link for link in global_operator_extra_links}

@prepare_lineage
def pre_execute(self, context):
"""
Expand Down Expand Up @@ -946,10 +956,11 @@ def xcom_pull(
dag_id=dag_id,
include_prior_dates=include_prior_dates)

@property
@cached_property
def extra_links(self):
# type: () -> Iterable[str]
return list(self.operator_extra_link_dict.keys())
return list(set(self.operator_extra_link_dict.keys())
.union(self.global_operator_extra_link_dict.keys()))

def get_extra_links(self, dttm, link_name):
"""
Expand All @@ -964,6 +975,8 @@ def get_extra_links(self, dttm, link_name):
"""
if link_name in self.operator_extra_link_dict:
return self.operator_extra_link_dict[link_name].get_link(self, dttm)
elif link_name in self.global_operator_extra_link_dict:
return self.global_operator_extra_link_dict[link_name].get_link(self, dttm)


class BaseOperatorLink:
Expand All @@ -973,6 +986,25 @@ class BaseOperatorLink:

__metaclass__ = ABCMeta

@property
@abstractmethod
def name(self):
# type: () -> str
"""
Name of the link. This will be the button name on the task UI.

:return: link name
"""
pass

@abstractmethod
def get_link(self, operator, dttm):
# type: (BaseOperator, datetime) -> str
"""
Link to external system.

:param operator: airflow operator
:param dttm: datetime
:return: link to external system
"""
pass
6 changes: 5 additions & 1 deletion airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from typing import List, Any

from airflow import settings
from airflow.models.baseoperator import BaseOperatorLink
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log
Expand Down Expand Up @@ -55,7 +56,8 @@ class AirflowPlugin(object):
#
# The function should have the following signature:
# def func_name(stat_name: str) -> str:
stat_name_handler = None # type:Any
stat_name_handler = None # type: Any
global_operator_extra_links = [] # type: List[BaseOperatorLink]

@classmethod
def validate(cls):
Expand Down Expand Up @@ -176,6 +178,7 @@ def make_module(name, objects):
flask_appbuilder_views = [] # type: List[Any]
flask_appbuilder_menu_links = [] # type: List[Any]
stat_name_handler = None # type: Any
global_operator_extra_links = [] # type: List[Any]

stat_name_handlers = []
for p in plugins:
Expand All @@ -199,6 +202,7 @@ def make_module(name, objects):
} for bp in p.flask_blueprints])
if p.stat_name_handler:
stat_name_handlers.append(p.stat_name_handler)
global_operator_extra_links.extend(p.global_operator_extra_links)

if len(stat_name_handlers) > 1:
raise AirflowPluginException(
Expand Down
5 changes: 5 additions & 0 deletions docs/howto/define_extra_link.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ The following code shows how to add extra links to an operator:

def execute(self, context):
self.log.info("Hello World!")

You can also add a global operator extra link that will be available to
all the operators through airflow plugin. Learn more about it in the
:ref:`plugin example <plugin-example>`.

24 changes: 24 additions & 0 deletions docs/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ looks like:
# ... perform Plugin boot actions
pass

# A list of global operator extra links that can redirect users to
# external systems. These extra links will be available on the
# task page in the form of buttons.
#
# Note: the global operator extra link can be overridden at each
# operator level.
global_operator_extra_links = []



Expand All @@ -131,6 +138,8 @@ For example,
Make sure you restart the webserver and scheduler after making changes to plugins so that they take effect.


.. _plugin-example:

Example
-------

Expand All @@ -148,6 +157,7 @@ definitions in Airflow.
# Importing base classes that we need to derive
from airflow.hooks.base_hook import BaseHook
from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperatorLink
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.executors.base_executor import BaseExecutor

Expand Down Expand Up @@ -202,6 +212,19 @@ definitions in Airflow.
def stat_name_dummy_handler(stat_name):
return stat_name

# A global operator extra link that redirect you to
# task logs stored in S3
class S3LogLink(BaseOperatorLink):
name = 'S3'

def get_link(self, operator, dttm):
return 'https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}'.format(
dag_id=operator.dag_id,
task_id=operator.task_id,
execution_date=dttm,
)


# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
Expand All @@ -214,6 +237,7 @@ definitions in Airflow.
appbuilder_views = [v_appbuilder_package]
appbuilder_menu_items = [appbuilder_mitem]
stat_name_handler = staticmethod(stat_name_dummy_handler)
global_operator_extra_links = [S3LogLink(),]


Note on role based views
Expand Down
20 changes: 19 additions & 1 deletion tests/plugins/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

# Importing base classes that we need to derive
from airflow.hooks.base_hook import BaseHook
from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperatorLink, BaseOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.executors.base_executor import BaseExecutor

Expand Down Expand Up @@ -88,6 +88,20 @@ def stat_name_dummy_handler(stat_name):
return stat_name


class AirflowLink(BaseOperatorLink):
name = 'airflow'

def get_link(self, operator, dttm):
return 'should_be_overridden'


class GithubLink(BaseOperatorLink):
name = 'github'

def get_link(self, operator, dttm):
return 'https://github.com/apache/airflow'


# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
Expand All @@ -100,6 +114,10 @@ class AirflowTestPlugin(AirflowPlugin):
appbuilder_views = [v_appbuilder_package]
appbuilder_menu_items = [appbuilder_mitem]
stat_name_handler = staticmethod(stat_name_dummy_handler)
global_operator_extra_links = [
AirflowLink(),
GithubLink(),
]


class MockPluginA(AirflowPlugin):
Expand Down
56 changes: 51 additions & 5 deletions tests/www/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1671,28 +1671,38 @@ def setUp(self):
self.DEFAULT_DATE = datetime(2017, 1, 1)

class RaiseErrorLink(BaseOperatorLink):
name = 'raise_error'

def get_link(self, operator, dttm):
raise ValueError('This is an error')

class NoResponseLink(BaseOperatorLink):
name = 'no_response'

def get_link(self, operator, dttm):
return None

class FooBarLink(BaseOperatorLink):
name = 'foo-bar'

def get_link(self, operator, dttm):
return 'http://www.example.com/{0}/{1}/{2}'.format(
operator.task_id, 'foo-bar', dttm)

class AirflowLink(BaseOperatorLink):
name = 'airflow'

def get_link(self, operator, dttm):
return 'https://airflow.apache.org'

class DummyTestOperator(BaseOperator):

operator_extra_link_dict = {
'raise_error': RaiseErrorLink(),
'no_response': NoResponseLink(),
'foo-bar': FooBarLink()
}
operator_extra_links = (
RaiseErrorLink(),
NoResponseLink(),
FooBarLink(),
AirflowLink(),
)

self.dag = DAG('dag', start_date=self.DEFAULT_DATE)
self.task = DummyTestOperator(task_id="some_dummy_task", dag=self.dag)
Expand All @@ -1719,6 +1729,42 @@ def test_extra_links_works(self, get_dag_function):
'error': None
})

@mock.patch('airflow.www.views.dagbag.get_dag')
def test_global_extra_links_works(self, get_dag_function):
get_dag_function.return_value = self.dag

response = self.client.get(
"{0}?dag_id={1}&task_id={2}&execution_date={3}&link_name=github"
.format(self.ENDPOINT, self.dag.dag_id, self.task.task_id, self.DEFAULT_DATE),
follow_redirects=True)

self.assertEqual(response.status_code, 200)
response_str = response.data
if isinstance(response.data, bytes):
response_str = response_str.decode()
self.assertEqual(json.loads(response_str), {
'url': 'https://github.com/apache/airflow',
'error': None
})

@mock.patch('airflow.www.views.dagbag.get_dag')
def test_operator_extra_link_override_global_extra_link(self, get_dag_function):
get_dag_function.return_value = self.dag

response = self.client.get(
"{0}?dag_id={1}&task_id={2}&execution_date={3}&link_name=airflow".format(
self.ENDPOINT, self.dag.dag_id, self.task.task_id, self.DEFAULT_DATE),
follow_redirects=True)

self.assertEqual(response.status_code, 200)
response_str = response.data
if isinstance(response.data, bytes):
response_str = response_str.decode()
self.assertEqual(json.loads(response_str), {
'url': 'https://airflow.apache.org',
'error': None
})

@mock.patch('airflow.www.views.dagbag.get_dag')
def test_extra_links_error_raised(self, get_dag_function):
get_dag_function.return_value = self.dag
Expand Down