Skip to content

Commit d65e2ec

Browse files
committed
Add one more test
1 parent 1b9dad3 commit d65e2ec

File tree

2 files changed

+123
-6
lines changed

2 files changed

+123
-6
lines changed

src/Storages/ObjectStorage/DataLakes/DeltaLake/PartitionPruner.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
#include "PartitionPruner.h"
22

33
#if USE_DELTA_KERNEL_RS
4+
#include <DataTypes/DataTypeNullable.h>
5+
46
#include <Interpreters/ActionsDAG.h>
57
#include <Interpreters/Context_fwd.h>
6-
#include <Storages/MergeTree/KeyCondition.h>
7-
#include <Storages/KeyDescription.h>
8+
89
#include <Parsers/ASTFunction.h>
910
#include <Parsers/ASTIdentifier.h>
10-
#include <DataTypes/DataTypeNullable.h>
11+
12+
#include <Storages/MergeTree/KeyCondition.h>
13+
#include <Storages/KeyDescription.h>
14+
1115

1216
namespace DB::ErrorCodes
1317
{
@@ -21,6 +25,9 @@ namespace
2125
{
2226
DB::ASTPtr createPartitionKeyAST(const DB::Names & partition_columns)
2327
{
28+
/// DeltaLake supports only plain partition keys,
29+
/// e.g. by column names without any functions.
30+
2431
std::shared_ptr<DB::ASTFunction> partition_key_ast = std::make_shared<DB::ASTFunction>();
2532
partition_key_ast->name = "tuple";
2633
partition_key_ast->arguments = std::make_shared<DB::ASTExpressionList>();

tests/integration/test_storage_delta/test.py

Lines changed: 113 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -668,17 +668,22 @@ def test_partition_columns(started_cluster, use_delta_kernel):
668668
f"""SELECT count()
669669
FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', '{minio_secret_key}', SETTINGS allow_experimental_delta_kernel_rs={use_delta_kernel})
670670
WHERE c == toDateTime('2000/01/05')
671-
""", query_id=query_id
671+
""",
672+
query_id=query_id,
672673
)
673674
)
674675
assert result == 1
675676

676677
if use_delta_kernel == 1:
677678
instance.query("SYSTEM FLUSH LOGS")
678-
assert num_rows - 1 == int(instance.query(f"""
679+
assert num_rows - 1 == int(
680+
instance.query(
681+
f"""
679682
SELECT ProfileEvents['DeltaLakePartitionPrunedFiles']
680683
FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'
681-
"""))
684+
"""
685+
)
686+
)
682687

683688
instance.query(
684689
f"""
@@ -1109,3 +1114,108 @@ def test_session_token(started_cluster):
11091114
"""
11101115
)
11111116
)
1117+
1118+
1119+
@pytest.mark.parametrize("use_delta_kernel", ["1"])
1120+
def test_partition_columns_2(started_cluster, use_delta_kernel):
1121+
node = started_cluster.instances["node1"]
1122+
table_name = randomize_table_name("test_partition_columns_2")
1123+
1124+
schema = pa.schema(
1125+
[
1126+
("a", pa.int32()),
1127+
("b", pa.int32()),
1128+
("c", pa.int32()),
1129+
("d", pa.string()),
1130+
("e", pa.string()),
1131+
]
1132+
)
1133+
data = [
1134+
pa.array([1, 2, 3, 4, 5], type=pa.int32()),
1135+
pa.array([4, 5, 6, 7, 8], type=pa.int32()),
1136+
pa.array([7, 7, 8, 9, 10], type=pa.int32()),
1137+
pa.array(["aa", "bb", "cc", "aa", "bb"], type=pa.string()),
1138+
pa.array(["aa", "bb", "cc", "aa", "cc"], type=pa.string()),
1139+
]
1140+
1141+
storage_options = {
1142+
"AWS_ENDPOINT_URL": f"http://{started_cluster.minio_ip}:{started_cluster.minio_port}",
1143+
"AWS_ACCESS_KEY_ID": minio_access_key,
1144+
"AWS_SECRET_ACCESS_KEY": minio_secret_key,
1145+
"AWS_ALLOW_HTTP": "true",
1146+
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
1147+
}
1148+
path = f"s3://root/{table_name}"
1149+
table = pa.Table.from_arrays(data, schema=schema)
1150+
1151+
write_deltalake(
1152+
path, table, storage_options=storage_options, partition_by=["c", "d"]
1153+
)
1154+
1155+
delta_function = f"""
1156+
deltaLake(
1157+
'http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' ,
1158+
'{minio_access_key}',
1159+
'{minio_secret_key}',
1160+
SETTINGS allow_experimental_delta_kernel_rs={use_delta_kernel})
1161+
"""
1162+
1163+
num_files = int(node.query(f"SELECT uniqExact(_path) FROM {delta_function}"))
1164+
assert num_files == 5
1165+
1166+
new_data = [
1167+
pa.array([2], type=pa.int32()),
1168+
pa.array([3], type=pa.int32()),
1169+
pa.array([7], type=pa.int32()),
1170+
pa.array(["aa"], type=pa.string()),
1171+
pa.array(["cc"], type=pa.string()),
1172+
]
1173+
new_table_data = pa.Table.from_arrays(new_data, schema=schema)
1174+
1175+
write_deltalake(
1176+
path, new_table_data, storage_options=storage_options, mode="append"
1177+
)
1178+
1179+
assert (
1180+
"a\tNullable(Int32)\t\t\t\t\t\n"
1181+
"b\tNullable(Int32)\t\t\t\t\t\n"
1182+
"c\tNullable(Int32)\t\t\t\t\t\n"
1183+
"d\tNullable(String)\t\t\t\t\t\n"
1184+
"e\tNullable(String)" == node.query(f"DESCRIBE TABLE {delta_function}").strip()
1185+
)
1186+
1187+
num_files = int(node.query(f"SELECT uniqExact(_path) FROM {delta_function}"))
1188+
assert num_files == 6
1189+
1190+
query_id = f"{table_name}-{uuid.uuid4()}"
1191+
assert (
1192+
"1"
1193+
in node.query(
1194+
f" SELECT a FROM {delta_function} WHERE c = 7 and d = 'aa'",
1195+
query_id=query_id,
1196+
).strip()
1197+
)
1198+
1199+
def check_pruned(count, query_id):
1200+
node.query("SYSTEM FLUSH LOGS")
1201+
assert count == int(
1202+
node.query(
1203+
f"""
1204+
SELECT ProfileEvents['DeltaLakePartitionPrunedFiles']
1205+
FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'
1206+
"""
1207+
)
1208+
)
1209+
1210+
check_pruned(num_files - 2, query_id)
1211+
1212+
query_id = f"{table_name}-{uuid.uuid4()}"
1213+
assert (
1214+
"2"
1215+
in node.query(
1216+
f"SELECT a FROM {delta_function} WHERE c = 7 and d = 'bb'",
1217+
query_id=query_id,
1218+
).strip()
1219+
)
1220+
1221+
check_pruned(num_files - 1, query_id)

0 commit comments

Comments
 (0)