|
4 | 4 |
|
5 | 5 | import databricks.sdk.core |
6 | 6 | import pytest |
7 | | -from databricks.sdk import AccountClient, WorkspaceClient |
| 7 | +from databricks.sdk import AccountClient |
8 | 8 | from databricks.sdk.core import Config |
9 | 9 |
|
10 | 10 | from databricks.labs.ucx.mixins.fixtures import * # noqa: F403 |
11 | | -from databricks.labs.ucx.mixins.sql import StatementExecutionExt |
12 | 11 |
|
13 | 12 | logging.getLogger("tests").setLevel("DEBUG") |
14 | 13 | logging.getLogger("databricks.labs.ucx").setLevel("DEBUG") |
@@ -54,112 +53,13 @@ def account_host(cfg: Config) -> str: |
54 | 53 |
|
55 | 54 |
|
56 | 55 | @pytest.fixture |
57 | | -def sql_exec(ws: WorkspaceClient, env_or_skip): |
58 | | - warehouse_id = env_or_skip("TEST_DEFAULT_WAREHOUSE_ID") |
59 | | - statement_execution = StatementExecutionExt(ws.api_client) |
60 | | - return partial(statement_execution.execute, warehouse_id) |
| 56 | +def sql_exec(sql_backend): |
| 57 | + return partial(sql_backend.execute) |
61 | 58 |
|
62 | 59 |
|
63 | 60 | @pytest.fixture |
64 | | -def sql_fetch_all(ws: WorkspaceClient, env_or_skip): |
65 | | - warehouse_id = env_or_skip("TEST_DEFAULT_WAREHOUSE_ID") |
66 | | - statement_execution = StatementExecutionExt(ws.api_client) |
67 | | - return partial(statement_execution.execute_fetch_all, warehouse_id) |
68 | | - |
69 | | - |
70 | | -@pytest.fixture |
71 | | -def make_catalog(sql_exec, make_random): |
72 | | - def create(): |
73 | | - name = f"ucx_C{make_random(4)}".lower() |
74 | | - sql_exec(f"CREATE CATALOG {name}") |
75 | | - return name |
76 | | - |
77 | | - yield from factory("catalog", create, lambda name: sql_exec(f"DROP CATALOG IF EXISTS {name} CASCADE")) # noqa: F405 |
78 | | - |
79 | | - |
80 | | -def test_catalog_fixture(make_catalog): |
81 | | - logger.info(f"Created new catalog: {make_catalog()}") |
82 | | - logger.info(f"Created new catalog: {make_catalog()}") |
83 | | - |
84 | | - |
85 | | -@pytest.fixture |
86 | | -def make_schema(sql_exec, make_random): |
87 | | - def create(*, catalog: str = "hive_metastore", schema: str | None = None): |
88 | | - if schema is None: |
89 | | - schema = f"ucx_S{make_random(4)}" |
90 | | - schema = f"{catalog}.{schema}".lower() |
91 | | - sql_exec(f"CREATE SCHEMA {schema}") |
92 | | - return schema |
93 | | - |
94 | | - yield from factory( # noqa: F405 |
95 | | - "schema", create, lambda schema: sql_exec(f"DROP SCHEMA IF EXISTS {schema} CASCADE") |
96 | | - ) |
97 | | - |
98 | | - |
99 | | -def test_schema_fixture(make_schema): |
100 | | - logger.info(f"Created new schema: {make_schema()}") |
101 | | - logger.info(f"Created new schema: {make_schema()}") |
102 | | - |
103 | | - |
104 | | -@pytest.fixture |
105 | | -def make_table(sql_exec, make_schema, make_random): |
106 | | - def create( |
107 | | - *, |
108 | | - catalog="hive_metastore", |
109 | | - name: str | None = None, |
110 | | - schema: str | None = None, |
111 | | - ctas: str | None = None, |
112 | | - non_delta: bool = False, |
113 | | - external: bool = False, |
114 | | - view: bool = False, |
115 | | - tbl_properties: dict[str, str] | None = None, |
116 | | - ): |
117 | | - if schema is None: |
118 | | - schema = make_schema(catalog=catalog) |
119 | | - if name is None: |
120 | | - name = f"{schema}.ucx_T{make_random(4)}".lower() |
121 | | - ddl = f'CREATE {"VIEW" if view else "TABLE"} {name}' |
122 | | - if ctas is not None: |
123 | | - # temporary (if not view) |
124 | | - ddl = f"{ddl} AS {ctas}" |
125 | | - elif non_delta: |
126 | | - location = "dbfs:/databricks-datasets/iot-stream/data-device" |
127 | | - ddl = f"{ddl} USING json LOCATION '{location}'" |
128 | | - elif external: |
129 | | - # external table |
130 | | - url = "s3a://databricks-datasets-oregon/delta-sharing/share/open-datasets.share" |
131 | | - share = f"{url}#delta_sharing.default.lending_club" |
132 | | - ddl = f"{ddl} USING deltaSharing LOCATION '{share}'" |
133 | | - else: |
134 | | - # managed table |
135 | | - ddl = f"{ddl} (id INT, value STRING)" |
136 | | - if tbl_properties: |
137 | | - tbl_properties = ",".join([f" '{k}' = '{v}' " for k, v in tbl_properties.items()]) |
138 | | - ddl = f"{ddl} TBLPROPERTIES ({tbl_properties})" |
139 | | - |
140 | | - sql_exec(ddl) |
141 | | - return name |
142 | | - |
143 | | - def remove(name): |
144 | | - try: |
145 | | - sql_exec(f"DROP TABLE IF EXISTS {name}") |
146 | | - except RuntimeError as e: |
147 | | - if "Cannot drop a view" in str(e): |
148 | | - sql_exec(f"DROP VIEW IF EXISTS {name}") |
149 | | - else: |
150 | | - raise e |
151 | | - |
152 | | - yield from factory("table", create, remove) # noqa: F405 |
153 | | - |
154 | | - |
155 | | -def test_table_fixture(make_table): |
156 | | - logger.info(f"Created new managed table in new schema: {make_table()}") |
157 | | - logger.info(f'Created new managed table in default schema: {make_table(schema="default")}') |
158 | | - logger.info(f"Created new external table in new schema: {make_table(external=True)}") |
159 | | - logger.info(f"Created new external JSON table in new schema: {make_table(non_delta=True)}") |
160 | | - logger.info(f'Created new tmp table in new schema: {make_table(ctas="SELECT 2+2 AS four")}') |
161 | | - logger.info(f'Created new view in new schema: {make_table(view=True, ctas="SELECT 2+2 AS four")}') |
162 | | - logger.info(f'Created table with properties: {make_table(tbl_properties={"test": "tableproperty"})}') |
| 61 | +def sql_fetch_all(sql_backend): |
| 62 | + return partial(sql_backend.fetch) |
163 | 63 |
|
164 | 64 |
|
165 | 65 | @pytest.fixture |
|
0 commit comments