In the process of investigating the issue reported here:
https://stackoverflow.com/questions/74390325/how-to-enable-elasticsearchio-parallel-reads-in-apache-beam
it appears that the method used by the ElasticsearchIO connector to get the estimated size of the data in the response is not accounting for the case where the configured index is an alias or a datastream or an index pattern which can point to multiple indexes.
The original issue was a query that returns over 100 million documents for processing in the pipeline was unable to scale and was only processing at a rate of 40 / second.
As discussed in the stackoverflow thread, the code here:
|
public long getEstimatedSizeBytes(PipelineOptions options) throws IOException { |
is not properly accounting for a number of scenarios where the index name returned by ElasticSearch is different than connectionConfiguration.getIndex().
ElasticSearch should be relied upon to return the proper indexes for a given stats query, and as such the _all object should be used instead of the indicies top level object. If there are other cases where the _all object isn't appropriate, then the code should iterate through all of the indicies returned under the indices field and sum the total store size, and not simply try to match the configured index.
Issue Priority
Priority: 2
Issue Component
Component: io-java-elasticsearch
In the process of investigating the issue reported here:
https://stackoverflow.com/questions/74390325/how-to-enable-elasticsearchio-parallel-reads-in-apache-beam
it appears that the method used by the ElasticsearchIO connector to get the estimated size of the data in the response is not accounting for the case where the configured index is an alias or a datastream or an index pattern which can point to multiple indexes.
The original issue was a query that returns over 100 million documents for processing in the pipeline was unable to scale and was only processing at a rate of 40 / second.
As discussed in the stackoverflow thread, the code here:
beam/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
Line 871 in c7f2cab
is not properly accounting for a number of scenarios where the index name returned by ElasticSearch is different than
connectionConfiguration.getIndex().ElasticSearch should be relied upon to return the proper indexes for a given stats query, and as such the
_allobject should be used instead of theindiciestop level object. If there are other cases where the_allobject isn't appropriate, then the code should iterate through all of the indicies returned under theindicesfield and sum the total store size, and not simply try to match the configured index.Issue Priority
Priority: 2
Issue Component
Component: io-java-elasticsearch