Skip to content

Commit a9c7b43

Browse files
committed
feature: postgresql adapter schema converter
1 parent 95348f5 commit a9c7b43

15 files changed

Lines changed: 1169 additions & 19 deletions

File tree

documentation/components/adapters/postgresql.md

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,3 +400,131 @@ df()
400400
| `pgsql_insert_options(...)` | Configure insert behavior (conflicts, upsert) |
401401
| `pgsql_update_options($primaryKeys)` | Configure update behavior (primary key columns) |
402402
| `pgsql_delete_options($primaryKeys)` | Configure delete behavior (primary key columns) |
403+
404+
## Schema Conversion
405+
406+
Two helpers convert between a Flow `Schema` and a PostgreSQL table definition:
407+
408+
- `to_pgsql_schema_table()` turns a Flow `Schema` into a `Flow\PostgreSql\Schema\Table`, which can emit `CREATE TABLE`
409+
(and related index/constraint) SQL via `toSql()`.
410+
- `pgsql_table_to_flow_schema()` turns a `Flow\PostgreSql\Schema\Table` back into a Flow `Schema`.
411+
412+
Column types are resolved through the shared `EntryTypesMap` (Flow type → PostgreSQL column type), and per-column
413+
details — primary keys, unique constraints, indexes, length, precision/scale, defaults, identity, generated columns,
414+
and explicit type overrides — are driven by `PostgreSqlMetadata` entries attached to each schema definition.
415+
416+
### Creating a Table from a Flow Schema
417+
418+
`to_pgsql_schema_table()` returns a table definition; call `toSql()` on it and execute each statement to create the
419+
table:
420+
421+
```php
422+
use Flow\ETL\Adapter\PostgreSql\PostgreSqlMetadata;
423+
424+
use function Flow\ETL\Adapter\PostgreSql\to_pgsql_schema_table;
425+
use function Flow\ETL\DSL\{bool_schema, int_schema, json_schema, schema, str_schema};
426+
427+
$table = to_pgsql_schema_table(
428+
schema(
429+
int_schema('id', metadata: PostgreSqlMetadata::primaryKey('pk_users')),
430+
str_schema('name', metadata: PostgreSqlMetadata::length(120)),
431+
str_schema('email', metadata: PostgreSqlMetadata::indexUnique('uq_users_email')),
432+
bool_schema('active', metadata: PostgreSqlMetadata::default(true)),
433+
json_schema('payload'),
434+
),
435+
'users',
436+
);
437+
438+
foreach ($table->toSql() as $sql) {
439+
$client->execute($sql);
440+
}
441+
```
442+
443+
By default the table is created in the `public` schema; pass a third argument to target another one:
444+
445+
```php
446+
$table = to_pgsql_schema_table($schema, 'users', 'analytics');
447+
```
448+
449+
### Steering the Conversion with Metadata
450+
451+
`PostgreSqlMetadata` factories return `Metadata` objects you attach to a definition via the `metadata:` argument of the
452+
schema DSL helpers. Combine multiple entries with `merge()`:
453+
454+
```php
455+
use Flow\ETL\Adapter\PostgreSql\PostgreSqlMetadata;
456+
use Flow\PostgreSql\Schema\IdentityGeneration;
457+
458+
use function Flow\ETL\DSL\{float_schema, int_schema, schema, str_schema};
459+
460+
$schema = schema(
461+
int_schema('id', metadata: PostgreSqlMetadata::identity(IdentityGeneration::BY_DEFAULT)),
462+
str_schema('sku', metadata: PostgreSqlMetadata::type('citext')), // explicit type override
463+
float_schema('amount', metadata: PostgreSqlMetadata::precision(10)->merge(PostgreSqlMetadata::scale(2))),
464+
int_schema('total', metadata: PostgreSqlMetadata::generated('price * quantity')),
465+
);
466+
```
467+
468+
| Metadata | Effect on the generated column |
469+
|-----------------------------------|------------------------------------------------------------|
470+
| `PostgreSqlMetadata::type($name)` | Force a specific PostgreSQL type, bypassing the type map |
471+
| `PostgreSqlMetadata::length($n)` | Emit `varchar($n)` |
472+
| `PostgreSqlMetadata::precision($p)` / `::scale($s)` | Emit `numeric($p, $s)` |
473+
| `PostgreSqlMetadata::default($v)` | Set a column `DEFAULT` |
474+
| `PostgreSqlMetadata::primaryKey($name)` | Include the column in the table primary key |
475+
| `PostgreSqlMetadata::indexUnique($name)` | Include the column in a named `UNIQUE` constraint |
476+
| `PostgreSqlMetadata::index($name)` | Include the column in a named index |
477+
| `PostgreSqlMetadata::identity($generation)` | Make the column an identity column |
478+
| `PostgreSqlMetadata::generated($expr)` | Make the column a generated column |
479+
480+
Columns sharing the same primary key, unique constraint, or index name are grouped together, so composite keys are
481+
expressed by attaching the same name to several definitions.
482+
483+
### Reading a Flow Schema back from a Table
484+
485+
`pgsql_table_to_flow_schema()` takes a `Flow\PostgreSql\Schema\Table` and returns a Flow `Schema`. Combine it with the
486+
PostgreSQL library's catalog provider to derive a Flow schema from a live table:
487+
488+
```php
489+
use function Flow\ETL\Adapter\PostgreSql\pgsql_table_to_flow_schema;
490+
use function Flow\PostgreSql\DSL\client_catalog_provider;
491+
492+
$table = client_catalog_provider($client, ['public'])
493+
->get()
494+
->get('public')
495+
->table('users');
496+
497+
$schema = pgsql_table_to_flow_schema($table);
498+
```
499+
500+
> **Note:** The reverse conversion is intentionally lossy. Several Flow types collapse onto the same PostgreSQL type
501+
> (for example `json`, `list`, `map`, and `structure` all map to `jsonb`), so a column is mapped back to a single
502+
> canonical Flow type rather than its original one.
503+
504+
### Customizing the Type Mapping
505+
506+
Both helpers accept an optional `EntryTypesMap`. Its second constructor argument overrides the Flow type → PostgreSQL
507+
column type mapping (the first argument keeps overriding the value-binding types used by the loader):
508+
509+
```php
510+
use Flow\ETL\Adapter\PostgreSql\EntryTypesMap;
511+
use Flow\PostgreSql\QueryBuilder\Schema\ColumnType;
512+
use Flow\Types\Type\Native\StringType;
513+
514+
use function Flow\ETL\Adapter\PostgreSql\to_pgsql_schema_table;
515+
516+
$table = to_pgsql_schema_table(
517+
$schema,
518+
'users',
519+
typesMap: new EntryTypesMap([], [
520+
StringType::class => ColumnType::varchar(255), // default strings to varchar(255) instead of text
521+
]),
522+
);
523+
```
524+
525+
### Schema Conversion DSL Functions Reference
526+
527+
| Function | Description |
528+
|-----------------------------------------------------------------------|--------------------------------------------------------|
529+
| `to_pgsql_schema_table($schema, $tableName, $databaseSchema, $typesMap)` | Convert a Flow `Schema` into a PostgreSQL `Table` |
530+
| `pgsql_table_to_flow_schema($table, $typesMap)` | Convert a PostgreSQL `Table` into a Flow `Schema` |

src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Unit/CSVLineReaderTest.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
use Flow\ETL\Adapter\CSV\CSVLineReader;
88
use Flow\ETL\Adapter\CSV\Tests\Double\LengthCapturingSourceStream;
99
use Flow\Filesystem\Stream\MemorySourceStream;
10-
use PHPUnit\Framework\Attributes\TestWith;
1110
use PHPUnit\Framework\TestCase;
1211

1312
use function Flow\Filesystem\DSL\path;

src/adapter/etl-adapter-postgresql/src/Flow/ETL/Adapter/PostgreSql/EntryTypesMap.php

Lines changed: 108 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,27 +25,41 @@
2525
use Flow\ETL\Row\Entry\XMLEntry;
2626
use Flow\PostgreSql\Client\TypedValue;
2727
use Flow\PostgreSql\Client\Types\ValueType;
28+
use Flow\PostgreSql\QueryBuilder\Schema\ColumnType;
29+
use Flow\Types\Type;
30+
use Flow\Types\Type\Logical\DateTimeType;
31+
use Flow\Types\Type\Logical\DateType;
32+
use Flow\Types\Type\Logical\HTMLElementType;
33+
use Flow\Types\Type\Logical\HTMLType;
34+
use Flow\Types\Type\Logical\JsonType;
35+
use Flow\Types\Type\Logical\ListType;
36+
use Flow\Types\Type\Logical\MapType;
37+
use Flow\Types\Type\Logical\StructureType;
38+
use Flow\Types\Type\Logical\TimeType;
39+
use Flow\Types\Type\Logical\UuidType;
40+
use Flow\Types\Type\Logical\XMLElementType as LogicalXMLElementType;
41+
use Flow\Types\Type\Logical\XMLType;
42+
use Flow\Types\Type\Native\BooleanType;
43+
use Flow\Types\Type\Native\EnumType;
44+
use Flow\Types\Type\Native\FloatType;
45+
use Flow\Types\Type\Native\IntegerType;
46+
use Flow\Types\Type\Native\StringType;
2847

2948
use function array_key_exists;
3049
use function array_merge;
50+
use function Flow\Types\DSL\type_boolean;
51+
use function Flow\Types\DSL\type_date;
52+
use function Flow\Types\DSL\type_datetime;
53+
use function Flow\Types\DSL\type_float;
54+
use function Flow\Types\DSL\type_integer;
55+
use function Flow\Types\DSL\type_json;
56+
use function Flow\Types\DSL\type_string;
57+
use function Flow\Types\DSL\type_time;
58+
use function Flow\Types\DSL\type_uuid;
59+
use function Flow\Types\DSL\type_xml;
3160

3261
/**
33-
* Maps ETL Entry types to PostgreSQL types.
34-
*
35-
* Users can customize the mapping by passing overrides to the constructor.
36-
* Entry classes not in the map will throw TypeMappingException.
37-
*
38-
* Example usage:
39-
* ```php
40-
* // Use defaults
41-
* $map = new EntryTypesMap();
42-
*
43-
* // Override specific types
44-
* $map = new EntryTypesMap([
45-
* IntegerEntry::class => ValueType::INT2,
46-
* ListEntry::class => ValueType::TEXT_ARRAY,
47-
* ]);
48-
* ```
62+
* Maps between Flow ETL types and PostgreSQL types.
4963
*/
5064
final readonly class EntryTypesMap
5165
{
@@ -79,12 +93,19 @@
7993
*/
8094
private array $typeMap;
8195

96+
/**
97+
* @var array<class-string<Type<mixed>>, ColumnType>
98+
*/
99+
private array $columnTypeMap;
100+
82101
/**
83102
* @param array<class-string<Entry<mixed>>, ValueType> $overrides Entry class to ValueType mappings that override defaults
103+
* @param array<class-string<Type<mixed>>, ColumnType> $columnTypeOverrides Flow Type class to ColumnType mappings that override defaults
84104
*/
85-
public function __construct(array $overrides = [])
105+
public function __construct(array $overrides = [], array $columnTypeOverrides = [])
86106
{
87107
$this->typeMap = array_merge(self::DEFAULT_TYPES, $overrides);
108+
$this->columnTypeMap = array_merge(self::defaultColumnTypes(), $columnTypeOverrides);
88109
}
89110

90111
/**
@@ -108,4 +129,74 @@ public function mapEntry(Entry $entry): ?TypedValue
108129

109130
return new TypedValue($entry->value(), $this->typeMap[$entryClass]);
110131
}
132+
133+
/**
134+
* Maps a Flow type to a PostgreSQL DDL column type.
135+
*
136+
* @param Type<mixed> $type
137+
*
138+
* @throws TypeMappingException when the Flow type cannot be mapped
139+
*/
140+
public function toColumnType(Type $type): ColumnType
141+
{
142+
$typeClass = $type::class;
143+
144+
if (!array_key_exists($typeClass, $this->columnTypeMap)) {
145+
throw TypeMappingException::unsupportedFlowType($typeClass);
146+
}
147+
148+
return $this->columnTypeMap[$typeClass];
149+
}
150+
151+
/**
152+
* Maps a PostgreSQL DDL column type back to a canonical Flow type.
153+
*
154+
* @throws TypeMappingException when the PostgreSQL type cannot be mapped
155+
*
156+
* @return Type<mixed>
157+
*/
158+
public function toFlowType(ColumnType $columnType): Type
159+
{
160+
$name = $columnType->normalize()['name'];
161+
162+
return match ($name) {
163+
'int2', 'int4', 'int8', 'smallserial', 'serial', 'bigserial' => type_integer(),
164+
'float4', 'float8', 'numeric' => type_float(),
165+
'bool' => type_boolean(),
166+
'text', 'varchar', 'bpchar', 'bytea' => type_string(),
167+
'date' => type_date(),
168+
'time' => type_time(),
169+
'timestamp', 'timestamptz' => type_datetime(),
170+
'uuid' => type_uuid(),
171+
'json', 'jsonb' => type_json(),
172+
'xml' => type_xml(),
173+
default => throw TypeMappingException::unsupportedColumnType($name),
174+
};
175+
}
176+
177+
/**
178+
* @return array<class-string<Type<mixed>>, ColumnType>
179+
*/
180+
private static function defaultColumnTypes(): array
181+
{
182+
return [
183+
IntegerType::class => ColumnType::bigint(),
184+
StringType::class => ColumnType::text(),
185+
FloatType::class => ColumnType::doublePrecision(),
186+
BooleanType::class => ColumnType::boolean(),
187+
DateType::class => ColumnType::date(),
188+
TimeType::class => ColumnType::time(),
189+
DateTimeType::class => ColumnType::timestamptz(),
190+
UuidType::class => ColumnType::uuid(),
191+
JsonType::class => ColumnType::jsonb(),
192+
ListType::class => ColumnType::jsonb(),
193+
MapType::class => ColumnType::jsonb(),
194+
StructureType::class => ColumnType::jsonb(),
195+
EnumType::class => ColumnType::text(),
196+
XMLType::class => ColumnType::xml(),
197+
LogicalXMLElementType::class => ColumnType::xml(),
198+
HTMLType::class => ColumnType::text(),
199+
HTMLElementType::class => ColumnType::text(),
200+
];
201+
}
111202
}

src/adapter/etl-adapter-postgresql/src/Flow/ETL/Adapter/PostgreSql/Exception/TypeMappingException.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,27 @@ public static function ambiguousEntryType(string $entryClass): self
1616
));
1717
}
1818

19+
public static function unsupportedColumnType(string $pgTypeName): self
20+
{
21+
return new self(sprintf(
22+
'PostgreSQL type "%s" cannot be automatically mapped to a Flow type. Provide a column type override to map it explicitly.',
23+
$pgTypeName,
24+
));
25+
}
26+
1927
public static function unsupportedEntryType(string $entryClass): self
2028
{
2129
return new self(sprintf(
2230
'Entry type "%s" is not supported for PostgreSQL mapping. Use withColumnType() to specify the target type explicitly or provide a custom EntryTypeMapper.',
2331
$entryClass,
2432
));
2533
}
34+
35+
public static function unsupportedFlowType(string $flowTypeClass): self
36+
{
37+
return new self(sprintf(
38+
'Flow type "%s" cannot be automatically mapped to a PostgreSQL column type. Provide a column type override to map it explicitly.',
39+
$flowTypeClass,
40+
));
41+
}
2642
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Adapter\PostgreSql;
6+
7+
use Flow\ETL\Schema\Metadata;
8+
use Flow\PostgreSql\Schema\IdentityGeneration;
9+
10+
/**
11+
* Per-column metadata keys understood by {@see SchemaConverter} when converting a Flow Schema to a
12+
* PostgreSQL table.
13+
*/
14+
enum PostgreSqlMetadata: string
15+
{
16+
case DEFAULT = 'pgsql_column_default';
17+
case GENERATED = 'pgsql_column_generated';
18+
case IDENTITY = 'pgsql_column_identity';
19+
case INDEX = 'pgsql_column_index';
20+
case INDEX_UNIQUE = 'pgsql_column_index_unique';
21+
case LENGTH = 'pgsql_column_length';
22+
case PRECISION = 'pgsql_column_precision';
23+
case PRIMARY_KEY = 'pgsql_column_primary';
24+
case SCALE = 'pgsql_column_scale';
25+
case TYPE = 'pgsql_column_type';
26+
27+
public static function default(bool|float|int|string $value): Metadata
28+
{
29+
return Metadata::with(self::DEFAULT->value, $value);
30+
}
31+
32+
public static function generated(string $expression): Metadata
33+
{
34+
return Metadata::with(self::GENERATED->value, $expression);
35+
}
36+
37+
public static function identity(IdentityGeneration $generation = IdentityGeneration::ALWAYS): Metadata
38+
{
39+
return Metadata::with(self::IDENTITY->value, $generation->value);
40+
}
41+
42+
public static function index(string $name): Metadata
43+
{
44+
return Metadata::with(self::INDEX->value, $name);
45+
}
46+
47+
public static function indexUnique(string $name): Metadata
48+
{
49+
return Metadata::with(self::INDEX_UNIQUE->value, $name);
50+
}
51+
52+
public static function length(int $length): Metadata
53+
{
54+
return Metadata::with(self::LENGTH->value, $length);
55+
}
56+
57+
public static function precision(int $precision): Metadata
58+
{
59+
return Metadata::with(self::PRECISION->value, $precision);
60+
}
61+
62+
public static function primaryKey(string $name = ''): Metadata
63+
{
64+
return Metadata::with(self::PRIMARY_KEY->value, $name);
65+
}
66+
67+
public static function scale(int $scale): Metadata
68+
{
69+
return Metadata::with(self::SCALE->value, $scale);
70+
}
71+
72+
public static function type(string $type): Metadata
73+
{
74+
return Metadata::with(self::TYPE->value, $type);
75+
}
76+
}

0 commit comments

Comments
 (0)