-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add RdsStopDbOperator and RdsStartDbOperator #27076
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| self.wait_for_completion = wait_for_completion | ||
|
|
||
| def execute(self, context: Context) -> str: | ||
| self.db_type = RdsDbType(self.db_type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
db_type is a templated field. Casting to RdsDbType in the constructor will raise an exception when users pass template strings, so doing it here instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you describe the reason for making db_type a templated field?
It could be either instance or cluster so do we need to make it more complex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other RDS settings are templated, so including db_type seemed natural. A user could generate a list of RDS instances/clusters to switch on/off at particular times of the day. Templating could be helpful in that case. Given it's a single-line change, the tradeoff between added flexibility and increased complexity seems appropriate to me here.
|
|
||
| .. exampleinclude:: /../../tests/system/providers/amazon/aws/example_rds_instance.py | ||
| :language: python | ||
| :dedent: 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| elif item_type == 'event_subscription': | ||
| subscriptions = self.hook.conn.describe_event_subscriptions(SubscriptionName=item_name) | ||
| return subscriptions['EventSubscriptionsList'] | ||
| elif item_type == "db_instance": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the same changes as those made to RdsBaseSensor in #26003.
Related: #25952 (comment)
| self.hook.conn.get_waiter("db_cluster_available").wait(DBClusterIdentifier=self.db_identifier) | ||
|
|
||
|
|
||
| class RdsStopDbOperator(RdsBaseOperator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stop an RDS instance/cluster. Optionally create a snapshot.
| return json.dumps(delete_db_instance, default=str) | ||
|
|
||
|
|
||
| class RdsStartDbOperator(RdsBaseOperator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start an RDS instance/cluster.
|
@potiuk @ferruzzi @kazanzhy @o-nikolas @vincbeck |
o-nikolas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple small nits but otherwise a very clean PR!
Also note: There are some static check failures and doc build failures that needs to be addressed (mostly mypy type related stuff for static checks). You can enable pre-commit to run static checks on your code before submitting a PR (readme) and you can also build docs locally as well (breeze build-docs --package-filter apache-airflow-providers-amazon)
| instance_snapshots = snapshot_result.get("DBSnapshots") | ||
| assert instance_snapshots | ||
| assert len(instance_snapshots) == 1 | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth adding a test for stopping the DB cluster with a snapshot and use the caplog fixture (there are examples in other airflow tests) to ensure your warning message is logged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice idea. Please see test_stop_db_cluster_create_snapshot_logs_warning_message in the same file.
| stop_db_instance = RdsStopDbOperator( | ||
| task_id="stop_db_instance", | ||
| db_identifier=rds_db_identifier, | ||
| wait_for_completion=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait_for_completion=True is a default value, you can drop it from this call (similar to how RdsCreateDbInstanceOperator` does above.
| response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier) | ||
| return response | ||
|
|
||
| def _wait_until_db_available(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could use here already written _await_status method.
It will reduce the code and unify all operators within this file.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kazanzhy
Thanks for your review. I agree, that would be more consistent. What concerns me is that when using _await_status we need to specify all possible wait_statuses. I observed the following states when creating an RDS instance, but depending on the settings, there may be more. (And they might change in the future?)
- Creating
- Backing-up
- Configuring-enhanced-monitoring
Rather than listing every possible state or risk "missing one," I think using the builtin "waiter" implementation is more appropriate here. We only need to check for the "available" state.
@o-nikolas
I noticed you work at Amazon, is there anything you'd like to add to this discussion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, we should use waiters as much as possible. They come with nice features such as retry with exponential backoff, ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our preference for AWS code is to use the boto Waiters wherever possible. I don't have the context to know whether the existing _await_status method could be updated to use waiters or not, but that would be my preference rather than the other direction. But I also think that's out of scope for this PR. So how you've got it now is fine, IMHO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused on why the _await_status is on operator level rather than on Hook level.
What concerns me is that when using _await_status we need to specify all possible wait_statuses
Why? We need to wait for Available status only aren't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yupp, will do 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just dogpiling on the "we should be using waiters everywhere" train. There are a few other places (The EKS operators I wrote before I knew about the boto waiters, for example) in the package where we have custom wait methods but the official boto ones would be cleaner and easier alternatives. It might make a nice task/project for new contributors to go through and see where they can be swapped out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eladkal I remember that I implemented _await_status to use the single method for different operators. I was the best solution in my opinion at that time. I knew about waiters but I guess there weren't all waiters that I needed.
I agree with @ferruzzi, let's use _await_status in this PR and then refactor the module and add waiters where it's possible. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you all for the feedback. Consensus:
- Leave current changes as-is
- Refactor waiting logic in #27096
| response = self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.db_identifier) | ||
| return response | ||
|
|
||
| def _wait_until_db_stopped(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same suggestion. Let's use _await_status method here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used around lines 802-808. (Could there be a misunderstanding?)
| response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier) | ||
| return response | ||
|
|
||
| def _wait_until_db_available(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, we should use waiters as much as possible. They come with nice features such as retry with exponential backoff, ...
|
@o-nikolas @ferruzzi @eladkal @kazanzhy |
Co-authored-by: Vincent <[email protected]>
Co-authored-by: Vincent <[email protected]>
eladkal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM

Closes: #25952
Related: #26003
Summary
This PR adds the following operators to the amazon provider package.
Todo
RdsStopDbOperatorRdsStartDbOperator