-
Notifications
You must be signed in to change notification settings - Fork 16
feat: Add DataFrame support in dy.Collection
#335
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
51ced99
98975b5
16f20bf
2a05ffb
a9ef78e
c40cedb
e86a8fd
3692e13
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
|
|
||
| from dataframely._filter import Filter | ||
| from dataframely._polars import FrameType | ||
| from dataframely._typing import DataFrame as TypedDataFrame | ||
| from dataframely._typing import LazyFrame as TypedLazyFrame | ||
| from dataframely.exc import AnnotationImplementationError, ImplementationError | ||
| from dataframely.schema import Schema | ||
|
|
@@ -92,6 +93,8 @@ class MemberInfo(CollectionMember): | |
| schema: type[Schema] | ||
| #: Whether the member is optional. | ||
| is_optional: bool | ||
| #: Whether the member is a lazy frame. | ||
| is_lazy: bool = True | ||
|
|
||
|
|
||
| @dataclass | ||
|
|
@@ -241,39 +244,46 @@ def _derive_member_info( | |
| attr, annotation_args[0], annotation_args[1] | ||
| ) | ||
| elif origin == typing.Union: | ||
| # Happy path: optional member | ||
| # Happy path: optional member (e.g. dy.LazyFrame[Schema] | None) | ||
| union_args = get_args(type_annotation) | ||
| if len(union_args) != 2: | ||
| raise AnnotationImplementationError(attr, type_annotation) | ||
| if not any(get_origin(arg) is None for arg in union_args): | ||
| # Check that exactly one arg is None (type(None) is NoneType) | ||
| if not any(arg is type(None) for arg in union_args): | ||
| raise AnnotationImplementationError(attr, type_annotation) | ||
|
|
||
| not_none_args = [arg for arg in union_args if get_origin(arg) is not None] | ||
| if len(not_none_args) == 0 or not issubclass( | ||
| get_origin(not_none_args[0]), TypedLazyFrame | ||
| ): | ||
| # Get the non-None type (exactly one exists given prior checks) | ||
| not_none_arg = next(arg for arg in union_args if arg is not type(None)) | ||
|
|
||
| frame_origin = get_origin(not_none_arg) | ||
| if frame_origin is None: | ||
| raise AnnotationImplementationError(attr, type_annotation) | ||
|
|
||
| return MemberInfo( | ||
| schema=get_args(not_none_args[0])[0], | ||
| is_optional=True, | ||
| ignored_in_filters=collection_member.ignored_in_filters, | ||
| inline_for_sampling=collection_member.inline_for_sampling, | ||
| propagate_row_failures=collection_member.propagate_row_failures, | ||
| ) | ||
| elif issubclass(origin, TypedLazyFrame): | ||
| # Happy path: required member | ||
| return MemberInfo( | ||
| schema=get_args(type_annotation)[0], | ||
| is_optional=False, | ||
| ignored_in_filters=collection_member.ignored_in_filters, | ||
| inline_for_sampling=collection_member.inline_for_sampling, | ||
| propagate_row_failures=collection_member.propagate_row_failures, | ||
| ) | ||
| schema = get_args(not_none_arg)[0] | ||
| is_optional = True | ||
| elif issubclass(origin, (TypedLazyFrame, TypedDataFrame)): | ||
| frame_origin = origin | ||
| schema = get_args(type_annotation)[0] | ||
| is_optional = False | ||
| else: | ||
| raise AnnotationImplementationError(attr, type_annotation) | ||
|
|
||
| if issubclass(frame_origin, TypedLazyFrame): | ||
| is_lazy = True | ||
| elif issubclass(frame_origin, TypedDataFrame): | ||
| is_lazy = False | ||
| else: | ||
| # Some other unknown annotation | ||
| raise AnnotationImplementationError(attr, type_annotation) | ||
|
|
||
| return MemberInfo( | ||
| schema=schema, | ||
| is_optional=is_optional, | ||
| is_lazy=is_lazy, | ||
| ignored_in_filters=collection_member.ignored_in_filters, | ||
| inline_for_sampling=collection_member.inline_for_sampling, | ||
| propagate_row_failures=collection_member.propagate_row_failures, | ||
| ) | ||
|
|
||
| def __repr__(cls) -> str: | ||
| parts = [f'[Collection "{cls.__class__.__name__}"]'] | ||
| parts.append(textwrap.indent("Members:", prefix=" " * 2)) | ||
|
|
@@ -344,6 +354,16 @@ def non_ignored_members(cls) -> set[str]: | |
| if not member.ignored_in_filters | ||
| } | ||
|
|
||
| @classmethod | ||
| def lazy_members(cls) -> set[str]: | ||
| """The names of all members annotated as lazy frames.""" | ||
| return {name for name, member in cls.members().items() if member.is_lazy} | ||
|
|
||
| @classmethod | ||
| def eager_members(cls) -> set[str]: | ||
| """The names of all members annotated as data frames (eager).""" | ||
| return {name for name, member in cls.members().items() if not member.is_lazy} | ||
|
|
||
| @classmethod | ||
| def _failure_propagating_members(cls) -> set[str]: | ||
| """The names of all members of the collection that propagate individual row | ||
|
|
@@ -371,20 +391,40 @@ def common_primary_key(cls) -> list[str]: | |
| def _filters(cls) -> dict[str, Filter[Self]]: | ||
| return getattr(cls, _FILTER_ATTR) | ||
|
|
||
| def to_dict(self) -> dict[str, pl.LazyFrame]: | ||
| """Return a dictionary representation of this collection.""" | ||
| def to_dict(self) -> dict[str, FrameType]: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This signature forces the user to always specify the return type. I would keep |
||
| """Return a dictionary representation of this collection. | ||
|
|
||
| Returns: | ||
| A dictionary mapping member names to their frames. | ||
| Members annotated with :class:`~dataframely.DataFrame` return DataFrames, | ||
| while members annotated with :class:`~dataframely.LazyFrame` return LazyFrames. | ||
| """ | ||
| return { | ||
| member: getattr(self, member) | ||
| for member in self.member_schemas() | ||
| if getattr(self, member) is not None | ||
| } | ||
|
|
||
| def _to_lazy_dict(self) -> dict[str, pl.LazyFrame]: | ||
| """Return a dictionary with all members as lazy frames (internal use).""" | ||
| return { | ||
| member: getattr(self, member).lazy() | ||
| for member in self.member_schemas() | ||
| if getattr(self, member) is not None | ||
| } | ||
|
|
||
| @classmethod | ||
| def _init(cls, data: Mapping[str, FrameType], /) -> Self: | ||
| out = cls() | ||
| for member_name, member in cls.members().items(): | ||
| if member.is_optional and member_name not in data: | ||
| setattr(out, member_name, None) | ||
| else: | ||
| elif member.is_lazy: | ||
| setattr(out, member_name, data[member_name].lazy()) | ||
| else: | ||
| frame = data[member_name] | ||
| if isinstance(frame, pl.LazyFrame): | ||
| setattr(out, member_name, frame.collect()) | ||
| else: | ||
| setattr(out, member_name, frame) | ||
|
Comment on lines
+425
to
+429
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can just do |
||
| return out | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,7 +32,7 @@ | |
| from dataframely._storage.constants import COLLECTION_METADATA_KEY | ||
| from dataframely._storage.delta import DeltaStorageBackend | ||
| from dataframely._storage.parquet import ParquetStorageBackend | ||
| from dataframely._typing import LazyFrame, Validation | ||
| from dataframely._typing import DataFrame, LazyFrame, Validation | ||
| from dataframely.exc import ( | ||
| DeserializationError, | ||
| ValidationError, | ||
|
|
@@ -68,13 +68,13 @@ class Collection(BaseCollection, ABC): | |
| to 1-N relationships that are managed in separate data frames. | ||
|
|
||
| A collection must only have type annotations for :class:`~dataframely.LazyFrame` | ||
| with known schema: | ||
| or :class:`~dataframely.DataFrame` with known schema: | ||
|
|
||
| .. code:: python | ||
|
|
||
| class MyCollection(dy.Collection): | ||
| first_member: dy.LazyFrame[MyFirstSchema] | ||
| second_member: dy.LazyFrame[MySecondSchema] | ||
| second_member: dy.DataFrame[MySecondSchema] | ||
|
|
||
| Besides, it may define *filters* (c.f. :meth:`~dataframely.filter`) and arbitrary | ||
| methods. | ||
|
|
@@ -735,7 +735,7 @@ def join( | |
| how=how, | ||
| maintain_order=maintain_order, | ||
| ) | ||
| for key, lf in self.to_dict().items() | ||
| for key, lf in self._to_lazy_dict().items() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might not be necessary with the comment above |
||
| } | ||
| ) | ||
|
|
||
|
|
@@ -788,16 +788,15 @@ def collect_all(self) -> Self: | |
| particularly useful when :meth:`filter` is called with lazy frame inputs. | ||
|
|
||
| Returns: | ||
| The same collection with all members collected once. | ||
|
|
||
| Note: | ||
| As all collection members are required to be lazy frames, the returned | ||
| collection's members are still "lazy". However, they are "shallow-lazy", | ||
| meaning they are obtained by calling `.collect().lazy()`. | ||
| The same collection with all members collected once. Members annotated | ||
| with :class:`~dataframely.DataFrame` are returned as DataFrames, while | ||
| members annotated with :class:`~dataframely.LazyFrame` are returned as | ||
| "shallow-lazy" frames (obtained by calling ``.collect().lazy()``). | ||
| """ | ||
| dfs = pl.collect_all(self.to_dict().values()) | ||
| lazy_dict = self._to_lazy_dict() | ||
| dfs = pl.collect_all(lazy_dict.values()) | ||
| return self._init( | ||
| {key: dfs[i].lazy() for i, key in enumerate(self.to_dict().keys())} | ||
| {key: dfs[i].lazy() for i, key in enumerate(lazy_dict.keys())} | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the |
||
| ) | ||
|
|
||
| # --------------------------------- SERIALIZATION -------------------------------- # | ||
|
|
@@ -842,6 +841,7 @@ def serialize(cls) -> str: | |
| name: { | ||
| "schema": info.schema._as_dict(), | ||
| "is_optional": info.is_optional, | ||
| "is_lazy": info.is_lazy, | ||
| "ignored_in_filters": info.ignored_in_filters, | ||
| "inline_for_sampling": info.inline_for_sampling, | ||
| } | ||
|
|
@@ -1172,7 +1172,7 @@ def _write(self, backend: StorageBackend, **kwargs: Any) -> None: | |
| # Utility method encapsulating the interaction with the StorageBackend | ||
|
|
||
| backend.write_collection( | ||
| self.to_dict(), | ||
| self._to_lazy_dict(), | ||
| serialized_collection=self.serialize(), | ||
| serialized_schemas={ | ||
| key: schema.serialize() for key, schema in self.member_schemas().items() | ||
|
|
@@ -1184,7 +1184,7 @@ def _sink(self, backend: StorageBackend, **kwargs: Any) -> None: | |
| # Utility method encapsulating the interaction with the StorageBackend | ||
|
|
||
| backend.sink_collection( | ||
| self.to_dict(), | ||
| self._to_lazy_dict(), | ||
| serialized_collection=self.serialize(), | ||
|
gab23r marked this conversation as resolved.
|
||
| serialized_schemas={ | ||
| key: schema.serialize() for key, schema in self.member_schemas().items() | ||
|
|
@@ -1330,11 +1330,14 @@ def deserialize_collection(data: str, strict: bool = True) -> type[Collection] | | |
|
|
||
| annotations: dict[str, Any] = {} | ||
| for name, info in decoded["members"].items(): | ||
| lf_type = LazyFrame[_schema_from_dict(info["schema"])] # type: ignore | ||
| schema = _schema_from_dict(info["schema"]) | ||
| # Default to lazy for backwards compatibility with old serialized data | ||
| is_lazy = info.get("is_lazy", True) | ||
| frame_type = LazyFrame[schema] if is_lazy else DataFrame[schema] # type: ignore | ||
| if info["is_optional"]: | ||
| lf_type = lf_type | None # type: ignore | ||
| frame_type = frame_type | None # type: ignore | ||
| annotations[name] = Annotated[ | ||
| lf_type, | ||
| frame_type, | ||
| CollectionMember( | ||
| ignored_in_filters=info["ignored_in_filters"], | ||
| inline_for_sampling=info["inline_for_sampling"], | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did these need to change? It feels like it's the same thing rewritten?