Skip to content

Commit f3f025a

Browse files
authored
Dbal KeySet Extractor (#1603)
* Dbal KeySet Extractor * Build docs * Fixed support for older doctrine versions
1 parent 4aec545 commit f3f025a

7 files changed

Lines changed: 660 additions & 3 deletions

File tree

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Adapter\Doctrine;
6+
7+
use function Flow\ETL\DSL\array_to_rows;
8+
use Doctrine\DBAL\Connection;
9+
use Doctrine\DBAL\Query\QueryBuilder;
10+
use Flow\ETL\{Adapter\Doctrine\Pagination\KeySet, Extractor, FlowContext, Schema};
11+
use Flow\ETL\Exception\{InvalidArgumentException, RuntimeException};
12+
13+
/**
14+
* Extractor implementing keyset pagination for Doctrine DBAL queries.
15+
*
16+
* This extractor fetches rows page by page using keyset pagination, which is more efficient
17+
* than limit/offset for large datasets. It requires a KeySet object defining the columns
18+
* and sort orders for pagination. The key columns must be non-null and provide a unique
19+
* ordering to ensure correct pagination.
20+
*/
21+
final class DbalKeySetExtractor implements Extractor
22+
{
23+
private ?int $maximum = null;
24+
25+
private int $pageSize = 1000;
26+
27+
private ?Schema $schema = null;
28+
29+
public function __construct(
30+
private readonly Connection $connection,
31+
private readonly QueryBuilder $queryBuilder,
32+
private readonly KeySet $keySet,
33+
) {
34+
$qb = clone $this->queryBuilder;
35+
36+
/** @phpstan-ignore-next-line */
37+
$cleanQuery = \method_exists($qb, 'resetOrderBy') ? (clone $this->queryBuilder)->resetOrderBy() : (clone $qb)->resetQueryPart('orderBy');
38+
39+
if ($cleanQuery->getSQL() !== $this->queryBuilder->getSQL()) {
40+
throw new InvalidArgumentException('Keyset pagination cannot be used with an ORDER BY clause, please remove OrderBy from Query Builder');
41+
}
42+
43+
if (empty($this->keySet->keys)) {
44+
throw new InvalidArgumentException('KeySet must contain at least one key for pagination');
45+
}
46+
}
47+
48+
public function extract(FlowContext $context) : \Generator
49+
{
50+
$totalFetched = 0;
51+
$lastRow = null;
52+
53+
while (true) {
54+
$qb = clone $this->queryBuilder;
55+
$qb->setMaxResults($this->pageSize);
56+
57+
foreach ($this->keySet->keys as $key) {
58+
$qb->addOrderBy($key->column, $key->order->value);
59+
}
60+
61+
if ($lastRow !== null) {
62+
$conditions = [];
63+
$parameters = [];
64+
$parameterTypes = [];
65+
66+
foreach ($this->keySet->keys as $index => $key) {
67+
if (!\array_key_exists($key->column, $lastRow)) {
68+
throw new RuntimeException(sprintf('Column "%s" not found in last row for keyset pagination', $key->column));
69+
}
70+
71+
$lastValue = $lastRow[$key->column];
72+
73+
if ($lastValue === null) {
74+
throw new RuntimeException(sprintf('NULL value found in column "%s" for keyset pagination; key columns must be non-null', $key->column));
75+
}
76+
77+
$paramName = $key->column . '_previous';
78+
$parameters[$paramName] = $lastValue;
79+
$parameterTypes[$paramName] = $key->type;
80+
81+
$subConditions = [];
82+
83+
for ($i = 0; $i < $index; $i++) {
84+
$prevKey = $this->keySet->keys[$i];
85+
$subConditions[] = $qb->expr()->eq($prevKey->column, ':' . $prevKey->column . '_previous');
86+
}
87+
$operator = $key->order->value === 'DESC' ? 'lt' : 'gt';
88+
$subConditions[] = $qb->expr()->{$operator}($key->column, ':' . $paramName);
89+
90+
$conditions[] = $qb->expr()->and(...$subConditions);
91+
}
92+
93+
if ($conditions) {
94+
$qb->andWhere($qb->expr()->or(...$conditions));
95+
96+
foreach ($parameters as $param => $value) {
97+
/** @phpstan-ignore-next-line */
98+
$qb->setParameter($param, $value, $parameterTypes[$param]);
99+
}
100+
}
101+
}
102+
103+
$stmt = $this->connection->executeQuery(
104+
$qb->getSQL(),
105+
$qb->getParameters(),
106+
$qb->getParameterTypes()
107+
);
108+
109+
$hasRows = false;
110+
111+
while ($row = $stmt->fetchAssociative()) {
112+
$hasRows = true;
113+
$lastRow = $row;
114+
115+
$signal = yield array_to_rows($row, $context->entryFactory(), [], $this->schema);
116+
117+
if ($signal === Extractor\Signal::STOP) {
118+
return;
119+
}
120+
121+
$totalFetched++;
122+
123+
if (null !== $this->maximum && $totalFetched >= $this->maximum) {
124+
return;
125+
}
126+
}
127+
128+
if (!$hasRows) {
129+
break;
130+
}
131+
}
132+
}
133+
134+
/**
135+
* Sets the maximum number of rows to fetch.
136+
*
137+
* @param int $maximum the maximum number of rows (must be > 0)
138+
*
139+
* @throws InvalidArgumentException if maximum is <= 0
140+
*
141+
* @return $this
142+
*/
143+
public function withMaximum(int $maximum) : self
144+
{
145+
if ($maximum <= 0) {
146+
throw new InvalidArgumentException('Maximum must be greater than 0, got ' . $maximum);
147+
}
148+
149+
$this->maximum = $maximum;
150+
151+
return $this;
152+
}
153+
154+
/**
155+
* Sets the number of rows per page.
156+
*
157+
* @param int $pageSize the page size (must be > 0)
158+
*
159+
* @throws InvalidArgumentException if page size is <= 0
160+
*
161+
* @return $this
162+
*/
163+
public function withPageSize(int $pageSize) : self
164+
{
165+
if ($pageSize <= 0) {
166+
throw new InvalidArgumentException('Page size must be greater than 0, got ' . $pageSize);
167+
}
168+
169+
$this->pageSize = $pageSize;
170+
171+
return $this;
172+
}
173+
174+
/**
175+
* Sets the schema for the extracted rows.
176+
*
177+
* @param Schema $schema the schema to apply to rows
178+
*
179+
* @return $this
180+
*/
181+
public function withSchema(Schema $schema) : self
182+
{
183+
$this->schema = $schema;
184+
185+
return $this;
186+
}
187+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Adapter\Doctrine\Pagination;
6+
7+
use Doctrine\DBAL\ParameterType;
8+
use Doctrine\DBAL\Types\Type;
9+
10+
final readonly class Key
11+
{
12+
public function __construct(
13+
public string $column,
14+
public Order $order,
15+
public string|int|ParameterType|Type $type = ParameterType::STRING,
16+
) {
17+
}
18+
19+
public static function asc(string $column, string|int|ParameterType|Type $type = ParameterType::STRING) : self
20+
{
21+
return new self($column, Order::ASC, $type);
22+
}
23+
24+
public static function desc(string $column, string|int|ParameterType|Type $type = ParameterType::STRING) : self
25+
{
26+
return new self($column, Order::DESC, $type);
27+
}
28+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Adapter\Doctrine\Pagination;
6+
7+
final readonly class KeySet
8+
{
9+
/**
10+
* @var array<Key>
11+
*/
12+
public array $keys;
13+
14+
public function __construct(Key ...$keys)
15+
{
16+
$this->keys = \array_reverse($keys);
17+
}
18+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Adapter\Doctrine\Pagination;
6+
7+
enum Order : string
8+
{
9+
case ASC = 'ASC';
10+
case DESC = 'DESC';
11+
}

src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/functions.php

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44

55
namespace Flow\ETL\Adapter\Doctrine;
66

7-
use Doctrine\DBAL\{ArrayParameterType as DbalArrayType, Connection, ParameterType as DbalParameterType};
7+
use Doctrine\DBAL\{
8+
ArrayParameterType as DbalArrayType,
9+
Connection,
10+
ParameterType,
11+
ParameterType as DbalParameterType,
12+
Types\Type
13+
};
814
use Doctrine\DBAL\Query\QueryBuilder;
915
use Doctrine\DBAL\Types\Type as DbalType;
1016
use Flow\Doctrine\Bulk\{Dialect\MySQLInsertOptions,
@@ -13,7 +19,10 @@
1319
Dialect\SqliteInsertOptions,
1420
InsertOptions,
1521
UpdateOptions};
16-
use Flow\ETL\{Attribute\DocumentationDSL,
22+
use Flow\ETL\{Adapter\Doctrine\Pagination\Key,
23+
Adapter\Doctrine\Pagination\KeySet,
24+
Adapter\Doctrine\Pagination\Order,
25+
Attribute\DocumentationDSL,
1726
Attribute\DocumentationExample,
1827
Attribute\Module,
1928
Attribute\Type as DSLType,
@@ -93,6 +102,15 @@ function from_dbal_limit_offset_qb(
93102
return $loader;
94103
}
95104

105+
#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::EXTRACTOR)]
106+
function from_dbal_key_set_qb(
107+
Connection $connection,
108+
QueryBuilder $queryBuilder,
109+
KeySet $key_set,
110+
) : DbalKeySetExtractor {
111+
return new DbalKeySetExtractor($connection, $queryBuilder, $key_set);
112+
}
113+
96114
/**
97115
* @param null|ParametersSet $parameters_set - each one parameters array will be evaluated as new query
98116
* @param array<int|string, DbalArrayType|DbalParameterType|DbalType|int|string> $types
@@ -265,3 +283,21 @@ function postgresql_update_options(
265283
$update_columns,
266284
);
267285
}
286+
287+
#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::HELPER)]
288+
function pagination_key_asc(string $column, string|int|ParameterType|Type $type = ParameterType::STRING) : Key
289+
{
290+
return new Key($column, Order::ASC, $type);
291+
}
292+
293+
#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::HELPER)]
294+
function pagination_key_desc(string $column, string|int|ParameterType|Type $type = ParameterType::STRING) : Key
295+
{
296+
return new Key($column, Order::DESC, $type);
297+
}
298+
299+
#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::HELPER)]
300+
function pagination_key_set(Key ...$keys) : KeySet
301+
{
302+
return new KeySet(...$keys);
303+
}

0 commit comments

Comments
 (0)