Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7113fe9
Introduce node-level reduction (instead of the coordinator level one)
astefan Mar 8, 2024
b1dcb3e
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Mar 19, 2024
7470709
Change the method access
astefan Mar 19, 2024
d36fd4c
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Mar 20, 2024
a91aff0
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Mar 20, 2024
2b7ab47
Add transport version for fragment reducer serialization
astefan Mar 20, 2024
654199b
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Mar 20, 2024
f860c6d
Simple mistake fixed
astefan Mar 20, 2024
de3540c
Use a bogus estimatedRowSize
astefan Mar 20, 2024
dd51d15
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Mar 20, 2024
414d82a
Merge branch 'main' into reduction_on_data_node
elasticmachine Mar 21, 2024
8109f86
Up the estimatedRowSize value
astefan Mar 21, 2024
d8d2ef3
Merge branch 'reduction_on_data_node' of https://github.com/astefan/e…
astefan Mar 21, 2024
af8b8e8
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Mar 21, 2024
65027d4
Mute HeapAttackIT.testSortByManyLongsSuccess
astefan Mar 21, 2024
a7fbdcc
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Mar 21, 2024
96796e9
Update docs/changelog/106516.yaml
astefan Mar 21, 2024
69a3469
Add pragma and tests
astefan Mar 25, 2024
aa34b0a
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Mar 28, 2024
4a8d879
Merge branch 'reduction_on_data_node' of https://github.com/astefan/e…
astefan Mar 28, 2024
785d54c
doh! fix
astefan Mar 28, 2024
ef057a6
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/106516.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 106516
summary: "ESQL: perform a reduction on the data node"
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ static TransportVersion def(int id) {
public static final TransportVersion SEARCH_NODE_LOAD_AUTOSCALING = def(8_617_00_0);
public static final TransportVersion ESQL_ES_SOURCE_OPTIONS = def(8_618_00_0);
public static final TransportVersion ADD_PERSISTENT_TASK_EXCEPTIONS = def(8_619_00_0);
public static final TransportVersion ESQL_REDUCER_NODE_FRAGMENT = def(8_620_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,9 @@ static Settings randomPragmas() {
if (randomBoolean()) {
settings.put("enrich_max_workers", between(1, 5));
}
if (randomBoolean()) {
settings.put("node_level_reduction", randomBoolean());
}
return settings.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ eth2 |epsilon |[fe81::cae2:65ff:fece:feb9, fe82::cae2:65ff:fece
lessThan
required_feature: esql.mv_warn

from hosts | sort host, card | where ip0 < ip1 | keep card, host, ip0, ip1;
warning:Line 1:38: evaluation of [ip0 < ip1] failed, treating result as null. Only first 20 failures recorded.
warning:Line 1:38: java.lang.IllegalArgumentException: single-value function encountered multi-value
from hosts | sort host, card, ip1 | where ip0 < ip1 | keep card, host, ip0, ip1;
warning:Line 1:43: evaluation of [ip0 < ip1] failed, treating result as null. Only first 20 failures recorded.
warning:Line 1:43: java.lang.IllegalArgumentException: single-value function encountered multi-value

card:keyword |host:keyword |ip0:ip |ip1:ip
eth1 |beta |127.0.0.1 |127.0.0.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ protected static QueryPragmas randomPragmas() {
if (randomBoolean()) {
settings.put("max_concurrent_shards_per_node", randomIntBetween(1, 10));
}
if (randomBoolean()) {
settings.put("node_level_reduction", randomBoolean());
}
}
return new QueryPragmas(settings.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
private String READ_DESCRIPTION;
private String MERGE_DESCRIPTION;
private String REDUCE_DESCRIPTION;
private boolean nodeLevelReduction;

@Before
public void setup() {
Expand All @@ -94,6 +95,7 @@ public void setup() {
REDUCE_DESCRIPTION = """
\\_ExchangeSourceOperator[]
\\_ExchangeSinkOperator""";
nodeLevelReduction = randomBoolean();
}

public void testTaskContents() throws Exception {
Expand Down Expand Up @@ -209,22 +211,31 @@ public void testCancelEsqlTask() throws Exception {
}

private ActionFuture<EsqlQueryResponse> startEsql() {
return startEsql("from test | stats sum(pause_me)");
}

private ActionFuture<EsqlQueryResponse> startEsql(String query) {
scriptPermits.drainPermits();
scriptPermits.release(between(1, 5));
var pragmas = new QueryPragmas(
Settings.builder()
// Force shard partitioning because that's all the tests know how to match. It is easier to reason about too.
.put("data_partitioning", "shard")
// Limit the page size to something small so we do more than one page worth of work, so we get more status updates.
.put("page_size", pageSize())
// Report the status after every action
.put("status_interval", "0ms")
.build()
);
return EsqlQueryRequestBuilder.newSyncEsqlQueryRequestBuilder(client())
.query("from test | stats sum(pause_me)")
.pragmas(pragmas)
.execute();
var settingsBuilder = Settings.builder()
// Force shard partitioning because that's all the tests know how to match. It is easier to reason about too.
.put("data_partitioning", "shard")
// Limit the page size to something small so we do more than one page worth of work, so we get more status updates.
.put("page_size", pageSize())
// Report the status after every action
.put("status_interval", "0ms");

if (nodeLevelReduction == false) {
// explicitly set the default (false) or don't
if (randomBoolean()) {
settingsBuilder.put("node_level_reduction", nodeLevelReduction);
}
} else {
settingsBuilder.put("node_level_reduction", nodeLevelReduction);
}

var pragmas = new QueryPragmas(settingsBuilder.build());
return EsqlQueryRequestBuilder.newSyncEsqlQueryRequestBuilder(client()).query(query).pragmas(pragmas).execute();
}

private void cancelTask(TaskId taskId) {
Expand Down Expand Up @@ -407,6 +418,67 @@ protected void doRun() throws Exception {
}
}

public void testTaskContentsForTopNQuery() throws Exception {
READ_DESCRIPTION = ("\\_LuceneTopNSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = 1000, "
+ "sorts = [{\"pause_me\":{\"order\":\"asc\",\"missing\":\"_last\",\"unmapped_type\":\"long\"}}]]\n"
+ "\\_ValuesSourceReaderOperator[fields = [pause_me]]\n"
+ "\\_ProjectOperator[projection = [1]]\n"
+ "\\_ExchangeSinkOperator").replace("pageSize()", Integer.toString(pageSize()));
MERGE_DESCRIPTION = "\\_ExchangeSourceOperator[]\n"
+ "\\_TopNOperator[count=1000, elementTypes=[LONG], encoders=[DefaultSortable], "
+ "sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false]]]\n"
+ "\\_ProjectOperator[projection = [0]]\n"
+ "\\_OutputOperator[columns = [pause_me]]";
REDUCE_DESCRIPTION = "\\_ExchangeSourceOperator[]\n"
+ (nodeLevelReduction
? "\\_TopNOperator[count=1000, elementTypes=[LONG], encoders=[DefaultSortable], "
+ "sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false]]]\n"
: "")
+ "\\_ExchangeSinkOperator";

ActionFuture<EsqlQueryResponse> response = startEsql("from test | sort pause_me | keep pause_me");
try {
getTasksStarting();
scriptPermits.release(pageSize());
getTasksRunning();
} finally {
// each scripted field "emit" is called by LuceneTopNSourceOperator and by ValuesSourceReaderOperator
scriptPermits.release(2 * numberOfDocs());
try (EsqlQueryResponse esqlResponse = response.get()) {
assertThat(Iterators.flatMap(esqlResponse.values(), i -> i).next(), equalTo(1L));
}
}
}

public void testTaskContentsForLimitQuery() throws Exception {
String limit = Integer.toString(randomIntBetween(pageSize() + 1, 2 * numberOfDocs()));
READ_DESCRIPTION = """
\\_LuceneSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = limit()]
\\_ValuesSourceReaderOperator[fields = [pause_me]]
\\_ProjectOperator[projection = [1]]
\\_ExchangeSinkOperator""".replace("pageSize()", Integer.toString(pageSize())).replace("limit()", limit);
MERGE_DESCRIPTION = """
\\_ExchangeSourceOperator[]
\\_LimitOperator[limit = limit()]
\\_ProjectOperator[projection = [0]]
\\_OutputOperator[columns = [pause_me]]""".replace("limit()", limit);
REDUCE_DESCRIPTION = ("\\_ExchangeSourceOperator[]\n"
+ (nodeLevelReduction ? "\\_LimitOperator[limit = limit()]\n" : "")
+ "\\_ExchangeSinkOperator").replace("limit()", limit);

ActionFuture<EsqlQueryResponse> response = startEsql("from test | keep pause_me | limit " + limit);
try {
getTasksStarting();
scriptPermits.release(pageSize());
getTasksRunning();
} finally {
scriptPermits.release(numberOfDocs());
try (EsqlQueryResponse esqlResponse = response.get()) {
assertThat(Iterators.flatMap(esqlResponse.values(), i -> i).next(), equalTo(1L));
}
}
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,8 @@ static FragmentExec readFragmentExec(PlanStreamInput in) throws IOException {
in.readSource(),
in.readLogicalPlanNode(),
in.readOptionalNamedWriteable(QueryBuilder.class),
in.readOptionalVInt()
in.readOptionalVInt(),
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_REDUCER_NODE_FRAGMENT) ? in.readOptionalPhysicalPlanNode() : null
);
}

Expand All @@ -638,6 +639,9 @@ static void writeFragmentExec(PlanStreamOutput out, FragmentExec fragmentExec) t
out.writeLogicalPlanNode(fragmentExec.fragment());
out.writeOptionalNamedWriteable(fragmentExec.esFilter());
out.writeOptionalVInt(fragmentExec.estimatedRowSize());
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_REDUCER_NODE_FRAGMENT)) {
out.writeOptionalPhysicalPlanNode(fragmentExec.reducer());
}
}

static GrokExec readGrokExec(PlanStreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public PhysicalPlan readPhysicalPlanNode() throws IOException {
return readNamed(PhysicalPlan.class);
}

public PhysicalPlan readOptionalPhysicalPlanNode() throws IOException {
return readOptionalNamed(PhysicalPlan.class);
}

public Source readSource() throws IOException {
boolean hasSource = readBoolean();
return hasSource ? readSourceWithText(this, configuration.query()) : Source.EMPTY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ public void writePhysicalPlanNode(PhysicalPlan physicalPlan) throws IOException
writeNamed(PhysicalPlan.class, physicalPlan);
}

public void writeOptionalPhysicalPlanNode(PhysicalPlan physicalPlan) throws IOException {
if (physicalPlan == null) {
writeBoolean(false);
} else {
writeBoolean(true);
writePhysicalPlanNode(physicalPlan);
}
}

public void writeSource(Source source) throws IOException {
writeBoolean(true);
writeSourceNoText(this, source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public PhysicalPlan apply(PhysicalPlan plan) {
Source.EMPTY,
new Project(logicalFragment.source(), logicalFragment, output),
fragmentExec.esFilter(),
fragmentExec.estimatedRowSize()
fragmentExec.estimatedRowSize(),
fragmentExec.reducer()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {

private final LogicalPlan fragment;
private final QueryBuilder esFilter;
private final PhysicalPlan reducer; // datanode-level physical plan node that performs an intermediate (not partial) reduce

/**
* Estimate of the number of bytes that'll be loaded per position before
Expand All @@ -28,14 +29,15 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
private final int estimatedRowSize;

public FragmentExec(LogicalPlan fragment) {
this(fragment.source(), fragment, null, 0);
this(fragment.source(), fragment, null, 0, null);
}

public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, int estimatedRowSize) {
public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, int estimatedRowSize, PhysicalPlan reducer) {
super(source);
this.fragment = fragment;
this.esFilter = esFilter;
this.estimatedRowSize = estimatedRowSize;
this.reducer = reducer;
}

public LogicalPlan fragment() {
Expand All @@ -50,9 +52,13 @@ public Integer estimatedRowSize() {
return estimatedRowSize;
}

public PhysicalPlan reducer() {
return reducer;
}

@Override
protected NodeInfo<FragmentExec> info() {
return NodeInfo.create(this, FragmentExec::new, fragment, esFilter, estimatedRowSize);
return NodeInfo.create(this, FragmentExec::new, fragment, esFilter, estimatedRowSize, reducer);
}

@Override
Expand All @@ -65,12 +71,20 @@ public PhysicalPlan estimateRowSize(State state) {
int estimatedRowSize = state.consumeAllFields(false);
return Objects.equals(estimatedRowSize, this.estimatedRowSize)
? this
: new FragmentExec(source(), fragment, esFilter, estimatedRowSize);
: new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
}

public FragmentExec withFilter(QueryBuilder filter) {
return Objects.equals(filter, this.esFilter) ? this : new FragmentExec(source(), fragment, filter, estimatedRowSize, reducer);
}

public FragmentExec withReducer(PhysicalPlan reducer) {
return Objects.equals(reducer, this.reducer) ? this : new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
}

@Override
public int hashCode() {
return Objects.hash(fragment, esFilter, estimatedRowSize);
return Objects.hash(fragment, esFilter, estimatedRowSize, reducer);
}

@Override
Expand All @@ -86,7 +100,8 @@ public boolean equals(Object obj) {
FragmentExec other = (FragmentExec) obj;
return Objects.equals(fragment, other.fragment)
&& Objects.equals(esFilter, other.esFilter)
&& Objects.equals(estimatedRowSize, other.estimatedRowSize);
&& Objects.equals(estimatedRowSize, other.estimatedRowSize)
&& Objects.equals(reducer, other.reducer);
}

@Override
Expand All @@ -97,7 +112,9 @@ public String nodeString() {
sb.append(esFilter);
sb.append(", estimatedRowSize=");
sb.append(estimatedRowSize);
sb.append(", fragment=[<>\n");
sb.append(", reducer=[");
sb.append(reducer == null ? "" : reducer.toString());
sb.append("], fragment=[<>\n");
sb.append(fragment.toString());
sb.append("<>]]");
return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public LocalExecutionPlanner(
/**
* turn the given plan into a list of drivers to execute
*/
public LocalExecutionPlan plan(PhysicalPlan node) {
public LocalExecutionPlan plan(PhysicalPlan localPhysicalPlan) {
var context = new LocalExecutionPlannerContext(
new ArrayList<>(),
new Holder<>(DriverParallelism.SINGLE),
Expand All @@ -160,11 +160,11 @@ public LocalExecutionPlan plan(PhysicalPlan node) {
);

// workaround for https://github.com/elastic/elasticsearch/issues/99782
node = node.transformUp(
localPhysicalPlan = localPhysicalPlan.transformUp(
AggregateExec.class,
a -> a.getMode() == AggregateExec.Mode.FINAL ? new ProjectExec(a.source(), a, Expressions.asAttributes(a.aggregates())) : a
);
PhysicalOperation physicalOperation = plan(node, context);
PhysicalOperation physicalOperation = plan(localPhysicalPlan, context);

final TimeValue statusInterval = configuration.pragmas().statusInterval();
context.addDriverFactory(
Expand All @@ -181,7 +181,7 @@ private PhysicalOperation plan(PhysicalPlan node, LocalExecutionPlannerContext c
if (node instanceof AggregateExec aggregate) {
return planAggregation(aggregate, context);
} else if (node instanceof FieldExtractExec fieldExtractExec) {
return planFieldExtractNode(context, fieldExtractExec);
return planFieldExtractNode(fieldExtractExec, context);
} else if (node instanceof ExchangeExec exchangeExec) {
return planExchange(exchangeExec, context);
} else if (node instanceof TopNExec topNExec) {
Expand Down Expand Up @@ -259,7 +259,7 @@ private PhysicalOperation planEsStats(EsStatsQueryExec statsQuery, LocalExecutio
return PhysicalOperation.fromSource(luceneFactory, layout.build());
}

private PhysicalOperation planFieldExtractNode(LocalExecutionPlannerContext context, FieldExtractExec fieldExtractExec) {
private PhysicalOperation planFieldExtractNode(FieldExtractExec fieldExtractExec, LocalExecutionPlannerContext context) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return physicalOperationProviders.fieldExtractPhysicalOperation(fieldExtractExec, plan(fieldExtractExec.child(), context));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public PhysicalPlan map(LogicalPlan p) {
throw new EsqlIllegalArgumentException("unsupported logical plan node [" + p.nodeName() + "]");
}

private static boolean isPipelineBreaker(LogicalPlan p) {
static boolean isPipelineBreaker(LogicalPlan p) {
return p instanceof Aggregate || p instanceof TopN || p instanceof Limit || p instanceof OrderBy;
}

Expand Down
Loading