Skip to content

Commit 96915b9

Browse files
authored
Merge pull request #60 from kraken-tech/improve-coverage
Improve coverage
2 parents 64e9256 + 8791b40 commit 96915b9

5 files changed

Lines changed: 171 additions & 41 deletions

File tree

src/psycopack/_commands.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,8 @@ def _get_first_partition_start_date(
157157
elif strategy.partition_by == _partition.PartitionInterval.MONTH:
158158
# Align to start of month
159159
return min_value.replace(day=1)
160-
else:
161-
raise ValueError(f"Unsupported partition_by: {strategy.partition_by}")
160+
else: # pragma: no cover
161+
raise NotImplementedError
162162

163163
def _get_last_partition_end_date(
164164
self,
@@ -186,8 +186,8 @@ def _get_last_partition_end_date(
186186
for _ in range(1 + num_of_extra_partitions):
187187
temp_date = (temp_date + datetime.timedelta(days=32)).replace(day=1)
188188
return temp_date
189-
else:
190-
raise ValueError(f"Unsupported partition_by: {strategy.partition_by}")
189+
else: # pragma: no cover
190+
raise NotImplementedError
191191

192192
def _get_partition_end_boundary(
193193
self,
@@ -207,8 +207,8 @@ def _get_partition_end_boundary(
207207
return (current_partition_start + datetime.timedelta(days=32)).replace(
208208
day=1
209209
)
210-
else:
211-
raise ValueError(f"Unsupported partition_by: {strategy.partition_by}")
210+
else: # pragma: no cover
211+
raise NotImplementedError
212212

213213
def _get_partition_suffix(
214214
self,
@@ -227,8 +227,8 @@ def _get_partition_suffix(
227227
elif strategy.partition_by == _partition.PartitionInterval.MONTH:
228228
# Format: p202501 (YYYYMM)
229229
return f"p{current_partition_start.strftime('%Y%m')}"
230-
else:
231-
raise ValueError(f"Unsupported partition_by: {strategy.partition_by}")
230+
else: # pragma: no cover
231+
raise NotImplementedError
232232

233233
def _create_datetime_partition(
234234
self,

src/psycopack/_introspect.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -765,14 +765,6 @@ def is_table_owner(self, *, table: str, schema: str) -> bool:
765765
assert result is not None
766766
return bool(result[0])
767767

768-
def get_current_date(self) -> datetime.date:
769-
self.cur.execute("SELECT CURRENT_DATE;")
770-
result = self.cur.fetchone()
771-
assert result is not None
772-
current_date = result[0]
773-
assert isinstance(current_date, datetime.date)
774-
return current_date
775-
776768
def get_min_partition_date_value(self, *, table: str, column: str) -> datetime.date:
777769
"""
778770
Get the minimum value of the partition column from the table.

src/psycopack/_repack.py

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,10 @@ def __init__(
180180
# after checking the table exists so we can safely introspect it.
181181
# The original table always has a single-column PK; partitioning
182182
# only affects the copy table until swap() is called.
183-
self._pk_column = ""
183+
self.pk_column = ""
184184
pk_info = self.introspector.get_primary_key_info(table=self.table)
185185
if pk_info and len(pk_info.columns) == 1:
186-
self._pk_column = pk_info.columns[0]
186+
self.pk_column = pk_info.columns[0]
187187

188188
if not skip_permissions_check:
189189
self._check_user_permissions()
@@ -240,26 +240,6 @@ def __init__(
240240
schema=schema,
241241
)
242242

243-
@property
244-
def pk_column(self) -> str:
245-
"""
246-
Method to cache the name of the pk column in the instance as to avoid
247-
calling introspection queries multiple times.
248-
249-
When partitioning is enabled, the table's PK becomes composite after
250-
setup_repacking(). In that case, we return the first column which is
251-
always the original single-column PK.
252-
"""
253-
if self._pk_column:
254-
return self._pk_column
255-
pk_info = self.introspector.get_primary_key_info(table=self.table)
256-
assert pk_info is not None
257-
# For partitioned tables, PK becomes composite (original PK + partition column)
258-
# We always want the first column, which is the original PK
259-
assert len(pk_info.columns) >= 1
260-
self._pk_column = pk_info.columns[0]
261-
return self._pk_column
262-
263243
def full(self) -> None:
264244
"""
265245
Process a full table repack from beginning to end.
@@ -483,7 +463,7 @@ def post_sync_update(self) -> None:
483463
return
484464
elif self.sync_strategy == _sync_strategy.SyncStrategy.CHANGE_LOG:
485465
return self._post_sync_update_for_change_log()
486-
else:
466+
else: # pragma: no cover
487467
raise NotImplementedError
488468

489469
def _post_sync_update_for_change_log(self) -> None:

src/psycopack/_tracker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def _validate_stage_dependencies(self, *, stage: Stage) -> None:
201201
Stage.SWAP: [self.table, self.copy_table],
202202
Stage.CLEAN_UP: [self.repacked_name, self.table],
203203
}
204-
else:
204+
else: # pragma: no cover
205205
raise NotImplementedError
206206

207207
if stage in table_dependencies:
@@ -223,7 +223,7 @@ def _validate_stage_dependencies(self, *, stage: Stage) -> None:
223223
Stage.SWAP: [self.trigger],
224224
Stage.CLEAN_UP: [self.repacked_trigger],
225225
}
226-
else:
226+
else: # pragma: no cover
227227
raise NotImplementedError
228228

229229
if stage in trigger_dependencies:

tests/test_repack.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3080,3 +3080,161 @@ def test_partition_calling_stages_individually(
30803080
""")
30813081
partitions = cur.fetchall()
30823082
assert len(partitions) > 0, "Table should have partitions"
3083+
3084+
3085+
def test_pk_column_property_accessed_before_setup(
3086+
connection: _psycopg.Connection,
3087+
) -> None:
3088+
"""
3089+
Test pk_column property when accessed before setup_repacking.
3090+
"""
3091+
with _cur.get_cursor(connection, logged=True) as cur:
3092+
# Create a simple table
3093+
cur.execute(
3094+
"""
3095+
CREATE TABLE test_pk_early_access (
3096+
id bigint PRIMARY KEY,
3097+
data text
3098+
);
3099+
"""
3100+
)
3101+
cur.execute("INSERT INTO test_pk_early_access (id, data) VALUES (1, 'test');")
3102+
connection.commit()
3103+
3104+
try:
3105+
# Create Psycopack instance
3106+
repack = Psycopack(
3107+
table="test_pk_early_access",
3108+
batch_size=1,
3109+
conn=connection,
3110+
cur=cur,
3111+
)
3112+
# Access pk_column BEFORE calling setup_repacking
3113+
pk_col = repack.pk_column
3114+
assert pk_col == "id"
3115+
# Verify _pk_column was cached
3116+
assert repack.pk_column == "id"
3117+
finally:
3118+
connection.rollback()
3119+
cur.execute("DROP TABLE IF EXISTS test_pk_early_access CASCADE;")
3120+
connection.commit()
3121+
3122+
3123+
def test_pk_column_property_for_partitioned_table(
3124+
connection: _psycopg.Connection,
3125+
) -> None:
3126+
"""
3127+
Test pk_column property when _pk_column is None for partitioned tables.
3128+
3129+
This tests the code path where pk_column is accessed before setup_repacking
3130+
and handles composite primary keys (original PK + partition column).
3131+
"""
3132+
with _cur.get_cursor(connection, logged=True) as cur:
3133+
# Create a simple table with date column for partitioning
3134+
cur.execute(
3135+
"""
3136+
CREATE TABLE test_pk_column (
3137+
id bigint PRIMARY KEY,
3138+
datetime_field date NOT NULL,
3139+
data text
3140+
);
3141+
"""
3142+
)
3143+
cur.execute(
3144+
"INSERT INTO test_pk_column (id, datetime_field, data) "
3145+
"VALUES (1, '2025-01-01', 'test');"
3146+
)
3147+
connection.commit()
3148+
try:
3149+
# Create Psycopack instance with partitioning
3150+
repack = Psycopack(
3151+
table="test_pk_column",
3152+
batch_size=1,
3153+
conn=connection,
3154+
cur=cur,
3155+
partition_config=_partition.PartitionConfig(
3156+
column="datetime_field",
3157+
num_of_extra_partitions_ahead=1,
3158+
strategy=_partition.DateRangeStrategy(
3159+
partition_by=_partition.PartitionInterval.DAY
3160+
),
3161+
),
3162+
sync_strategy=SyncStrategy.CHANGE_LOG,
3163+
)
3164+
3165+
# Validate and setup to create the partitioned table
3166+
repack.pre_validate()
3167+
repack.setup_repacking()
3168+
3169+
# Now test pk_column property - for partitioned tables,
3170+
# the PK becomes composite (original PK + partition column)
3171+
# but pk_column should return the first column (original PK)
3172+
pk_col = repack.pk_column
3173+
assert pk_col == "id"
3174+
3175+
# Verify the copy table was created as partitioned
3176+
cur.execute(
3177+
f"""
3178+
SELECT relkind FROM pg_class
3179+
WHERE relname = '{repack.copy_table}'
3180+
AND relnamespace = 'public'::regnamespace;
3181+
"""
3182+
)
3183+
result = cur.fetchone()
3184+
assert result is not None
3185+
assert result[0] == "p" # 'p' means partitioned table
3186+
3187+
finally:
3188+
connection.rollback()
3189+
cur.execute("DROP TABLE IF EXISTS test_pk_column CASCADE;")
3190+
connection.commit()
3191+
3192+
3193+
def test_full_method_resume_from_swap_stage(
3194+
connection: _psycopg.Connection,
3195+
) -> None:
3196+
"""
3197+
Test the full() method when resuming from SWAP stage.
3198+
"""
3199+
with _cur.get_cursor(connection, logged=True) as cur:
3200+
# Create a test table
3201+
table_name = "test_resume_swap"
3202+
cur.execute(f"CREATE TABLE {table_name} (id bigint PRIMARY KEY, data text);")
3203+
cur.execute(f"INSERT INTO {table_name} (id, data) VALUES (1, 'test');")
3204+
connection.commit()
3205+
3206+
try:
3207+
repack = Psycopack(
3208+
table=table_name,
3209+
batch_size=100,
3210+
conn=connection,
3211+
cur=cur,
3212+
)
3213+
3214+
# Run through to POST_SYNC_UPDATE stage (but not SWAP)
3215+
repack.pre_validate()
3216+
repack.setup_repacking()
3217+
repack.backfill()
3218+
repack.sync_schemas()
3219+
repack.post_sync_update()
3220+
3221+
# Verify we're at SWAP stage (next to be completed)
3222+
current_stage = repack.tracker.get_current_stage()
3223+
assert current_stage == _tracker.Stage.SWAP
3224+
3225+
# Now call full() - it should run swap and clean_up
3226+
repack.full()
3227+
3228+
# Verify we've completed everything - after clean_up, the tracker row
3229+
# is deleted, so we're back at PRE_VALIDATION stage
3230+
current_stage = repack.tracker.get_current_stage()
3231+
assert current_stage == _tracker.Stage.PRE_VALIDATION
3232+
3233+
finally:
3234+
connection.rollback()
3235+
# Clean up any remaining tables
3236+
cur.execute(f"DROP TABLE IF EXISTS {table_name} CASCADE;")
3237+
cur.execute(
3238+
f"DROP TABLE IF EXISTS {table_name}_psycopack_repacked CASCADE;"
3239+
)
3240+
connection.commit()

0 commit comments

Comments
 (0)