@@ -33,8 +33,8 @@ type DataReader struct {
3333// It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level.
3434func NewDataReader (database * db.Postgres ) * DataReader {
3535 return & DataReader {
36- database : database , // Set the database to the passed in PostgreSQL instance
37- txOptions : pgx.TxOptions {IsoLevel : pgx .ReadCommitted , AccessMode : pgx .ReadOnly }, // Set the transaction options
36+ database : database , // Set the database to the passed in PostgreSQL instance
37+ txOptions : pgx.TxOptions {IsoLevel : pgx .RepeatableRead , AccessMode : pgx .ReadOnly }, // Set the transaction options
3838 }
3939}
4040
@@ -57,7 +57,7 @@ func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, fi
5757 var args []interface {}
5858 builder := r .database .Builder .Select ("entity_type, entity_id, relation, subject_type, subject_id, subject_relation" ).From (RelationTuplesTable ).Where (squirrel.Eq {"tenant_id" : tenantID })
5959 builder = utils .TuplesFilterQueryForSelectBuilder (builder , filter )
60- builder = utils .SnapshotQuery (builder , st .(snapshot.Token ).Value .Uint )
60+ builder = utils .SnapshotQuery (builder , st .(snapshot.Token ).Value .Uint , st .(snapshot. Token ). Snapshot )
6161
6262 if pagination .Cursor () != "" {
6363 var t database.ContinuousToken
@@ -132,7 +132,7 @@ func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, fil
132132 // Build the relationships query based on the provided filter, snapshot value, and pagination settings.
133133 builder := r .database .Builder .Select ("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation" ).From (RelationTuplesTable ).Where (squirrel.Eq {"tenant_id" : tenantID })
134134 builder = utils .TuplesFilterQueryForSelectBuilder (builder , filter )
135- builder = utils .SnapshotQuery (builder , st .(snapshot.Token ).Value .Uint )
135+ builder = utils .SnapshotQuery (builder , st .(snapshot.Token ).Value .Uint , st .(snapshot. Token ). Snapshot )
136136
137137 // Apply the pagination token and limit to the query.
138138 if pagination .Token () != "" {
@@ -219,7 +219,7 @@ func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string,
219219 var args []interface {}
220220 builder := r .database .Builder .Select ("entity_type, entity_id, attribute, value" ).From (AttributesTable ).Where (squirrel.Eq {"tenant_id" : tenantID })
221221 builder = utils .AttributesFilterQueryForSelectBuilder (builder , filter )
222- builder = utils .SnapshotQuery (builder , st .(snapshot.Token ).Value .Uint )
222+ builder = utils .SnapshotQuery (builder , st .(snapshot.Token ).Value .Uint , st .(snapshot. Token ). Snapshot )
223223
224224 // Generate the SQL query and arguments.
225225 var query string
@@ -278,7 +278,7 @@ func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filte
278278 var args []interface {}
279279 builder := r .database .Builder .Select ("entity_type, entity_id, attribute, value" ).From (AttributesTable ).Where (squirrel.Eq {"tenant_id" : tenantID })
280280 builder = utils .AttributesFilterQueryForSelectBuilder (builder , filter )
281- builder = utils .SnapshotQuery (builder , st .(snapshot.Token ).Value .Uint )
281+ builder = utils .SnapshotQuery (builder , st .(snapshot.Token ).Value .Uint , st .(snapshot. Token ). Snapshot )
282282
283283 if pagination .Cursor () != "" {
284284 var t database.ContinuousToken
@@ -366,7 +366,7 @@ func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter
366366 // Build the relationships query based on the provided filter, snapshot value, and pagination settings.
367367 builder := r .database .Builder .Select ("id, entity_type, entity_id, attribute, value" ).From (AttributesTable ).Where (squirrel.Eq {"tenant_id" : tenantID })
368368 builder = utils .AttributesFilterQueryForSelectBuilder (builder , filter )
369- builder = utils .SnapshotQuery (builder , st .(snapshot.Token ).Value .Uint )
369+ builder = utils .SnapshotQuery (builder , st .(snapshot.Token ).Value .Uint , st .(snapshot. Token ). Snapshot )
370370
371371 // Apply the pagination token and limit to the query.
372372 if pagination .Token () != "" {
@@ -479,7 +479,7 @@ func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID
479479 })
480480
481481 // Apply snapshot filter
482- builder = utils .SnapshotQuery (builder , st .(snapshot.Token ).Value .Uint )
482+ builder = utils .SnapshotQuery (builder , st .(snapshot.Token ).Value .Uint , st .(snapshot. Token ). Snapshot )
483483
484484 // Apply exclusion if the list is not empty
485485 if len (excluded ) > 0 {
@@ -556,11 +556,12 @@ func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.S
556556 defer span .End ()
557557 // Log snapshot operation
558558 slog .DebugContext (ctx , "getting head snapshot for tenant_id" , slog .String ("tenant_id" , tenantID ))
559- // Declare transaction ID variable
559+ // Declare transaction ID and snapshot variables
560560 var xid db.XID8
561+ var snapshotValue string
561562
562- // Build the query to find the highest transaction ID associated with the tenant.
563- builder := r .database .Builder .Select ("id" ).From (TransactionsTable ).Where (squirrel.Eq {"tenant_id" : tenantID }).OrderBy ("id DESC" ).Limit (1 )
563+ // Build the query to find the highest transaction ID and snapshot associated with the tenant.
564+ builder := r .database .Builder .Select ("id" , "snapshot" ).From (TransactionsTable ).Where (squirrel.Eq {"tenant_id" : tenantID }).OrderBy ("id DESC" ).Limit (1 )
564565 query , args , err := builder .ToSql ()
565566 if err != nil {
566567 return nil , utils .HandleError (ctx , span , err , base .ErrorCode_ERROR_CODE_SQL_BUILDER )
@@ -569,18 +570,18 @@ func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.S
569570 // TODO: To optimize this query, create the following index concurrently to avoid table locks:
570571 // CREATE INDEX CONCURRENTLY idx_transactions_tenant_id_id ON transactions(tenant_id, id DESC);
571572
572- // Execute the query and retrieve the highest transaction ID.
573- err = r .database .ReadPool .QueryRow (ctx , query , args ... ).Scan (& xid )
573+ // Execute the query and retrieve the highest transaction ID and snapshot .
574+ err = r .database .ReadPool .QueryRow (ctx , query , args ... ).Scan (& xid , & snapshotValue )
574575 if err != nil {
575576 // If no rows are found, return a snapshot token with a value of 0.
576577 if errors .Is (err , pgx .ErrNoRows ) {
577- return snapshot.Token {Value : db.XID8 {Uint : 0 }}, nil
578+ return snapshot.Token {Value : db.XID8 {Uint : 0 }, Snapshot : "" }, nil
578579 }
579580 return nil , utils .HandleError (ctx , span , err , base .ErrorCode_ERROR_CODE_SCAN )
580581 }
581582
582583 slog .DebugContext (ctx , "successfully retrieved latest snapshot token" )
583584 // Return snapshot token
584585 // Return the latest snapshot token associated with the tenant.
585- return snapshot.Token {Value : xid }, nil
586+ return snapshot.Token {Value : xid , Snapshot : snapshotValue }, nil
586587}
0 commit comments