Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\Doctrine;

use function Flow\ETL\DSL\array_to_rows;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Query\QueryBuilder;
use Flow\ETL\{Adapter\Doctrine\Pagination\KeySet, Extractor, FlowContext, Schema};
use Flow\ETL\Exception\{InvalidArgumentException, RuntimeException};

/**
* Extractor implementing keyset pagination for Doctrine DBAL queries.
*
* This extractor fetches rows page by page using keyset pagination, which is more efficient
* than limit/offset for large datasets. It requires a KeySet object defining the columns
* and sort orders for pagination. The key columns must be non-null and provide a unique
* ordering to ensure correct pagination.
*/
final class DbalKeySetExtractor implements Extractor
{
private ?int $maximum = null;

private int $pageSize = 1000;

private ?Schema $schema = null;

public function __construct(
private readonly Connection $connection,
private readonly QueryBuilder $queryBuilder,
private readonly KeySet $keySet,
) {
$qb = clone $this->queryBuilder;

/** @phpstan-ignore-next-line */
$cleanQuery = \method_exists($qb, 'resetOrderBy') ? (clone $this->queryBuilder)->resetOrderBy() : (clone $qb)->resetQueryPart('orderBy');

if ($cleanQuery->getSQL() !== $this->queryBuilder->getSQL()) {
throw new InvalidArgumentException('Keyset pagination cannot be used with an ORDER BY clause, please remove OrderBy from Query Builder');
}

if (empty($this->keySet->keys)) {
throw new InvalidArgumentException('KeySet must contain at least one key for pagination');
}
}

public function extract(FlowContext $context) : \Generator
{
$totalFetched = 0;
$lastRow = null;

while (true) {
$qb = clone $this->queryBuilder;
$qb->setMaxResults($this->pageSize);

foreach ($this->keySet->keys as $key) {
$qb->addOrderBy($key->column, $key->order->value);
}

if ($lastRow !== null) {
$conditions = [];
$parameters = [];
$parameterTypes = [];

foreach ($this->keySet->keys as $index => $key) {
if (!\array_key_exists($key->column, $lastRow)) {
throw new RuntimeException(sprintf('Column "%s" not found in last row for keyset pagination', $key->column));

Check warning on line 68 in src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/DbalKeySetExtractor.php

View check run for this annotation

Codecov / codecov/patch

src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/DbalKeySetExtractor.php#L68

Added line #L68 was not covered by tests
}

$lastValue = $lastRow[$key->column];

if ($lastValue === null) {
throw new RuntimeException(sprintf('NULL value found in column "%s" for keyset pagination; key columns must be non-null', $key->column));

Check warning on line 74 in src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/DbalKeySetExtractor.php

View check run for this annotation

Codecov / codecov/patch

src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/DbalKeySetExtractor.php#L74

Added line #L74 was not covered by tests
}

$paramName = $key->column . '_previous';
$parameters[$paramName] = $lastValue;
$parameterTypes[$paramName] = $key->type;

$subConditions = [];

for ($i = 0; $i < $index; $i++) {
$prevKey = $this->keySet->keys[$i];
$subConditions[] = $qb->expr()->eq($prevKey->column, ':' . $prevKey->column . '_previous');
}
$operator = $key->order->value === 'DESC' ? 'lt' : 'gt';
$subConditions[] = $qb->expr()->{$operator}($key->column, ':' . $paramName);

$conditions[] = $qb->expr()->and(...$subConditions);
}

if ($conditions) {
$qb->andWhere($qb->expr()->or(...$conditions));

foreach ($parameters as $param => $value) {
/** @phpstan-ignore-next-line */
$qb->setParameter($param, $value, $parameterTypes[$param]);
}
}
}

$stmt = $this->connection->executeQuery(
$qb->getSQL(),
$qb->getParameters(),
$qb->getParameterTypes()
);

$hasRows = false;

while ($row = $stmt->fetchAssociative()) {
$hasRows = true;
$lastRow = $row;

$signal = yield array_to_rows($row, $context->entryFactory(), [], $this->schema);

if ($signal === Extractor\Signal::STOP) {
return;

Check warning on line 118 in src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/DbalKeySetExtractor.php

View check run for this annotation

Codecov / codecov/patch

src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/DbalKeySetExtractor.php#L118

Added line #L118 was not covered by tests
}

$totalFetched++;

if (null !== $this->maximum && $totalFetched >= $this->maximum) {
return;
}
}

if (!$hasRows) {
break;
}
}
}

/**
* Sets the maximum number of rows to fetch.
*
* @param int $maximum the maximum number of rows (must be > 0)
*
* @throws InvalidArgumentException if maximum is <= 0
*
* @return $this
*/
public function withMaximum(int $maximum) : self
{
if ($maximum <= 0) {
throw new InvalidArgumentException('Maximum must be greater than 0, got ' . $maximum);
}

$this->maximum = $maximum;

return $this;
}

/**
* Sets the number of rows per page.
*
* @param int $pageSize the page size (must be > 0)
*
* @throws InvalidArgumentException if page size is <= 0
*
* @return $this
*/
public function withPageSize(int $pageSize) : self
{
if ($pageSize <= 0) {
throw new InvalidArgumentException('Page size must be greater than 0, got ' . $pageSize);
}

$this->pageSize = $pageSize;

return $this;
}

/**
* Sets the schema for the extracted rows.
*
* @param Schema $schema the schema to apply to rows
*
* @return $this
*/
public function withSchema(Schema $schema) : self
{
$this->schema = $schema;

return $this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\Doctrine\Pagination;

use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Types\Type;

final readonly class Key
{
public function __construct(
public string $column,
public Order $order,
public string|int|ParameterType|Type $type = ParameterType::STRING,
) {
}

public static function asc(string $column, string|int|ParameterType|Type $type = ParameterType::STRING) : self

Check warning on line 19 in src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/Pagination/Key.php

View check run for this annotation

Codecov / codecov/patch

src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/Pagination/Key.php#L19

Added line #L19 was not covered by tests
{
return new self($column, Order::ASC, $type);

Check warning on line 21 in src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/Pagination/Key.php

View check run for this annotation

Codecov / codecov/patch

src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/Pagination/Key.php#L21

Added line #L21 was not covered by tests
}

public static function desc(string $column, string|int|ParameterType|Type $type = ParameterType::STRING) : self

Check warning on line 24 in src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/Pagination/Key.php

View check run for this annotation

Codecov / codecov/patch

src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/Pagination/Key.php#L24

Added line #L24 was not covered by tests
{
return new self($column, Order::DESC, $type);

Check warning on line 26 in src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/Pagination/Key.php

View check run for this annotation

Codecov / codecov/patch

src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/Pagination/Key.php#L26

Added line #L26 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\Doctrine\Pagination;

final readonly class KeySet
{
/**
* @var array<Key>
*/
public array $keys;

public function __construct(Key ...$keys)
{
$this->keys = \array_reverse($keys);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\Doctrine\Pagination;

enum Order : string
{
case ASC = 'ASC';
case DESC = 'DESC';
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

namespace Flow\ETL\Adapter\Doctrine;

use Doctrine\DBAL\{ArrayParameterType as DbalArrayType, Connection, ParameterType as DbalParameterType};
use Doctrine\DBAL\{
ArrayParameterType as DbalArrayType,
Connection,
ParameterType,
ParameterType as DbalParameterType,
Types\Type
};
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Types\Type as DbalType;
use Flow\Doctrine\Bulk\{Dialect\MySQLInsertOptions,
Expand All @@ -13,7 +19,10 @@
Dialect\SqliteInsertOptions,
InsertOptions,
UpdateOptions};
use Flow\ETL\{Attribute\DocumentationDSL,
use Flow\ETL\{Adapter\Doctrine\Pagination\Key,
Adapter\Doctrine\Pagination\KeySet,
Adapter\Doctrine\Pagination\Order,
Attribute\DocumentationDSL,
Attribute\DocumentationExample,
Attribute\Module,
Attribute\Type as DSLType,
Expand Down Expand Up @@ -93,6 +102,15 @@ function from_dbal_limit_offset_qb(
return $loader;
}

#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::EXTRACTOR)]
function from_dbal_key_set_qb(
Connection $connection,
QueryBuilder $queryBuilder,
KeySet $key_set,
) : DbalKeySetExtractor {
return new DbalKeySetExtractor($connection, $queryBuilder, $key_set);
}

/**
* @param null|ParametersSet $parameters_set - each one parameters array will be evaluated as new query
* @param array<int|string, DbalArrayType|DbalParameterType|DbalType|int|string> $types
Expand Down Expand Up @@ -265,3 +283,21 @@ function postgresql_update_options(
$update_columns,
);
}

#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::HELPER)]
function pagination_key_asc(string $column, string|int|ParameterType|Type $type = ParameterType::STRING) : Key
{
return new Key($column, Order::ASC, $type);
}

#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::HELPER)]
function pagination_key_desc(string $column, string|int|ParameterType|Type $type = ParameterType::STRING) : Key
{
return new Key($column, Order::DESC, $type);
}

#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::HELPER)]
function pagination_key_set(Key ...$keys) : KeySet
{
return new KeySet(...$keys);
}
Loading