Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/api-reference/apidocs.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"info": {
"title": "Permify API",
"description": "Permify is an open source authorization service for creating fine-grained and scalable authorization systems.",
"version": "v1.4.7",
"version": "v1.4.8",
"contact": {
"name": "API Support",
"url": "https://github.com/Permify/permify/issues",
Expand Down
2 changes: 1 addition & 1 deletion docs/api-reference/openapiv2/apidocs.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"info": {
"title": "Permify API",
"description": "Permify is an open source authorization service for creating fine-grained and scalable authorization systems.",
"version": "v1.4.7",
"version": "v1.4.8",
"contact": {
"name": "API Support",
"url": "https://github.com/Permify/permify/issues",
Expand Down
2 changes: 1 addition & 1 deletion internal/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var Identifier = ""
*/
const (
// Version is the last release of the Permify (e.g. v0.1.0)
Version = "v1.4.7"
Version = "v1.4.8"
)

// Function to create a single line of the ASCII art with centered content and color
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/postgres/data_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,13 +575,13 @@ func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.S
if err != nil {
// If no rows are found, return a snapshot token with a value of 0.
if errors.Is(err, pgx.ErrNoRows) {
return snapshot.Token{Value: db.XID8{Uint: 0}, Snapshot: ""}, nil
return snapshot.NewToken(db.XID8{Uint: 0}, ""), nil
}
return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
}

slog.DebugContext(ctx, "successfully retrieved latest snapshot token")
// Return snapshot token
// Return the latest snapshot token associated with the tenant.
return snapshot.Token{Value: xid, Snapshot: snapshotValue}, nil
return snapshot.NewToken(xid, snapshotValue), nil
}
95 changes: 92 additions & 3 deletions internal/storage/postgres/snapshot/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/base64"
"encoding/binary"
"fmt"
"slices"
"strconv"
"strings"

Expand All @@ -25,12 +26,15 @@ type (
}
)

// NewToken - Creates a new snapshot token
// NewToken creates a new snapshot token with proper MVCC visibility.
// It automatically processes the snapshot to ensure each transaction has a unique snapshot.
func NewToken(value postgres.XID8, snapshot string) token.SnapToken {
return Token{
t := Token{
Value: value,
Snapshot: snapshot,
}
t.Snapshot = t.createFinalSnapshot()
return t
}
Comment thread
tolgaozen marked this conversation as resolved.

// Encode - Encodes the token to a string
Expand Down Expand Up @@ -88,7 +92,7 @@ func (t EncodedToken) Decode() (token.SnapToken, error) {
Uint: binary.LittleEndian.Uint64(b),
Status: pgtype.Present,
},
Snapshot: "", // Empty for backward compatibility
Snapshot: "",
}, nil
}

Expand Down Expand Up @@ -122,3 +126,88 @@ func (t EncodedToken) Decode() (token.SnapToken, error) {
func (t EncodedToken) String() string {
return t.Value
}

// createFinalSnapshot creates a final snapshot by adding the current transaction ID to the XIP list.
// This ensures each concurrent transaction gets a unique snapshot for proper MVCC visibility.
func (t Token) createFinalSnapshot() string {
if t.Snapshot == "" {
return ""
}

parts := strings.SplitN(strings.TrimSpace(t.Snapshot), ":", 3)
if len(parts) < 2 {
return t.Snapshot
}

xmin, err := strconv.ParseUint(parts[0], 10, 64)
if err != nil {
return t.Snapshot
}
xmax, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return t.Snapshot
}

txid := t.Value.Uint

// Parse existing XIPs
xipList := []uint64{}
if len(parts) == 3 && parts[2] != "" {
for _, xipStr := range strings.Split(parts[2], ",") {
xipStr = strings.TrimSpace(xipStr)
if xipStr == "" {
continue
}
xip, err := strconv.ParseUint(xipStr, 10, 64)
if err != nil {
return t.Snapshot
}
xipList = append(xipList, xip)
}
}

// Sort XIPs to ensure deterministic snapshot encoding and maintain PostgreSQL invariants
// Per PostgreSQL semantics, xip[] must contain only XIDs with xmin ≤ xip < xmax
slices.Sort(xipList)

// Add current txid to make snapshot unique for this transaction
// Insert in sorted order to maintain consistency
inserted := false
for i, xip := range xipList {
if xip == txid {
// Already in list, don't add again
inserted = true
break
}
if xip > txid {
// Insert at position i
xipList = append(xipList[:i], append([]uint64{txid}, xipList[i:]...)...)
inserted = true
break
}
}
if !inserted {
// Append to end
xipList = append(xipList, txid)
}

// Adjust xmax if necessary
newXmax := xmax
if txid >= newXmax {
newXmax = txid + 1
}

// Adjust xmin if current txid is smaller
newXmin := xmin
if txid < newXmin {
newXmin = txid
}

// Build the result snapshot string efficiently using strings.Builder
var xipStrs []string
for _, xip := range xipList {
xipStrs = append(xipStrs, fmt.Sprintf("%d", xip))
}

return fmt.Sprintf("%d:%d:%s", newXmin, newXmax, strings.Join(xipStrs, ","))
}
20 changes: 10 additions & 10 deletions internal/storage/postgres/snapshot/token_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ var _ = Describe("token", func() {
target token.SnapToken
expected string
}{
{NewToken(postgres.XID8{Uint: 4, Status: pgtype.Present}, "100:100:"), "NDoxMDA6MTAwOg=="},
{NewToken(postgres.XID8{Uint: 12, Status: pgtype.Present}, "200:200:150"), "MTI6MjAwOjIwMDoxNTA="},
{NewToken(postgres.XID8{Uint: 43242, Status: pgtype.Present}, "300:300:250,280"), "NDMyNDI6MzAwOjMwMDoyNTAsMjgw"},
{NewToken(postgres.XID8{Uint: 4, Status: pgtype.Present}, "100:100:"), "NDo0OjEwMDo0"},
{NewToken(postgres.XID8{Uint: 12, Status: pgtype.Present}, "200:200:150"), "MTI6MTI6MjAwOjEyLDE1MA=="},
{NewToken(postgres.XID8{Uint: 43242, Status: pgtype.Present}, "300:300:250,280"), "NDMyNDI6MzAwOjQzMjQzOjI1MCwyODAsNDMyNDI="},
}

for _, tt := range tests {
Expand Down Expand Up @@ -135,18 +135,18 @@ var _ = Describe("token", func() {

It("Case 1.1: Success - New format (with snapshot)", func() {
tests := []struct {
target token.EncodedSnapToken
expected token.SnapToken
original token.SnapToken
}{
{EncodedToken{Value: "NDoxMDA6MTAwOg=="}, NewToken(postgres.XID8{Uint: 4, Status: pgtype.Present}, "100:100:")},
{EncodedToken{Value: "MTI6MjAwOjIwMDoxNTA="}, NewToken(postgres.XID8{Uint: 12, Status: pgtype.Present}, "200:200:150")},
{EncodedToken{Value: "NDMyNDI6MzAwOjMwMDoyNTAsMjgw"}, NewToken(postgres.XID8{Uint: 43242, Status: pgtype.Present}, "300:300:250,280")},
{NewToken(postgres.XID8{Uint: 4, Status: pgtype.Present}, "100:100:")},
{NewToken(postgres.XID8{Uint: 12, Status: pgtype.Present}, "200:200:150")},
{NewToken(postgres.XID8{Uint: 43242, Status: pgtype.Present}, "300:300:250,280")},
}

for _, tt := range tests {
t, err := tt.target.Decode()
encoded := tt.original.Encode()
decoded, err := encoded.Decode()
Expect(err).ShouldNot(HaveOccurred())
Expect(t).Should(Equal(tt.expected))
Expect(decoded.(Token).Value.Uint).Should(Equal(tt.original.(Token).Value.Uint))
}
})

Expand Down
82 changes: 3 additions & 79 deletions internal/storage/postgres/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,83 +36,10 @@ const (
earliestPostgresVersion = 130008 // The earliest supported version of PostgreSQL is 13.8
)

// createFinalSnapshot creates a final snapshot string for proper transaction visibility.
// If xmax != xid, it adds xid to the xip_list to make the snapshot unique.
func createFinalSnapshot(snapshotValue string, xid uint64) string {
// Parse snapshot: "xmin:xmax:xip_list"
parts := strings.SplitN(strings.TrimSpace(snapshotValue), ":", 3)
if len(parts) < 2 {
return snapshotValue
}

xminStr, xmaxStr := parts[0], parts[1]

// Parse xmin and xmax for range validation
xmin, err := strconv.ParseUint(xminStr, 10, 64)
if err != nil {
return snapshotValue
}
xmax, err := strconv.ParseUint(xmaxStr, 10, 64)
if err != nil {
return snapshotValue
}

// If xmax == xid, no need to modify snapshot
if xmax == xid {
return snapshotValue
}

// Validate xid is in valid range [xmin, xmax)
if xid < xmin || xid >= xmax {
return snapshotValue
}

// Parse existing xip_list
var xips []uint64
if len(parts) == 3 && parts[2] != "" {
for _, xipStr := range strings.Split(parts[2], ",") {
xipStr = strings.TrimSpace(xipStr)
if xipStr == "" {
continue
}
xip, err := strconv.ParseUint(xipStr, 10, 64)
if err != nil {
return snapshotValue
}
// Check if xid is already in xip_list
if xip == xid {
return snapshotValue
}
xips = append(xips, xip)
}
}

// Add xid to the list and sort it
xips = append(xips, xid)
sortXips(xips)

// Rebuild xip_list string
var xipStrs []string
for _, xip := range xips {
xipStrs = append(xipStrs, fmt.Sprintf("%d", xip))
}
return fmt.Sprintf("%s:%s:%s", xminStr, xmaxStr, strings.Join(xipStrs, ","))
}

// sortXips sorts a slice of xip values in ascending order
func sortXips(xips []uint64) {
for i := 0; i < len(xips)-1; i++ {
for j := i + 1; j < len(xips); j++ {
if xips[i] > xips[j] {
xips[i], xips[j] = xips[j], xips[i]
}
}
}
}

// SnapshotQuery adds conditions to a SELECT query for checking transaction visibility based on created and expired transaction IDs.
// Optimized version with parameterized queries for security.
func SnapshotQuery(sl squirrel.SelectBuilder, value uint64, snapshotValue string) squirrel.SelectBuilder {
slog.Info("SnapshotQuery called", slog.Uint64("xid", value), slog.String("snapshot", snapshotValue))
// Backward compatibility: if snapshot is empty, use old method
if snapshotValue == "" {
// Create a subquery for the snapshot associated with the provided value.
Expand All @@ -137,19 +64,16 @@ func SnapshotQuery(sl squirrel.SelectBuilder, value uint64, snapshotValue string
return sl.Where(createdWhere).Where(expiredWhere)
}

// Create final snapshot with proper visibility
finalSnapshot := createFinalSnapshot(snapshotValue, value)

// Records that were created and are visible in the snapshot
createdWhere := squirrel.Or{
squirrel.Expr("pg_visible_in_snapshot(created_tx_id, ?) = true", finalSnapshot),
squirrel.Expr("pg_visible_in_snapshot(created_tx_id, ?) = true", snapshotValue),
squirrel.Expr("created_tx_id = ?::xid8", value), // Include current transaction
}

// Records that are still active (not expired) at snapshot time
expiredWhere := squirrel.And{
squirrel.Or{
squirrel.Expr("pg_visible_in_snapshot(expired_tx_id, ?) = false", finalSnapshot),
squirrel.Expr("pg_visible_in_snapshot(expired_tx_id, ?) = false", snapshotValue),
squirrel.Expr("expired_tx_id = ?::xid8", ActiveRecordTxnID), // Never expired
},
squirrel.Expr("expired_tx_id <> ?::xid8", value), // Not expired by current transaction
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/postgres/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (w *Watch) getChanges(ctx context.Context, value db.XID8, tenantID string)
defer arows.Close()

// Set the snapshot token for the changes.
changes.SnapToken = snapshot.Token{Value: value}.Encode().String()
changes.SnapToken = snapshot.NewToken(value, "").Encode().String()

// Iterate through the result rows.
for trows.Next() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pb/base/v1/openapi.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion proto/base/v1/openapi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {
info: {
title: "Permify API";
description: "Permify is an open source authorization service for creating fine-grained and scalable authorization systems.";
version: "v1.4.7";
version: "v1.4.8";
contact: {
name: "API Support";
url: "https://github.com/Permify/permify/issues";
Expand Down