Skip to content

Conversation

@hankehly
Copy link
Contributor

@hankehly hankehly commented Oct 16, 2022

Closes: #25952
Related: #26003

Summary

This PR adds the following operators to the amazon provider package.

  1. RdsStartDbOperator - Starts an RDS instance or cluster, optionally creates a snapshot.
  2. RdsStartDbOperator - Stops an RDS instance or cluster

Todo

  • Add RdsStopDbOperator
  • Add RdsStartDbOperator
  • Update docs
  • Add unit tests
  • Add system tests
  • Run system tests

self.wait_for_completion = wait_for_completion

def execute(self, context: Context) -> str:
self.db_type = RdsDbType(self.db_type)
Copy link
Contributor Author

@hankehly hankehly Oct 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

db_type is a templated field. Casting to RdsDbType in the constructor will raise an exception when users pass template strings, so doing it here instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you describe the reason for making db_type a templated field?
It could be either instance or cluster so do we need to make it more complex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other RDS settings are templated, so including db_type seemed natural. A user could generate a list of RDS instances/clusters to switch on/off at particular times of the day. Templating could be helpful in that case. Given it's a single-line change, the tradeoff between added flexibility and increased complexity seems appropriate to me here.


.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_rds_instance.py
:language: python
:dedent: 4
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Screen Shot 2022-10-16 at 15 18 34

elif item_type == 'event_subscription':
subscriptions = self.hook.conn.describe_event_subscriptions(SubscriptionName=item_name)
return subscriptions['EventSubscriptionsList']
elif item_type == "db_instance":
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the same changes as those made to RdsBaseSensor in #26003.
Related: #25952 (comment)

self.hook.conn.get_waiter("db_cluster_available").wait(DBClusterIdentifier=self.db_identifier)


class RdsStopDbOperator(RdsBaseOperator):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop an RDS instance/cluster. Optionally create a snapshot.

return json.dumps(delete_db_instance, default=str)


class RdsStartDbOperator(RdsBaseOperator):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start an RDS instance/cluster.

@hankehly hankehly marked this pull request as ready for review October 16, 2022 08:32
@hankehly hankehly requested a review from eladkal as a code owner October 16, 2022 08:32
@hankehly hankehly mentioned this pull request Oct 16, 2022
2 tasks
@hankehly
Copy link
Contributor Author

@potiuk @ferruzzi @kazanzhy @o-nikolas @vincbeck
Please review this PR at your earliest convenience.

Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple small nits but otherwise a very clean PR!

Also note: There are some static check failures and doc build failures that needs to be addressed (mostly mypy type related stuff for static checks). You can enable pre-commit to run static checks on your code before submitting a PR (readme) and you can also build docs locally as well (breeze build-docs --package-filter apache-airflow-providers-amazon)

instance_snapshots = snapshot_result.get("DBSnapshots")
assert instance_snapshots
assert len(instance_snapshots) == 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth adding a test for stopping the DB cluster with a snapshot and use the caplog fixture (there are examples in other airflow tests) to ensure your warning message is logged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea. Please see test_stop_db_cluster_create_snapshot_logs_warning_message in the same file.

stop_db_instance = RdsStopDbOperator(
task_id="stop_db_instance",
db_identifier=rds_db_identifier,
wait_for_completion=True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_for_completion=True is a default value, you can drop it from this call (similar to how RdsCreateDbInstanceOperator` does above.

response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
return response

def _wait_until_db_available(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could use here already written _await_status method.
It will reduce the code and unify all operators within this file.
WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kazanzhy
Thanks for your review. I agree, that would be more consistent. What concerns me is that when using _await_status we need to specify all possible wait_statuses. I observed the following states when creating an RDS instance, but depending on the settings, there may be more. (And they might change in the future?)

  • Creating
  • Backing-up
  • Configuring-enhanced-monitoring

Rather than listing every possible state or risk "missing one," I think using the builtin "waiter" implementation is more appropriate here. We only need to check for the "available" state.

@o-nikolas
I noticed you work at Amazon, is there anything you'd like to add to this discussion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we should use waiters as much as possible. They come with nice features such as retry with exponential backoff, ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our preference for AWS code is to use the boto Waiters wherever possible. I don't have the context to know whether the existing _await_status method could be updated to use waiters or not, but that would be my preference rather than the other direction. But I also think that's out of scope for this PR. So how you've got it now is fine, IMHO.

Copy link
Contributor

@eladkal eladkal Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused on why the _await_status is on operator level rather than on Hook level.

What concerns me is that when using _await_status we need to specify all possible wait_statuses

Why? We need to wait for Available status only aren't we?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yupp, will do 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just dogpiling on the "we should be using waiters everywhere" train. There are a few other places (The EKS operators I wrote before I knew about the boto waiters, for example) in the package where we have custom wait methods but the official boto ones would be cleaner and easier alternatives. It might make a nice task/project for new contributors to go through and see where they can be swapped out?

Copy link
Contributor

@kazanzhy kazanzhy Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal I remember that I implemented _await_status to use the single method for different operators. I was the best solution in my opinion at that time. I knew about waiters but I guess there weren't all waiters that I needed.

I agree with @ferruzzi, let's use _await_status in this PR and then refactor the module and add waiters where it's possible. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you all for the feedback. Consensus:

  • Leave current changes as-is
  • Refactor waiting logic in #27096

response = self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.db_identifier)
return response

def _wait_until_db_stopped(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same suggestion. Let's use _await_status method here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used around lines 802-808. (Could there be a misunderstanding?)

response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
return response

def _wait_until_db_available(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we should use waiters as much as possible. They come with nice features such as retry with exponential backoff, ...

@hankehly hankehly requested review from ferruzzi and removed request for eladkal October 18, 2022 02:02
@hankehly hankehly requested review from ferruzzi, kazanzhy and o-nikolas and removed request for ferruzzi, kazanzhy and o-nikolas October 18, 2022 02:02
@hankehly
Copy link
Contributor Author

@o-nikolas @ferruzzi @eladkal @kazanzhy
Please see/approve the latest feedback changes at your earliest convenience.

@eladkal eladkal requested review from eladkal and removed request for kazanzhy October 18, 2022 16:55
Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add RDS operators/sensors

6 participants