Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Uses _all to follow alias/datastreams when estimating index size
Fixes #24117
  • Loading branch information
egalpin committed Nov 15, 2022
commit a014637106970a0a0e9eb7944aa5caf79fa5fd37
Original file line number Diff line number Diff line change
Expand Up @@ -873,9 +873,8 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
return estimatedByteSize;
}
final ConnectionConfiguration connectionConfiguration = spec.getConnectionConfiguration();
JsonNode statsJson = getStats(connectionConfiguration, false);
JsonNode indexStats =
statsJson.path("indices").path(connectionConfiguration.getIndex()).path("primaries");
JsonNode statsJson = getStats(connectionConfiguration);
JsonNode indexStats = statsJson.path("_all").path("primaries");
long indexSize = indexStats.path("store").path("size_in_bytes").asLong();
LOG.debug("estimate source byte size: total index size {}", indexSize);

Expand Down Expand Up @@ -927,9 +926,8 @@ static long estimateIndexSize(ConnectionConfiguration connectionConfiguration)
// NB: Elasticsearch 5.x+ now provides the slice API.
// (https://www.elastic.co/guide/en/elasticsearch/reference/5.0/search-request-scroll.html
// #sliced-scroll)
JsonNode statsJson = getStats(connectionConfiguration, false);
JsonNode indexStats =
statsJson.path("indices").path(connectionConfiguration.getIndex()).path("primaries");
JsonNode statsJson = getStats(connectionConfiguration);
JsonNode indexStats = statsJson.path("_all").path("primaries");
JsonNode store = indexStats.path("store");
return store.path("size_in_bytes").asLong();
}
Expand All @@ -956,12 +954,9 @@ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}

private static JsonNode getStats(
ConnectionConfiguration connectionConfiguration, boolean shardLevel) throws IOException {
private static JsonNode getStats(ConnectionConfiguration connectionConfiguration)
throws IOException {
HashMap<String, String> params = new HashMap<>();
if (shardLevel) {
params.put("level", "shards");
}
String endpoint = String.format("/%s/_stats", connectionConfiguration.getIndex());
try (RestClient restClient = connectionConfiguration.createClient()) {
Request request = new Request("GET", endpoint);
Expand Down