Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions documentation/components/adapters/postgresql.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,131 @@ df()
| `pgsql_insert_options(...)` | Configure insert behavior (conflicts, upsert) |
| `pgsql_update_options($primaryKeys)` | Configure update behavior (primary key columns) |
| `pgsql_delete_options($primaryKeys)` | Configure delete behavior (primary key columns) |

## Schema Conversion

Two helpers convert between a Flow `Schema` and a PostgreSQL table definition:

- `to_pgsql_schema_table()` turns a Flow `Schema` into a `Flow\PostgreSql\Schema\Table`, which can emit `CREATE TABLE`
(and related index/constraint) SQL via `toSql()`.
- `pgsql_table_to_flow_schema()` turns a `Flow\PostgreSql\Schema\Table` back into a Flow `Schema`.

Column types are resolved through the shared `EntryTypesMap` (Flow type → PostgreSQL column type), and per-column
details — primary keys, unique constraints, indexes, length, precision/scale, defaults, identity, generated columns,
and explicit type overrides — are driven by `PostgreSqlMetadata` entries attached to each schema definition.

### Creating a Table from a Flow Schema

`to_pgsql_schema_table()` returns a table definition; call `toSql()` on it and execute each statement to create the
table:

```php
use Flow\ETL\Adapter\PostgreSql\PostgreSqlMetadata;

use function Flow\ETL\Adapter\PostgreSql\to_pgsql_schema_table;
use function Flow\ETL\DSL\{bool_schema, int_schema, json_schema, schema, str_schema};

$table = to_pgsql_schema_table(
schema(
int_schema('id', metadata: PostgreSqlMetadata::primaryKey('pk_users')),
str_schema('name', metadata: PostgreSqlMetadata::length(120)),
str_schema('email', metadata: PostgreSqlMetadata::indexUnique('uq_users_email')),
bool_schema('active', metadata: PostgreSqlMetadata::default(true)),
json_schema('payload'),
),
'users',
);

foreach ($table->toSql() as $sql) {
$client->execute($sql);
}
```

By default the table is created in the `public` schema; pass a third argument to target another one:

```php
$table = to_pgsql_schema_table($schema, 'users', 'analytics');
```

### Steering the Conversion with Metadata

`PostgreSqlMetadata` factories return `Metadata` objects you attach to a definition via the `metadata:` argument of the
schema DSL helpers. Combine multiple entries with `merge()`:

```php
use Flow\ETL\Adapter\PostgreSql\PostgreSqlMetadata;
use Flow\PostgreSql\Schema\IdentityGeneration;

use function Flow\ETL\DSL\{float_schema, int_schema, schema, str_schema};

$schema = schema(
int_schema('id', metadata: PostgreSqlMetadata::identity(IdentityGeneration::BY_DEFAULT)),
str_schema('sku', metadata: PostgreSqlMetadata::type('citext')), // explicit type override
float_schema('amount', metadata: PostgreSqlMetadata::precision(10)->merge(PostgreSqlMetadata::scale(2))),
int_schema('total', metadata: PostgreSqlMetadata::generated('price * quantity')),
);
```

| Metadata | Effect on the generated column |
|-----------------------------------|------------------------------------------------------------|
| `PostgreSqlMetadata::type($name)` | Force a specific PostgreSQL type, bypassing the type map |
| `PostgreSqlMetadata::length($n)` | Emit `varchar($n)` |
| `PostgreSqlMetadata::precision($p)` / `::scale($s)` | Emit `numeric($p, $s)` |
| `PostgreSqlMetadata::default($v)` | Set a column `DEFAULT` |
| `PostgreSqlMetadata::primaryKey($name)` | Include the column in the table primary key |
| `PostgreSqlMetadata::indexUnique($name)` | Include the column in a named `UNIQUE` constraint |
| `PostgreSqlMetadata::index($name)` | Include the column in a named index |
| `PostgreSqlMetadata::identity($generation)` | Make the column an identity column |
| `PostgreSqlMetadata::generated($expr)` | Make the column a generated column |

Columns sharing the same primary key, unique constraint, or index name are grouped together, so composite keys are
expressed by attaching the same name to several definitions.

### Reading a Flow Schema back from a Table

`pgsql_table_to_flow_schema()` takes a `Flow\PostgreSql\Schema\Table` and returns a Flow `Schema`. Combine it with the
PostgreSQL library's catalog provider to derive a Flow schema from a live table:

```php
use function Flow\ETL\Adapter\PostgreSql\pgsql_table_to_flow_schema;
use function Flow\PostgreSql\DSL\client_catalog_provider;

$table = client_catalog_provider($client, ['public'])
->get()
->get('public')
->table('users');

$schema = pgsql_table_to_flow_schema($table);
```

> **Note:** The reverse conversion is intentionally lossy. Several Flow types collapse onto the same PostgreSQL type
> (for example `json`, `list`, `map`, and `structure` all map to `jsonb`), so a column is mapped back to a single
> canonical Flow type rather than its original one.

### Customizing the Type Mapping

Both helpers accept an optional `EntryTypesMap`. Its second constructor argument overrides the Flow type → PostgreSQL
column type mapping (the first argument keeps overriding the value-binding types used by the loader):

```php
use Flow\ETL\Adapter\PostgreSql\EntryTypesMap;
use Flow\PostgreSql\QueryBuilder\Schema\ColumnType;
use Flow\Types\Type\Native\StringType;

use function Flow\ETL\Adapter\PostgreSql\to_pgsql_schema_table;

$table = to_pgsql_schema_table(
$schema,
'users',
typesMap: new EntryTypesMap([], [
StringType::class => ColumnType::varchar(255), // default strings to varchar(255) instead of text
]),
);
```

### Schema Conversion DSL Functions Reference

| Function | Description |
|-----------------------------------------------------------------------|--------------------------------------------------------|
| `to_pgsql_schema_table($schema, $tableName, $databaseSchema, $typesMap)` | Convert a Flow `Schema` into a PostgreSQL `Table` |
| `pgsql_table_to_flow_schema($table, $typesMap)` | Convert a PostgreSQL `Table` into a Flow `Schema` |
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use Flow\ETL\Adapter\CSV\CSVLineReader;
use Flow\ETL\Adapter\CSV\Tests\Double\LengthCapturingSourceStream;
use Flow\Filesystem\Stream\MemorySourceStream;
use PHPUnit\Framework\Attributes\TestWith;
use PHPUnit\Framework\TestCase;

use function Flow\Filesystem\DSL\path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,41 @@
use Flow\ETL\Row\Entry\XMLEntry;
use Flow\PostgreSql\Client\TypedValue;
use Flow\PostgreSql\Client\Types\ValueType;
use Flow\PostgreSql\QueryBuilder\Schema\ColumnType;
use Flow\Types\Type;
use Flow\Types\Type\Logical\DateTimeType;
use Flow\Types\Type\Logical\DateType;
use Flow\Types\Type\Logical\HTMLElementType;
use Flow\Types\Type\Logical\HTMLType;
use Flow\Types\Type\Logical\JsonType;
use Flow\Types\Type\Logical\ListType;
use Flow\Types\Type\Logical\MapType;
use Flow\Types\Type\Logical\StructureType;
use Flow\Types\Type\Logical\TimeType;
use Flow\Types\Type\Logical\UuidType;
use Flow\Types\Type\Logical\XMLElementType as LogicalXMLElementType;
use Flow\Types\Type\Logical\XMLType;
use Flow\Types\Type\Native\BooleanType;
use Flow\Types\Type\Native\EnumType;
use Flow\Types\Type\Native\FloatType;
use Flow\Types\Type\Native\IntegerType;
use Flow\Types\Type\Native\StringType;

use function array_key_exists;
use function array_merge;
use function Flow\Types\DSL\type_boolean;
use function Flow\Types\DSL\type_date;
use function Flow\Types\DSL\type_datetime;
use function Flow\Types\DSL\type_float;
use function Flow\Types\DSL\type_integer;
use function Flow\Types\DSL\type_json;
use function Flow\Types\DSL\type_string;
use function Flow\Types\DSL\type_time;
use function Flow\Types\DSL\type_uuid;
use function Flow\Types\DSL\type_xml;

/**
* Maps ETL Entry types to PostgreSQL types.
*
* Users can customize the mapping by passing overrides to the constructor.
* Entry classes not in the map will throw TypeMappingException.
*
* Example usage:
* ```php
* // Use defaults
* $map = new EntryTypesMap();
*
* // Override specific types
* $map = new EntryTypesMap([
* IntegerEntry::class => ValueType::INT2,
* ListEntry::class => ValueType::TEXT_ARRAY,
* ]);
* ```
* Maps between Flow ETL types and PostgreSQL types.
*/
final readonly class EntryTypesMap
{
Expand Down Expand Up @@ -79,12 +93,19 @@
*/
private array $typeMap;

/**
* @var array<class-string<Type<mixed>>, ColumnType>
*/
private array $columnTypeMap;

/**
* @param array<class-string<Entry<mixed>>, ValueType> $overrides Entry class to ValueType mappings that override defaults
* @param array<class-string<Type<mixed>>, ColumnType> $columnTypeOverrides Flow Type class to ColumnType mappings that override defaults
*/
public function __construct(array $overrides = [])
public function __construct(array $overrides = [], array $columnTypeOverrides = [])
{
$this->typeMap = array_merge(self::DEFAULT_TYPES, $overrides);
$this->columnTypeMap = array_merge(self::defaultColumnTypes(), $columnTypeOverrides);
}

/**
Expand All @@ -108,4 +129,74 @@ public function mapEntry(Entry $entry): ?TypedValue

return new TypedValue($entry->value(), $this->typeMap[$entryClass]);
}

/**
* Maps a Flow type to a PostgreSQL DDL column type.
*
* @param Type<mixed> $type
*
* @throws TypeMappingException when the Flow type cannot be mapped
*/
public function toColumnType(Type $type): ColumnType
{
$typeClass = $type::class;

if (!array_key_exists($typeClass, $this->columnTypeMap)) {
throw TypeMappingException::unsupportedFlowType($typeClass);
}

return $this->columnTypeMap[$typeClass];
}

/**
* Maps a PostgreSQL DDL column type back to a canonical Flow type.
*
* @throws TypeMappingException when the PostgreSQL type cannot be mapped
*
* @return Type<mixed>
*/
public function toFlowType(ColumnType $columnType): Type
{
$name = $columnType->normalize()['name'];

return match ($name) {
'int2', 'int4', 'int8', 'smallserial', 'serial', 'bigserial' => type_integer(),
'float4', 'float8', 'numeric' => type_float(),
'bool' => type_boolean(),
'text', 'varchar', 'bpchar', 'bytea' => type_string(),
'date' => type_date(),
'time' => type_time(),
'timestamp', 'timestamptz' => type_datetime(),
'uuid' => type_uuid(),
'json', 'jsonb' => type_json(),
'xml' => type_xml(),
default => throw TypeMappingException::unsupportedColumnType($name),
};
}

/**
* @return array<class-string<Type<mixed>>, ColumnType>
*/
private static function defaultColumnTypes(): array
{
return [
IntegerType::class => ColumnType::bigint(),
StringType::class => ColumnType::text(),
FloatType::class => ColumnType::doublePrecision(),
BooleanType::class => ColumnType::boolean(),
DateType::class => ColumnType::date(),
TimeType::class => ColumnType::time(),
DateTimeType::class => ColumnType::timestamptz(),
UuidType::class => ColumnType::uuid(),
JsonType::class => ColumnType::jsonb(),
ListType::class => ColumnType::jsonb(),
MapType::class => ColumnType::jsonb(),
StructureType::class => ColumnType::jsonb(),
EnumType::class => ColumnType::text(),
XMLType::class => ColumnType::xml(),
LogicalXMLElementType::class => ColumnType::xml(),
HTMLType::class => ColumnType::text(),
HTMLElementType::class => ColumnType::text(),
];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,27 @@ public static function ambiguousEntryType(string $entryClass): self
));
}

public static function unsupportedColumnType(string $pgTypeName): self
{
return new self(sprintf(
'PostgreSQL type "%s" cannot be automatically mapped to a Flow type. Provide a column type override to map it explicitly.',
$pgTypeName,
));
}

public static function unsupportedEntryType(string $entryClass): self
{
return new self(sprintf(
'Entry type "%s" is not supported for PostgreSQL mapping. Use withColumnType() to specify the target type explicitly or provide a custom EntryTypeMapper.',
$entryClass,
));
}

public static function unsupportedFlowType(string $flowTypeClass): self
{
return new self(sprintf(
'Flow type "%s" cannot be automatically mapped to a PostgreSQL column type. Provide a column type override to map it explicitly.',
$flowTypeClass,
));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\PostgreSql;

use Flow\ETL\Schema\Metadata;
use Flow\PostgreSql\Schema\IdentityGeneration;

/**
* Per-column metadata keys understood by {@see SchemaConverter} when converting a Flow Schema to a
* PostgreSQL table.
*/
enum PostgreSqlMetadata: string
{
case DEFAULT = 'pgsql_column_default';
case GENERATED = 'pgsql_column_generated';
case IDENTITY = 'pgsql_column_identity';
case INDEX = 'pgsql_column_index';
case INDEX_UNIQUE = 'pgsql_column_index_unique';
case LENGTH = 'pgsql_column_length';
case PRECISION = 'pgsql_column_precision';
case PRIMARY_KEY = 'pgsql_column_primary';
case SCALE = 'pgsql_column_scale';
case TYPE = 'pgsql_column_type';

public static function default(bool|float|int|string $value): Metadata
{
return Metadata::with(self::DEFAULT->value, $value);
}

public static function generated(string $expression): Metadata
{
return Metadata::with(self::GENERATED->value, $expression);
}

public static function identity(IdentityGeneration $generation = IdentityGeneration::ALWAYS): Metadata
{
return Metadata::with(self::IDENTITY->value, $generation->value);
}

public static function index(string $name): Metadata
{
return Metadata::with(self::INDEX->value, $name);
}

public static function indexUnique(string $name): Metadata
{
return Metadata::with(self::INDEX_UNIQUE->value, $name);
}

public static function length(int $length): Metadata
{
return Metadata::with(self::LENGTH->value, $length);
}

public static function precision(int $precision): Metadata
{
return Metadata::with(self::PRECISION->value, $precision);
}

public static function primaryKey(string $name = ''): Metadata
{
return Metadata::with(self::PRIMARY_KEY->value, $name);
}

public static function scale(int $scale): Metadata
{
return Metadata::with(self::SCALE->value, $scale);
}

public static function type(string $type): Metadata
{
return Metadata::with(self::TYPE->value, $type);
}
}
Loading
Loading