@@ -2,19 +2,24 @@ package migration
22
33import (
44 "context"
5+ "errors"
56 "fmt"
7+ "strings"
68 "time"
79
810 "gofr.dev/pkg/gofr/container"
911)
1012
13+ var errMongoLockRefreshFailed = errors .New ("failed to refresh MongoDB lock: lock lost or stolen" )
14+
1115type mongoDS struct {
1216 container.Mongo
1317}
1418
1519type mongoMigrator struct {
1620 container.Mongo
1721 migrator
22+ testInterval time.Duration // Used for testing; if non-zero, overrides defaultRefresh
1823}
1924
2025// apply initializes mongoMigrator using the Mongo interface.
@@ -27,14 +32,30 @@ func (ds mongoDS) apply(m migrator) migrator {
2732
2833const (
2934 mongoMigrationCollection = "gofr_migrations"
35+ mongoLockCollection = "gofr_migration_locks"
36+ mongoLockDocumentID = "gofr_migrations_lock"
3037)
3138
39+ func isMongoCollectionExistsError (err error ) bool {
40+ if err == nil {
41+ return false
42+ }
43+
44+ msg := strings .ToLower (err .Error ())
45+
46+ return strings .Contains (msg , "already exists" ) || strings .Contains (msg , "namespaceexists" )
47+ }
48+
3249// checkAndCreateMigrationTable initializes a MongoDB collection if it doesn't exist.
3350func (mg mongoMigrator ) checkAndCreateMigrationTable (c * container.Container ) error {
3451 err := mg .Mongo .CreateCollection (context .Background (), mongoMigrationCollection )
35- if err != nil {
36- c .Debug ("Migration collection might already exist:" , err )
3752
53+ if err != nil && ! isMongoCollectionExistsError (err ) {
54+ return err
55+ }
56+
57+ err = mg .Mongo .CreateCollection (context .Background (), mongoLockCollection )
58+ if err != nil && ! isMongoCollectionExistsError (err ) {
3859 return err
3960 }
4061
@@ -98,11 +119,118 @@ func (mg mongoMigrator) rollback(c *container.Container, data transactionData) {
98119 c .Fatalf ("Migration %v failed." , data .MigrationNumber )
99120}
100121
122+ func (mg mongoMigrator ) startRefresh (ctx context.Context , cancel context.CancelFunc , c * container.Container , ownerID string ) {
123+ interval := defaultRefresh
124+ if mg .testInterval > 0 {
125+ interval = mg .testInterval
126+ }
127+
128+ ticker := time .NewTicker (interval )
129+ defer ticker .Stop ()
130+
131+ for {
132+ select {
133+ case <- ticker .C :
134+ now := time .Now ()
135+
136+ filter := map [string ]any {
137+ "_id" : mongoLockDocumentID ,
138+ "lockedBy" : ownerID ,
139+ }
140+
141+ update := map [string ]any {
142+ "$set" : map [string ]any {
143+ "lockedAt" : now ,
144+ "expiresAt" : now .Add (defaultLockTTL ),
145+ },
146+ }
147+
148+ modified , err := mg .Mongo .UpdateOne (ctx , mongoLockCollection , filter , update )
149+ if err != nil {
150+ c .Errorf ("failed to refresh mongo lock: %v" , err )
151+ cancel ()
152+
153+ return
154+ }
155+
156+ if modified == 0 {
157+ c .Errorf ("%v" , errMongoLockRefreshFailed )
158+ cancel ()
159+
160+ return
161+ }
162+
163+ c .Debug ("Mongo lock refreshed successfully" )
164+ case <- ctx .Done ():
165+ return
166+ }
167+ }
168+ }
169+
101170func (mg mongoMigrator ) lock (ctx context.Context , cancel context.CancelFunc , c * container.Container , ownerID string ) error {
102- return mg .migrator .lock (ctx , cancel , c , ownerID )
171+ for i := 0 ; ; i ++ {
172+ now := time .Now ()
173+
174+ staleFilter := map [string ]any {
175+ "_id" : mongoLockDocumentID ,
176+ "expiresAt" : map [string ]any {
177+ "$lte" : now ,
178+ },
179+ }
180+
181+ if _ , err := mg .Mongo .DeleteOne (ctx , mongoLockCollection , staleFilter ); err != nil {
182+ c .Errorf ("failed to cleanup stale MongoDB lock: %v" , err )
183+ }
184+
185+ lockDoc := map [string ]any {
186+ "_id" : mongoLockDocumentID ,
187+ "lockedAt" : now ,
188+ "lockedBy" : ownerID ,
189+ "expiresAt" : now .Add (defaultLockTTL ),
190+ }
191+
192+ _ , err := mg .Mongo .InsertOne (ctx , mongoLockCollection , lockDoc )
193+ if err == nil {
194+ c .Debug ("Mongo lock acquired successfully" )
195+
196+ go mg .startRefresh (ctx , cancel , c , ownerID )
197+
198+ return mg .migrator .lock (ctx , cancel , c , ownerID )
199+ }
200+
201+ if ! isDuplicateKeyError (err ) {
202+ c .Errorf ("error while acquiring mongodb lock: %v" , err )
203+
204+ return errLockAcquisitionFailed
205+ }
206+
207+ c .Debugf ("MongoDB lock already held, retrying in %v... (attempt %d)" , defaultRetry , i + 1 )
208+
209+ select {
210+ case <- time .After (defaultRetry ):
211+ case <- ctx .Done ():
212+ return ctx .Err ()
213+ }
214+ }
103215}
104216
105217func (mg mongoMigrator ) unlock (c * container.Container , ownerID string ) error {
218+ deleted , err := mg .Mongo .DeleteOne (context .Background (), mongoLockCollection , map [string ]any {
219+ "_id" : mongoLockDocumentID ,
220+ "lockedBy" : ownerID ,
221+ })
222+ if err != nil {
223+ c .Errorf ("unable to release MongoDB lock: %v" , err )
224+ return errLockReleaseFailed
225+ }
226+
227+ if deleted == 0 {
228+ c .Errorf ("failed to release MongoDB lock: lock already released or owned by another instance" )
229+ return errLockReleaseFailed
230+ }
231+
232+ c .Debug ("Mongo lock released successfully" )
233+
106234 return mg .migrator .unlock (c , ownerID )
107235}
108236
0 commit comments