-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Improve idempotency of BigQueryInsertJobOperator #9590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve idempotency of BigQueryInsertJobOperator #9590
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| job_id = f"{self.job_id}_{int(time())}" | |
| job_id = f"{job_id}_{int(time())}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see - yet you won't be able to re-poll for this job since it uses a the current time, which is not reproducible on an "eventual" next run.
Though much better than what is now
|
@albertocalderari @edejong what do you think? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f"airflow_{self.dag_id}{self.task_id}{exec_date}" the job won't be clearable this way which might result in unexpected behavours
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this before you LOL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works in case we want to reattach to running job or job that succeded. The case of rerunning failed task is handled in L1727
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this do to the the behavior for re-running a DAG for an execution date?
Would this not by default reattach to the originally succeeded job (rather than expected behavior or re-running the job)? Should this be configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My question is: if job succeded why should we rerun it instead of using the existing result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In short, the underlying table / partition could have changed between the original run and the new run and in many cases the user will want to ensure that the result of this job is based on a fresh run of the query.
Airflow cannot know if the underlying table(s) have changed since the original run.
To give an example use case:
Imagine a streaming job in charge of inserting records to an hourly partitioned fact table and a DAG responsible for running some hourly analytics query (joins to dimension tables / aggregation). Once the processing time watermark passes the end of the partition interval original scheduled dag run runs (BigQueryInsertJobOperator task runs a query on the data at this processing time). Then late data (event time << processing time) arrives that should still go to the old partition (because partitioning is on event time). When the streaming job detects this it could use some mechanism (e.g. the REST API or an alert to a human who manually re-runs the dag for an execution date) to tell the DAG (and this BigQueryInsertJobOperator) to re-run because the original run for this interval / partition was actually against incomplete data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@turbaszek I tend to agree with @jaketf. We frequently might want to rerun a DAG for a prior, successful execution date at some point in the future that would overwrite data in a partitioned BigQuery table. Typically this might occur because of a change in the business logic.
So, in my opinion, the job_id needs to be more unique than just airflow_{self.dag_id}_{self.task_id}_{exec_date}_ in order to account for this.
If I clear the status of a task in a DAG, I expect that task to be fully rerun.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will not be able to create universal behavior that will keep idempotency. Some users want to always execute SQL query, others want to use predetermined results, and others want to re-query only when it is backfill. All the cases sound correct and they cannot always be combined into a single operator without additional parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then there should be additional parameters. IMHO, I don't think it is controversial to suggest that, when the status of a BQ task is cleared, the default behaviour should be that the query if fully rerun. Other cases sound like less common scenarios (in my experience at least).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the query if fully rerun.
That was the original behaviour as the job_id was always generated by discovery API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. I'd say that should be the default going forward too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will not be able to create universal behavior that will keep idempotency. Some users want to always execute SQL query, others want to use predetermined results, and others want to re-query only when it is backfill. All the cases sound correct and they cannot always be combined into a single operator without additional parameters.
This is an interesting point. Perhaps we consider bringing back the deprecated BigQueryOperator (contract is always runs a new query) and than have this BigQueryInsertJobOperator (contract is create job if not succeeded exists).
IMHO we can have both behaviors in a single operator with a force_rerun=True parameter that controls how the job id is generated.
# choice of uuid
uniqueness_hash = hash(uuid.uuid4()) if self.force_rerun else hash(job_config)
job_id = f"{dag_id}{task_id}{exec_date}{uniqueness_hash}"|
Related #10014 |
|
How about using a hash of job config to create
We can also consider using The uniqueness of |
I can think of scenario we could easily face where a task was successful but there was missing data in the source at the time the job was run. Rerunning in this context would mean that the I'd argue that the job_id of each try should be unique so including |
|
If the job config is the same AND underlying data hasn't changed then in most cases it should not be expensive to run the job again as it will be served from cache by the BigQuery service (unless corner case that query uses a non-deterministic function like
I totally agree these scenarios exist where job config is exactly the same but the referenced tables have changed due to some other process (e.g. partition of data received late streaming inserts so we're re-running task for that partition, or external table on GCS references a wildcard where there are new files present). Note that idempotency formally means same inputs => same outputs. @nathadfield makes the case that the inputs are the job config AND all data referenced in the query. I feel like the default should remain always run a new query and for those concerned about not firing extra API calls to query cache or pods dying during job execution, they can specify However, maybe we can validate this assumption w/ more data points / opinions than me and @nathadfield. |
|
Just read the original issue which seems focused on not wasting resources when a pod dies in the middle of task execution. Use @turbaszek's proposed {dag_id}{task_id}{exec_date}{hash(job_config)} BUT only reattach if the job is in a PENDING, or RUNNING state. If it is SUCCEEDED or faile we generate a new ID for uniqueness and resubmit. This way if the original query succeeded and no data has changed the second query will be served from cache. |
|
Although a pretty cool solution, the difficulty I have with "reattach when the job has not finished, otherwise start a new one" is that that's quite a bit of implicit logic to follow, that may or may not be what people expect. I would argue it's better to just keep things as predictable as possible, which means that by default it's probably best to just launch a new job. This will work fine and predictably for most use cases, and doesn't always lead to extra costs as the BigQuery cache might sometimes be used. In other words add the try number to the job id. Then for the exceptions we can think about adding an option to the Operator to either reattach to the previous job, or cancel it if it's still running. This is up to the user's specific use case scenario. So here don't use try number in the job id, then if a new job is needed make some small modification to the previous one. That way we can start out in a predictable, simple way and more complicated use case scenarios can be added iteratively in future versions as needed. Also, because whatever happens to a previous job is explicitly set by the user it won't lead to many support questions which is what I fear otherwise. Use case scenarios vary a lot and there's really no one size fits all. |
|
Good point on predictability. For context I believe this is the enum of job states
This notably does not distinguish between SUCCEEDED and ERROR but that requires the client code to checking the errorResult in jobstatus https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus |
|
I really like the idea of having a set of reattach_states, it's so explicit. I'd have to draw up some state diagrams to make sure sure this fulfils common use case scenarios to be sure but intuitively this sounds like the way to go to me. |
|
Very interesting discussion :). I really like the proposal of @jaketf - with the explicitness of it (all these options are merely optimisations at the expense of predictability), but I wanted to modify it slightly. I believe there is no need to limit ourselves to the states defined in https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/JobState. With 4 states, we can easily define our own states:
This will be far less confusing than additional |
c644a5c to
b8183cb
Compare
|
@edejong @jaketf @potiuk @nathadfield I added changes the
Using force_rerun will submit a new job every time without attaching to already existing ones. |
b8183cb to
f704528
Compare
jaketf
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@turbaszek appreciate all the work put into this to make something flexible for many use cases!
One last point, by default (force_rerun=True and reattach_states = None)if we are in scenario where there is a matching job id we throw an exception saying "consider setting force_rerun = True" This seems less than ideal.
Correct me if I'm wrong, using force_rerun will generate a unique job_id so there should be no matching job_id in Big Query? |
Co-authored-by: Jacob Ferriero <[email protected]>
potiuk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
olchas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as well.
|
Wooohooooooo! |
Closes #8903
This operator work in the following way:
force_rerunis Truejob_idin form of[provide_job_id | airflow_{dag_id}_{task_id}_{exec_date}]_{uniqueness_suffix}job_idstate is in
reattach_states. If the job is done the operator will raiseAirflowException.Using
force_rerunwill submit a new job everytime wihtout attaching to already existing ones.Make sure to mark the boxes below before creating PR: [x]
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.