Skip to content

Commit cc19fd6

Browse files
committed
[SPARK-23889][SQL] DataSourceV2: Add interfaces to request distribution and ordering
1 parent 1554977 commit cc19fd6

File tree

14 files changed

+626
-10
lines changed

14 files changed

+626
-10
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.connector.expressions.Expression;
22+
23+
/**
24+
* A distribution where tuples that share the same values for clustering expressions are co-located
25+
* in the same partition.
26+
*
27+
* @since 3.2.0
28+
*/
29+
@Experimental
30+
public interface ClusteredDistribution extends Distribution {
31+
/**
32+
* Returns clustering expressions.
33+
*/
34+
Expression[] clustering();
35+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* An interface that defines how data is distributed across partitions.
24+
*
25+
* @since 3.2.0
26+
*/
27+
@Experimental
28+
public interface Distribution {}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.connector.expressions.Expression;
22+
import org.apache.spark.sql.connector.expressions.SortOrder;
23+
24+
/**
25+
* Helper methods to create distributions to pass into Spark.
26+
*
27+
* @since 3.2.0
28+
*/
29+
@Experimental
30+
public class Distributions {
31+
private Distributions() {
32+
}
33+
34+
/**
35+
* Creates a distribution where no promises are made about co-location of data.
36+
*/
37+
public static UnspecifiedDistribution unspecified() {
38+
return LogicalDistributions.unspecified();
39+
}
40+
41+
/**
42+
* Creates a distribution where tuples that share the same values for clustering expressions are
43+
* co-located in the same partition.
44+
*/
45+
public static ClusteredDistribution clustered(Expression[] clustering) {
46+
return LogicalDistributions.clustered(clustering);
47+
}
48+
49+
/**
50+
* Creates a distribution where tuples have been ordered across partitions according
51+
* to ordering expressions, but not necessarily within a given partition.
52+
*/
53+
public static OrderedDistribution ordered(SortOrder[] ordering) {
54+
return LogicalDistributions.ordered(ordering);
55+
}
56+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.connector.expressions.SortOrder;
22+
23+
/**
24+
* A distribution where tuples have been ordered across partitions according
25+
* to ordering expressions, but not necessarily within a given partition.
26+
*
27+
* @since 3.2.0
28+
*/
29+
@Experimental
30+
public interface OrderedDistribution extends Distribution {
31+
/**
32+
* Returns ordering expressions.
33+
*/
34+
SortOrder[] ordering();
35+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* A distribution where no promises are made about co-location of data.
24+
*
25+
* @since 3.2.0
26+
*/
27+
@Experimental
28+
public interface UnspecifiedDistribution extends Distribution {}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Expressions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,15 @@ public static Transform hours(String column) {
164164
return LogicalExpressions.hours(Expressions.column(column));
165165
}
166166

167+
/**
168+
* Create a sort expression.
169+
*
170+
* @param expr an expression to produce values to sort
171+
* @param direction direction of the sort
172+
* @param nullOrder null order of the sort
173+
* @return a SortOrder
174+
*/
175+
public static SortOrder sort(Expression expr, SortDirection direction, NullOrdering nullOrder) {
176+
return LogicalExpressions.sort(expr, direction, nullOrder);
177+
}
167178
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.expressions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* A null order used in sorting expressions.
24+
*
25+
* @since 3.2.0
26+
*/
27+
@Experimental
28+
public enum NullOrdering {
29+
NULLS_FIRST, NULLS_LAST;
30+
31+
@Override
32+
public String toString() {
33+
switch (this) {
34+
case NULLS_FIRST:
35+
return "NULLS FIRST";
36+
case NULLS_LAST:
37+
return "NULLS LAST";
38+
default:
39+
throw new IllegalArgumentException("Unexpected null order: " + this);
40+
}
41+
}
42+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.expressions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* A sort direction used in sorting expressions.
24+
*
25+
* @since 3.2.0
26+
*/
27+
@Experimental
28+
public enum SortDirection {
29+
ASCENDING, DESCENDING;
30+
31+
@Override
32+
public String toString() {
33+
switch (this) {
34+
case ASCENDING:
35+
return "ASC";
36+
case DESCENDING:
37+
return "DESC";
38+
default:
39+
throw new IllegalArgumentException("Unexpected sort direction: " + this);
40+
}
41+
}
42+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.expressions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* Represents a sort order in the public expression API.
24+
*
25+
* @since 3.2.0
26+
*/
27+
@Experimental
28+
public interface SortOrder extends Expression {
29+
/**
30+
* Returns the sort expression.
31+
*/
32+
Expression expression();
33+
34+
/**
35+
* Returns the sort direction.
36+
*/
37+
SortDirection direction();
38+
39+
/**
40+
* Returns the null ordering.
41+
*/
42+
NullOrdering nullOrdering();
43+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.write;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.connector.distributions.Distribution;
22+
import org.apache.spark.sql.connector.distributions.UnspecifiedDistribution;
23+
import org.apache.spark.sql.connector.expressions.SortOrder;
24+
25+
/**
26+
* A write that requires a specific distribution and ordering of data.
27+
*
28+
* @since 3.2.0
29+
*/
30+
@Experimental
31+
public interface RequiresDistributionAndOrdering extends Write {
32+
/**
33+
* Returns the distribution required by this write.
34+
* <p>
35+
* Spark will distribute incoming records across partitions to satisfy the required distribution
36+
* before passing the records to the data source table on write.
37+
* <p>
38+
* Implementations may return {@link UnspecifiedDistribution} if they don't require any specific
39+
* distribution of data on write.
40+
*
41+
* @return the required distribution
42+
*/
43+
Distribution requiredDistribution();
44+
45+
/**
46+
* Returns the ordering required by this write.
47+
* <p>
48+
* Spark will order incoming records within partitions to satisfy the required ordering
49+
* before passing those records to the data source table on write.
50+
* <p>
51+
* Implementations may return an empty array if they don't require any specific ordering of data
52+
* on write.
53+
*
54+
* @return the required ordering
55+
*/
56+
SortOrder[] requiredOrdering();
57+
}

0 commit comments

Comments
 (0)