Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,33 @@ To connect to an external server (e.g., running in Docker), use the ``postgresql

By default, it connects to ``127.0.0.1:5432``.

Chaining fixtures
-----------------

You can chain multiple ``postgresql_noproc`` fixtures to layer your data pre-population. Each fixture in the chain will create its own template database based on the previous one.

.. code-block:: python

from pytest_postgresql import factories

# 1. Start with a process or a no-process base
base_proc = factories.postgresql_proc(load=[load_schema])

# 2. Add a layer with some data
seeded_noproc = factories.postgresql_noproc(depends_on="base_proc", load=[load_data])

# 3. Add another layer with more data
more_seeded_noproc = factories.postgresql_noproc(depends_on="seeded_noproc", load=[load_more_data])

# 4. Use the final layer in your test
client = factories.postgresql("more_seeded_noproc")



.. image:: https://raw.githubusercontent.com/dbfixtures/pytest-postgresql/main/docs/images/architecture_chaining.svg
:alt: Fixture Chaining Diagram
:align: center

Configuration
=============

Expand Down
22 changes: 22 additions & 0 deletions docs/architecture_chaining.mmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
sequenceDiagram
participant Test as Test
participant ProcF as base_proc Fixture
participant NoProc1 as seeded_noproc Fixture
participant NoProc2 as more_seeded_noproc Fixture
participant DB as PostgreSQL DB

Test->>ProcF: request base_proc
ProcF->>DB: init database & run load_schema
ProcF-->>Test: return PostgreSQLExecutor

Test->>NoProc1: request seeded_noproc (depends_on=base_proc)
NoProc1->>ProcF: read connection/template info
NoProc1->>DB: create layered DB / run load_data
NoProc1-->>Test: return NoopExecutor

Test->>NoProc2: request more_seeded_noproc (depends_on=seeded_noproc)
NoProc2->>NoProc1: read connection/template info
NoProc2->>DB: run load_more_data on layered DB
NoProc2-->>Test: return NoopExecutor

Test->>Test: validate tables and data across layers
5 changes: 5 additions & 0 deletions newsfragments/890.break.1.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bump the minimum supported pytest version to 8.2.

The previous minimum was about two years old, and older pytest versions
can be flaky with fixture chaining that relies on `getfixturevalue` on
Python 3.12-3.13 when used alongside xdist.
1 change: 1 addition & 0 deletions newsfragments/890.break.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor ``DatabaseJanitor`` to use explicit template management. This includes a new ``as_template`` flag and making ``dbname`` a required parameter.
1 change: 1 addition & 0 deletions newsfragments/890.docs.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a Mermaid sequence diagram to the documentation to illustrate fixture chaining and hierarchical cloning.
1 change: 1 addition & 0 deletions newsfragments/890.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``depends_on`` parameter to ``postgresql_noproc`` factory to allow hierarchical cloning and chaining of process fixtures.
3 changes: 1 addition & 2 deletions oldest/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pytest == 7.4; python_version >= "3.14"
pytest == 7.2; python_version < "3.14"
pytest == 8.2
port-for == 0.7.3
mirakuru == 2.6.0
psycopg == 3.0.0
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ classifiers = [
"Framework :: Pytest",
]
dependencies = [
"pytest >= 7.2",
"pytest >= 8.2",
"port-for >= 0.7.3",
"mirakuru >= 2.6.0",
"packaging",
Expand Down
39 changes: 31 additions & 8 deletions pytest_postgresql/factories/noprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def postgresql_noproc(
dbname: str | None = None,
options: str = "",
load: list[Callable | str | Path] | None = None,
depends_on: str | None = None,
) -> Callable[[FixtureRequest], Iterator[NoopExecutor]]:
"""Postgresql noprocess factory.

Expand All @@ -55,6 +56,7 @@ def postgresql_noproc(
:param dbname: postgresql database name
:param options: Postgresql connection options
:param load: List of functions used to initialize database's template.
:param depends_on: Optional name of the fixture to depend on.
:returns: function which makes a postgresql process
"""

Expand All @@ -66,32 +68,53 @@ def postgresql_noproc_fixture(request: FixtureRequest) -> Iterator[NoopExecutor]
:returns: tcp executor-like object
"""
config = get_config(request)
pg_host = host or config.host
pg_port = port or config.port or 5432
pg_user = user or config.user
pg_password = password or config.password

if depends_on:
base = request.getfixturevalue(depends_on)
pg_host = host or base.host
pg_port = port or base.port
pg_user = user or base.user
pg_password = password or base.password
pg_options = options or base.options
base_template_dbname = base.template_dbname
else:
pg_host = host or config.host
pg_port = port or config.port or 5432
pg_user = user or config.user
pg_password = password or config.password
pg_options = options or config.options
base_template_dbname = None

pg_dbname = xdistify_dbname(dbname or config.dbname)
pg_options = options or config.options
pg_load = load or config.load
drop_test_database = config.drop_test_database

# In this case there's a risk that both seeded and depends_on fixture
# might end up with the same configured dbname.
if depends_on and not dbname:
noop_exec_dbname = f"{pg_dbname}_{depends_on}"
else:
noop_exec_dbname = pg_dbname

noop_exec = NoopExecutor(
host=pg_host,
port=pg_port,
user=pg_user,
password=pg_password,
dbname=pg_dbname,
dbname=noop_exec_dbname,
options=pg_options,
)
janitor = DatabaseJanitor(
user=noop_exec.user,
host=noop_exec.host,
port=noop_exec.port,
template_dbname=noop_exec.template_dbname,
dbname=noop_exec.template_dbname,
template_dbname=base_template_dbname,
as_template=True,
version=noop_exec.version,
password=noop_exec.password,
)
if drop_test_database is True:
if drop_test_database:
janitor.drop()
with janitor:
for load_element in pg_load:
Expand Down
3 changes: 2 additions & 1 deletion pytest_postgresql/factories/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ def postgresql_proc_fixture(
user=postgresql_executor.user,
host=postgresql_executor.host,
port=postgresql_executor.port,
template_dbname=postgresql_executor.template_dbname,
dbname=postgresql_executor.template_dbname,
as_template=True,
version=postgresql_executor.version,
password=postgresql_executor.password,
)
Expand Down
41 changes: 21 additions & 20 deletions pytest_postgresql/janitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ def __init__(
host: str,
port: str | int,
version: str | float | Version, # type: ignore[valid-type]
dbname: str | None = None,
dbname: str,
template_dbname: str | None = None,
as_template: bool = False,
password: str | None = None,
isolation_level: "psycopg.IsolationLevel | None" = None,
connection_timeout: int = 60,
Expand All @@ -40,7 +41,8 @@ def __init__(
:param host: postgresql host
:param port: postgresql port
:param dbname: database name
:param dbname: template database name
:param template_dbname: template database name to clone from
:param as_template: whether to mark the database as a template
:param version: postgresql version number
:param password: optional postgresql password
:param isolation_level: optional postgresql isolation level
Expand All @@ -52,10 +54,9 @@ def __init__(
self.password = password
self.host = host
self.port = port
# At least one of the dbname or template_dbname has to be filled.
assert any([dbname, template_dbname])
self.dbname = dbname
self.template_dbname = template_dbname
self.as_template = as_template
self._connection_timeout = connection_timeout
self.isolation_level = isolation_level
if not isinstance(version, Version):
Expand All @@ -66,32 +67,33 @@ def __init__(
def init(self) -> None:
"""Create database in postgresql."""
with self.cursor() as cur:
if self.is_template():
cur.execute(f'CREATE DATABASE "{self.template_dbname}" WITH is_template = true;')
elif self.template_dbname is None:
cur.execute(f'CREATE DATABASE "{self.dbname}";')
else:
if self.template_dbname:
# And make sure no-one is left connected to the template database.
# Otherwise, Creating database from template will fail
self._terminate_connection(cur, self.template_dbname)
cur.execute(f'CREATE DATABASE "{self.dbname}" TEMPLATE "{self.template_dbname}";')
query = f'CREATE DATABASE "{self.dbname}" TEMPLATE "{self.template_dbname}"'
else:
query = f'CREATE DATABASE "{self.dbname}"'

if self.as_template:
query += " IS_TEMPLATE = true"

cur.execute(f"{query};")

def is_template(self) -> bool:
"""Determine whether the DatabaseJanitor maintains template or database."""
return self.dbname is None
return self.as_template

def drop(self) -> None:
"""Drop database in postgresql."""
# We cannot drop the database while there are connections to it, so we
# terminate all connections first while not allowing new connections.
db_to_drop = self.template_dbname if self.is_template() else self.dbname
assert db_to_drop
with self.cursor() as cur:
self._dont_datallowconn(cur, db_to_drop)
self._terminate_connection(cur, db_to_drop)
if self.is_template():
cur.execute(f'ALTER DATABASE "{db_to_drop}" with is_template false;')
cur.execute(f'DROP DATABASE IF EXISTS "{db_to_drop}";')
self._dont_datallowconn(cur, self.dbname)
self._terminate_connection(cur, self.dbname)
if self.as_template:
cur.execute(f'ALTER DATABASE "{self.dbname}" with is_template false;')
cur.execute(f'DROP DATABASE IF EXISTS "{self.dbname}";')

@staticmethod
def _dont_datallowconn(cur: Cursor, dbname: str) -> None:
Expand All @@ -116,13 +118,12 @@ def load(self, load: Callable | str | Path) -> None:
* a callable that expects: host, port, user, dbname and password arguments.

"""
db_to_load = self.template_dbname if self.is_template() else self.dbname
_loader = build_loader(load)
_loader(
host=self.host,
port=self.port,
user=self.user,
dbname=db_to_load,
dbname=self.dbname,
password=self.password,
)

Expand Down
101 changes: 101 additions & 0 deletions tests/test_chaining.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Chaining noprocess fixtures tests for pytest-postgresql."""

import psycopg

from pytest_postgresql import factories
from pytest_postgresql.executor import PostgreSQLExecutor
from pytest_postgresql.executor_noop import NoopExecutor


def load_schema(host: str, port: int, user: str, dbname: str, password: str | None) -> None:
"""Load schema into the database."""
with psycopg.connect(host=host, port=port, user=user, dbname=dbname, password=password) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE schema_table (id serial PRIMARY KEY, name varchar);")
conn.commit()


def load_data(host: str, port: int, user: str, dbname: str, password: str | None) -> None:
"""Load the first layer of data into the database."""
with psycopg.connect(host=host, port=port, user=user, dbname=dbname, password=password) as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO schema_table (name) VALUES ('data_layer');")
cur.execute("CREATE TABLE data_table (id serial PRIMARY KEY, val varchar);")
conn.commit()


def load_more_data(host: str, port: int, user: str, dbname: str, password: str | None) -> None:
"""Load the second layer of data into the database."""
with psycopg.connect(host=host, port=port, user=user, dbname=dbname, password=password) as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO schema_table (name) VALUES ('more_data_layer');")
cur.execute("CREATE TABLE more_data_table (id serial PRIMARY KEY, extra varchar);")
conn.commit()


# Chaining: proc -> noproc -> client
base_proc = factories.postgresql_proc(load=[load_schema])
seeded_noproc = factories.postgresql_noproc(depends_on="base_proc", load=[load_data])
client_layered = factories.postgresql("seeded_noproc")

# Deeper chaining: proc -> noproc -> noproc -> client
more_seeded_noproc = factories.postgresql_noproc(depends_on="seeded_noproc", load=[load_more_data])
client_deep_layered = factories.postgresql("more_seeded_noproc")


def test_chaining_two_layers(client_layered: psycopg.Connection) -> None:
"""Test that data from both proc and noproc layers is present."""
with client_layered.cursor() as cur:
# From base_proc (load_schema)
cur.execute("SELECT count(*) FROM information_schema.tables WHERE table_name = 'schema_table';")
res = cur.fetchone()
assert res
assert res[0] == 1

# From seeded_noproc (load_data)
cur.execute("SELECT count(*) FROM information_schema.tables WHERE table_name = 'data_table';")
res = cur.fetchone()
assert res
assert res[0] == 1

# Data inserted in seeded_noproc
cur.execute("SELECT name FROM schema_table;")
res = cur.fetchone()
assert res
assert res[0] == "data_layer"


def test_chaining_three_layers(client_deep_layered: psycopg.Connection) -> None:
"""Test that data from all three layers is present."""
with client_deep_layered.cursor() as cur:
# From base_proc
cur.execute("SELECT count(*) FROM information_schema.tables WHERE table_name = 'schema_table';")
res = cur.fetchone()
assert res
assert res[0] == 1

# From seeded_noproc
cur.execute("SELECT count(*) FROM information_schema.tables WHERE table_name = 'data_table';")
res = cur.fetchone()
assert res
assert res[0] == 1

# From more_seeded_noproc
cur.execute("SELECT count(*) FROM information_schema.tables WHERE table_name = 'more_data_table';")
res = cur.fetchone()
assert res
assert res[0] == 1

# Data from multiple layers
cur.execute("SELECT name FROM schema_table ORDER BY id;")
results = cur.fetchall()
assert results[0][0] == "data_layer"
assert results[1][0] == "more_data_layer"


def test_inheritance(base_proc: PostgreSQLExecutor, seeded_noproc: NoopExecutor) -> None:
"""Verify that connection parameters are inherited from the base fixture."""
assert seeded_noproc.host == base_proc.host
assert seeded_noproc.port == base_proc.port
assert seeded_noproc.user == base_proc.user
assert seeded_noproc.password == base_proc.password
Loading
Loading