diff --git a/docker-compose.yml b/docker-compose.yml index 9d7f5431d..f2da3f9b9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -81,10 +81,18 @@ services: - database ports: - "9706:27017" + volumes: + - ./mongo-keyfile:/etc/mongo-keyfile:ro + - mongo-data:/data/db environment: MONGO_INITDB_DATABASE: utopia_testing MONGO_INITDB_ROOT_USERNAME: root MONGO_INITDB_ROOT_PASSWORD: password + command: mongod --replSet rs0 --bind_ip_all --keyFile /etc/mongo-keyfile + +# Manyally initate the replica set +# mongo users(!root) do not get created automatically!!! +#docker compose exec mongo mongosh admin -u root -p password --eval 'rs.initiate({_id: "rs0", members: [{_id: 0, host: "mongo:27017"}]})' mongo-express: image: mongo-express @@ -148,5 +156,7 @@ services: networks: - database +volumes: + mongo-data: networks: database: diff --git a/mongo-keyfile b/mongo-keyfile new file mode 100644 index 000000000..5585939eb --- /dev/null +++ b/mongo-keyfile @@ -0,0 +1,16 @@ +ydIuYSvU/9QLt7fkH32IdXbP2z2+w+fzSEoolW8Q1Z8nLhRyrZF0Zq7a0KzeNI7K +gPIl1ikI6ob6h0+RxYmGeOOUjjkcBlkvYrmABDKsRipTkTTp4z0fUBTIUJV0lVvs +N9+VpM0/pLLIhI8jb38aa7pmsoufBQ3uiNR68ZFykPqzZQ4d5VfMqfZk7z3dpFlh +DURPOOG0HAFe68MLXVFYdaHGW4yomuTPrpzWSiUhFAPFEBYg4elARQc4CaiinFds +SQi/SrUsYMGODPr+on9/lboia/SInaSP+dzDqpsbL29atvIVHtU29RlPJdZ2V1ub +Oe2O1xN9F59TtjNUgDiAtMGKTMS/0S1mbPC6Og5JAR7U4xZ7/6S5n3+p0RjYyTlH +fhssJ7pc/bveN6mShNrsIKK0Z50YYjablzm07EDJYhfEWMG5Wu1AvEVqEH68ioDl +JL5QO63A2bXvMN7dXS69+E0hHn6xaZYu+CnKedvgWdyhraCT1Q01ZyDyv2y7isGD +1BAlNLlt+cPMCitETcxZne+JHdkL/mDKffHUPM4Drtzchg4DbiG49uC9Ib7zTws+ +NcburXY+9B8j7WN7ZHXhiB7/OWJ/IHJCZTdKz70mEPH4AHoRFpZNM5eMnYxYdbQD +40MhAS7fuOYhtFIQiQ+SCeFMucE3KYvp1JpTVQwT4SNrIlHPqfPn5xFBcgDjhvwT +hHJCgXP4HrRuf47Ta6kHy2UFQ7r5JOqSZSOFwP+tUyfhjEB5ZWJ1qCUZxFagoc9A +//9SoyulZwCxEr2ijmes1Nzv56hSTjYb6pPjFWd92G87w+VZv4R/vF5nwcYUyuIS +iQWPs/kOzb4NeJW24lNzR2zH2BsJt3OI+BFY64cc8O0o6EtFWcoabwyJYKe6RXPX +0S4ngcnGzRP+tVa6LsrjAYrNpmZDrP9x93pXQHfByTS2oSaI1eGeAagFTu/HS2kC +uCJ0HfH99sRSgJ1Ab+2C8G8305meDAbtdCtvl/1anPnV6ISy diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index 4eb69b60c..7d80d7af0 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -44,6 +44,14 @@ class Mongo extends Adapter //protected ?int $timeout = null; + /** + * Transaction/session state for MongoDB transactions + */ + private ?object $sessionId = null; // Store raw BSON id object + private ?int $txnNumber = null; + protected int $inTransaction = 0; + private bool $firstOpInTransaction = false; + /** * Constructor. * @@ -74,19 +82,161 @@ public function clearTimeout(string $event): void $this->timeout = 0; } + /** + * @template T + * @param callable(): T $callback + * @return T + * @throws \Throwable + */ + public function withTransaction(callable $callback): mixed + { + // If the database is not a replica set, we can't use transactions + if (!$this->client->isReplicaSet()) { + $result = $callback(); + return $result; + } + + // Removed the attmpts to retry the transaction. + //Unlike pdo if we run theabortTransaction more then once (same transactioId), + // it will throw an error the there is no transaction in progress. + + try { + $this->startTransaction(); + $result = $callback(); + $this->commitTransaction(); + return $result; + } catch (\Throwable $action) { + try { + $this->rollbackTransaction(); + } catch (\Throwable $rollback) { + $this->inTransaction = 0; + // Throw the original exception, not the rollback one + // Since if it's a duplicate key error, the rollback will fail + //and we want to throw the original exception. + } + $this->inTransaction = 0; + throw $action; + } + } + + public function startTransaction(): bool { - return true; + try { + if ($this->inTransaction === 0) { + if (!$this->sessionId) { + $this->sessionId = $this->client->startSession(); // Store raw id object + } + $this->txnNumber = ($this->txnNumber ?? 0) + 1; + $this->firstOpInTransaction = true; + + // Initialize the transaction on MongoDB's side with a dummy find operation + // This ensures the transaction is active even if validation fails later. + $this->client->query([ + 'find' => 'system.version', + 'filter' => $this->client->toObject([]), + 'limit' => 1, + 'lsid' => ['id' => $this->sessionId], + 'txnNumber' => new \MongoDB\BSON\Int64($this->txnNumber), // Long type for txnNumber + 'autocommit' => false, + 'startTransaction' => true + ], 'admin'); + + $this->firstOpInTransaction = false; + } + $this->inTransaction++; + return true; + } catch (\Throwable $e) { + throw new DatabaseException('Failed to start transaction: ' . $e->getMessage(), $e->getCode(), $e); + } } public function commitTransaction(): bool { - return true; + try { + if ($this->inTransaction === 0) { + return false; + } + $this->inTransaction--; + if ($this->inTransaction === 0) { + if (!$this->sessionId) { + return false; + } + try { + $result = $this->client->commitTransaction( + ['id' => $this->sessionId], // Pass raw id object + $this->txnNumber, + false + ); + } catch (\Throwable $e) { + throw new DatabaseException($e->getMessage(), $e->getCode(), $e); + } + + // Session is now closed by the client using endSessions, state is reseted + // TODO do we want session per transaction or to manage it on the connection level? + $this->sessionId = null; + $this->txnNumber = null; + + return true; + } + return true; + } catch (\Throwable $e) { + throw new DatabaseException('Failed to commit transaction: ' . $e->getMessage(), $e->getCode(), $e); + } } public function rollbackTransaction(): bool { - return true; + + try { + if ($this->inTransaction === 0) { + return false; + } + $this->inTransaction--; + if ($this->inTransaction === 0) { + if (!$this->sessionId) { + return false; + } + + try { + $result = $this->client->abortTransaction( + ['id' => $this->sessionId], // Pass raw id object + $this->txnNumber, + false + ); + } catch (\Throwable $e) { + throw new DatabaseException($e->getMessage(), $e->getCode(), $e); + } + + // Session is now closed by the client using endSessions, reset our state + $this->sessionId = null; + $this->txnNumber = null; + + return true; + } + return true; + } catch (\Throwable $e) { + throw new DatabaseException('Failed to rollback transaction: ' . $e->getMessage(), $e->getCode(), $e); + } + } + + /** + * Helper to add transaction/session context to command options if in transaction + */ + private function addTransactionContext(array $options = []): array + { + if ($this->inTransaction) { + $options['lsid'] = ['id' => $this->sessionId]; + $options['txnNumber'] = new \MongoDB\BSON\Int64($this->txnNumber); + $options['autocommit'] = false; + + if ($this->firstOpInTransaction) { + // For MongoDB, the first operation in a transaction should include startTransaction + $options['startTransaction'] = true; + $this->firstOpInTransaction = false; + } + } + return $options; } /** @@ -761,12 +911,8 @@ public function getDocument(string $collection, string $id, array $queries = [], if (empty($result)) { return new Document([]); } - - $result = $this->replaceChars('_', '$', (array)$result[0]); - // if (array_key_exists('$permissions', $result) && empty($result['$permissions'])) { - // $result['$permissions'] = []; - // } + $result = $this->replaceChars('_', '$', (array)$result[0]); return new Document($result); } @@ -782,7 +928,7 @@ public function getDocument(string $collection, string $id, array $queries = [], */ public function createDocument(string $collection, Document $document): Document { - + $name = $this->getNamespace() . '_' . $this->filter($collection); $sequence = $document->getSequence(); @@ -799,9 +945,8 @@ public function createDocument(string $collection, Document $document): Document if (!empty($sequence)) { $record['_id'] = $sequence; } - - $result = $this->insertDocument($name, $this->removeNullKeys($record)); - + $options = $this->addTransactionContext([]); + $result = $this->insertDocument($name, $this->removeNullKeys($record), $options); $result = $this->replaceChars('_', '$', $result); return new Document($result); @@ -941,6 +1086,7 @@ public function createDocuments(string $collection, array $documents): array { $name = $this->getNamespace() . '_' . $this->filter($collection); + $options = $this->addTransactionContext([]); $records = []; $hasSequence = null; $documents = array_map(fn ($doc) => clone $doc, $documents); @@ -969,7 +1115,7 @@ public function createDocuments(string $collection, array $documents): array $records[] = $this->removeNullKeys($record); } - $documents = $this->client->insertMany($name, $records); + $documents = $this->client->insertMany($name, $records, $options); foreach ($documents as $index => $document) { $documents[$index] = $this->replaceChars('_', '$', $this->client->toArray($document)); @@ -987,12 +1133,11 @@ public function createDocuments(string $collection, array $documents): array * @return array * @throws Duplicate */ - private function insertDocument(string $name, array $document): array + private function insertDocument(string $name, array $document, array $options = []): array { try { - $this->client->insert($name, $document); - + $result = $this->client->insert($name, $document, $options); $filters = []; $filters['_uid'] = $document['_uid']; @@ -1000,12 +1145,18 @@ private function insertDocument(string $name, array $document): array $filters['_tenant'] = $this->getTenant(); } - $result = $this->client->find( + // in order to get the document we need to pass the transaction context to the find. + $this->client->find( $name, $filters, - ['limit' => 1] + array_merge($options, ['limit' => 1]) )->cursor->firstBatch[0]; - + + /** + * TODO Do we even need this find? + * We can just return the result from the insertDocument. + */ + return $this->client->toArray($result); } catch (MongoException $e) { throw new Duplicate($e->getMessage()); @@ -1031,7 +1182,7 @@ public function updateDocument(string $collection, string $id, Document $documen $record = $document->getArrayCopy(); $record = $this->replaceChars('$', '_', $record); - + $filters = []; $filters['_uid'] = $id; if ($this->sharedTables) { @@ -1040,7 +1191,8 @@ public function updateDocument(string $collection, string $id, Document $documen try { unset($record['_id']); // Don't update _id - $this->client->update($name, $filters, $record); + $options = $this->addTransactionContext([]); + $this->client->update($name, $filters, $record, $options); } catch (MongoException $e) { throw new Duplicate($e->getMessage()); } @@ -1066,6 +1218,7 @@ public function updateDocuments(string $collection, Document $updates, array $do ; $name = $this->getNamespace() . '_' . $this->filter($collection); + $options = $this->addTransactionContext([]); $queries = [ Query::equal('$sequence', \array_map(fn ($document) => $document->getSequence(), $documents)) ]; @@ -1085,7 +1238,7 @@ public function updateDocuments(string $collection, Document $updates, array $do ]; try { - $this->client->update($name, $filters, $updateQuery, multi: true); + $this->client->update($name, $filters, $updateQuery, multi: true, options: $options); } catch (MongoException $e) { throw new Duplicate($e->getMessage()); } @@ -1161,16 +1314,18 @@ public function createOrUpdateDocuments(string $collection, string $attribute, a ]; } + $options = $this->addTransactionContext([]); + $this->client->upsert( $name, $operations, - ["ordered" => false] // TODO Do we want to continue if an error is thrown? + options: $options ); } catch (MongoException $e) { throw $this->processException($e); } - + return \array_map(fn ($change) => $change->getNew(), $changes); } @@ -1262,6 +1417,7 @@ public function increaseDocumentAttribute(string $collection, string $id, string $filters[$attribute] = ['$gte' => $min]; } + $options = $this->addTransactionContext([]); $this->client->update( $this->getNamespace() . '_' . $this->filter($collection), $filters, @@ -1269,6 +1425,7 @@ public function increaseDocumentAttribute(string $collection, string $id, string '$inc' => [$attribute => $value], '$set' => ['_updatedAt' => $this->toMongoDatetime($updatedAt)], ], + options: $options ); return true; @@ -1293,7 +1450,8 @@ public function deleteDocument(string $collection, string $id): bool $filters['_tenant'] = $this->getTenant(); } - $result = $this->client->delete($name, $filters); + $options = $this->addTransactionContext([]); + $result = $this->client->delete($name, $filters, 1, [], $options); return (!!$result); } @@ -1324,8 +1482,9 @@ public function deleteDocuments(string $collection, array $sequences, array $per $count = $this->client->delete( collection: $name, filters: $filters, - options: $options, - limit: 0 + limit: 0, + deleteOptions: [], + options: $options ); } catch (MongoException $e) { $this->processException($e);