Skip to content

Commit 42fb9ea

Browse files
committed
Dbal KeySet Extractor
1 parent 4aec545 commit 42fb9ea

7 files changed

Lines changed: 657 additions & 3 deletions

File tree

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Adapter\Doctrine;
6+
7+
use function Flow\ETL\DSL\array_to_rows;
8+
use Doctrine\DBAL\Connection;
9+
use Doctrine\DBAL\Query\QueryBuilder;
10+
use Flow\ETL\{Adapter\Doctrine\Pagination\KeySet, Extractor, FlowContext, Schema};
11+
use Flow\ETL\Exception\{InvalidArgumentException, RuntimeException};
12+
13+
/**
14+
* Extractor implementing keyset pagination for Doctrine DBAL queries.
15+
*
16+
* This extractor fetches rows page by page using keyset pagination, which is more efficient
17+
* than limit/offset for large datasets. It requires a KeySet object defining the columns
18+
* and sort orders for pagination. The key columns must be non-null and provide a unique
19+
* ordering to ensure correct pagination.
20+
*/
21+
final class DbalKeySetExtractor implements Extractor
22+
{
23+
private ?int $maximum = null;
24+
25+
private int $pageSize = 1000;
26+
27+
private ?Schema $schema = null;
28+
29+
public function __construct(
30+
private readonly Connection $connection,
31+
private readonly QueryBuilder $queryBuilder,
32+
private readonly KeySet $keySet,
33+
) {
34+
$qb = clone $this->queryBuilder;
35+
36+
if ($qb->resetOrderBy()->getSQL() !== $this->queryBuilder->getSQL()) {
37+
throw new InvalidArgumentException('Keyset pagination cannot be used with an ORDER BY clause, please remove OrderBy from Query Builder');
38+
}
39+
40+
if (empty($this->keySet->keys)) {
41+
throw new InvalidArgumentException('KeySet must contain at least one key for pagination');
42+
}
43+
}
44+
45+
public function extract(FlowContext $context) : \Generator
46+
{
47+
$totalFetched = 0;
48+
$lastRow = null;
49+
50+
while (true) {
51+
$qb = clone $this->queryBuilder;
52+
$qb->setMaxResults($this->pageSize);
53+
54+
foreach ($this->keySet->keys as $key) {
55+
$qb->addOrderBy($key->column, $key->order->value);
56+
}
57+
58+
if ($lastRow !== null) {
59+
$conditions = [];
60+
$parameters = [];
61+
$parameterTypes = [];
62+
63+
foreach ($this->keySet->keys as $index => $key) {
64+
if (!\array_key_exists($key->column, $lastRow)) {
65+
throw new RuntimeException(sprintf('Column "%s" not found in last row for keyset pagination', $key->column));
66+
}
67+
68+
$lastValue = $lastRow[$key->column];
69+
70+
if ($lastValue === null) {
71+
throw new RuntimeException(sprintf('NULL value found in column "%s" for keyset pagination; key columns must be non-null', $key->column));
72+
}
73+
74+
$paramName = $key->column . '_previous';
75+
$parameters[$paramName] = $lastValue;
76+
$parameterTypes[$paramName] = $key->type;
77+
78+
$subConditions = [];
79+
80+
for ($i = 0; $i < $index; $i++) {
81+
$prevKey = $this->keySet->keys[$i];
82+
$subConditions[] = $qb->expr()->eq($prevKey->column, ':' . $prevKey->column . '_previous');
83+
}
84+
$operator = $key->order->value === 'DESC' ? 'lt' : 'gt';
85+
$subConditions[] = $qb->expr()->{$operator}($key->column, ':' . $paramName);
86+
87+
$conditions[] = $qb->expr()->and(...$subConditions);
88+
}
89+
90+
if ($conditions) {
91+
$qb->andWhere($qb->expr()->or(...$conditions));
92+
93+
foreach ($parameters as $param => $value) {
94+
/** @phpstan-ignore-next-line */
95+
$qb->setParameter($param, $value, $parameterTypes[$param]);
96+
}
97+
}
98+
}
99+
100+
$stmt = $this->connection->executeQuery(
101+
$qb->getSQL(),
102+
$qb->getParameters(),
103+
$qb->getParameterTypes()
104+
);
105+
106+
$hasRows = false;
107+
108+
while ($row = $stmt->fetchAssociative()) {
109+
$hasRows = true;
110+
$lastRow = $row;
111+
112+
$signal = yield array_to_rows($row, $context->entryFactory(), [], $this->schema);
113+
114+
if ($signal === Extractor\Signal::STOP) {
115+
return;
116+
}
117+
118+
$totalFetched++;
119+
120+
if (null !== $this->maximum && $totalFetched >= $this->maximum) {
121+
return;
122+
}
123+
}
124+
125+
if (!$hasRows) {
126+
break;
127+
}
128+
}
129+
}
130+
131+
/**
132+
* Sets the maximum number of rows to fetch.
133+
*
134+
* @param int $maximum the maximum number of rows (must be > 0)
135+
*
136+
* @throws InvalidArgumentException if maximum is <= 0
137+
*
138+
* @return $this
139+
*/
140+
public function withMaximum(int $maximum) : self
141+
{
142+
if ($maximum <= 0) {
143+
throw new InvalidArgumentException('Maximum must be greater than 0, got ' . $maximum);
144+
}
145+
146+
$this->maximum = $maximum;
147+
148+
return $this;
149+
}
150+
151+
/**
152+
* Sets the number of rows per page.
153+
*
154+
* @param int $pageSize the page size (must be > 0)
155+
*
156+
* @throws InvalidArgumentException if page size is <= 0
157+
*
158+
* @return $this
159+
*/
160+
public function withPageSize(int $pageSize) : self
161+
{
162+
if ($pageSize <= 0) {
163+
throw new InvalidArgumentException('Page size must be greater than 0, got ' . $pageSize);
164+
}
165+
166+
$this->pageSize = $pageSize;
167+
168+
return $this;
169+
}
170+
171+
/**
172+
* Sets the schema for the extracted rows.
173+
*
174+
* @param Schema $schema the schema to apply to rows
175+
*
176+
* @return $this
177+
*/
178+
public function withSchema(Schema $schema) : self
179+
{
180+
$this->schema = $schema;
181+
182+
return $this;
183+
}
184+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Adapter\Doctrine\Pagination;
6+
7+
use Doctrine\DBAL\ParameterType;
8+
use Doctrine\DBAL\Types\Type;
9+
10+
final readonly class Key
11+
{
12+
public function __construct(
13+
public string $column,
14+
public Order $order,
15+
public string|int|ParameterType|Type $type = ParameterType::STRING,
16+
) {
17+
}
18+
19+
public static function asc(string $column, string|int|ParameterType|Type $type = ParameterType::STRING) : self
20+
{
21+
return new self($column, Order::ASC, $type);
22+
}
23+
24+
public static function desc(string $column, string|int|ParameterType|Type $type = ParameterType::STRING) : self
25+
{
26+
return new self($column, Order::DESC, $type);
27+
}
28+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Adapter\Doctrine\Pagination;
6+
7+
final readonly class KeySet
8+
{
9+
/**
10+
* @var array<Key>
11+
*/
12+
public array $keys;
13+
14+
public function __construct(Key ...$keys)
15+
{
16+
$this->keys = \array_reverse($keys);
17+
}
18+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Adapter\Doctrine\Pagination;
6+
7+
enum Order : string
8+
{
9+
case ASC = 'ASC';
10+
case DESC = 'DESC';
11+
}

src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/functions.php

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44

55
namespace Flow\ETL\Adapter\Doctrine;
66

7-
use Doctrine\DBAL\{ArrayParameterType as DbalArrayType, Connection, ParameterType as DbalParameterType};
7+
use Doctrine\DBAL\{
8+
ArrayParameterType as DbalArrayType,
9+
Connection,
10+
ParameterType,
11+
ParameterType as DbalParameterType,
12+
Types\Type
13+
};
814
use Doctrine\DBAL\Query\QueryBuilder;
915
use Doctrine\DBAL\Types\Type as DbalType;
1016
use Flow\Doctrine\Bulk\{Dialect\MySQLInsertOptions,
@@ -13,7 +19,10 @@
1319
Dialect\SqliteInsertOptions,
1420
InsertOptions,
1521
UpdateOptions};
16-
use Flow\ETL\{Attribute\DocumentationDSL,
22+
use Flow\ETL\{Adapter\Doctrine\Pagination\Key,
23+
Adapter\Doctrine\Pagination\KeySet,
24+
Adapter\Doctrine\Pagination\Order,
25+
Attribute\DocumentationDSL,
1726
Attribute\DocumentationExample,
1827
Attribute\Module,
1928
Attribute\Type as DSLType,
@@ -93,6 +102,15 @@ function from_dbal_limit_offset_qb(
93102
return $loader;
94103
}
95104

105+
#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::EXTRACTOR)]
106+
function from_dbal_key_set_qb(
107+
Connection $connection,
108+
QueryBuilder $queryBuilder,
109+
KeySet $key_set,
110+
) : DbalKeySetExtractor {
111+
return new DbalKeySetExtractor($connection, $queryBuilder, $key_set);
112+
}
113+
96114
/**
97115
* @param null|ParametersSet $parameters_set - each one parameters array will be evaluated as new query
98116
* @param array<int|string, DbalArrayType|DbalParameterType|DbalType|int|string> $types
@@ -265,3 +283,21 @@ function postgresql_update_options(
265283
$update_columns,
266284
);
267285
}
286+
287+
#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::HELPER)]
288+
function pagination_key_asc(string $column, string|int|ParameterType|Type $type = ParameterType::STRING) : Key
289+
{
290+
return new Key($column, Order::ASC, $type);
291+
}
292+
293+
#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::HELPER)]
294+
function pagination_key_desc(string $column, string|int|ParameterType|Type $type = ParameterType::STRING) : Key
295+
{
296+
return new Key($column, Order::DESC, $type);
297+
}
298+
299+
#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::HELPER)]
300+
function pagination_key_set(Key ...$keys) : KeySet
301+
{
302+
return new KeySet(...$keys);
303+
}

0 commit comments

Comments
 (0)