Skip to content

Commit 4f6ed64

Browse files
authored
Dbal bulk data optimization (#1776)
* Convert bulk values to dbal values using Types instead custom BulkData logic * Create a mechanism allowing to override or pass target table column types to Bulk * Static analysis * Updated documentation * Updated dependecies Removed problematic column from Dbal Bulk insert integration tests, due to differences in how different versions of dbal are handling jsonb columns. * Removed more fragile tests
1 parent 2343318 commit 4f6ed64

28 files changed

Lines changed: 1787 additions & 251 deletions

File tree

composer.lock

Lines changed: 11 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

documentation/components/adapters/doctrine.md

Lines changed: 122 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,115 @@ Adapter for [ETL](https://github.com/flow-php/etl) using bulk operations from [D
2929
## Loader - DbalLoader
3030

3131
```php
32+
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
33+
34+
data_frame()
35+
->read(from_())
36+
->write(to_dbal_table_insert(['url' => \getenv('PGSQL_DATABASE_URL')], 'your-table-name'))
37+
->run();
38+
```
39+
40+
All supported DbalLoader operations via DSL functions:
41+
42+
- `to_dbal_table_insert(array|Connection $connection, string $table, ?InsertOptions $options = null)` - Insert new rows
43+
- `to_dbal_table_update(array|Connection $connection, string $table, ?UpdateOptions $options = null)` - Update existing rows
44+
- `to_dbal_table_delete(array|Connection $connection, string $table)` - Delete rows
45+
46+
You can also configure bulk operations with platform-specific options:
47+
48+
```php
49+
use function Flow\ETL\Adapter\Doctrine\{to_dbal_table_insert, postgresql_insert_options};
50+
51+
data_frame()
52+
->read(from_())
53+
->write(to_dbal_table_insert(
54+
$connection,
55+
'users',
56+
postgresql_insert_options(conflict_columns: ['id'])
57+
))
58+
->run();
59+
```
60+
61+
### Type Detection and Optimization
62+
63+
`DbalLoader` now provides advanced type detection capabilities to optimize database operations:
64+
65+
#### Automatic Type Detection from Flow Schema
66+
67+
By default, `DbalLoader` automatically detects column types from the Flow Schema of your data:
68+
69+
```php
70+
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
71+
72+
data_frame()
73+
->read(from_())
74+
->write(to_dbal_table_insert($connection, 'users'))
75+
->run();
76+
// Types are automatically detected from the Flow Schema
77+
```
78+
79+
#### Manual Type Override
80+
81+
You can override specific column types for fine-grained control:
82+
83+
```php
84+
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
85+
use Doctrine\DBAL\Types\Type;
86+
use Doctrine\DBAL\Types\Types;
87+
88+
data_frame()
89+
->read(from_())
90+
->write(to_dbal_table_insert($connection, 'users')
91+
->withColumnTypes([
92+
'id' => Type::getType(Types::INTEGER),
93+
'email' => Type::getType(Types::STRING),
94+
'created_at' => Type::getType(Types::DATETIME_IMMUTABLE),
95+
]))
96+
->run();
97+
```
98+
99+
#### Custom Type Detector
100+
101+
For advanced scenarios, you can provide a custom type detector with your own type mapping:
102+
103+
```php
104+
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
105+
use Flow\ETL\Adapter\Doctrine\{DbalTypesDetector, TypesMap};
106+
use Flow\Types\Type\Native\StringType;
107+
use Doctrine\DBAL\Types\TextType;
108+
109+
$customTypesMap = new TypesMap([
110+
StringType::class => TextType::class, // Map Flow strings to DBAL text type
111+
]);
112+
32113
data_frame()
33114
->read(from_())
34-
->write(new DbalLoader('your-table-name', $bulkSize = 100, ['url' => \getenv('PGSQL_DATABASE_URL')], ['skip_conflicts' => true]))
115+
->write(to_dbal_table_insert($connection, 'users')
116+
->withTypesDetector(new DbalTypesDetector($customTypesMap)))
35117
->run();
36118
```
37119

38-
All supported types of `DbalBulkLoader` loading:
120+
#### Data Normalization
39121

40-
- `::insert(Connection $connection, string $table, QueryFactory $queryFactory = null) : self`
41-
- `::insertOrSkipOnConflict(Connection $connection, string $table, QueryFactory $queryFactory = null) : self`
42-
- `::insertOrUpdateOnConstraintConflict(Connection $connection, string $table, string $constraint, QueryFactory $queryFactory = null) : self`
122+
`DbalLoader` automatically handles data normalization for database compatibility:
43123

44-
The `bulkSize` means how many rows you want to push to a database in a single `INSERT` query. Each extracted rows set
45-
is going to be split before inserting data into the database.
124+
- **XML Entries**: `XMLEntry` and `XMLElementEntry` objects are automatically converted to their string representation
125+
- **Complex Types**: Lists, Maps, and Structures are serialized as JSON
126+
- **Type Safety**: All data is normalized while preserving type information for optimal database performance
127+
128+
```php
129+
use function Flow\ETL\DSL\{data_frame, from_array, xml_entry};
130+
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
131+
132+
data_frame()
133+
->read(from_array([
134+
['id' => 1, 'data' => xml_entry('data', $domDocument)],
135+
['id' => 2, 'data' => xml_entry('data', $domElement)],
136+
]))
137+
->write(to_dbal_table_insert($connection, 'xml_table'))
138+
->run();
139+
// XML entries are automatically converted to strings before database insertion
140+
```
46141

47142
## Extractor - DbalQuery
48143

@@ -168,7 +263,7 @@ new Table(
168263
When types map is not provided, the default one will be used:
169264

170265
```php
171-
public const DEFAULT_TYPES = [
266+
public const FLOW_TYPES = [
172267
StringType::class => \Doctrine\DBAL\Types\StringType::class,
173268
IntegerType::class => \Doctrine\DBAL\Types\IntegerType::class,
174269
FloatType::class => \Doctrine\DBAL\Types\FloatType::class,
@@ -184,4 +279,23 @@ public const DEFAULT_TYPES = [
184279
MapType::class => \Doctrine\DBAL\Types\JsonType::class,
185280
StructureType::class => \Doctrine\DBAL\Types\JsonType::class,
186281
];
282+
```
283+
284+
The `TypesMap` class provides bidirectional mapping between Flow types and Doctrine DBAL types, allowing for flexible type conversion in both directions:
285+
286+
```php
287+
use Flow\ETL\Adapter\Doctrine\TypesMap;
288+
use Flow\Types\Type\Native\StringType;
289+
use Doctrine\DBAL\Types\TextType;
290+
291+
// Create custom type mapping
292+
$customMap = new TypesMap([
293+
StringType::class => TextType::class,
294+
]);
295+
296+
// Convert Flow type to DBAL type
297+
$dbalType = $customMap->toDbalType(StringType::class);
298+
299+
// Convert DBAL type to Flow type instance
300+
$flowType = $customMap->toFlowType(TextType::class);
187301
```

documentation/components/libs/doctrine-dbal-bulk.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,39 @@ $bulk->insert(
3737

3838
```
3939

40+
### Type Optimization with BulkData
41+
42+
`BulkData` can accept optional type information to optimize database operations:
43+
44+
```php
45+
use Doctrine\DBAL\Types\Type;
46+
use Doctrine\DBAL\Types\Types;
47+
48+
$bulk = Bulk::create();
49+
$bulk->insert(
50+
$dbalConnection,
51+
'your-table-name',
52+
new BulkData(
53+
[
54+
['id' => 1, 'name' => 'Name One', 'created_at' => new \DateTime()],
55+
['id' => 2, 'name' => 'Name Two', 'created_at' => new \DateTime()],
56+
],
57+
[
58+
'id' => Type::getType(Types::INTEGER),
59+
'name' => Type::getType(Types::STRING),
60+
'created_at' => Type::getType(Types::DATETIME_IMMUTABLE),
61+
]
62+
)
63+
);
64+
```
65+
66+
**Type Detection Behavior:**
67+
68+
- **With Types**: When column types are provided, `BulkData` uses them directly for optimal performance
69+
- **Without Types**: When no types are provided, `BulkData` automatically queries the database to understand the table structure and determine appropriate types
70+
71+
This automatic type detection ensures data consistency but comes with a performance cost due to the additional database query. For optimal performance in high-throughput scenarios, explicitly providing column types is recommended.
72+
4073
Update:
4174

4275
```php

src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/DbalLoader.php

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,28 @@
55
namespace Flow\ETL\Adapter\Doctrine;
66

77
use Doctrine\DBAL\{Connection, DriverManager};
8+
use Doctrine\DBAL\Types\Type;
89
use Flow\Doctrine\Bulk\{Bulk, BulkData, InsertOptions, UpdateOptions};
910
use Flow\ETL\Exception\InvalidArgumentException;
1011
use Flow\ETL\{FlowContext, Loader, Rows};
1112

1213
final class DbalLoader implements Loader
1314
{
15+
private ?Bulk $bulk = null;
16+
17+
/**
18+
* @var null|array<string, Type>
19+
*/
20+
private ?array $columnTypes = null;
21+
1422
private ?Connection $connection = null;
1523

1624
private string $operation = 'insert';
1725

1826
private InsertOptions|UpdateOptions|null $operationOptions = null;
1927

28+
private ?DbalTypesDetector $typesDetector = null;
29+
2030
/**
2131
* @param array<string, mixed> $connectionParams
2232
*/
@@ -55,14 +65,28 @@ public static function fromConnection(
5565

5666
public function load(Rows $rows, FlowContext $context) : void
5767
{
58-
Bulk::create()->{$this->operation}(
68+
$normalizedData = (new RowsNormalizer())->normalize($rows->sortEntries());
69+
70+
$this->bulk()->{$this->operation}(
5971
$this->connection(),
6072
$this->tableName,
61-
new BulkData($rows->sortEntries()->toArray()),
73+
new BulkData($normalizedData, $this->typesDetector()->convert($rows->schema(), $this->columnTypes ?? [])),
6274
$this->operationOptions
6375
);
6476
}
6577

78+
/**
79+
* Override types taken from Flow Schema with explicitly provided DBAL types.
80+
*
81+
* @param array<string, Type> $types Column name => DBAL Type instance
82+
*/
83+
public function withColumnTypes(array $types) : self
84+
{
85+
$this->columnTypes = $types;
86+
87+
return $this;
88+
}
89+
6690
/**
6791
* @throws InvalidArgumentException
6892
*/
@@ -84,6 +108,25 @@ public function withOperationOptions(InsertOptions|UpdateOptions|null $operation
84108
return $this;
85109
}
86110

111+
/**
112+
* Set custom SchemaToTypesConverter with custom TypesMap.
113+
*/
114+
public function withTypesDetector(DbalTypesDetector $detector) : self
115+
{
116+
$this->typesDetector = $detector;
117+
118+
return $this;
119+
}
120+
121+
private function bulk() : Bulk
122+
{
123+
if ($this->bulk === null) {
124+
$this->bulk = Bulk::create();
125+
}
126+
127+
return $this->bulk;
128+
}
129+
87130
private function connection() : Connection
88131
{
89132
if ($this->connection === null) {
@@ -93,4 +136,15 @@ private function connection() : Connection
93136

94137
return $this->connection;
95138
}
139+
140+
private function typesDetector() : DbalTypesDetector
141+
{
142+
if ($this->typesDetector !== null) {
143+
return $this->typesDetector;
144+
}
145+
146+
$this->typesDetector = new DbalTypesDetector();
147+
148+
return $this->typesDetector;
149+
}
96150
}

0 commit comments

Comments
 (0)