@@ -668,17 +668,22 @@ def test_partition_columns(started_cluster, use_delta_kernel):
668668 f"""SELECT count()
669669 FROM deltaLake('http://{ started_cluster .minio_ip } :{ started_cluster .minio_port } /{ bucket } /{ result_file } /', 'minio', '{ minio_secret_key } ', SETTINGS allow_experimental_delta_kernel_rs={ use_delta_kernel } )
670670 WHERE c == toDateTime('2000/01/05')
671- """ , query_id = query_id
671+ """ ,
672+ query_id = query_id ,
672673 )
673674 )
674675 assert result == 1
675676
676677 if use_delta_kernel == 1 :
677678 instance .query ("SYSTEM FLUSH LOGS" )
678- assert num_rows - 1 == int (instance .query (f"""
679+ assert num_rows - 1 == int (
680+ instance .query (
681+ f"""
679682 SELECT ProfileEvents['DeltaLakePartitionPrunedFiles']
680683 FROM system.query_log WHERE query_id = '{ query_id } ' AND type = 'QueryFinish'
681- """ ))
684+ """
685+ )
686+ )
682687
683688 instance .query (
684689 f"""
@@ -1109,3 +1114,108 @@ def test_session_token(started_cluster):
11091114 """
11101115 )
11111116 )
1117+
1118+
1119+ @pytest .mark .parametrize ("use_delta_kernel" , ["1" ])
1120+ def test_partition_columns_2 (started_cluster , use_delta_kernel ):
1121+ node = started_cluster .instances ["node1" ]
1122+ table_name = randomize_table_name ("test_partition_columns_2" )
1123+
1124+ schema = pa .schema (
1125+ [
1126+ ("a" , pa .int32 ()),
1127+ ("b" , pa .int32 ()),
1128+ ("c" , pa .int32 ()),
1129+ ("d" , pa .string ()),
1130+ ("e" , pa .string ()),
1131+ ]
1132+ )
1133+ data = [
1134+ pa .array ([1 , 2 , 3 , 4 , 5 ], type = pa .int32 ()),
1135+ pa .array ([4 , 5 , 6 , 7 , 8 ], type = pa .int32 ()),
1136+ pa .array ([7 , 7 , 8 , 9 , 10 ], type = pa .int32 ()),
1137+ pa .array (["aa" , "bb" , "cc" , "aa" , "bb" ], type = pa .string ()),
1138+ pa .array (["aa" , "bb" , "cc" , "aa" , "cc" ], type = pa .string ()),
1139+ ]
1140+
1141+ storage_options = {
1142+ "AWS_ENDPOINT_URL" : f"http://{ started_cluster .minio_ip } :{ started_cluster .minio_port } " ,
1143+ "AWS_ACCESS_KEY_ID" : minio_access_key ,
1144+ "AWS_SECRET_ACCESS_KEY" : minio_secret_key ,
1145+ "AWS_ALLOW_HTTP" : "true" ,
1146+ "AWS_S3_ALLOW_UNSAFE_RENAME" : "true" ,
1147+ }
1148+ path = f"s3://root/{ table_name } "
1149+ table = pa .Table .from_arrays (data , schema = schema )
1150+
1151+ write_deltalake (
1152+ path , table , storage_options = storage_options , partition_by = ["c" , "d" ]
1153+ )
1154+
1155+ delta_function = f"""
1156+ deltaLake(
1157+ 'http://{ started_cluster .minio_ip } :{ started_cluster .minio_port } /root/{ table_name } ' ,
1158+ '{ minio_access_key } ',
1159+ '{ minio_secret_key } ',
1160+ SETTINGS allow_experimental_delta_kernel_rs={ use_delta_kernel } )
1161+ """
1162+
1163+ num_files = int (node .query (f"SELECT uniqExact(_path) FROM { delta_function } " ))
1164+ assert num_files == 5
1165+
1166+ new_data = [
1167+ pa .array ([2 ], type = pa .int32 ()),
1168+ pa .array ([3 ], type = pa .int32 ()),
1169+ pa .array ([7 ], type = pa .int32 ()),
1170+ pa .array (["aa" ], type = pa .string ()),
1171+ pa .array (["cc" ], type = pa .string ()),
1172+ ]
1173+ new_table_data = pa .Table .from_arrays (new_data , schema = schema )
1174+
1175+ write_deltalake (
1176+ path , new_table_data , storage_options = storage_options , mode = "append"
1177+ )
1178+
1179+ assert (
1180+ "a\t Nullable(Int32)\t \t \t \t \t \n "
1181+ "b\t Nullable(Int32)\t \t \t \t \t \n "
1182+ "c\t Nullable(Int32)\t \t \t \t \t \n "
1183+ "d\t Nullable(String)\t \t \t \t \t \n "
1184+ "e\t Nullable(String)" == node .query (f"DESCRIBE TABLE { delta_function } " ).strip ()
1185+ )
1186+
1187+ num_files = int (node .query (f"SELECT uniqExact(_path) FROM { delta_function } " ))
1188+ assert num_files == 6
1189+
1190+ query_id = f"{ table_name } -{ uuid .uuid4 ()} "
1191+ assert (
1192+ "1"
1193+ in node .query (
1194+ f" SELECT a FROM { delta_function } WHERE c = 7 and d = 'aa'" ,
1195+ query_id = query_id ,
1196+ ).strip ()
1197+ )
1198+
1199+ def check_pruned (count , query_id ):
1200+ node .query ("SYSTEM FLUSH LOGS" )
1201+ assert count == int (
1202+ node .query (
1203+ f"""
1204+ SELECT ProfileEvents['DeltaLakePartitionPrunedFiles']
1205+ FROM system.query_log WHERE query_id = '{ query_id } ' AND type = 'QueryFinish'
1206+ """
1207+ )
1208+ )
1209+
1210+ check_pruned (num_files - 2 , query_id )
1211+
1212+ query_id = f"{ table_name } -{ uuid .uuid4 ()} "
1213+ assert (
1214+ "2"
1215+ in node .query (
1216+ f"SELECT a FROM { delta_function } WHERE c = 7 and d = 'bb'" ,
1217+ query_id = query_id ,
1218+ ).strip ()
1219+ )
1220+
1221+ check_pruned (num_files - 1 , query_id )
0 commit comments