@@ -60,7 +60,7 @@ class Database
6060 public const VAR_LINESTRING = 'linestring ' ;
6161 public const VAR_POLYGON = 'polygon ' ;
6262
63- public const SPATIAL_TYPES = [self ::VAR_POINT ,self ::VAR_LINESTRING , self ::VAR_POLYGON ];
63+ public const SPATIAL_TYPES = [self ::VAR_POINT , self ::VAR_LINESTRING , self ::VAR_POLYGON ];
6464
6565 // Index Types
6666 public const INDEX_KEY = 'key ' ;
@@ -3826,7 +3826,8 @@ public function createDocument(string $collection, Document $document): Document
38263826 * @param string $collection
38273827 * @param array<Document> $documents
38283828 * @param int $batchSize
3829- * @param callable|null $onNext
3829+ * @param (callable(Document): void)|null $onNext
3830+ * @param (callable(Throwable): void)|null $onError
38303831 * @return int
38313832 * @throws AuthorizationException
38323833 * @throws StructureException
@@ -3838,6 +3839,7 @@ public function createDocuments(
38383839 array $ documents ,
38393840 int $ batchSize = self ::INSERT_BATCH_SIZE ,
38403841 ?callable $ onNext = null ,
3842+ ?callable $ onError = null ,
38413843 ): int {
38423844 if (!$ this ->adapter ->getSharedTables () && $ this ->adapter ->getTenantPerDocument ()) {
38433845 throw new DatabaseException ('Shared tables must be enabled if tenant per document is enabled. ' );
@@ -3914,7 +3916,13 @@ public function createDocuments(
39143916
39153917 $ document = $ this ->casting ($ collection , $ document );
39163918 $ document = $ this ->decode ($ collection , $ document );
3917- $ onNext && $ onNext ($ document );
3919+
3920+ try {
3921+ $ onNext && $ onNext ($ document );
3922+ } catch (\Throwable $ e ) {
3923+ $ onError ? $ onError ($ e ) : throw $ e ;
3924+ }
3925+
39183926 $ modified ++;
39193927 }
39203928 }
@@ -4283,7 +4291,7 @@ public function updateDocument(string $collection, string $id, Document $documen
42834291
42844292 if ($ document ->offsetExists ('$permissions ' )) {
42854293 $ originalPermissions = $ old ->getPermissions ();
4286- $ currentPermissions = $ document ->getPermissions ();
4294+ $ currentPermissions = $ document ->getPermissions ();
42874295
42884296 sort ($ originalPermissions );
42894297 sort ($ currentPermissions );
@@ -4473,8 +4481,8 @@ public function updateDocument(string $collection, string $id, Document $documen
44734481 * @param Document $updates
44744482 * @param array<Query> $queries
44754483 * @param int $batchSize
4476- * @param callable|null $onNext
4477- * @param callable|null $onError
4484+ * @param ( callable(Document $updated, Document $old): void) |null $onNext
4485+ * @param ( callable(Throwable): void) |null $onError
44784486 * @return int
44794487 * @throws AuthorizationException
44804488 * @throws ConflictException
@@ -4597,12 +4605,12 @@ public function updateDocuments(
45974605 break ;
45984606 }
45994607
4600- $ currentPermissions = $ updates ->getPermissions ();
4608+ $ old = array_map (fn ($ doc ) => clone $ doc , $ batch );
4609+ $ currentPermissions = $ updates ->getPermissions ();
46014610 sort ($ currentPermissions );
46024611
46034612 $ this ->withTransaction (function () use ($ collection , $ updates , &$ batch , $ currentPermissions ) {
46044613 foreach ($ batch as $ index => $ document ) {
4605-
46064614 $ skipPermissionsUpdate = true ;
46074615
46084616 if ($ updates ->offsetExists ('$permissions ' )) {
@@ -4647,13 +4655,12 @@ public function updateDocuments(
46474655 );
46484656 });
46494657
4650- foreach ($ batch as $ doc ) {
4658+ foreach ($ batch as $ index => $ doc ) {
46514659 $ doc ->removeAttribute ('$skipPermissionsUpdate ' );
4652-
46534660 $ this ->purgeCachedDocument ($ collection ->getId (), $ doc ->getId ());
46544661 $ doc = $ this ->decode ($ collection , $ doc );
46554662 try {
4656- $ onNext && $ onNext ($ doc );
4663+ $ onNext && $ onNext ($ doc, $ old [ $ index ] );
46574664 } catch (Throwable $ th ) {
46584665 $ onError ? $ onError ($ th ) : throw $ th ;
46594666 }
@@ -5069,28 +5076,62 @@ private function getJunctionCollection(Document $collection, Document $relatedCo
50695076 : '_ ' . $ relatedCollection ->getSequence () . '_ ' . $ collection ->getSequence ();
50705077 }
50715078
5079+ /**
5080+ * Create or update a document.
5081+ *
5082+ * @param string $collection
5083+ * @param Document $document
5084+ * @return Document
5085+ * @throws StructureException
5086+ * @throws Throwable
5087+ */
5088+ public function upsertDocument (
5089+ string $ collection ,
5090+ Document $ document ,
5091+ ): Document {
5092+ $ result = null ;
5093+
5094+ $ this ->upsertDocumentsWithIncrease (
5095+ $ collection ,
5096+ '' ,
5097+ [$ document ],
5098+ function (Document $ doc , ?Document $ _old = null ) use (&$ result ) {
5099+ $ result = $ doc ;
5100+ }
5101+ );
5102+
5103+ if ($ result === null ) {
5104+ // No-op (unchanged): return the current persisted doc
5105+ $ result = $ this ->getDocument ($ collection , $ document ->getId ());
5106+ }
5107+ return $ result ;
5108+ }
5109+
50725110 /**
50735111 * Create or update documents.
50745112 *
50755113 * @param string $collection
50765114 * @param array<Document> $documents
50775115 * @param int $batchSize
5078- * @param callable|null $onNext
5116+ * @param (callable(Document, ?Document): void)|null $onNext
5117+ * @param (callable(Throwable): void)|null $onError
50795118 * @return int
50805119 * @throws StructureException
50815120 * @throws \Throwable
50825121 */
5083- public function createOrUpdateDocuments (
5122+ public function upsertDocuments (
50845123 string $ collection ,
50855124 array $ documents ,
50865125 int $ batchSize = self ::INSERT_BATCH_SIZE ,
50875126 ?callable $ onNext = null ,
5127+ ?callable $ onError = null
50885128 ): int {
5089- return $ this ->createOrUpdateDocumentsWithIncrease (
5129+ return $ this ->upsertDocumentsWithIncrease (
50905130 $ collection ,
50915131 '' ,
50925132 $ documents ,
50935133 $ onNext ,
5134+ $ onError ,
50945135 $ batchSize
50955136 );
50965137 }
@@ -5101,18 +5142,20 @@ public function createOrUpdateDocuments(
51015142 * @param string $collection
51025143 * @param string $attribute
51035144 * @param array<Document> $documents
5104- * @param callable|null $onNext
5145+ * @param (callable(Document, ?Document): void)|null $onNext
5146+ * @param (callable(Throwable): void)|null $onError
51055147 * @param int $batchSize
51065148 * @return int
51075149 * @throws StructureException
51085150 * @throws \Throwable
51095151 * @throws Exception
51105152 */
5111- public function createOrUpdateDocumentsWithIncrease (
5153+ public function upsertDocumentsWithIncrease (
51125154 string $ collection ,
51135155 string $ attribute ,
51145156 array $ documents ,
51155157 ?callable $ onNext = null ,
5158+ ?callable $ onError = null ,
51165159 int $ batchSize = self ::INSERT_BATCH_SIZE
51175160 ): int {
51185161 if (empty ($ documents )) {
@@ -5144,7 +5187,7 @@ public function createOrUpdateDocumentsWithIncrease(
51445187
51455188 if ($ document ->offsetExists ('$permissions ' )) {
51465189 $ originalPermissions = $ old ->getPermissions ();
5147- $ currentPermissions = $ document ->getPermissions ();
5190+ $ currentPermissions = $ document ->getPermissions ();
51485191
51495192 sort ($ originalPermissions );
51505193 sort ($ currentPermissions );
@@ -5274,7 +5317,7 @@ public function createOrUpdateDocumentsWithIncrease(
52745317 /**
52755318 * @var array<Change> $chunk
52765319 */
5277- $ batch = $ this ->withTransaction (fn () => Authorization::skip (fn () => $ this ->adapter ->createOrUpdateDocuments (
5320+ $ batch = $ this ->withTransaction (fn () => Authorization::skip (fn () => $ this ->adapter ->upsertDocuments (
52785321 $ collection ,
52795322 $ attribute ,
52805323 $ chunk
@@ -5290,7 +5333,7 @@ public function createOrUpdateDocumentsWithIncrease(
52905333 }
52915334 }
52925335
5293- foreach ($ batch as $ doc ) {
5336+ foreach ($ batch as $ index => $ doc ) {
52945337 if ($ this ->resolveRelationships ) {
52955338 $ doc = $ this ->silent (fn () => $ this ->populateDocumentRelationships ($ collection , $ doc ));
52965339 }
@@ -5305,7 +5348,13 @@ public function createOrUpdateDocumentsWithIncrease(
53055348 $ this ->purgeCachedDocument ($ collection ->getId (), $ doc ->getId ());
53065349 }
53075350
5308- $ onNext && $ onNext ($ doc );
5351+ $ old = $ chunk [$ index ]->getOld ();
5352+
5353+ try {
5354+ $ onNext && $ onNext ($ doc , $ old ->isEmpty () ? null : $ old );
5355+ } catch (\Throwable $ th ) {
5356+ $ onError ? $ onError ($ th ) : throw $ th ;
5357+ }
53095358 }
53105359 }
53115360
@@ -5972,8 +6021,8 @@ private function deleteCascade(Document $collection, Document $relatedCollection
59726021 * @param string $collection
59736022 * @param array<Query> $queries
59746023 * @param int $batchSize
5975- * @param callable|null $onNext
5976- * @param callable|null $onError
6024+ * @param ( callable(Document, Document): void) |null $onNext
6025+ * @param ( callable(Throwable): void) |null $onError
59776026 * @return int
59786027 * @throws AuthorizationException
59796028 * @throws DatabaseException
@@ -6065,6 +6114,7 @@ public function deleteDocuments(
60656114 break ;
60666115 }
60676116
6117+ $ old = array_map (fn ($ doc ) => clone $ doc , $ batch );
60686118 $ sequences = [];
60696119 $ permissionIds = [];
60706120
@@ -6101,7 +6151,7 @@ public function deleteDocuments(
61016151 );
61026152 });
61036153
6104- foreach ($ batch as $ document ) {
6154+ foreach ($ batch as $ index => $ document ) {
61056155 if ($ this ->getSharedTables () && $ this ->getTenantPerDocument ()) {
61066156 $ this ->withTenant ($ document ->getTenant (), function () use ($ collection , $ document ) {
61076157 $ this ->purgeCachedDocument ($ collection ->getId (), $ document ->getId ());
@@ -6110,7 +6160,7 @@ public function deleteDocuments(
61106160 $ this ->purgeCachedDocument ($ collection ->getId (), $ document ->getId ());
61116161 }
61126162 try {
6113- $ onNext && $ onNext ($ document );
6163+ $ onNext && $ onNext ($ document, $ old [ $ index ] );
61146164 } catch (Throwable $ th ) {
61156165 $ onError ? $ onError ($ th ) : throw $ th ;
61166166 }
@@ -6520,7 +6570,7 @@ public static function addFilter(string $name, callable $encode, callable $decod
65206570 public function encode (Document $ collection , Document $ document ): Document
65216571 {
65226572 $ attributes = $ collection ->getAttribute ('attributes ' , []);
6523- $ internalDateAttributes = ['$createdAt ' ,'$updatedAt ' ];
6573+ $ internalDateAttributes = ['$createdAt ' , '$updatedAt ' ];
65246574 foreach ($ this ->getInternalAttributes () as $ attribute ) {
65256575 $ attributes [] = $ attribute ;
65266576 }
@@ -6937,7 +6987,7 @@ public static function convertQuery(Document $collection, Query $query): Query
69376987 }
69386988 }
69396989
6940- if (! $ attribute ->isEmpty ()) {
6990+ if (!$ attribute ->isEmpty ()) {
69416991 $ query ->setOnArray ($ attribute ->getAttribute ('array ' , false ));
69426992
69436993 if ($ attribute ->getAttribute ('type ' ) == Database::VAR_DATETIME ) {
@@ -7195,7 +7245,7 @@ public function decodeSpatialData(string $wkt): array
71957245 // POINT(x y)
71967246 if (str_starts_with ($ upper , 'POINT( ' )) {
71977247 $ start = strpos ($ wkt , '( ' ) + 1 ;
7198- $ end = strrpos ($ wkt , ') ' );
7248+ $ end = strrpos ($ wkt , ') ' );
71997249 $ inside = substr ($ wkt , $ start , $ end - $ start );
72007250
72017251 $ coords = explode (' ' , trim ($ inside ));
@@ -7205,7 +7255,7 @@ public function decodeSpatialData(string $wkt): array
72057255 // LINESTRING(x1 y1, x2 y2, ...)
72067256 if (str_starts_with ($ upper , 'LINESTRING( ' )) {
72077257 $ start = strpos ($ wkt , '( ' ) + 1 ;
7208- $ end = strrpos ($ wkt , ') ' );
7258+ $ end = strrpos ($ wkt , ') ' );
72097259 $ inside = substr ($ wkt , $ start , $ end - $ start );
72107260
72117261 $ points = explode (', ' , $ inside );
@@ -7218,7 +7268,7 @@ public function decodeSpatialData(string $wkt): array
72187268 // POLYGON((x1,y1),(x2,y2))
72197269 if (str_starts_with ($ upper , 'POLYGON(( ' )) {
72207270 $ start = strpos ($ wkt , '(( ' ) + 2 ;
7221- $ end = strrpos ($ wkt , ')) ' );
7271+ $ end = strrpos ($ wkt , ')) ' );
72227272 $ inside = substr ($ wkt , $ start , $ end - $ start );
72237273
72247274 $ rings = explode ('),( ' , $ inside );
0 commit comments