@@ -42,19 +42,24 @@ def _schema_for(cls, klass):
4242 return ", " .join (fields )
4343
4444 @classmethod
45- def _filter_none_rows (cls , rows ):
45+ def _filter_none_rows (cls , rows , full_name ):
46+ if len (rows ) == 0 :
47+ return rows
48+
4649 results = []
47- nullable_fields = []
50+ nullable_fields = set ()
4851
4952 for field in dataclasses .fields (rows [0 ]):
5053 if field .default is None :
51- nullable_fields .append (field .name )
54+ nullable_fields .add (field .name )
5255
5356 for row in rows :
57+ if row is None :
58+ continue
5459 row_contains_none = False
5560 for column , value in dataclasses .asdict (row ).items ():
5661 if value is None and column not in nullable_fields :
57- logger .debug (f"Field { column } is None, filtering row" )
62+ logger .warning (f"[ { full_name } ] Field { column } is None, filtering row" )
5863 row_contains_none = True
5964 break
6065
@@ -81,7 +86,7 @@ def save_table(self, full_name: str, rows: list[any], mode="append"):
8186 if mode == "overwrite" :
8287 msg = "Overwrite mode is not yet supported"
8388 raise NotImplementedError (msg )
84-
89+ rows = self . _filter_none_rows ( rows , full_name )
8590 if len (rows ) == 0 :
8691 return
8792
@@ -136,7 +141,7 @@ def fetch(self, sql) -> Iterator[any]:
136141 return self ._spark .sql (sql ).collect ()
137142
138143 def save_table (self , full_name : str , rows : list [any ], mode : str = "append" ):
139- rows = self ._filter_none_rows (rows )
144+ rows = self ._filter_none_rows (rows , full_name )
140145
141146 if len (rows ) == 0 :
142147 return
0 commit comments