Skip to content

Conversation

@diogoalexandrefranco
Copy link
Contributor

@diogoalexandrefranco diogoalexandrefranco commented Mar 18, 2018

Refactors DagBag to prepare the possibility of fetching dags remotely, by creating the DagFetcher abstraction, and a factory function that instantiates the right DagFetcher based on the dags_folder URI prefix.

Implements the default FileSystemDagFetcher (using the code that was part of DagBag).

Adds the possibility of adding DagFetchers as plugins.

JIRA

  • My PR addresses the following AIRFLOW-2221 issues and references them in the PR title.

Description

  • This PR refactors DagBag to prepare the implementation of multiple DagFetchers. It is not supposed to bring new functionality, but it is also not supposed to break anything :)
    Because the file system related code from DagBag was moved to FileSystemDagFetcher, the diff is bigger than the actual changes. Is is basically:
  • A DagFetcher abstract base class.
  • A FileSystemDagFetcher, where the code is what we already had scattered in the DagBag class.
  • A get_dag_fetcher factory method that instantiates the right fetcher based on the dags_folder setting scheme (the uri prefix).
  • DagBag instances initialize and hold their own dag_fetcher (but always use the default for the example_dags).

Tests

  • My PR adds the following unit tests:
    Added test_get_dag_fetcher in tests.models:DagBagTest
    Removed test_get_dag_without_refresh as I didn't find an easy way to implement this test under the new architecture.
    Added test_fetchers in test.plugins_manager:PluginsTest

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":

    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"
  • Passes git diff upstream/master -u -- "*.py" | flake8 --diff

@codecov-io
Copy link

codecov-io commented Mar 18, 2018

Codecov Report

Merging #3138 into master will decrease coverage by 3.5%.
The diff coverage is 87.57%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #3138      +/-   ##
==========================================
- Coverage   76.67%   73.16%   -3.51%     
==========================================
  Files         199      187      -12     
  Lines       16186    12716    -3470     
==========================================
- Hits        12410     9304    -3106     
+ Misses       3776     3412     -364
Impacted Files Coverage Δ
airflow/plugins_manager.py 92.85% <100%> (+0.9%) ⬆️
airflow/models.py 87.56% <100%> (-4.49%) ⬇️
airflow/dag/fetchers/s3.py 100% <100%> (ø)
airflow/dag/fetchers/git.py 100% <100%> (ø)
airflow/__init__.py 81.25% <100%> (+6.96%) ⬆️
airflow/dag/fetchers/hdfs.py 100% <100%> (ø)
airflow/dag/fetchers/__init__.py 100% <100%> (ø)
airflow/dag/fetchers/gcs.py 100% <100%> (ø)
airflow/dag/fetchers/filesystem.py 82.9% <82.9%> (ø)
airflow/dag/fetchers/base.py 85.71% <85.71%> (ø)
... and 213 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e703d6b...6979f0d. Read the comment docs.

@bolkedebruin
Copy link
Contributor

Great that this is coming! I would prefer a high level design before doing this as it is quite core. Couple of things that should be present:

  1. Poll vs Push: Dags should only be fetched if a newer one is present; What will we use for identification?
  2. Storage location: as we now should have a place where dags should be pulled towards to lets make sure we start using the LSB location for this: /var/lib/airflow/dags is probably best
  3. Security: Is any worker allowed to pull any dag? I think a proper security model needs to be in place
  4. API integration: Can we also PUSH dags to the remote location?

Obviously, not everything needs to be in place right away, but having an generic idea where we are moving towards to seems smart. Security should not be an after thought :-).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do not put this in the root dir but in something like airflow/dag/fetcher/filesystem.py

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to BaseDagFetcher

@jgao54
Copy link

jgao54 commented Mar 19, 2018

Woot! That was fast :)

Similar to what @bolkedebruin mentioned, please break the dag_fetcher.py up into separate modules for each:

  • init.py (where you can implement get_dag_fetcher)
  • BaseDagFetcher
  • FileSystemDagFetcher
    etc.

Also would be useful to add dag fetchers to plugin manager so custom fetchers can be written for internal usages. It's consistent with operators/hooks/executors/etc. (see plugins_manager.py)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.stats should be called in process_file, otherwise if the uri is a single file, fetch won't be able to generate stats for it (I'm aware this is just copy/paste from old code, but just pointing it out)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit rigid, it doesn't handle scenario where multiple dag fetchers are written for a specific service

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it maybe worth adding dag fetcher as a default config.

@diogoalexandrefranco diogoalexandrefranco force-pushed the master branch 2 times, most recently from b37cc10 to 081f7ee Compare March 20, 2018 00:36
Refactors DagBag to prepare the possibility of fetching
dags remotely, by creating the DagFetcher abstraction.
Defaults to FileSystemDagFetcher, maintining backwards
compatibility.
@diogoalexandrefranco
Copy link
Contributor Author

Thank you both for your help! Updated the PR:

  • Renamed base class to BaseDagFetcher
  • Separated each fetcher into modules in airflow.dag.fetchers
  • get_dag_fetchers defined in init.py
  • Added dag_fetchers to plugins_manager.py, the facilities to integrate those plugins, tests etc..

Other topics:

  • @jgao54's suggestion of adding a default config for the fetcher to use makes sense to me, so as not to tie a service/scheme to only one fetcher. Maybe we don't look at the URI prefix of the dags_folder setting at all?
  • @bolkedebruin's second point hints at the possibility of always fetching dags to /var/lib/airflow/dags. I like the idea of forcing this, because implementing a DagFetcher would only be a matter of downloading the files, and all the logic that this PR has currently in FileSystemDagFetcher could be moved to the base class. For example the method fetch of FileSystemDagFetcher would just be the logic to copy from <settings.dags_folder> to /var/lib/airflow/dags.
  • Regarding a proper security model, I'd be happy to hear suggestions.

I felt like starting the implementation would help to get things moving, but I'm happy to create a design doc.

@mistercrunch
Copy link
Member

mistercrunch commented Mar 21, 2018

A few points to consider:

  • uris: it'd be great to have a URI scheme for dag locations as in s3://mybucket/mydag.tar.gz or hdfs://dags_folders/mydag.1032910.zip, or local:///val/dags/mydag.py there has to be a python lib that already works for fetching files from all sorts of places. If such library exist, maybe we can just have a single RemoteDagFetcher class to handle many of the storage backends. The git one is slightly different perhaps and should have support for a git ref in there like a some SHA git://repolocation/{SOME_GIT_REF_WHICH_COULD_BE_MASTER_OR_SOME_SHA}/myfolder/mydag.py
  • serialization: there's support for .zip files now somehow, and I can see how it would be nice to have this extensible. Proper modeling would call for yet another abstraction for that, though my thought is that the base dag fetcher class can handle it in a method. We'll need some support for fetch, decompress in temp, keep-as-cached, cleanup flow.
  • versioning: there has to be versioning semantics in the API here
  • dags manifest: currently the DAGs list is inferred by crawling through the DAGS_FOLDER (yuck! my bad...), we need alternate ways of defining that. Providing a list of URIs would be a reasonable alternative. Perhaps there's a way to generate that list of URIs given a DAGS_FOLDER. airflow_settings could have a DAG_LIST configuration that defaults to the current DAG_LIST = lambda x: DagBag.crawl_folder_for_dags(conf.get('DAGS_FOLDER')). has to have support for a callable as it may change over time, the scheduler should know when to re-evaluate the callable periodically. We'd probably expect people to move towards maintaining a list manually.
  • assuming version semantics in URIs, perhaps pinning the URI in the DagRun instance. This would allow for version consistency within a DagRun

I think the URI is really key here. Assuming that the URI contains a provenance (s3, hive, hdfs, git), and can contain a version number (people are free to version their artifacts, create symlinks to latests, git refs, ...) and that an extension .zip, .tar.gz can call for some unpacking handling.

A few more thing:

  • caching: hopefully we don't fetch-unpack-interpret when we don't need to, has to be deterministic somehow for different processes on the same box to reuse it. Not sure if caching logic should live (DagBag? DagFetcher?)
  • would be great to move towards a more stateless web server, I'm not sure where that fits in, it's probably not a great idea to handle the DAG fetching within the web request's scope

Sorry for the large brain dump here. This is a bit of an hairy issue.

@jgao54
Copy link

jgao54 commented Mar 21, 2018

@diogoalexandrefranco a design doc would be the ideal next step. This PR is a really awesome initiative, and it would be good that we prune out the design before merging this in case folks start adapting this current design and we end up making bw-incompatible changes later.

@mistercrunch Using URI scheme to determine dag fetching mechanism is fine with me as well, but I think adding flexibility to the fetcher on how the URI is parsed will be useful (A bit far-fetched (no pun intended) example, which relates to manually maintaining a list of dags, would be someone decided to use a text file on a local machine to maintain their dag list, then it would be useful to have a custom text file fetcher that allow them to fetch the dags from those listed in the text file.)

I'm also excited that the dag fetcher should be able to get rid of the work-around with https://issues.apache.org/jira/browse/AIRFLOW-97

@mistercrunch
Copy link
Member

Yeah AIRFLOW-97 is ugly... I was just scared after we triggered unexpected code that was in module scope (no if __name__ == __main__ section...) somewhere in DAGS_FOLDER.

About the DAGS_MANIFEST idea, if it can receive a callable, people can do whatever they want to manage their dictionary of dag_ids -> URIs. If they want a hard coded list that's fine, if they want to parse a yaml file they can do that, use the current crawler logic equivalent it'sok, if they want to put it in a models.Variable (yuck) that works too :)

In retrospec the DagBag being a dynamic thing parsing through a folder has drawback and I'd rather move to enforce something more explicit. Turns out parsing a folder full of python code to find objects of a certain type is a bad idea. Something as simple as sys.exit() or time.sleep(10**10) in any module could create major problems (though I think we've built up quite a immune system around those at this point)

@diogoalexandrefranco
Copy link
Contributor Author

A lot to unpack here :) Thank you for your help! With so much on the table, I'm thinking that it makes sense to explicitly divide this into phases. The base re-architecture seems like the low hanging fruit we could merge first, allowing independent development of many of these features afterwards. Something like:

Base re-arch:

  • Dag Fetcher abstraction with BaseDagFetcher
  • Storage location definition (Max called it caching).
  • Backwards compatible FileSystemDagFetcher with its fetching logic from dags_folder to the cache.
  • Dispatching base on URI scheme prefix

It seems like the above could be done in such a way where things would be strictly improved, even before further work.

Independent features:

  • Dag Fetchers for all the things
  • Proper unpacking based on suffix (separate .zip logic as well as new .tar.gz)
  • Version semantics
  • Security model for dag fetching
  • SETTINGS.DAG_FETCHER, to support multiple DagFetchers for the same URI scheme
  • SETTINGS.DAG_LIST callable, to support explicit manifest of which dags to fetch
  • Tackle version consistency within a DagRun instance
  • Webserver performance when fetching remotely

How should we go about starting the design doc? The Airflow space in Confluence seems like the right place, but I don't have permission to create a new page.

@mistercrunch
Copy link
Member

https://github.com/RaRe-Technologies/smart_open may help, not sure if it's a perfect fit though.

You should be able to get a confluence account as you get an Apache JIRA account, in the meantime a public gdoc or gist or anything that doesn't requires creds works fine by me.

@jgao54
Copy link

jgao54 commented May 31, 2018

@diogoalexandrefranco just want to check up with you to make sure you are not blocked from the confluence permission issue, and how the design doc is going :)

@astahlman
Copy link
Contributor

@diogoalexandrefranco Friendly ping - are you still planning on abstracting out the BaseDagFetcher, or is that work up for grabs?

@mistercrunch
Copy link
Member

@astahlman looks like it's up for grabs, happy to help

@feng-tao
Copy link
Member

@astahlman , let me know how I can help. This should be very interesting.

@diogoalexandrefranco
Copy link
Contributor Author

diogoalexandrefranco commented Aug 14, 2018 via email

@feng-tao
Copy link
Member

Andrew @astahlman , FYI, Airflow starts using AIF(airflow improvement proposal). With this kinda change, I think we need to have a wiki to document the initial design(https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals).

@idavison
Copy link

idavison commented Oct 16, 2018

@astahlman Are you still working on this PR? My coworker and I are interested in helping out with this, including drafting up an AIP.

@astahlman
Copy link
Contributor

@idavison I am not working on it - all yours!

@idavison
Copy link

Hey guys, I've drafted up a proposal for the dag fetcher. I've tried to keep in mind everything people have discussed here previously, but I'm sure I've missed some stuff. I'd appreciate people's thoughts on this and I have some questions about different design decisions we'll need to make .

https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-5+DagFetcher

Thanks!

@stale
Copy link

stale bot commented Dec 20, 2018

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 20, 2018
@stale stale bot closed this Dec 27, 2018
@dthauvin
Copy link

Hello guys , this is an important feature for large scale Airflow setup . what are the news ?

@ismailsimsek
Copy link
Contributor

looking forward to this feature. it might be good idea to have rest endpoint to trigger the fetch operation
ex:

  1. dags uploaded to s3
  2. rest endpoint called
  3. new dags fetched from remote location

@asaf400
Copy link

asaf400 commented Dec 22, 2019

For some reason this PR and the accompanying AIP have been either forgotten or abandoned ..

I am posting this comment in hopes that someone can push this forward..

@neodino
Copy link

neodino commented Mar 12, 2020

Any news?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.