Skip to content

Conversation

@turbaszek
Copy link
Member

@turbaszek turbaszek commented Jun 30, 2020

Closes #8903

This operator work in the following way:

  • it calculates unique hash of the job using job's configuration or uuid if force_rerun is True
  • creates job_id in form of
    [provide_job_id | airflow_{dag_id}_{task_id}_{exec_date}]_{uniqueness_suffix}
  • submits a BigQuery job using the job_id
  • if job with given id already exists then it tries to reattach to the job if its not done and its
    state is in reattach_states. If the job is done the operator will raise AirflowException.

Using force_rerun will submit a new job everytime wihtout attaching to already existing ones.


Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@boring-cyborg boring-cyborg bot added the provider:google Google (including GCP) related issues label Jun 30, 2020
@potiuk potiuk marked this pull request as ready for review June 30, 2020 14:51
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
job_id = f"{self.job_id}_{int(time())}"
job_id = f"{job_id}_{int(time())}"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see - yet you won't be able to re-poll for this job since it uses a the current time, which is not reproducible on an "eventual" next run.
Though much better than what is now

@turbaszek
Copy link
Member Author

@albertocalderari @edejong what do you think?

Copy link
Contributor

@albertocalderari albertocalderari Jul 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f"airflow_{self.dag_id}{self.task_id}{exec_date}" the job won't be clearable this way which might result in unexpected behavours

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this before you LOL

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works in case we want to reattach to running job or job that succeded. The case of rerunning failed task is handled in L1727

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this do to the the behavior for re-running a DAG for an execution date?
Would this not by default reattach to the originally succeeded job (rather than expected behavior or re-running the job)? Should this be configurable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is: if job succeded why should we rerun it instead of using the existing result?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In short, the underlying table / partition could have changed between the original run and the new run and in many cases the user will want to ensure that the result of this job is based on a fresh run of the query.

Airflow cannot know if the underlying table(s) have changed since the original run.

To give an example use case:
Imagine a streaming job in charge of inserting records to an hourly partitioned fact table and a DAG responsible for running some hourly analytics query (joins to dimension tables / aggregation). Once the processing time watermark passes the end of the partition interval original scheduled dag run runs (BigQueryInsertJobOperator task runs a query on the data at this processing time). Then late data (event time << processing time) arrives that should still go to the old partition (because partitioning is on event time). When the streaming job detects this it could use some mechanism (e.g. the REST API or an alert to a human who manually re-runs the dag for an execution date) to tell the DAG (and this BigQueryInsertJobOperator) to re-run because the original run for this interval / partition was actually against incomplete data.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@turbaszek I tend to agree with @jaketf. We frequently might want to rerun a DAG for a prior, successful execution date at some point in the future that would overwrite data in a partitioned BigQuery table. Typically this might occur because of a change in the business logic.

So, in my opinion, the job_id needs to be more unique than just airflow_{self.dag_id}_{self.task_id}_{exec_date}_ in order to account for this.

If I clear the status of a task in a DAG, I expect that task to be fully rerun.

Copy link
Member

@mik-laj mik-laj Jul 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will not be able to create universal behavior that will keep idempotency. Some users want to always execute SQL query, others want to use predetermined results, and others want to re-query only when it is backfill. All the cases sound correct and they cannot always be combined into a single operator without additional parameters.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then there should be additional parameters. IMHO, I don't think it is controversial to suggest that, when the status of a BQ task is cleared, the default behaviour should be that the query if fully rerun. Other cases sound like less common scenarios (in my experience at least).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the query if fully rerun.

That was the original behaviour as the job_id was always generated by discovery API.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I'd say that should be the default going forward too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will not be able to create universal behavior that will keep idempotency. Some users want to always execute SQL query, others want to use predetermined results, and others want to re-query only when it is backfill. All the cases sound correct and they cannot always be combined into a single operator without additional parameters.

This is an interesting point. Perhaps we consider bringing back the deprecated BigQueryOperator (contract is always runs a new query) and than have this BigQueryInsertJobOperator (contract is create job if not succeeded exists).

IMHO we can have both behaviors in a single operator with a force_rerun=True parameter that controls how the job id is generated.

# choice of uuid 
uniqueness_hash = hash(uuid.uuid4()) if self.force_rerun else hash(job_config)

job_id = f"{dag_id}{task_id}{exec_date}{uniqueness_hash}"

@turbaszek
Copy link
Member Author

Related #10014

@turbaszek
Copy link
Member Author

How about using a hash of job config to create job_id? {dag_id}{task_id}{exec_date}{hash(job_config)}
This will not solve everything but we will be able to handle:

  • creation of new job
  • reattach to the job which is running (with same config)
  • create new job in case of rerun / backfill with new changed config

We can also consider using try_number somewhere. Until this change, we were using random job_id, so the second point was not possible.

The uniqueness of job_id enforces a lot of limitation and I think user should be aware of that and I'm not sure if we will be able to handle 100% cases.

@nathadfield
Copy link
Collaborator

How about using a hash of job config to create job_id? {dag_id}{task_id}{exec_date}{hash(job_config)}

I can think of scenario we could easily face where a task was successful but there was missing data in the source at the time the job was run. Rerunning in this context would mean that the job_config would not change if we are talking about a SQL query.

I'd argue that the job_id of each try should be unique so including try_number sounds like a good idea.

@jaketf
Copy link
Contributor

jaketf commented Jul 29, 2020

If the job config is the same AND underlying data hasn't changed then in most cases it should not be expensive to run the job again as it will be served from cache by the BigQuery service (unless corner case that query uses a non-deterministic function like CURRENT_[DATE, USER]).

I can think of scenario we could easily face where a task was successful but there was missing data in the source at the time the job was run. Rerunning in this context would mean that the job_config would not change if we are talking about a SQL query.

I totally agree these scenarios exist where job config is exactly the same but the referenced tables have changed due to some other process (e.g. partition of data received late streaming inserts so we're re-running task for that partition, or external table on GCS references a wildcard where there are new files present).

Note that idempotency formally means same inputs => same outputs. @nathadfield makes the case that the inputs are the job config AND all data referenced in the query.

I feel like the default should remain always run a new query and for those concerned about not firing extra API calls to query cache or pods dying during job execution, they can specify force_rerun=False on a per use cases basis as I believe this is the exception not the rule.

However, maybe we can validate this assumption w/ more data points / opinions than me and @nathadfield.
I suggest we ping the dev / user list and airflow gcp slack to send around some sort of poll on this so we can get in touch with more users of BigQuery integration.

@jaketf
Copy link
Contributor

jaketf commented Jul 29, 2020

Just read the original issue which seems focused on not wasting resources when a pod dies in the middle of task execution.
For this reason I think we satisfy everyone if the contract is this

Use @turbaszek's proposed {dag_id}{task_id}{exec_date}{hash(job_config)} BUT only reattach if the job is in a PENDING, or RUNNING state. If it is SUCCEEDED or faile we generate a new ID for uniqueness and resubmit.

This way if the original query succeeded and no data has changed the second query will be served from cache.
If data has changed the query will run again.
If the query is in flight we will reattach.

@edejong
Copy link
Contributor

edejong commented Jul 30, 2020

Although a pretty cool solution, the difficulty I have with "reattach when the job has not finished, otherwise start a new one" is that that's quite a bit of implicit logic to follow, that may or may not be what people expect.

I would argue it's better to just keep things as predictable as possible, which means that by default it's probably best to just launch a new job. This will work fine and predictably for most use cases, and doesn't always lead to extra costs as the BigQuery cache might sometimes be used. In other words add the try number to the job id.

Then for the exceptions we can think about adding an option to the Operator to either reattach to the previous job, or cancel it if it's still running. This is up to the user's specific use case scenario. So here don't use try number in the job id, then if a new job is needed make some small modification to the previous one.

That way we can start out in a predictable, simple way and more complicated use case scenarios can be added iteratively in future versions as needed. Also, because whatever happens to a previous job is explicitly set by the user it won't lead to many support questions which is what I fear otherwise. Use case scenarios vary a lot and there's really no one size fits all.

@jaketf
Copy link
Contributor

jaketf commented Jul 30, 2020

Good point on predictability.
Especially for those migrating from the old BQ Operator contract (of always submit new job) it will be most predictable to have the same contract by default.
I think parameterizing a reattach_states: Set[str] which defaults to empty set feels really flexible this way user can say reattach to {"PENDING", "RUNNING"} or {"PENDING","RUNNING", "DONE"}.
Unfortunately, I think we'd need yet another parameter for retry_on_reattach_to_error: bool default to True (but won't have any affect unless `"DONE" in self.reattach_states).

For context I believe this is the enum of job states

This notably does not distinguish between SUCCEEDED and ERROR but that requires the client code to checking the errorResult in jobstatus https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus

@edejong
Copy link
Contributor

edejong commented Jul 31, 2020

I really like the idea of having a set of reattach_states, it's so explicit. I'd have to draw up some state diagrams to make sure sure this fulfils common use case scenarios to be sure but intuitively this sounds like the way to go to me.

@potiuk
Copy link
Member

potiuk commented Aug 2, 2020

Very interesting discussion :).

I really like the proposal of @jaketf - with the explicitness of it (all these options are merely optimisations at the expense of predictability), but I wanted to modify it slightly. I believe there is no need to limit ourselves to the states defined in https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/JobState. With 4 states, we can easily define our own states:

  • PENDING
  • RUNNING
  • SUCCESS
  • FAILURE

This will be far less confusing than additional retry_on_reattach_to_error parameter.

@turbaszek turbaszek force-pushed the improve-idempotency-bq-insert-job-op branch from c644a5c to b8183cb Compare August 3, 2020 13:01
@turbaszek
Copy link
Member Author

turbaszek commented Aug 3, 2020

@edejong @jaketf @potiuk @nathadfield I added changes the force_rerun, reattach_states and job_id from configuration has. This operator works now in the following way:

  • it calculates a unique hash of the job using job's configuration or uuid if force_rerun is True
  • creates job_id in form of
    [provide_job_id | airflow_{dag_id}_{task_id}_{exec_date}]_{uniqueness_suffix}
  • submits a BigQuery job using the job_id
  • if job with given id already exists then it tries to reattach to the job if its not done and its
    state is in reattach_states. If the job is done the operator will raise AirflowException.

Using force_rerun will submit a new job every time without attaching to already existing ones.

@turbaszek turbaszek force-pushed the improve-idempotency-bq-insert-job-op branch from b8183cb to f704528 Compare August 3, 2020 13:11
Copy link
Contributor

@jaketf jaketf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@turbaszek appreciate all the work put into this to make something flexible for many use cases!
One last point, by default (force_rerun=True and reattach_states = None)if we are in scenario where there is a matching job id we throw an exception saying "consider setting force_rerun = True" This seems less than ideal.

@turbaszek
Copy link
Member Author

One last point, by default (force_rerun=True and reattach_states = None)if we are in scenario where there is a matching job id we throw an exception saying "consider setting force_rerun = True" This seems less than ideal.

Correct me if I'm wrong, using force_rerun will generate a unique job_id so there should be no matching job_id in Big Query?

@turbaszek turbaszek requested a review from mik-laj August 10, 2020 09:52
@turbaszek
Copy link
Member Author

@potiuk @mik-laj @olchas would you mind taking a look before merge?

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@olchas olchas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as well.

@albertocalderari
Copy link
Contributor

Wooohooooooo!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

BigQueryHook refactor + deterministic BQ Job ID

8 participants