Skip to content

Commit 5b46bb6

Browse files
committed
Abstract upsert to sql level
1 parent 9faf41f commit 5b46bb6

5 files changed

Lines changed: 1311 additions & 1316 deletions

File tree

src/Database/Adapter.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,14 @@ protected function trigger(string $event, mixed $query): mixed
419419
return $query;
420420
}
421421

422+
/**
423+
* Quote a string
424+
*
425+
* @param string $string
426+
* @return string
427+
*/
428+
abstract protected function quote(string $string): string;
429+
422430
/**
423431
* Ping Database
424432
*

src/Database/Adapter/MariaDB.php

Lines changed: 0 additions & 277 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,51 +1099,6 @@ public function createDocuments(string $collection, array $documents): array
10991099
return $documents;
11001100
}
11011101

1102-
/**
1103-
* Get internal IDs for the given documents
1104-
*
1105-
* @param string $collection
1106-
* @param array<string> $documentIds
1107-
* @param array<?int> $documentTenants
1108-
* @return array<string>
1109-
* @throws DatabaseException
1110-
*/
1111-
private function getInternalIds(string $collection, array $documentIds, array $documentTenants = []): array
1112-
{
1113-
$internalIds = [];
1114-
1115-
/**
1116-
* UID, _tenant bottleneck is ~ 5000 rows since we use _uid IN query
1117-
*/
1118-
foreach (\array_chunk($documentIds, 1000) as $documentIdsChunk) {
1119-
$sql = "
1120-
SELECT _uid, _id
1121-
FROM {$this->getSQLTable($collection)}
1122-
WHERE _uid IN (" . implode(',', array_map(fn ($index) => ":_key_{$index}", array_keys($documentIdsChunk))) . ")
1123-
{$this->getTenantQuery($collection, tenantCount: \count($documentIdsChunk))}
1124-
";
1125-
1126-
$stmt = $this->getPDO()->prepare($sql);
1127-
1128-
foreach ($documentIdsChunk as $index => $id) {
1129-
$stmt->bindValue(":_key_{$index}", $id);
1130-
}
1131-
1132-
if ($this->sharedTables) {
1133-
foreach ($documentIdsChunk as $index => $id) {
1134-
$stmt->bindValue(":_tenant_{$index}", \array_shift($documentTenants));
1135-
}
1136-
}
1137-
1138-
$stmt->execute();
1139-
$results = $stmt->fetchAll(PDO::FETCH_KEY_PAIR); // Fetch as [documentId => internalId]
1140-
$stmt->closeCursor();
1141-
1142-
$internalIds = [...$internalIds, ...$results];
1143-
}
1144-
1145-
return $internalIds;
1146-
}
11471102
/**
11481103
* Update Document
11491104
*
@@ -1375,238 +1330,6 @@ public function updateDocument(string $collection, string $id, Document $documen
13751330
return $document;
13761331
}
13771332

1378-
1379-
/**
1380-
* @param string $collection
1381-
* @param string $attribute
1382-
* @param array<Document> $documents
1383-
* @return array<Document>
1384-
* @throws DatabaseException
1385-
*/
1386-
public function createOrUpdateDocuments(
1387-
string $collection,
1388-
string $attribute,
1389-
array $documents
1390-
): array {
1391-
if (empty($documents)) {
1392-
return $documents;
1393-
}
1394-
1395-
try {
1396-
$name = $this->filter($collection);
1397-
$attribute = $this->filter($attribute);
1398-
1399-
$attributes = [];
1400-
$bindIndex = 0;
1401-
$batchKeys = [];
1402-
$bindValues = [];
1403-
$documentIds = [];
1404-
$documentTenants = [];
1405-
1406-
foreach ($documents as $document) {
1407-
$attributes = $document->getAttributes();
1408-
$attributes['_uid'] = $document->getId();
1409-
$attributes['_createdAt'] = $document->getCreatedAt();
1410-
$attributes['_updatedAt'] = $document->getUpdatedAt();
1411-
$attributes['_permissions'] = \json_encode($document->getPermissions());
1412-
1413-
if (!empty($document->getInternalId())) {
1414-
$attributes['_id'] = $document->getInternalId();
1415-
} else {
1416-
$documentIds[] = $document->getId();
1417-
}
1418-
1419-
if ($this->sharedTables) {
1420-
$attributes['_tenant']
1421-
= $documentTenants[]
1422-
= $document->getTenant();
1423-
}
1424-
1425-
$columns = [];
1426-
foreach (\array_keys($attributes) as $key => $attr) {
1427-
$columns[$key] = "`{$this->filter($attr)}`";
1428-
}
1429-
$columns = '(' . \implode(', ', $columns) . ')';
1430-
1431-
$bindKeys = [];
1432-
1433-
foreach ($attributes as $attrValue) {
1434-
if (\is_array($attrValue)) {
1435-
$attrValue = \json_encode($attrValue);
1436-
}
1437-
$attrValue = (\is_bool($attrValue)) ? (int)$attrValue : $attrValue;
1438-
$bindKey = 'key_' . $bindIndex;
1439-
$bindKeys[] = ':' . $bindKey;
1440-
$bindValues[$bindKey] = $attrValue;
1441-
$bindIndex++;
1442-
}
1443-
1444-
$batchKeys[] = '(' . \implode(', ', $bindKeys) . ')';
1445-
}
1446-
1447-
$getUpdateClause = function (string $attribute, bool $increment = false): string {
1448-
if ($increment) {
1449-
$new = "`{$attribute}` + VALUES(`{$attribute}`)";
1450-
} else {
1451-
$new = "VALUES(`{$attribute}`)";
1452-
}
1453-
1454-
if ($this->sharedTables) {
1455-
return "`{$attribute}` = IF(`_tenant` = VALUES(`_tenant`), {$new}, `{$attribute}`)";
1456-
}
1457-
1458-
return "`{$attribute}` = {$new}";
1459-
};
1460-
1461-
if (!empty($attribute)) {
1462-
// Increment specific column by its new value in place
1463-
$updateColumns = [
1464-
$getUpdateClause($attribute, increment: true),
1465-
$getUpdateClause('_updatedAt'),
1466-
];
1467-
} else {
1468-
// Update all columns
1469-
$updateColumns = [];
1470-
foreach (\array_keys($attributes) as $attr) {
1471-
$updateColumns[] = $getUpdateClause($this->filter($attr));
1472-
}
1473-
}
1474-
1475-
$stmt = $this->getPDO()->prepare(
1476-
"
1477-
INSERT INTO {$this->getSQLTable($name)} {$columns}
1478-
VALUES " . \implode(', ', $batchKeys) . "
1479-
ON DUPLICATE KEY UPDATE
1480-
" . \implode(', ', $updateColumns)
1481-
);
1482-
1483-
foreach ($bindValues as $key => $binding) {
1484-
$stmt->bindValue($key, $binding, $this->getPDOType($binding));
1485-
}
1486-
1487-
$stmt->execute();
1488-
1489-
// Fetch existing permissions in bulk after data updates
1490-
$sql = "
1491-
SELECT _document, _type, _permission
1492-
FROM {$this->getSQLTable($name . '_perms')}
1493-
WHERE _document IN (" . \implode(',', \array_map(fn ($index) => ":_key_{$index}", \array_keys($documents))) . ")
1494-
{$this->getTenantQuery($collection, tenantCount: \count($documentTenants))}
1495-
";
1496-
1497-
$stmt = $this->getPDO()->prepare($sql);
1498-
1499-
foreach ($documents as $index => $document) {
1500-
$stmt->bindValue(":_key_{$index}", $document->getId());
1501-
}
1502-
1503-
if ($this->sharedTables) {
1504-
foreach ($documentTenants as $index => $tenant) {
1505-
$stmt->bindValue(":_tenant_{$index}", $tenant);
1506-
}
1507-
}
1508-
1509-
$stmt->execute();
1510-
$existing = $stmt->fetchAll();
1511-
$stmt->closeCursor();
1512-
1513-
// Group permissions by document
1514-
$permissionsByDocument = [];
1515-
foreach ($existing as $row) {
1516-
$permissionsByDocument[$row['_document']][$row['_type']][] = $row['_permission'];
1517-
}
1518-
1519-
foreach ($documentIds as $id) {
1520-
foreach (Database::PERMISSIONS as $type) {
1521-
$permissionsByDocument[$id][$type] = $permissionsByDocument[$id][$type] ?? [];
1522-
}
1523-
}
1524-
1525-
$removeQueries = [];
1526-
$removeBindValues = [];
1527-
$addQueries = [];
1528-
$addBindValues = [];
1529-
1530-
foreach ($documents as $index => $document) {
1531-
$currentPermissions = $permissionsByDocument[$document->getId()] ?? [];
1532-
1533-
// Calculate removals
1534-
foreach (Database::PERMISSIONS as $type) {
1535-
$toRemove = \array_diff($currentPermissions[$type] ?? [], $document->getPermissionsByType($type));
1536-
if (!empty($toRemove)) {
1537-
$removeQueries[] = "(
1538-
_document = :_uid_{$index}
1539-
{$this->getTenantQuery($collection, tenantCount: \count($toRemove))}
1540-
AND _type = '{$type}'
1541-
AND _permission IN (" . \implode(',', \array_map(fn ($i) => ":remove_{$type}_{$index}_{$i}", \array_keys($toRemove))) . ")
1542-
)";
1543-
$removeBindValues[":_uid_{$index}"] = $document->getId();
1544-
$removeBindValues[":_tenant_{$index}"] = $document->getTenant();
1545-
foreach ($toRemove as $i => $perm) {
1546-
$removeBindValues[":remove_{$type}_{$index}_{$i}"] = $perm;
1547-
}
1548-
}
1549-
}
1550-
1551-
// Calculate additions
1552-
foreach (Database::PERMISSIONS as $type) {
1553-
$toAdd = \array_diff($document->getPermissionsByType($type), $currentPermissions[$type] ?? []);
1554-
foreach ($toAdd as $i => $permission) {
1555-
$addQuery = "(:_uid_{$index}, '{$type}', :add_{$type}_{$index}_{$i}";
1556-
1557-
if ($this->sharedTables) {
1558-
$addQuery .= ", :_tenant_{$index}";
1559-
}
1560-
1561-
$addQuery .= ")";
1562-
$addQueries[] = $addQuery;
1563-
$addBindValues[":_uid_{$index}"] = $document->getId();
1564-
$addBindValues[":add_{$type}_{$index}_{$i}"] = $permission;
1565-
1566-
if ($this->sharedTables) {
1567-
$addBindValues[":_tenant_{$index}"] = $document->getTenant();
1568-
}
1569-
}
1570-
}
1571-
}
1572-
1573-
// Execute permission removals
1574-
if (!empty($removeQueries)) {
1575-
$removeQuery = \implode(' OR ', $removeQueries);
1576-
$stmtRemovePermissions = $this->getPDO()->prepare("DELETE FROM {$this->getSQLTable($name . '_perms')} WHERE {$removeQuery}");
1577-
foreach ($removeBindValues as $key => $value) {
1578-
$stmtRemovePermissions->bindValue($key, $value, $this->getPDOType($value));
1579-
}
1580-
$stmtRemovePermissions->execute();
1581-
}
1582-
1583-
// Execute permission additions
1584-
if (!empty($addQueries)) {
1585-
$sqlAddPermissions = "INSERT INTO {$this->getSQLTable($name . '_perms')} (_document, _type, _permission";
1586-
if ($this->sharedTables) {
1587-
$sqlAddPermissions .= ", _tenant";
1588-
}
1589-
$sqlAddPermissions .= ") VALUES " . \implode(', ', $addQueries);
1590-
$stmtAddPermissions = $this->getPDO()->prepare($sqlAddPermissions);
1591-
foreach ($addBindValues as $key => $value) {
1592-
$stmtAddPermissions->bindValue($key, $value, $this->getPDOType($value));
1593-
}
1594-
$stmtAddPermissions->execute();
1595-
}
1596-
1597-
$internalIds = $this->getInternalIds($collection, $documentIds, $documentTenants);
1598-
foreach ($documents as $document) {
1599-
if (isset($internalIds[$document->getId()])) {
1600-
$document['$internalId'] = $internalIds[$document->getId()];
1601-
}
1602-
}
1603-
} catch (PDOException $e) {
1604-
throw $this->processException($e);
1605-
}
1606-
1607-
return $documents;
1608-
}
1609-
16101333
/**
16111334
* Increase or decrease an attribute value
16121335
*

src/Database/Adapter/Pool.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ public function rollbackTransaction(): bool
9191
return $this->delegate(__FUNCTION__, \func_get_args());
9292
}
9393

94+
protected function quote(string $string): string
95+
{
96+
return $this->delegate(__FUNCTION__, \func_get_args());
97+
}
98+
9499
public function ping(): bool
95100
{
96101
return $this->delegate(__FUNCTION__, \func_get_args());

src/Database/Adapter/Postgres.php

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1377,17 +1377,6 @@ public function updateDocument(string $collection, string $id, Document $documen
13771377
return $document;
13781378
}
13791379

1380-
/**
1381-
* @param string $collection
1382-
* @param string $attribute
1383-
* @param array<Document> $documents
1384-
* @return array<Document>
1385-
*/
1386-
public function createOrUpdateDocuments(string $collection, string $attribute, array $documents): array
1387-
{
1388-
return $documents;
1389-
}
1390-
13911380
/**
13921381
* Increase or decrease an attribute value
13931382
*
@@ -1835,6 +1824,15 @@ public function sum(string $collection, string $attribute, array $queries = [],
18351824
return $result['sum'] ?? 0;
18361825
}
18371826

1827+
/**
1828+
* @return string
1829+
*/
1830+
public function getConnectionId(): string
1831+
{
1832+
$stmt = $this->getPDO()->query("SELECT pg_backend_pid();");
1833+
return $stmt->fetchColumn();
1834+
}
1835+
18381836
/**
18391837
* Get SQL Condition
18401838
*
@@ -2110,8 +2108,6 @@ public function getLikeOperator(): string
21102108
return 'ILIKE';
21112109
}
21122110

2113-
2114-
21152111
protected function processException(PDOException $e): \Exception
21162112
{
21172113
// Timeout
@@ -2143,14 +2139,9 @@ protected function processException(PDOException $e): \Exception
21432139
}
21442140

21452141
/**
2142+
* @param string $string
21462143
* @return string
21472144
*/
2148-
public function getConnectionId(): string
2149-
{
2150-
$stmt = $this->getPDO()->query("SELECT pg_backend_pid();");
2151-
return $stmt->fetchColumn();
2152-
}
2153-
21542145
protected function quote(string $string): string
21552146
{
21562147
return "\"{$string}\"";

0 commit comments

Comments
 (0)