Skip to content

Conversation

@turbaszek
Copy link
Member

@turbaszek turbaszek commented Oct 19, 2019

Make sure you have checked all steps below.

Jira

Description

  • Here are some details about my PR, including screenshots of any UI changes:
    This PR introduces refactored Dataproc operators that use python client for Dataproc instead of discovery API. Job operators like DataProcHadoopOperator are going to be deprecated and user should use generic DataprocSubmitJobOperator. In general new operators will accept only request objects like Cluster or Job. I've added helper methods that will help users to generate those objects using their old operators.

This PR also decouples hook, operators any additional Dataproc helpers.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain docstrings that explain what it does
    • If you implement backwards incompatible changes, please leave a note in the Updating.md so we can assign it to a appropriate release

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested this change with the original DAG?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that was how I checked backwards compatibility. Channing example DAG was last thing I did. If you wish I can change the DAG in separate PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DeprecationWarning,
DeprecationWarning,
stacklevel=2

@turbaszek
Copy link
Member Author

@mik-laj, @potiuk can you take a look at the Travis? I am not sure if this is something flaky or not.
https://api.travis-ci.org/v3/job/599993904/log.txt

@mik-laj
Copy link
Member

mik-laj commented Oct 19, 2019

It looks like some kind of dependencies has exploded again.

@mik-laj
Copy link
Member

mik-laj commented Oct 19, 2019

I restarted failed job.

@codecov-io
Copy link

codecov-io commented Oct 21, 2019

Codecov Report

Merging #6371 into master will increase coverage by 0.19%.
The diff coverage is 86.91%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master   #6371      +/-   ##
=========================================
+ Coverage    83.3%   83.5%   +0.19%     
=========================================
  Files         645     645              
  Lines       37291   37356      +65     
=========================================
+ Hits        31067   31194     +127     
+ Misses       6224    6162      -62
Impacted Files Coverage Δ
...ders/google/cloud/example_dags/example_dataproc.py 0% <0%> (ø)
airflow/contrib/hooks/gcp_dataproc_hook.py 100% <100%> (ø) ⬆️
airflow/contrib/operators/dataproc_operator.py 100% <100%> (ø) ⬆️
...rflow/providers/google/cloud/operators/dataproc.py 88.27% <93.91%> (ø)
airflow/providers/google/cloud/hooks/dataproc.py 98.6% <98.6%> (ø)
...rflow/gcp/example_dags/example_gcp_dlp_operator.py
airflow/gcp/example_dags/example_dlp_operator.py 100% <0%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3a3730e...ec33459. Read the comment docs.

@turbaszek turbaszek force-pushed the gcp-dataproc-retro branch 3 times, most recently from 4f56a2f to 6661b9a Compare October 30, 2019 10:24
@turbaszek turbaszek force-pushed the gcp-dataproc-retro branch 2 times, most recently from 6e90966 to 4c1e2e9 Compare November 5, 2019 11:25
@turbaszek turbaszek requested a review from mik-laj November 5, 2019 14:09
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one change -> providers/google/cloud

@turbaszek turbaszek requested a review from potiuk November 13, 2019 14:51
@potiuk potiuk merged commit d633d3a into apache:master Nov 13, 2019
@turbaszek turbaszek deleted the gcp-dataproc-retro branch December 4, 2019 11:19
@dossett
Copy link
Contributor

dossett commented Feb 3, 2020

This seems to have made some substantive changes to the behavior of the operators, was that intended? For example, functionality to detect and reattach to a running job is gone. See the code starting here for functionality that wasn't ported. d633d3a#diff-0be5a6bccaef6a25d9ff5d63a92a12f0L64-L65

@potiuk @mik-laj @nuclearpinguin

@dossett
Copy link
Contributor

dossett commented Feb 3, 2020

That link may not expand well. It's to line 64 in the airflow/gcp/hooks/dataproc.py that was renamed as part of this change.

@dossett
Copy link
Contributor

dossett commented Feb 4, 2020

The functionality that was apparently lost is AIRFLOW-3211

@ghost
Copy link

ghost commented Feb 14, 2020

@dossett, the functionality added in AIRFLOW-3211 actually broke the behavior of the dataproc hook and made a few 1.10.x releases unusable for dataproc users. The problem is that the hook only uses the task ID part of the dataproc job ID when looking for previous invocations of the job, so if dataproc history still has jobs corresponding to any of the previous dag runs, the dataproc hook doesn't execute the job. A proper way to implement this would be to associate dataproc jobs with particular dag runs by e.g. embedding a dag run id hash in the dataproc job id. In any case the functionality added in AIRFLOW-3211 has to be optional. In our experience, users expect dataproc jobs to be re-executed when they re-execute the task, and this new behavior creates a lot of confusion.

@dossett
Copy link
Contributor

dossett commented Feb 19, 2020

@digger I take your points about the original change. Do you know if reverting that functionality was an intentional part of this PR?

@ghost
Copy link

ghost commented Feb 20, 2020

@dossett, I don't know if that was intentional. I just shared my feedback on changes made in AIRFLOW-3211. In the company I work for we use Airflow and we had to patch Airflow 1.10.7 and 1.10.9, reverting changes from AIRFLOW-3211, in order to make the dataproc functionality work.

@turbaszek
Copy link
Member Author

@digger is right, Dataproc was not working (tested using example DAG). I've tried to do my best to preserve as much backward compatibility as I could. The present implementation is much clearer and uses Google's python library. Without operators - hooks coupling it should be easier to add any needed changes.

I am happy to review PR that adds missing functionality @dossett :)

@dossett
Copy link
Contributor

dossett commented Feb 24, 2020

@nuclearpinguin It looks like functionality added by AIRFLOW-3149 (to more gracefully handle the creation of clusters in an ERROR state) was also reverted, was that also intentional? Was there a JIRA or mailing list discussion of the functional changes that accompanied this API migration? (I'm seriously asking, I searched but did not find anything.)

@turbaszek
Copy link
Member Author

There's a JIRA: https://issues.apache.org/jira/browse/AIRFLOW-5691

No change was intentional. Also, this change was not backported to 1.10.x .

So, from what I understand the submit job operator should:

  • check if job with provided id already exists
  • if yes then the existing job should be used
  • if no create new job

Doesn't request_id work like that?

If the server receives two SubmitJobRequest requests with the same id, then the second request will be ignored and the first Job created and stored in the backend is returned.

https://googleapis.dev/python/dataproc/latest/gapic/v1/api.html#google.cloud.dataproc_v1.JobControllerClient.submit_job

@dossett @digger

@dossett
Copy link
Contributor

dossett commented Mar 20, 2020

@turbaszek I hope to return to this topic once world events stabilize and my kids return to school. Cheers! -Aaron

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants