Skip to content

[BEAM-10990] Elasticsearch response filtering and [BEAM-5172] Tries to reduce ES UTest flakiness#15381

Merged
echauchot merged 2 commits intoapache:masterfrom
egalpin:BEAM-10990-elasticsearch-response-filtering
Oct 20, 2021
Merged

[BEAM-10990] Elasticsearch response filtering and [BEAM-5172] Tries to reduce ES UTest flakiness#15381
echauchot merged 2 commits intoapache:masterfrom
egalpin:BEAM-10990-elasticsearch-response-filtering

Conversation

@egalpin
Copy link
Copy Markdown
Member

@egalpin egalpin commented Aug 24, 2021

Adds the ability to prevent infinite retries with non-transient Elasticsearch write failures by providing a user-configurable setting called (for now, I'm not certain the name is perfect) withThrowWriteFailures, which is true by default to maintain backward compatibility.

If withThrowWriteFailures is set to false, the response from Elasticsearch Bulk API will be used to capture the result of persisting a document (or deleting it). The order of Bulk API response is guaranteed to be in the same order as the Bulk API request[1], so we can stitch together what the write result of a given input document was.

This PR introduces a new class WriteSummary. This class is used to collect the context of a document as it passes from raw input document (i.e. the PCollection being sent to ElasticsearchIO.Write/ElasticsearchIO.DocToBulk), the Bulk Directive which resulted from transform settings and the input document (ex. delete directive, upsert, scripted upsert, etc), as well as whether the document resulted in an error from ES. By maintaining all the context/ancestry of the document, users can get at the input document and its resulting error, for example. Without maintaining the ancestry, we could only return the Bulk Directive which would be less helpful in many cases.

[1] https://discuss.elastic.co/t/ordering-of-responses-in-the-bulk-api/13264


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Aug 24, 2021

@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Aug 25, 2021

Run Java PreCommit

@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Sep 1, 2021

Friendly bump. Anyone able to review?

@timrobertson100
Copy link
Copy Markdown
Contributor

timrobertson100 commented Sep 1, 2021

I'm sorry I haven't got time to complete a review, but I have read through the changes once, and it all looks well structured and has good attention to detail on comments, tests etc.

The area I can't comment on immediately is the adapter and windowing behavior - if someone could confirm that section looks reasonable I think it looks good to merge.

@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Sep 1, 2021

Ya I agree that maintaining a record of timestamps and using the context adapter is a lot of complexity. The concept was borrowed heavily from this RedisIO PR: #5841

I'm interested in investigating how much work would be involved in completing BEAM-1287 to allow FinishBundle methods to accept OutputReceiver/MultiOutputReceiver as that would make this type of pattern (which seems to be used in at least a few places) much easier to work with.

@timrobertson100
Copy link
Copy Markdown
Contributor

Thanks @egalpin

@echauchot - if you have time, could you please take a look at the section of changes in ElasticsearchIO from lines 2346 onwards? I'm just not familiar enough with the windowing to verify. The rest of the changes seem very reasonable.

@echauchot
Copy link
Copy Markdown
Contributor

@timrobertson100 sure ! thanks for taking a look !
@egalpin thanks for your work !

@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Sep 2, 2021

Thanks @timrobertson100 for having a look!

@echauchot I’m definitely keen to understand if the strategy in the PR will work and its windowing implications, and would be happy to learn of alternative approaches!

@echauchot
Copy link
Copy Markdown
Contributor

@egalpin sorry for the late review, taking a look now

@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Sep 16, 2021

Thanks for having a look @echauchot! And no problem, I’ve been swamped with work and unable to address the flaky tests so I completely understand!

@aaltay aaltay requested a review from echauchot September 23, 2021 18:58
@echauchot
Copy link
Copy Markdown
Contributor

@aaltay @egalpin sorry guys I had very reduced availability these days because of the Apachecon. Resuming review

@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Sep 27, 2021

Oh I had also forgotten about an alternative solution that would have less complexity, but I’m not sure how it might or might not adhere to best practices as a sink in the system.

We could instead combine inputs globally and forget about maintaining windows altogether. I believe this is what the BQ Write method does, and I think that not maintaining windows in a sink is generally sane 🤷‍♂️

Thoughts?

Copy link
Copy Markdown
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Evan, thanks a lot for your work !
I have some minor change requests around wording but a major concern with the complexity of having to deal with windows. It should be transparent to an IO. I think you can merge input doc and response globally and do not deal with windows.

@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Sep 29, 2021

Thanks for the feedback @echauchot! I wasn't sure if re-windowing within an IO was acceptable but it sounds like that's the path to go. It's way less complex, and I definitely like that. I'll make those changes

@echauchot
Copy link
Copy Markdown
Contributor

echauchot commented Sep 30, 2021

Thanks for the feedback @echauchot! I wasn't sure if re-windowing within an IO was acceptable but it sounds like that's the path to go. It's way less complex, and I definitely like that. I'll make those changes

@egalpin my pleasure !

I mean: you should not change the windows of the elements. In fact, you should not deal with the windows at all. Your problem is to join the input elements with the status (json and error) of the write. You could do:

  • join by doc id but that whould not be possible in case the id is not provided in the input doc (autogeneration). So it is not the correct way to go.
  • you maintain the same order between input docs and WriteSummary objects so you could simply join index 1 with index 1, index 2 with index 2 etc...
    I don't get why you bothered with windows in the first place but maybe there is something I missed.

@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Sep 30, 2021

Thanks for clarifying, that makes sense 👍 My original intent was to leave windows alone entirely and just output without modification. It’s worth noting that the window each element belongs to should be left unmodified, and that’s why the complexity is present.

The challenge arose from the use of FinishBundle, where the output method requires explicit use of a BoundedWindow. So all of the window collection/multimap/adapter complexity is all done so that we could output any buffered elements when FinishBundle is called. If FinishBundle could accept a MultiOutput or OutputReceiver, I believe this would be solved neatly.

I just didn’t want to gate this change on FinishBundle changes, but maybe that’s the right path?

@echauchot
Copy link
Copy Markdown
Contributor

@egalpin I don't get it. Neither BulkIOBaseFn#finishBundle() nor ElasticsearchIO#flushbatch() expect windowing information.

@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Oct 1, 2021

@echauchot It's highly possible that I have a fundamental misunderstanding about how to handle window data. I'm going to try to outline my goals and challenges, and take the proposed implementation out of focus.

Goals:

  • Change BulkIO/Write to output PCollectionTuple rather than PDone, in order to support reporting the status of indexing each input document
  • Leave windows/timestampes/etc of input data entirely unaltered

Challenges:

  • BulkIOBaseFn relies on buffering inputs, either using bundles or Stateful specs
  • BulkIOBaseFn#finishBundle() must be called to ensure that any buffered inputs are sent to ES, and as of this PR, output to the PCollectionTuple
  • DoFn.FinishBundle mehtods can accept a FinishBundleContext in order to output elements
  • FinishBundleContext output methods all require explicit specification of a BoundedWindow instance

I got a bit stuck on that last point. My impression was that in order to ensure buffered docs' results were output, and in order to leave those elements' windows unaltered, I needed to keep track of the windows to which those elements belong so that they could be explicitly passed to FinishBundleContext#output.

I'd definitely be keen to learn more about how to handle windows and challenge my assumptions here. Thanks for your time @echauchot in reviewing and teaching.

@echauchot
Copy link
Copy Markdown
Contributor

echauchot commented Oct 8, 2021

@echauchot It's highly possible that I have a fundamental misunderstanding about how to handle window data. I'm going to try to outline my goals and challenges, and take the proposed implementation out of focus.

Goals:

  • Change BulkIO/Write to output PCollectionTuple rather than PDone, in order to support reporting the status of indexing each input document
  • Leave windows/timestampes/etc of input data entirely unaltered

Challenges:

  • BulkIOBaseFn relies on buffering inputs, either using bundles or Stateful specs
  • BulkIOBaseFn#finishBundle() must be called to ensure that any buffered inputs are sent to ES, and as of this PR, output to the PCollectionTuple
  • DoFn.FinishBundle mehtods can accept a FinishBundleContext in order to output elements
  • FinishBundleContext output methods all require explicit specification of a BoundedWindow instance

I got a bit stuck on that last point. My impression was that in order to ensure buffered docs' results were output, and in order to leave those elements' windows unaltered, I needed to keep track of the windows to which those elements belong so that they could be explicitly passed to FinishBundleContext#output.

I'd definitely be keen to learn more about how to handle windows and challenge my assumptions here. Thanks for your time @echauchot in reviewing and teaching.

Just to write here what we discussed privately yesterday (Apache way: what did not happen publicly did not happen at all):

  • My bad, I did not know about the DoFn#finishBundle() signature change. It now forces specifying a window in the output. I first thought that dealing with windows was not needed and brought unnecessary complexity but it seems it is mandatory 😄
    but I hope it is ony temporary until OutputReceiver is fleshed out.
  • I took a look at the existing IOs that ouput data to PCollections:
    • FhirIO: outputs to last seen window. Seems incorrect.
    • HadoopFormatIO and BigqueryIO store to a map keyed by window to then output per window similarly to what you do
      => So I guess the general window maintaining looks good. I need to look in more details at the code to give LGTM

Copy link
Copy Markdown
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall windowing management and flush* methods look good to me, please address the minor comments and we will be good to merge

@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Oct 18, 2021

Note that e01efcd aims to also address BEAM-5172

@egalpin egalpin requested a review from echauchot October 18, 2021 18:36
@echauchot
Copy link
Copy Markdown
Contributor

BEAM-5172

Nice !
But please, rename commit with [BEAM-5172] header as it refers to a different ticket, I guess the utest docCount decrease commit refers to the same ticket as well. And also please squash the [BEAM-10990] commits together and the [BEAM-5172] commits together.

Copy link
Copy Markdown
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your work Evan ! Almost LGTM. One final thing about adding a test that covers the case that you indicated about PDone. Please also squash as explained, that way we will avoid a round trip and I could directly merge when the above UTest is done.

@egalpin egalpin force-pushed the BEAM-10990-elasticsearch-response-filtering branch from bea212b to f7691e6 Compare October 19, 2021 15:07
@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Oct 19, 2021

Pre commit failure seems to indicate that missing cache entry was the cause 🤔

12:03:32 * What went wrong:
12:03:32 Execution failed for task ':sdks:java:testing:load-tests:compileJava'.
12:03:32 > Failed to load cache entry for task ':sdks:java:testing:load-tests:compileJava'

I'll try re-running and then will dig in further if persistent.

@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Oct 19, 2021

Run Java PreCommit

Copy link
Copy Markdown
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM ! Perfect work and fluid communication as always, thanks Evan !
Merging

@echauchot echauchot changed the title [BEAM-10990] Elasticsearch response filtering [BEAM-10990] Elasticsearch response filtering and [BEAM-5172] Tries to reduce ES UTest flakiness Oct 20, 2021
@echauchot echauchot merged commit a05aa45 into apache:master Oct 20, 2021
@egalpin
Copy link
Copy Markdown
Member Author

egalpin commented Oct 20, 2021

Thanks for your review efforts and insights @echauchot, it was a pleasure as always 🎉

@lukecwik
Copy link
Copy Markdown
Member

lukecwik commented Mar 7, 2022

I believe the flushing logic has a bug where we are outputting to the wrong window: https://issues.apache.org/jira/browse/BEAM-14064

@echauchot
Copy link
Copy Markdown
Contributor

Should be fixed by this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants