Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 170 additions & 71 deletions python/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion python/pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
io = load_file_io(self.properties, table.metadata_location)
metadata = table.metadata
manifest_lists_to_delete = set()
manifests_to_delete = []
manifests_to_delete: List[ManifestFile] = []
for snapshot in metadata.snapshots:
manifests_to_delete += snapshot.manifests(io)
if snapshot.manifest_list is not None:
Expand Down
8 changes: 4 additions & 4 deletions python/pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ class OAuthErrorResponse(IcebergBaseModel):
error: Literal[
"invalid_request", "invalid_client", "invalid_grant", "unauthorized_client", "unsupported_grant_type", "invalid_scope"
]
error_description: Optional[str]
error_uri: Optional[str]
error_description: Optional[str] = None
error_uri: Optional[str] = None


class RestCatalog(Catalog):
Expand Down Expand Up @@ -430,7 +430,7 @@ def create_table(
write_order=sort_order,
properties=properties,
)
serialized_json = request.json()
serialized_json = request.model_dump_json().encode("utf-8")
response = self._session.post(
self.url(Endpoints.create_table, namespace=namespace_and_table["namespace"]),
data=serialized_json,
Expand Down Expand Up @@ -507,7 +507,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
"""
response = self._session.post(
self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)),
data=table_request.json(),
data=table_request.model_dump_json().encode("utf-8"),
)
try:
response.raise_for_status()
Expand Down
10 changes: 7 additions & 3 deletions python/pyiceberg/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ class FauxTable(IcebergBaseModel):
metadata_location: str
metadata: TableMetadata

print(FauxTable(identifier=table.identifier, metadata=table.metadata, metadata_location=table.metadata_location).json())
print(
FauxTable(
identifier=table.identifier, metadata=table.metadata, metadata_location=table.metadata_location
).model_dump_json()
)

def describe_properties(self, properties: Properties) -> None:
self._out(properties)
Expand All @@ -209,13 +213,13 @@ def text(self, response: str) -> None:
print(json.dumps(response))

def schema(self, schema: Schema) -> None:
print(schema.json())
print(schema.model_dump_json())

def files(self, table: Table, history: bool) -> None:
pass

def spec(self, spec: PartitionSpec) -> None:
print(spec.json())
print(spec.model_dump_json())

def uuid(self, uuid: Optional[UUID]) -> None:
self._out({"uuid": str(uuid) if uuid else "missing"})
Expand Down
2 changes: 1 addition & 1 deletion python/pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ def _task_to_table(
schema_raw = metadata.get(ICEBERG_SCHEMA)
# TODO: if field_ids are not present, Name Mapping should be implemented to look them up in the table schema,
# see https://github.com/apache/iceberg/issues/7451
file_schema = Schema.parse_raw(schema_raw) if schema_raw is not None else pyarrow_to_schema(physical_schema)
file_schema = Schema.model_validate_json(schema_raw) if schema_raw is not None else pyarrow_to_schema(physical_schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It uses "validate" instead of "parse"? That seems weird to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it just doesn't seem to describe what the method is doing. It's not a validation. It's producing a different object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I would also prefer parse


pyarrow_filter = None
if bound_row_filter is not AlwaysTrue():
Expand Down
30 changes: 22 additions & 8 deletions python/pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from functools import cached_property
from typing import (
Any,
Expand All @@ -23,15 +25,21 @@
Tuple,
)

from pydantic import Field
from pydantic import (
BeforeValidator,
Field,
PlainSerializer,
WithJsonSchema,
)
from typing_extensions import Annotated

from pyiceberg.schema import Schema
from pyiceberg.transforms import Transform
from pyiceberg.transforms import Transform, parse_transform
from pyiceberg.typedef import IcebergBaseModel
from pyiceberg.types import NestedField, StructType

INITIAL_PARTITION_SPEC_ID = 0
_PARTITION_DATA_ID_START: int = 1000
PARTITION_FIELD_ID_START: int = 1000


class PartitionField(IcebergBaseModel):
Expand All @@ -46,7 +54,12 @@ class PartitionField(IcebergBaseModel):

source_id: int = Field(alias="source-id")
field_id: int = Field(alias="field-id")
transform: Transform[Any, Any] = Field()
transform: Annotated[ # type: ignore
Transform,
BeforeValidator(parse_transform),
PlainSerializer(lambda c: str(c), return_type=str), # pylint: disable=W0108
WithJsonSchema({"type": "string"}, mode="serialization"),
] = Field()
name: str = Field()

def __init__(
Expand All @@ -65,6 +78,7 @@ def __init__(
data["transform"] = transform
if name is not None:
data["name"] = name

super().__init__(**data)

def __str__(self) -> str:
Expand All @@ -82,7 +96,7 @@ class PartitionSpec(IcebergBaseModel):
"""

spec_id: int = Field(alias="spec-id", default=INITIAL_PARTITION_SPEC_ID)
fields: Tuple[PartitionField, ...] = Field(alias="fields", default_factory=tuple)
fields: Tuple[PartitionField, ...] = Field(default_factory=tuple)

def __init__(
self,
Expand Down Expand Up @@ -129,7 +143,7 @@ def is_unpartitioned(self) -> bool:
def last_assigned_field_id(self) -> int:
if self.fields:
return max(pf.field_id for pf in self.fields)
return _PARTITION_DATA_ID_START
return PARTITION_FIELD_ID_START

@cached_property
def source_id_to_fields_map(self) -> Dict[int, List[PartitionField]]:
Expand All @@ -143,7 +157,7 @@ def source_id_to_fields_map(self) -> Dict[int, List[PartitionField]]:
def fields_by_source_id(self, field_id: int) -> List[PartitionField]:
return self.source_id_to_fields_map.get(field_id, [])

def compatible_with(self, other: "PartitionSpec") -> bool:
def compatible_with(self, other: PartitionSpec) -> bool:
"""Produce a boolean to return True if two PartitionSpec are considered compatible."""
if self == other:
return True
Expand Down Expand Up @@ -196,7 +210,7 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre
PartitionField(
name=field.name,
source_id=fresh_field.field_id,
field_id=_PARTITION_DATA_ID_START + pos,
field_id=PARTITION_FIELD_ID_START + pos,
transform=field.transform,
)
)
Expand Down
2 changes: 1 addition & 1 deletion python/pyiceberg/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite:
overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`.
"""
with output_file.create(overwrite=overwrite) as output_stream:
json_bytes = metadata.json().encode("utf-8")
json_bytes = metadata.model_dump_json().encode("utf-8")
json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
output_stream.write(json_bytes)
6 changes: 3 additions & 3 deletions python/pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
Union,
)

from pydantic import Field
from pydantic import Field, SerializeAsAny
from sortedcontainers import SortedList

from pyiceberg.expressions import (
Expand Down Expand Up @@ -365,8 +365,8 @@ class AssertDefaultSortOrderId(TableRequirement):

class CommitTableRequest(IcebergBaseModel):
identifier: Identifier = Field()
requirements: List[TableRequirement] = Field(default_factory=list)
updates: List[TableUpdate] = Field(default_factory=list)
requirements: List[SerializeAsAny[TableRequirement]] = Field(default_factory=list)
updates: List[SerializeAsAny[TableUpdate]] = Field(default_factory=list)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand what's happening here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also had to learn this by raising an issue: pydantic/pydantic#6403 (comment)

It turns out that you need to wrap this when you're serializing an object, that might have subclasses. This way it gets correctly serialized.



class CommitTableResponse(IcebergBaseModel):
Expand Down
Loading