Skip to content

Commit ff5e144

Browse files
authored
Merge pull request #53 from kraken-tech/fix-pk-sequence-value-update-negative-2
Fix pk sequence value update negative 2
2 parents c92adab + 652f9e0 commit ff5e144

5 files changed

Lines changed: 183 additions & 17 deletions

File tree

src/psycopack/_commands.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,18 @@ def drop_sequence_if_exists(self, *, seq: str) -> None:
7474
.as_string(self.conn)
7575
)
7676

77-
def create_sequence(self, *, seq: str, bigint: bool) -> None:
77+
def create_sequence(self, *, seq: str, bigint: bool, minvalue: int) -> None:
7878
if bigint:
79-
sql = "CREATE SEQUENCE {schema}.{seq} AS BIGINT;"
79+
sql = "CREATE SEQUENCE {schema}.{seq} AS BIGINT MINVALUE {minvalue};"
8080
else:
81-
sql = "CREATE SEQUENCE {schema}.{seq};"
81+
sql = "CREATE SEQUENCE {schema}.{seq} MINVALUE {minvalue};"
8282

8383
self.cur.execute(
8484
psycopg.sql.SQL(sql)
8585
.format(
8686
seq=psycopg.sql.Identifier(seq),
8787
schema=psycopg.sql.Identifier(self.schema),
88+
minvalue=psycopg.sql.Literal(minvalue),
8889
)
8990
.as_string(self.conn)
9091
)
@@ -475,20 +476,11 @@ def swap_pk_sequence_name(self, *, first_table: str, second_table: str) -> None:
475476
self.rename_sequence(seq_from=second_seq, seq_to=first_seq)
476477
self.rename_sequence(seq_from=temp_seq, seq_to=second_seq)
477478

478-
def transfer_pk_sequence_value(
479-
self, *, source_table: str, dest_table: str, convert_pk_to_bigint: bool
480-
) -> None:
479+
def transfer_pk_sequence_value(self, *, source_table: str, dest_table: str) -> None:
481480
source_seq = self.introspector.get_pk_sequence_name(table=source_table)
482481
dest_seq = self.introspector.get_pk_sequence_name(table=dest_table)
483482
value = self.introspector.get_pk_sequence_value(seq=source_seq)
484483

485-
if convert_pk_to_bigint and value < 0:
486-
# special case handling where negative PK values were used before bigint conversion
487-
value = 2**31 # reset to positive, specifically the first bigint value
488-
489-
# TODO: try to correctly restore a negative PK sequence value if we revert swap
490-
# while doing a bigint conversion
491-
492484
self.cur.execute(
493485
psycopg.sql.SQL("SELECT setval('{schema}.{sequence}', {value});")
494486
.format(
@@ -499,6 +491,27 @@ def transfer_pk_sequence_value(
499491
.as_string(self.conn)
500492
)
501493

494+
def update_pk_sequence_value(self, *, table: str) -> None:
495+
"""
496+
Update the sequence value if it was negative (for use in bigint conversions).
497+
"""
498+
seq = self.introspector.get_pk_sequence_name(table=table)
499+
value = self.introspector.get_pk_sequence_value(seq=seq)
500+
501+
if value < 0:
502+
# special case handling where negative PK values were used before bigint conversion
503+
value = 2**31 # reset to positive, specifically the first bigint value
504+
505+
self.cur.execute(
506+
psycopg.sql.SQL("SELECT setval('{schema}.{sequence}', {value});")
507+
.format(
508+
schema=psycopg.sql.Identifier(self.schema),
509+
sequence=psycopg.sql.Identifier(seq),
510+
value=psycopg.sql.SQL(str(value)),
511+
)
512+
.as_string(self.conn)
513+
)
514+
502515
def acquire_access_exclusive_lock(self, *, table: str) -> None:
503516
self.cur.execute(
504517
psycopg.sql.SQL("LOCK TABLE {schema}.{table} IN ACCESS EXCLUSIVE MODE;")

src/psycopack/_introspect.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,26 @@ def get_pk_sequence_value(self, *, seq: str) -> int:
628628
assert isinstance(value, int)
629629
return value
630630

631+
def get_pk_sequence_min_value(self, *, seq: str) -> int:
632+
self.cur.execute(
633+
psycopg.sql.SQL(
634+
dedent("""
635+
SELECT min_value FROM pg_sequences
636+
WHERE schemaname = {schema} AND sequencename = {sequence};
637+
""")
638+
)
639+
.format(
640+
schema=psycopg.sql.Literal(self.schema),
641+
sequence=psycopg.sql.Literal(seq),
642+
)
643+
.as_string(self.conn)
644+
)
645+
result = self.cur.fetchone()
646+
assert result is not None
647+
value = result[0]
648+
assert isinstance(value, int)
649+
return value
650+
631651
def get_backfill_batch(self, *, table: str) -> BackfillBatch | None:
632652
self.cur.execute(
633653
psycopg.sql.SQL(

src/psycopack/_repack.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,6 @@ def swap(self) -> None:
403403
self.command.transfer_pk_sequence_value(
404404
source_table=self.table,
405405
dest_table=self.copy_table,
406-
convert_pk_to_bigint=self.convert_pk_to_bigint,
407406
)
408407
self.command.rename_table(
409408
table_from=self.table, table_to=self.repacked_name
@@ -461,7 +460,6 @@ def revert_swap(self) -> None:
461460
self.command.transfer_pk_sequence_value(
462461
source_table=self.table,
463462
dest_table=self.repacked_name,
464-
convert_pk_to_bigint=self.convert_pk_to_bigint,
465463
)
466464

467465
self.command.rename_table(table_from=self.table, table_to=self.copy_table)
@@ -507,6 +505,11 @@ def clean_up(self) -> None:
507505
)
508506
self.command.drop_function_if_exists(function=self.repacked_function)
509507

508+
if self.convert_pk_to_bigint and self.introspector.get_pk_sequence_name(
509+
table=self.table
510+
):
511+
self.command.update_pk_sequence_value(table=self.table)
512+
510513
for idx_sql in indexes:
511514
for index_data in indexes[idx_sql]:
512515
self.command.rename_index(
@@ -599,7 +602,7 @@ def _create_copy_table(self) -> None:
599602
always=(pk_info.identity_type == "a"),
600603
pk_column=self.pk_column,
601604
)
602-
elif self.introspector.get_pk_sequence_name(table=self.table):
605+
elif seq := self.introspector.get_pk_sequence_name(table=self.table):
603606
# Create a new sequence for the copied table's id column so that it
604607
# does not depend on the original's one. Otherwise, we wouldn't be
605608
# able to delete the original table after the repack process is
@@ -608,6 +611,7 @@ def _create_copy_table(self) -> None:
608611
self.command.create_sequence(
609612
seq=self.id_seq,
610613
bigint=("big" in pk_info.data_types[0].lower()),
614+
minvalue=self.introspector.get_pk_sequence_min_value(seq=seq),
611615
)
612616
self.command.set_table_id_seq(
613617
table=self.copy_table,

tests/factories.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ def create_table_for_repacking(
7070
);
7171
""")
7272
)
73+
if "serial" in pk_type.lower():
74+
seq = f"{table_name}_{pk_name}_seq"
75+
cur.execute(
76+
f"ALTER SEQUENCE {schema}.{seq} MINVALUE {pk_start} RESTART WITH {pk_start};"
77+
)
78+
7379
cur.execute(f"CREATE INDEX btree_idx ON {schema}.{table_name} (var_with_btree);")
7480
cur.execute(
7581
f"CREATE INDEX pattern_ops_idx ON {schema}.{table_name} (var_with_pattern_ops varchar_pattern_ops);"

tests/test_repack.py

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import dataclasses
22
from textwrap import dedent
3+
from typing import Tuple, Union
34
from unittest import mock
45

56
import pytest
@@ -137,7 +138,7 @@ def _assert_repack(
137138
if table_before.pk_seq_val is None or table_before.pk_seq_val > 0:
138139
assert table_before.pk_seq_val == table_after.pk_seq_val
139140
else:
140-
assert table_after.pk_seq_val == 2**31
141+
assert table_after.pk_seq_val is None or table_after.pk_seq_val >= 2**31
141142

142143
# All functions and triggers are removed.
143144
trigger_info = _get_trigger_info(repack, cur)
@@ -166,6 +167,78 @@ def _assert_reset(repack: Psycopack, cur: _cur.Cursor) -> None:
166167
assert repack.introspector.get_table_oid(table=repack.tracker.tracker_table) is None
167168

168169

170+
def _do_writes(
171+
table: str,
172+
cur: _cur.Cursor,
173+
schema: str = "public",
174+
check_table: str | None = None,
175+
) -> None:
176+
"""
177+
Do some writes (insert, update, delete) to check that the copy function works.
178+
"""
179+
cur.execute(
180+
dedent(f"""
181+
INSERT INTO {schema}.{table} (
182+
var_with_btree,
183+
var_with_pattern_ops,
184+
int_with_check,
185+
int_with_not_valid_check,
186+
int_with_long_index_name,
187+
var_with_unique_idx,
188+
var_with_unique_const,
189+
valid_fk,
190+
not_valid_fk,
191+
{table},
192+
var_maybe_with_exclusion,
193+
var_with_multiple_idx
194+
)
195+
VALUES (
196+
substring(md5(random()::text), 1, 10),
197+
substring(md5(random()::text), 1, 10),
198+
(floor(random() * 10) + 1)::int,
199+
(floor(random() * 10) + 1)::int,
200+
(floor(random() * 10) + 1)::int,
201+
substring(md5(random()::text), 1, 10),
202+
substring(md5(random()::text), 1, 10),
203+
(floor(random() * 10) + 1)::int,
204+
(floor(random() * 10) + 1)::int,
205+
(floor(random() * 10) + 1)::int,
206+
substring(md5(random()::text), 1, 10),
207+
substring(md5(random()::text), 1, 10)
208+
)
209+
RETURNING id;
210+
""")
211+
)
212+
result = cur.fetchone()
213+
assert result is not None
214+
id_ = result[0]
215+
if check_table is not None:
216+
assert _query_row(table=table, id_=id_, cur=cur, schema=schema) == _query_row(
217+
table=check_table, id_=id_, cur=cur, schema=schema
218+
)
219+
220+
cur.execute(f"UPDATE {schema}.{table} SET var_with_btree = 'foo' WHERE id = {id_};")
221+
if check_table is not None:
222+
assert _query_row(table=table, id_=id_, cur=cur, schema=schema) == _query_row(
223+
table=check_table, id_=id_, cur=cur, schema=schema
224+
)
225+
226+
cur.execute(f"DELETE FROM {schema}.{table} WHERE id = {id_};")
227+
assert _query_row(table=table, id_=id_, cur=cur, schema=schema) is None
228+
if check_table is not None:
229+
assert _query_row(table=check_table, id_=id_, cur=cur, schema=schema) is None
230+
231+
232+
def _query_row(
233+
table: str,
234+
id_: int,
235+
cur: _cur.Cursor,
236+
schema: str = "public",
237+
) -> Tuple[Union[int, str], ...] | None:
238+
cur.execute(f"SELECT * FROM {schema}.{table} WHERE id = {id_};")
239+
return cur.fetchone()
240+
241+
169242
@pytest.mark.parametrize(
170243
"pk_type",
171244
("bigint", "bigserial", "integer", "serial", "smallint", "smallserial"),
@@ -1324,6 +1397,56 @@ def test_when_table_has_negative_pk_values(
13241397
)
13251398

13261399

1400+
@pytest.mark.parametrize(
1401+
"initial_pk_type",
1402+
(
1403+
"integer",
1404+
"serial",
1405+
"smallint",
1406+
"smallserial",
1407+
),
1408+
)
1409+
def test_with_writes_when_table_has_negative_pk_values(
1410+
connection: _psycopg.Connection, initial_pk_type: str
1411+
) -> None:
1412+
with _cur.get_cursor(connection, logged=True) as cur:
1413+
factories.create_table_for_repacking(
1414+
connection=connection,
1415+
cur=cur,
1416+
table_name="to_repack",
1417+
rows=100,
1418+
pk_type=initial_pk_type,
1419+
pk_start=-200,
1420+
)
1421+
table_before = _collect_table_info(table="to_repack", connection=connection)
1422+
1423+
repack = Psycopack(
1424+
table="to_repack",
1425+
batch_size=1,
1426+
conn=connection,
1427+
cur=cur,
1428+
convert_pk_to_bigint=True,
1429+
)
1430+
repack.pre_validate()
1431+
repack.setup_repacking()
1432+
repack.backfill()
1433+
_do_writes(table="to_repack", cur=cur, check_table=repack.copy_table)
1434+
repack.sync_schemas()
1435+
_do_writes(table="to_repack", cur=cur, check_table=repack.copy_table)
1436+
repack.swap()
1437+
_do_writes(table="to_repack", cur=cur, check_table=repack.repacked_name)
1438+
repack.clean_up()
1439+
_do_writes(table="to_repack", cur=cur)
1440+
1441+
table_after = _collect_table_info(table="to_repack", connection=connection)
1442+
_assert_repack(
1443+
table_before=table_before,
1444+
table_after=table_after,
1445+
repack=repack,
1446+
cur=cur,
1447+
)
1448+
1449+
13271450
def test_when_table_has_large_value_being_inserted(
13281451
connection: _psycopg.Connection,
13291452
) -> None:

0 commit comments

Comments
 (0)