Skip to content

Commit d3be572

Browse files
authored
Make HMS & UC fixtures return CatalogInfo, SchemaInfo, and TableInfo (#409)
1 parent a147023 commit d3be572

File tree

14 files changed

+226
-245
lines changed

14 files changed

+226
-245
lines changed

src/databricks/labs/ucx/hive_metastore/tables.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ def migrate_tables(self):
195195
_, errors = Threads.gather("migrate tables", tasks)
196196
if len(errors) > 0:
197197
# TODO: https://github.com/databrickslabs/ucx/issues/406
198-
logger.error(f"Detected {len(errors)} while migrating tables")
198+
logger.error(f"Detected {len(errors)} errors while migrating tables")
199199

200200
def _migrate_table(self, target_catalog, table):
201201
sql = table.uc_create_sql(target_catalog)

src/databricks/labs/ucx/mixins/fixtures.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
from databricks.sdk import AccountClient, WorkspaceClient
1616
from databricks.sdk.core import DatabricksError
1717
from databricks.sdk.service import compute, iam, jobs, pipelines, workspace
18+
from databricks.sdk.service.catalog import CatalogInfo, SchemaInfo, TableInfo
1819
from databricks.sdk.service.sql import CreateWarehouseRequestWarehouseType
1920
from databricks.sdk.service.workspace import ImportFormat
2021

22+
from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend
23+
2124
_LOG = logging.getLogger(__name__)
2225

2326

@@ -668,3 +671,99 @@ def inner(var: str) -> str:
668671
return debug_env[var]
669672

670673
return inner
674+
675+
676+
@pytest.fixture
677+
def sql_backend(ws, env_or_skip):
678+
warehouse_id = env_or_skip("TEST_DEFAULT_WAREHOUSE_ID")
679+
return StatementExecutionBackend(ws, warehouse_id)
680+
681+
682+
@pytest.fixture
683+
def inventory_schema(make_schema):
684+
return make_schema(catalog_name="hive_metastore").name
685+
686+
687+
@pytest.fixture
688+
def make_catalog(ws, sql_backend, make_random):
689+
def create() -> CatalogInfo:
690+
name = f"ucx_C{make_random(4)}".lower()
691+
sql_backend.execute(f"CREATE CATALOG {name}")
692+
catalog_info = ws.catalogs.get(name)
693+
return catalog_info
694+
695+
yield from factory(
696+
"catalog",
697+
create,
698+
lambda catalog_info: ws.catalogs.delete(catalog_info.full_name, force=True),
699+
)
700+
701+
702+
@pytest.fixture
703+
def make_schema(sql_backend, make_random):
704+
def create(*, catalog_name: str = "hive_metastore", name: str | None = None) -> SchemaInfo:
705+
if name is None:
706+
name = f"ucx_S{make_random(4)}"
707+
full_name = f"{catalog_name}.{name}".lower()
708+
sql_backend.execute(f"CREATE SCHEMA {full_name}")
709+
return SchemaInfo(catalog_name=catalog_name, name=name, full_name=full_name)
710+
711+
yield from factory(
712+
"schema",
713+
create,
714+
lambda schema_info: sql_backend.execute(f"DROP SCHEMA IF EXISTS {schema_info.full_name} CASCADE"),
715+
)
716+
717+
718+
@pytest.fixture
719+
def make_table(sql_backend, make_schema, make_random):
720+
def create(
721+
*,
722+
catalog_name="hive_metastore",
723+
schema_name: str | None = None,
724+
name: str | None = None,
725+
ctas: str | None = None,
726+
non_delta: bool = False,
727+
external: bool = False,
728+
view: bool = False,
729+
tbl_properties: dict[str, str] | None = None,
730+
) -> TableInfo:
731+
if schema_name is None:
732+
schema = make_schema(catalog_name=catalog_name)
733+
catalog_name = schema.catalog_name
734+
schema_name = schema.name
735+
if name is None:
736+
name = f"ucx_T{make_random(4)}"
737+
full_name = f"{catalog_name}.{schema_name}.{name}".lower()
738+
ddl = f'CREATE {"VIEW" if view else "TABLE"} {full_name}'
739+
if ctas is not None:
740+
# temporary (if not view)
741+
ddl = f"{ddl} AS {ctas}"
742+
elif non_delta:
743+
location = "dbfs:/databricks-datasets/iot-stream/data-device"
744+
ddl = f"{ddl} USING json LOCATION '{location}'"
745+
elif external:
746+
# external table
747+
url = "s3a://databricks-datasets-oregon/delta-sharing/share/open-datasets.share"
748+
share = f"{url}#delta_sharing.default.lending_club"
749+
ddl = f"{ddl} USING deltaSharing LOCATION '{share}'"
750+
else:
751+
# managed table
752+
ddl = f"{ddl} (id INT, value STRING)"
753+
if tbl_properties:
754+
tbl_properties = ",".join([f" '{k}' = '{v}' " for k, v in tbl_properties.items()])
755+
ddl = f"{ddl} TBLPROPERTIES ({tbl_properties})"
756+
757+
sql_backend.execute(ddl)
758+
return TableInfo(catalog_name=catalog_name, schema_name=schema_name, name=name, full_name=full_name)
759+
760+
def remove(table_info: TableInfo):
761+
try:
762+
sql_backend.execute(f"DROP TABLE IF EXISTS {table_info.full_name}")
763+
except RuntimeError as e:
764+
if "Cannot drop a view" in str(e):
765+
sql_backend.execute(f"DROP VIEW IF EXISTS {table_info.full_name}")
766+
else:
767+
raise e
768+
769+
yield from factory("table", create, remove)

tests/integration/conftest.py

Lines changed: 5 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@
44

55
import databricks.sdk.core
66
import pytest
7-
from databricks.sdk import AccountClient, WorkspaceClient
7+
from databricks.sdk import AccountClient
88
from databricks.sdk.core import Config
99

1010
from databricks.labs.ucx.mixins.fixtures import * # noqa: F403
11-
from databricks.labs.ucx.mixins.sql import StatementExecutionExt
1211

1312
logging.getLogger("tests").setLevel("DEBUG")
1413
logging.getLogger("databricks.labs.ucx").setLevel("DEBUG")
@@ -54,112 +53,13 @@ def account_host(cfg: Config) -> str:
5453

5554

5655
@pytest.fixture
57-
def sql_exec(ws: WorkspaceClient, env_or_skip):
58-
warehouse_id = env_or_skip("TEST_DEFAULT_WAREHOUSE_ID")
59-
statement_execution = StatementExecutionExt(ws.api_client)
60-
return partial(statement_execution.execute, warehouse_id)
56+
def sql_exec(sql_backend):
57+
return partial(sql_backend.execute)
6158

6259

6360
@pytest.fixture
64-
def sql_fetch_all(ws: WorkspaceClient, env_or_skip):
65-
warehouse_id = env_or_skip("TEST_DEFAULT_WAREHOUSE_ID")
66-
statement_execution = StatementExecutionExt(ws.api_client)
67-
return partial(statement_execution.execute_fetch_all, warehouse_id)
68-
69-
70-
@pytest.fixture
71-
def make_catalog(sql_exec, make_random):
72-
def create():
73-
name = f"ucx_C{make_random(4)}".lower()
74-
sql_exec(f"CREATE CATALOG {name}")
75-
return name
76-
77-
yield from factory("catalog", create, lambda name: sql_exec(f"DROP CATALOG IF EXISTS {name} CASCADE")) # noqa: F405
78-
79-
80-
def test_catalog_fixture(make_catalog):
81-
logger.info(f"Created new catalog: {make_catalog()}")
82-
logger.info(f"Created new catalog: {make_catalog()}")
83-
84-
85-
@pytest.fixture
86-
def make_schema(sql_exec, make_random):
87-
def create(*, catalog: str = "hive_metastore", schema: str | None = None):
88-
if schema is None:
89-
schema = f"ucx_S{make_random(4)}"
90-
schema = f"{catalog}.{schema}".lower()
91-
sql_exec(f"CREATE SCHEMA {schema}")
92-
return schema
93-
94-
yield from factory( # noqa: F405
95-
"schema", create, lambda schema: sql_exec(f"DROP SCHEMA IF EXISTS {schema} CASCADE")
96-
)
97-
98-
99-
def test_schema_fixture(make_schema):
100-
logger.info(f"Created new schema: {make_schema()}")
101-
logger.info(f"Created new schema: {make_schema()}")
102-
103-
104-
@pytest.fixture
105-
def make_table(sql_exec, make_schema, make_random):
106-
def create(
107-
*,
108-
catalog="hive_metastore",
109-
name: str | None = None,
110-
schema: str | None = None,
111-
ctas: str | None = None,
112-
non_delta: bool = False,
113-
external: bool = False,
114-
view: bool = False,
115-
tbl_properties: dict[str, str] | None = None,
116-
):
117-
if schema is None:
118-
schema = make_schema(catalog=catalog)
119-
if name is None:
120-
name = f"{schema}.ucx_T{make_random(4)}".lower()
121-
ddl = f'CREATE {"VIEW" if view else "TABLE"} {name}'
122-
if ctas is not None:
123-
# temporary (if not view)
124-
ddl = f"{ddl} AS {ctas}"
125-
elif non_delta:
126-
location = "dbfs:/databricks-datasets/iot-stream/data-device"
127-
ddl = f"{ddl} USING json LOCATION '{location}'"
128-
elif external:
129-
# external table
130-
url = "s3a://databricks-datasets-oregon/delta-sharing/share/open-datasets.share"
131-
share = f"{url}#delta_sharing.default.lending_club"
132-
ddl = f"{ddl} USING deltaSharing LOCATION '{share}'"
133-
else:
134-
# managed table
135-
ddl = f"{ddl} (id INT, value STRING)"
136-
if tbl_properties:
137-
tbl_properties = ",".join([f" '{k}' = '{v}' " for k, v in tbl_properties.items()])
138-
ddl = f"{ddl} TBLPROPERTIES ({tbl_properties})"
139-
140-
sql_exec(ddl)
141-
return name
142-
143-
def remove(name):
144-
try:
145-
sql_exec(f"DROP TABLE IF EXISTS {name}")
146-
except RuntimeError as e:
147-
if "Cannot drop a view" in str(e):
148-
sql_exec(f"DROP VIEW IF EXISTS {name}")
149-
else:
150-
raise e
151-
152-
yield from factory("table", create, remove) # noqa: F405
153-
154-
155-
def test_table_fixture(make_table):
156-
logger.info(f"Created new managed table in new schema: {make_table()}")
157-
logger.info(f'Created new managed table in default schema: {make_table(schema="default")}')
158-
logger.info(f"Created new external table in new schema: {make_table(external=True)}")
159-
logger.info(f"Created new external JSON table in new schema: {make_table(non_delta=True)}")
160-
logger.info(f'Created new tmp table in new schema: {make_table(ctas="SELECT 2+2 AS four")}')
161-
logger.info(f'Created new view in new schema: {make_table(view=True, ctas="SELECT 2+2 AS four")}')
162-
logger.info(f'Created table with properties: {make_table(tbl_properties={"test": "tableproperty"})}')
61+
def sql_fetch_all(sql_backend):
62+
return partial(sql_backend.fetch)
16363

16464

16565
@pytest.fixture

tests/integration/framework/test_dashboards.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
)
1212

1313

14-
def test_creating_widgets(ws: WorkspaceClient, make_warehouse, make_schema, env_or_skip):
15-
pytest.skip()
14+
@pytest.mark.skip("not working")
15+
def test_creating_widgets(ws: WorkspaceClient, make_warehouse, env_or_skip):
1616
dashboard_widgets_api = DashboardWidgetsAPI(ws.api_client)
1717
query_visualizations_api = QueryVisualizationsExt(ws.api_client)
1818

tests/integration/framework/test_fixtures.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,23 @@ def test_sql_backend_works(ws, wsfs_wheel):
105105
def test_env_or_skip(env_or_skip):
106106
with pytest.raises((Skipped, Failed)):
107107
env_or_skip("NO_ENV_VAR_HERE")
108+
109+
110+
def test_catalog_fixture(make_catalog):
111+
logger.info(f"Created new catalog: {make_catalog()}")
112+
logger.info(f"Created new catalog: {make_catalog()}")
113+
114+
115+
def test_schema_fixture(make_schema):
116+
logger.info(f"Created new schema: {make_schema()}")
117+
logger.info(f"Created new schema: {make_schema()}")
118+
119+
120+
def test_table_fixture(make_table):
121+
logger.info(f"Created new managed table in new schema: {make_table()}")
122+
logger.info(f'Created new managed table in default schema: {make_table(schema_name="default")}')
123+
logger.info(f"Created new external table in new schema: {make_table(external=True)}")
124+
logger.info(f"Created new external JSON table in new schema: {make_table(non_delta=True)}")
125+
logger.info(f'Created new tmp table in new schema: {make_table(ctas="SELECT 2+2 AS four")}')
126+
logger.info(f'Created new view in new schema: {make_table(view=True, ctas="SELECT 2+2 AS four")}')
127+
logger.info(f'Created table with properties: {make_table(tbl_properties={"test": "tableproperty"})}')
Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,23 @@
11
import logging
22

3-
from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend
43
from databricks.labs.ucx.hive_metastore.data_objects import ExternalLocationCrawler
54
from databricks.labs.ucx.hive_metastore.mounts import Mount
65
from databricks.labs.ucx.hive_metastore.tables import Table
76

87
logger = logging.getLogger(__name__)
98

109

11-
def test_external_locations(ws, make_warehouse, make_schema, env_or_skip):
12-
warehouse_id = env_or_skip("TEST_DEFAULT_WAREHOUSE_ID")
13-
10+
def test_external_locations(ws, sql_backend, inventory_schema, env_or_skip):
1411
logger.info("setting up fixtures")
15-
sbe = StatementExecutionBackend(ws, warehouse_id)
1612
tables = [
1713
Table("hive_metastore", "foo", "bar", "MANAGED", "delta", location="s3://test_location/test1/table1"),
1814
Table("hive_metastore", "foo", "bar", "EXTERNAL", "delta", location="s3://test_location/test2/table2"),
1915
Table("hive_metastore", "foo", "bar", "EXTERNAL", "delta", location="dbfs:/mnt/foo/test3/table3"),
2016
]
21-
schema = make_schema()
22-
sbe.save_table(f"{schema}.tables", tables, Table)
23-
sbe.save_table(f"{schema}.mounts", [Mount("/mnt/foo", "s3://bar")], Mount)
17+
sql_backend.save_table(f"{inventory_schema}.tables", tables, Table)
18+
sql_backend.save_table(f"{inventory_schema}.mounts", [Mount("/mnt/foo", "s3://bar")], Mount)
2419

25-
crawler = ExternalLocationCrawler(ws, sbe, schema.split(".")[1])
20+
crawler = ExternalLocationCrawler(ws, sql_backend, inventory_schema)
2621
results = crawler.snapshot()
2722
assert len(results) == 2
2823
assert results[1].location == "s3://bar/test3/"

tests/integration/hive_metastore/test_grants.py

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,28 @@
22

33
from databricks.sdk import WorkspaceClient
44

5-
from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend
65
from databricks.labs.ucx.hive_metastore import GrantsCrawler, TablesCrawler
76

87
logger = logging.getLogger(__name__)
98

109

1110
def test_all_grants_in_databases(
12-
ws: WorkspaceClient, sql_exec, make_catalog, make_schema, make_table, make_group, env_or_skip
11+
ws: WorkspaceClient, sql_backend, inventory_schema, make_schema, make_table, make_group, env_or_skip
1312
):
14-
warehouse_id = env_or_skip("TEST_DEFAULT_WAREHOUSE_ID")
15-
1613
group_a = make_group()
1714
group_b = make_group()
1815
schema_a = make_schema()
1916
schema_b = make_schema()
20-
table_a = make_table(schema=schema_a)
21-
table_b = make_table(schema=schema_b)
22-
23-
sql_exec(f"GRANT USAGE ON SCHEMA default TO `{group_a.display_name}`")
24-
sql_exec(f"GRANT USAGE ON SCHEMA default TO `{group_b.display_name}`")
25-
sql_exec(f"GRANT SELECT ON TABLE {table_a} TO `{group_a.display_name}`")
26-
sql_exec(f"GRANT SELECT ON TABLE {table_b} TO `{group_b.display_name}`")
27-
sql_exec(f"GRANT MODIFY ON SCHEMA {schema_b} TO `{group_b.display_name}`")
17+
table_a = make_table(schema_name=schema_a.name)
18+
table_b = make_table(schema_name=schema_b.name)
2819

29-
inventory_schema = make_schema(catalog="hive_metastore")
30-
_, inventory_schema = inventory_schema.split(".")
20+
sql_backend.execute(f"GRANT USAGE ON SCHEMA default TO `{group_a.display_name}`")
21+
sql_backend.execute(f"GRANT USAGE ON SCHEMA default TO `{group_b.display_name}`")
22+
sql_backend.execute(f"GRANT SELECT ON TABLE {table_a.full_name} TO `{group_a.display_name}`")
23+
sql_backend.execute(f"GRANT SELECT ON TABLE {table_b.full_name} TO `{group_b.display_name}`")
24+
sql_backend.execute(f"GRANT MODIFY ON SCHEMA {schema_b.full_name} TO `{group_b.display_name}`")
3125

32-
backend = StatementExecutionBackend(ws, warehouse_id)
33-
tables = TablesCrawler(backend, inventory_schema)
26+
tables = TablesCrawler(sql_backend, inventory_schema)
3427
grants = GrantsCrawler(tables)
3528

3629
all_grants = {}
@@ -39,6 +32,6 @@ def test_all_grants_in_databases(
3932
all_grants[f"{grant.principal}.{grant.object_key}"] = grant.action_type
4033

4134
assert len(all_grants) >= 3, "must have at least three grants"
42-
assert all_grants[f"{group_a.display_name}.{table_a}"] == "SELECT"
43-
assert all_grants[f"{group_b.display_name}.{table_b}"] == "SELECT"
44-
assert all_grants[f"{group_b.display_name}.{schema_b}"] == "MODIFY"
35+
assert all_grants[f"{group_a.display_name}.{table_a.full_name}"] == "SELECT"
36+
assert all_grants[f"{group_b.display_name}.{table_b.full_name}"] == "SELECT"
37+
assert all_grants[f"{group_b.display_name}.{schema_b.full_name}"] == "MODIFY"

0 commit comments

Comments
 (0)