Skip to content

Commit 902fdc3

Browse files
committed
refactor: Improve code readability with consistent formatting
1 parent e36d994 commit 902fdc3

File tree

3 files changed

+46
-26
lines changed

3 files changed

+46
-26
lines changed

pyiceberg/table/__init__.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1799,16 +1799,20 @@ def projection(self) -> Schema:
17991799
return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)
18001800

18011801
@abstractmethod
1802-
def plan_files(self) -> Iterable[ScanTask]: ...
1802+
def plan_files(self) -> Iterable[ScanTask]:
1803+
...
18031804

18041805
@abstractmethod
1805-
def to_arrow(self) -> pa.Table: ...
1806+
def to_arrow(self) -> pa.Table:
1807+
...
18061808

18071809
@abstractmethod
1808-
def to_pandas(self, **kwargs: Any) -> pd.DataFrame: ...
1810+
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
1811+
...
18091812

18101813
@abstractmethod
1811-
def to_polars(self) -> pl.DataFrame: ...
1814+
def to_polars(self) -> pl.DataFrame:
1815+
...
18121816

18131817
def update(self: S, **overrides: Any) -> S:
18141818
"""Create a copy of this table scan with updated fields."""
@@ -1841,7 +1845,8 @@ def with_case_sensitive(self: S, case_sensitive: bool = True) -> S:
18411845
return self.update(case_sensitive=case_sensitive)
18421846

18431847
@abstractmethod
1844-
def count(self) -> int: ...
1848+
def count(self) -> int:
1849+
...
18451850

18461851

18471852
class ScanTask:

pyiceberg/table/upsert_util.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,7 @@ def has_duplicate_rows(df: pyarrow_table, join_cols: list[str]) -> bool:
191191
return len(df.select(join_cols).group_by(join_cols).aggregate([([], "count_all")]).filter(pc.field("count_all") > 1)) > 0
192192

193193

194-
def _compare_columns_vectorized(
195-
source_col: pa.Array | pa.ChunkedArray, target_col: pa.Array | pa.ChunkedArray
196-
) -> pa.Array:
194+
def _compare_columns_vectorized(source_col: pa.Array | pa.ChunkedArray, target_col: pa.Array | pa.ChunkedArray) -> pa.Array:
197195
"""
198196
Vectorized comparison of two columns, returning a boolean array where True means values differ.
199197
@@ -223,7 +221,7 @@ def _compare_columns_vectorized(
223221

224222
# PyArrow cannot directly compare struct columns, so we recursively compare each field
225223
diff_masks = []
226-
for i, field in enumerate(col_type):
224+
for i, _field in enumerate(col_type):
227225
src_field = pc.struct_field(source_col, [i])
228226
tgt_field = pc.struct_field(target_col, [i])
229227
field_diff = _compare_columns_vectorized(src_field, tgt_field)
@@ -237,7 +235,12 @@ def _compare_columns_vectorized(
237235
field_diff = functools.reduce(pc.or_, diff_masks)
238236
return pc.or_(field_diff, struct_null_diff)
239237

240-
elif pa.types.is_list(col_type) or pa.types.is_large_list(col_type) or pa.types.is_fixed_size_list(col_type) or pa.types.is_map(col_type):
238+
elif (
239+
pa.types.is_list(col_type)
240+
or pa.types.is_large_list(col_type)
241+
or pa.types.is_fixed_size_list(col_type)
242+
or pa.types.is_map(col_type)
243+
):
241244
# For list/map types, fall back to Python comparison as PyArrow doesn't support vectorized comparison
242245
# This is still faster than the original row-by-row approach since we batch the conversion
243246
source_py = source_col.to_pylist()

tests/table/test_upsert.py

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,14 @@ def gen_target_iceberg_table(
100100
) -> Table:
101101
additional_columns = ", t.order_id + 1000 as order_line_id" if composite_key else ""
102102

103-
df = ctx.sql(f"""
103+
df = ctx.sql(
104+
f"""
104105
with t as (SELECT unnest(range({start_row},{end_row + 1})) as order_id)
105106
SELECT t.order_id {additional_columns}
106107
, date '2021-01-01' as order_date, 'A' as order_type
107108
from t
108-
""").to_arrow_table()
109+
"""
110+
).to_arrow_table()
109111

110112
table = catalog.create_table(identifier, df.schema)
111113

@@ -166,23 +168,27 @@ def test_merge_scenario_skip_upd_row(catalog: Catalog) -> None:
166168

167169
ctx = SessionContext()
168170

169-
df = ctx.sql("""
171+
df = ctx.sql(
172+
"""
170173
select 1 as order_id, date '2021-01-01' as order_date, 'A' as order_type
171174
union all
172175
select 2 as order_id, date '2021-01-01' as order_date, 'A' as order_type
173-
""").to_arrow_table()
176+
"""
177+
).to_arrow_table()
174178

175179
table = catalog.create_table(identifier, df.schema)
176180

177181
table.append(df)
178182

179-
source_df = ctx.sql("""
183+
source_df = ctx.sql(
184+
"""
180185
select 1 as order_id, date '2021-01-01' as order_date, 'A' as order_type
181186
union all
182187
select 2 as order_id, date '2021-01-01' as order_date, 'B' as order_type
183188
union all
184189
select 3 as order_id, date '2021-01-01' as order_date, 'A' as order_type
185-
""").to_arrow_table()
190+
"""
191+
).to_arrow_table()
186192

187193
res = table.upsert(df=source_df, join_cols=["order_id"])
188194

@@ -202,23 +208,27 @@ def test_merge_scenario_date_as_key(catalog: Catalog) -> None:
202208
identifier = "default.test_merge_scenario_date_as_key"
203209
_drop_table(catalog, identifier)
204210

205-
df = ctx.sql("""
211+
df = ctx.sql(
212+
"""
206213
select date '2021-01-01' as order_date, 'A' as order_type
207214
union all
208215
select date '2021-01-02' as order_date, 'A' as order_type
209-
""").to_arrow_table()
216+
"""
217+
).to_arrow_table()
210218

211219
table = catalog.create_table(identifier, df.schema)
212220

213221
table.append(df)
214222

215-
source_df = ctx.sql("""
223+
source_df = ctx.sql(
224+
"""
216225
select date '2021-01-01' as order_date, 'A' as order_type
217226
union all
218227
select date '2021-01-02' as order_date, 'B' as order_type
219228
union all
220229
select date '2021-01-03' as order_date, 'A' as order_type
221-
""").to_arrow_table()
230+
"""
231+
).to_arrow_table()
222232

223233
res = table.upsert(df=source_df, join_cols=["order_date"])
224234

@@ -238,23 +248,27 @@ def test_merge_scenario_string_as_key(catalog: Catalog) -> None:
238248

239249
ctx = SessionContext()
240250

241-
df = ctx.sql("""
251+
df = ctx.sql(
252+
"""
242253
select 'abc' as order_id, 'A' as order_type
243254
union all
244255
select 'def' as order_id, 'A' as order_type
245-
""").to_arrow_table()
256+
"""
257+
).to_arrow_table()
246258

247259
table = catalog.create_table(identifier, df.schema)
248260

249261
table.append(df)
250262

251-
source_df = ctx.sql("""
263+
source_df = ctx.sql(
264+
"""
252265
select 'abc' as order_id, 'A' as order_type
253266
union all
254267
select 'def' as order_id, 'B' as order_type
255268
union all
256269
select 'ghi' as order_id, 'A' as order_type
257-
""").to_arrow_table()
270+
"""
271+
).to_arrow_table()
258272

259273
res = table.upsert(df=source_df, join_cols=["order_id"])
260274

@@ -1563,5 +1577,3 @@ def test_coarse_match_filter_composite_key_mixed_types() -> None:
15631577
# numeric_id is sparse (density < 10%), so should use In()
15641578
# string_id is non-numeric, so should use In()
15651579
assert isinstance(result, And)
1566-
1567-

0 commit comments

Comments
 (0)