|
1 | 1 | import copy |
2 | | -from http.client import CONFLICT, HTTPException |
3 | 2 | import itertools |
4 | | -from typing import Any, Optional, Tuple, Union |
5 | | -from urllib.error import HTTPError |
| 3 | +from typing import Optional, Tuple, Union |
6 | 4 | from uuid import UUID |
7 | 5 |
|
8 | 6 | from azure.identity import DefaultAzureCredential |
|
11 | 9 | from pyapacheatlas.core import (AtlasEntity, AtlasProcess, |
12 | 10 | PurviewClient) |
13 | 11 | from pyapacheatlas.core.typedef import (AtlasAttributeDef,Cardinality,EntityTypeDef) |
14 | | -from pyapacheatlas.core.util import GuidTracker |
| 12 | +from pyapacheatlas.core.util import GuidTracker, AtlasException |
15 | 13 | from pyhocon import ConfigFactory |
16 | 14 |
|
17 | 15 | from registry.interface import Registry |
|
23 | 21 | TYPEDEF_ARRAY_ANCHOR=f"array<feathr_anchor_v1>" |
24 | 22 | TYPEDEF_ARRAY_DERIVED_FEATURE=f"array<feathr_derived_feature_v1>" |
25 | 23 | TYPEDEF_ARRAY_ANCHOR_FEATURE=f"array<feathr_anchor_feature_v1>" |
| 24 | + |
| 25 | +class ConflictError(Exception): |
| 26 | + pass |
| 27 | + |
26 | 28 | class PurviewRegistry(Registry): |
27 | 29 | def __init__(self,azure_purview_name: str, registry_delimiter: str = "__", credential=None,register_types = True): |
28 | 30 | self.registry_delimiter = registry_delimiter |
@@ -568,18 +570,22 @@ def _register_feathr_feature_types(self): |
568 | 570 | def _upload_entity_batch(self, entity_batch:list[AtlasEntity]): |
569 | 571 | # we only support entity creation, update is not supported. |
570 | 572 | # setting lastModifiedTS ==0 will ensure this, if another entity with ts>=1 exist |
571 | | - # upload funtion will fail with 412 Precondition fail. |
| 573 | + # upload function will fail with 412 Precondition fail. |
572 | 574 | for entity in entity_batch: |
573 | 575 | entity.lastModifiedTS="0" |
574 | | - results = self.purview_client.upload_entities( |
575 | | - batch=entity) |
576 | | - if results: |
577 | | - dict = {x.guid: x for x in entity_batch} |
578 | | - for k, v in results['guidAssignments'].items(): |
579 | | - dict[k].guid = v |
580 | | - else: |
581 | | - raise RuntimeError("Feature registration failed.", results) |
582 | | - |
| 576 | + try: |
| 577 | + results = self.purview_client.upload_entities( |
| 578 | + batch=entity) |
| 579 | + if results: |
| 580 | + dict = {x.guid: x for x in entity_batch} |
| 581 | + for k, v in results['guidAssignments'].items(): |
| 582 | + dict[k].guid = v |
| 583 | + else: |
| 584 | + raise RuntimeError("Feature registration failed.", results) |
| 585 | + except AtlasException as e: |
| 586 | + if "PreConditionCheckFailed" in e.args[0]: |
| 587 | + raise ConflictError(f"Entity {entity.guid}, {entity.typeName} -- {entity.qualifiedName} already exists in Purview. Please use a new name.") |
| 588 | + |
583 | 589 | def _generate_fully_qualified_name(self, segments): |
584 | 590 | return self.registry_delimiter.join(segments) |
585 | 591 |
|
|
0 commit comments