From 37f428c4e80ec6255aef057828add6cd7ac1b17f Mon Sep 17 00:00:00 2001 From: KoalaGeo Date: Tue, 19 May 2026 14:56:07 +0100 Subject: [PATCH 01/11] Enhance SQL Alchemy engine with connection pool options Added connection pool options for SQL Alchemy engine. --- pygeoapi/provider/sql.py | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/pygeoapi/provider/sql.py b/pygeoapi/provider/sql.py index 868ee4f88..c38c34f2b 100644 --- a/pygeoapi/provider/sql.py +++ b/pygeoapi/provider/sql.py @@ -134,6 +134,7 @@ def __init__( self.db_user, self._db_password, self.db_conn, + self.db_pool_options, **self.db_options ) self.table_model = get_table_model( @@ -615,6 +616,24 @@ def store_db_parameters( connection_data.get('search_path') or options.pop('search_path', ['public']) ) + # Connection-pool tuning. These are popped out of ``options`` so they + # are NOT passed to the DBAPI as connect_args, and are coerced to a + # hashable form so get_engine() can stay functools.cache()-able. + # Defaults keep SQLAlchemy's QueuePool sizing but, unlike SQLAlchemy's + # default of -1, recycle connections after an hour so that pooled + # connections cannot sit IDLE on the server indefinitely. + pool_defaults = { + 'pool_size': 5, + 'max_overflow': 10, + 'pool_recycle': 3600, + 'pool_timeout': 30, + 'pool_pre_ping': True, + } + self.db_pool_options = tuple(sorted( + (key, type(default)(options.pop(key, default))) + for key, default in pool_defaults.items() + )) + self.db_options = { k: v for k, v in options.items() @@ -631,6 +650,7 @@ def get_engine( user: str, password: str, conn_str: Optional[str] = None, + pool_options: tuple[tuple[str, Any], ...] = (), **connect_args ) -> Engine: """ @@ -643,6 +663,11 @@ def get_engine( :param user: database user :param password: database password :param conn_str: optional connection URL + :param pool_options: hashable tuple of (key, value) pairs controlling + the connection pool (pool_size, max_overflow, + pool_recycle, pool_timeout, pool_pre_ping). Passed + as a tuple rather than a dict so this function can + remain functools.cache()-able. :param connect_args: custom connection arguments to pass to create_engine() :returns: SQL Alchemy engine @@ -658,10 +683,16 @@ def get_engine( ) engine = create_engine( - conn_str, connect_args=connect_args, pool_pre_ping=True + conn_str, + connect_args=connect_args, + **dict(pool_options) + ) + + LOGGER.debug( + f'Created engine for {repr(engine.url)} ' + f'with pool options {dict(pool_options)}.' ) - LOGGER.debug(f'Created engine for {repr(engine.url)}.') return engine From 5841ed9de034438826bb5438ff03671976a1d234 Mon Sep 17 00:00:00 2001 From: KoalaGeo Date: Tue, 19 May 2026 14:56:48 +0100 Subject: [PATCH 02/11] Add db_pool_options to PostgreSQL connection --- pygeoapi/process/manager/postgresql.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pygeoapi/process/manager/postgresql.py b/pygeoapi/process/manager/postgresql.py index 7a2adc559..5f1c9eeeb 100644 --- a/pygeoapi/process/manager/postgresql.py +++ b/pygeoapi/process/manager/postgresql.py @@ -95,6 +95,7 @@ def __init__(self, manager_def: dict): self.db_user, self._db_password, self.db_conn, + self.db_pool_options, **self.db_options ) self.table_output = self.output_dir is None From bc68af4209dff101a2b7b35d09e67718206a6c47 Mon Sep 17 00:00:00 2001 From: KoalaGeo Date: Tue, 19 May 2026 15:08:23 +0100 Subject: [PATCH 03/11] Update pool_recycle to SQLAlchemy default value Change pool_recycle to -1 to preserve current behavior. --- pygeoapi/provider/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pygeoapi/provider/sql.py b/pygeoapi/provider/sql.py index c38c34f2b..1b970a03e 100644 --- a/pygeoapi/provider/sql.py +++ b/pygeoapi/provider/sql.py @@ -625,7 +625,7 @@ def store_db_parameters( pool_defaults = { 'pool_size': 5, 'max_overflow': 10, - 'pool_recycle': 3600, + 'pool_recycle': -1, # SQLAlchemy default; preserves current behaviour 'pool_timeout': 30, 'pool_pre_ping': True, } From cd9c836e074456da925b17d7280d9cc583d3d817 Mon Sep 17 00:00:00 2001 From: KoalaGeo Date: Tue, 19 May 2026 15:12:28 +0100 Subject: [PATCH 04/11] Enhance SQLAlchemy connection pooling settings Added SQLAlchemy connection-pool tuning options to configuration. --- docs/source/publishing/ogcapi-features.rst | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/docs/source/publishing/ogcapi-features.rst b/docs/source/publishing/ogcapi-features.rst index 162f1921f..b95158917 100644 --- a/docs/source/publishing/ogcapi-features.rst +++ b/docs/source/publishing/ogcapi-features.rst @@ -703,11 +703,27 @@ These are optional and if not specified, the default from the engine will be use # Number of seconds after which a TCP keepalive message that is not # acknowledged by the server should be retransmitted. keepalives_interval: 1 + # SQLAlchemy connection-pool tuning (optional). Defaults match + # SQLAlchemy's QueuePool and preserve previous behaviour. + # Persistent connections held open per worker process. + pool_size: 5 + # Extra short-lived connections allowed above pool_size. + max_overflow: 10 + # Recreate connections older than this many seconds. -1 (the + # default) never recycles; set a finite value (e.g. 300) so + # pooled connections cannot sit IDLE on the server indefinitely. + pool_recycle: -1 + # Seconds to wait for a connection from the pool before erroring. + pool_timeout: 30 + # Test connections with a lightweight ping before use. + pool_pre_ping: true id_field: osm_id table: hotosm_bdi_waterways geom_field: foo_geom count: true # Optional; Default true; Enable/disable count for improved performance. +`get_engine()` is cached per worker process, so providers that share the same database connection should use identical pool options to keep sharing a single engine; differing pool options intentionally create separate engines. + The PostgreSQL provider is also able to connect to Cloud SQL databases. .. code-block:: yaml From 9838aa97756cf42d29de74ec1d42827715084e39 Mon Sep 17 00:00:00 2001 From: KoalaGeo Date: Tue, 19 May 2026 15:14:22 +0100 Subject: [PATCH 05/11] Add files via upload test_sql_pool_options.py exercises `store_db_parameters()` directly, requires no database, and runs in standard CI. It asserts the zero-behaviour-change defaults, override + typing, no DBAPI leakage, the existing dict-filtering, hashable/deterministic cache keys, and coexistence with search_path. --- tests/provider/test_sql_pool_options.py | 99 +++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 tests/provider/test_sql_pool_options.py diff --git a/tests/provider/test_sql_pool_options.py b/tests/provider/test_sql_pool_options.py new file mode 100644 index 000000000..ec2c6f5ee --- /dev/null +++ b/tests/provider/test_sql_pool_options.py @@ -0,0 +1,99 @@ +# ================================================================= +# Tests for configurable SQLAlchemy connection-pool options on the +# SQL provider. These exercise store_db_parameters() directly and do +# not require a live database, so they run in standard CI. +# ================================================================= + +import pytest + +from pygeoapi.provider.sql import store_db_parameters + + +class _Dummy: + """Minimal stand-in for a provider/manager instance.""" + default_port = 5432 + + +CONN = {'host': 'h', 'dbname': 'd', 'user': 'u', 'password': 'p'} + + +def test_pool_options_defaults_preserve_current_behaviour(): + obj = _Dummy() + store_db_parameters(obj, dict(CONN), {}) + pool = dict(obj.db_pool_options) + # Defaults must match pre-existing effective behaviour: + # pool_pre_ping was hardcoded True; pool_recycle was unset (-1). + assert pool['pool_size'] == 5 + assert pool['max_overflow'] == 10 + assert pool['pool_timeout'] == 30 + assert pool['pool_pre_ping'] is True + assert pool['pool_recycle'] == -1 + + +def test_pool_options_are_overridable_and_typed(): + obj = _Dummy() + store_db_parameters( + obj, dict(CONN), + {'pool_size': 2, 'max_overflow': 3, 'pool_recycle': 300}, + ) + pool = dict(obj.db_pool_options) + assert pool['pool_size'] == 2 and isinstance(pool['pool_size'], int) + assert pool['max_overflow'] == 3 + assert pool['pool_recycle'] == 300 + # untouched keys keep defaults + assert pool['pool_timeout'] == 30 + assert pool['pool_pre_ping'] is True + + +def test_pool_options_not_leaked_to_dbapi_connect_args(): + obj = _Dummy() + store_db_parameters( + obj, dict(CONN), + {'connect_timeout': 10, 'pool_size': 2, 'pool_recycle': 300}, + ) + for k in ('pool_size', 'max_overflow', 'pool_recycle', + 'pool_timeout', 'pool_pre_ping'): + assert k not in obj.db_options + # genuine DBAPI connect args still pass through + assert obj.db_options['connect_timeout'] == 10 + + +def test_dict_valued_options_still_filtered(): + obj = _Dummy() + store_db_parameters( + obj, dict(CONN), + {'pool_size': 2, 'zoom': {'min': 0, 'max': 22}}, + ) + assert 'zoom' not in obj.db_options + assert dict(obj.db_pool_options)['pool_size'] == 2 + + +def test_pool_options_hashable_and_deterministic(): + a, b = _Dummy(), _Dummy() + store_db_parameters(a, dict(CONN), {'pool_size': 2}) + store_db_parameters(b, dict(CONN), {'pool_size': 2}) + # identical config -> identical key -> shared engine via functools.cache + assert a.db_pool_options == b.db_pool_options + assert hash(a.db_pool_options) == hash(b.db_pool_options) + + c = _Dummy() + store_db_parameters(c, dict(CONN), {'pool_size': 9}) + # differing pool config -> distinct key (separate engine, by design) + assert c.db_pool_options != a.db_pool_options + + +def test_pool_options_coexist_with_search_path(): + obj = _Dummy() + store_db_parameters( + obj, dict(CONN), + {'search_path': ['published', 'public'], 'pool_size': 4}, + ) + assert obj.db_search_path == ('published', 'public') + assert dict(obj.db_pool_options)['pool_size'] == 4 + + +@pytest.mark.parametrize('bad', [{'pool_size': 'two'}]) +def test_non_integer_pool_value_raises(bad): + # type coercion surfaces bad config loudly rather than silently + with pytest.raises(ValueError): + store_db_parameters(_Dummy(), dict(CONN), bad) From 82ea64bc456bfbab08bd16bc1ca04c63a931c245 Mon Sep 17 00:00:00 2001 From: KoalaGeo Date: Mon, 8 Jun 2026 22:44:44 +0100 Subject: [PATCH 06/11] Consolidate SQLAlchemy connection pool option handling Pool configuration parameters are now passed directly within the database 'options' block and parsed by get_engine() rather than store_db_parameters(). This streamlines internal processing and formalizes connection pool settings within the pygeoapi configuration schema. Updated tests reflect this shift in responsibility. --- pygeoapi/process/manager/postgresql.py | 3 +- pygeoapi/provider/sql.py | 67 ++++++----- .../schemas/config/pygeoapi-config-0.x.yml | 17 ++- tests/provider/test_sql_pool_options.py | 112 ++++-------------- 4 files changed, 76 insertions(+), 123 deletions(-) diff --git a/pygeoapi/process/manager/postgresql.py b/pygeoapi/process/manager/postgresql.py index 5f1c9eeeb..c71ec69e2 100644 --- a/pygeoapi/process/manager/postgresql.py +++ b/pygeoapi/process/manager/postgresql.py @@ -95,7 +95,6 @@ def __init__(self, manager_def: dict): self.db_user, self._db_password, self.db_conn, - self.db_pool_options, **self.db_options ) self.table_output = self.output_dir is None @@ -340,4 +339,4 @@ def get_table_model( Base.metadata.create_all(engine, tables=[Jobs], checkfirst=True) - return Jobs + return Jobs \ No newline at end of file diff --git a/pygeoapi/provider/sql.py b/pygeoapi/provider/sql.py index 1b970a03e..b5b773ffb 100644 --- a/pygeoapi/provider/sql.py +++ b/pygeoapi/provider/sql.py @@ -134,7 +134,6 @@ def __init__( self.db_user, self._db_password, self.db_conn, - self.db_pool_options, **self.db_options ) self.table_model = get_table_model( @@ -616,24 +615,11 @@ def store_db_parameters( connection_data.get('search_path') or options.pop('search_path', ['public']) ) - # Connection-pool tuning. These are popped out of ``options`` so they - # are NOT passed to the DBAPI as connect_args, and are coerced to a - # hashable form so get_engine() can stay functools.cache()-able. - # Defaults keep SQLAlchemy's QueuePool sizing but, unlike SQLAlchemy's - # default of -1, recycle connections after an hour so that pooled - # connections cannot sit IDLE on the server indefinitely. - pool_defaults = { - 'pool_size': 5, - 'max_overflow': 10, - 'pool_recycle': -1, # SQLAlchemy default; preserves current behaviour - 'pool_timeout': 30, - 'pool_pre_ping': True, - } - self.db_pool_options = tuple(sorted( - (key, type(default)(options.pop(key, default))) - for key, default in pool_defaults.items() - )) - + # Connection-pool tuning keys (pool_size, max_overflow, pool_recycle, + # pool_timeout, pool_pre_ping) are intentionally left in ``options`` and + # flow through ``db_options`` to get_engine(), which separates them from + # the DBAPI connect_args. Their types are validated by the config JSON + # Schema, so no coercion is performed here. self.db_options = { k: v for k, v in options.items() @@ -641,6 +627,20 @@ def store_db_parameters( } +#: Connection-pool tuning keys recognised by get_engine(). These configure +#: SQLAlchemy's QueuePool rather than the DBAPI, so get_engine() separates +#: them from connect_args. The defaults reproduce pygeoapi's previous +#: behaviour exactly: SQLAlchemy's own QueuePool defaults, except for +#: pool_pre_ping, which was previously hardcoded to True. +POOL_OPTION_DEFAULTS = { + 'pool_size': 5, + 'max_overflow': 10, + 'pool_recycle': -1, # SQLAlchemy default; never recycles connections + 'pool_timeout': 30, + 'pool_pre_ping': True, +} + + @functools.cache def get_engine( driver_name: str, @@ -650,7 +650,6 @@ def get_engine( user: str, password: str, conn_str: Optional[str] = None, - pool_options: tuple[tuple[str, Any], ...] = (), **connect_args ) -> Engine: """ @@ -663,12 +662,11 @@ def get_engine( :param user: database user :param password: database password :param conn_str: optional connection URL - :param pool_options: hashable tuple of (key, value) pairs controlling - the connection pool (pool_size, max_overflow, - pool_recycle, pool_timeout, pool_pre_ping). Passed - as a tuple rather than a dict so this function can - remain functools.cache()-able. - :param connect_args: custom connection arguments to pass to create_engine() + :param connect_args: keyword arguments forwarded from the provider's + ``options`` block. Connection-pool tuning keys (see + POOL_OPTION_DEFAULTS) are extracted and applied to + the engine's pool; any remaining keys are passed to + the DBAPI as connect_args. :returns: SQL Alchemy engine """ @@ -682,15 +680,26 @@ def get_engine( database=database ) + # Separate connection-pool tuning from DBAPI connect args. Pool keys are + # applied to create_engine() directly; everything left in connect_args is + # forwarded to the DBAPI. get_engine() stays functools.cache()-able + # because connect_args values are hashable scalars, so engine sharing per + # process is preserved; providers with differing pool config (or any + # other option) correctly get distinct engines. + pool_options = { + key: connect_args.pop(key, default) + for key, default in POOL_OPTION_DEFAULTS.items() + } + engine = create_engine( conn_str, connect_args=connect_args, - **dict(pool_options) + **pool_options ) LOGGER.debug( f'Created engine for {repr(engine.url)} ' - f'with pool options {dict(pool_options)}.' + f'with pool options {pool_options}.' ) return engine @@ -927,4 +936,4 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): else feature_id ) - return feature + return feature \ No newline at end of file diff --git a/pygeoapi/resources/schemas/config/pygeoapi-config-0.x.yml b/pygeoapi/resources/schemas/config/pygeoapi-config-0.x.yml index 4d0b77dd8..73190d22c 100644 --- a/pygeoapi/resources/schemas/config/pygeoapi-config-0.x.yml +++ b/pygeoapi/resources/schemas/config/pygeoapi-config-0.x.yml @@ -734,8 +734,23 @@ definitions: type: integer keepalives_interval: type: integer + pool_size: + type: integer + description: persistent connections held open per worker process + max_overflow: + type: integer + description: extra short-lived connections allowed above pool_size + pool_recycle: + type: integer + description: recreate connections older than this many seconds (-1 never recycles) + pool_timeout: + type: integer + description: seconds to wait for a connection from the pool before erroring + pool_pre_ping: + type: boolean + description: test connections with a lightweight ping before use required: - server - logging - metadata - - resources + - resources \ No newline at end of file diff --git a/tests/provider/test_sql_pool_options.py b/tests/provider/test_sql_pool_options.py index ec2c6f5ee..70f2817c6 100644 --- a/tests/provider/test_sql_pool_options.py +++ b/tests/provider/test_sql_pool_options.py @@ -1,99 +1,29 @@ # ================================================================= -# Tests for configurable SQLAlchemy connection-pool options on the -# SQL provider. These exercise store_db_parameters() directly and do -# not require a live database, so they run in standard CI. +# Test that get_engine() separates SQLAlchemy connection-pool tuning +# options from DBAPI connect_args. This is the contract introduced by +# the configurable-pool change; it needs no live database. # ================================================================= -import pytest +from unittest import mock -from pygeoapi.provider.sql import store_db_parameters +from pygeoapi.provider import sql -class _Dummy: - """Minimal stand-in for a provider/manager instance.""" - default_port = 5432 - - -CONN = {'host': 'h', 'dbname': 'd', 'user': 'u', 'password': 'p'} - - -def test_pool_options_defaults_preserve_current_behaviour(): - obj = _Dummy() - store_db_parameters(obj, dict(CONN), {}) - pool = dict(obj.db_pool_options) - # Defaults must match pre-existing effective behaviour: - # pool_pre_ping was hardcoded True; pool_recycle was unset (-1). - assert pool['pool_size'] == 5 - assert pool['max_overflow'] == 10 - assert pool['pool_timeout'] == 30 - assert pool['pool_pre_ping'] is True - assert pool['pool_recycle'] == -1 - - -def test_pool_options_are_overridable_and_typed(): - obj = _Dummy() - store_db_parameters( - obj, dict(CONN), - {'pool_size': 2, 'max_overflow': 3, 'pool_recycle': 300}, +@mock.patch.object(sql, 'create_engine') +def test_get_engine_separates_pool_options_from_connect_args(mock_create): + sql.get_engine.cache_clear() + sql.get_engine( + 'postgresql+psycopg2', 'h', 5432, 'd', 'u', 'p', None, + pool_size=2, pool_recycle=300, connect_timeout=10, ) - pool = dict(obj.db_pool_options) - assert pool['pool_size'] == 2 and isinstance(pool['pool_size'], int) - assert pool['max_overflow'] == 3 - assert pool['pool_recycle'] == 300 - # untouched keys keep defaults - assert pool['pool_timeout'] == 30 - assert pool['pool_pre_ping'] is True - - -def test_pool_options_not_leaked_to_dbapi_connect_args(): - obj = _Dummy() - store_db_parameters( - obj, dict(CONN), - {'connect_timeout': 10, 'pool_size': 2, 'pool_recycle': 300}, - ) - for k in ('pool_size', 'max_overflow', 'pool_recycle', - 'pool_timeout', 'pool_pre_ping'): - assert k not in obj.db_options - # genuine DBAPI connect args still pass through - assert obj.db_options['connect_timeout'] == 10 - - -def test_dict_valued_options_still_filtered(): - obj = _Dummy() - store_db_parameters( - obj, dict(CONN), - {'pool_size': 2, 'zoom': {'min': 0, 'max': 22}}, - ) - assert 'zoom' not in obj.db_options - assert dict(obj.db_pool_options)['pool_size'] == 2 - - -def test_pool_options_hashable_and_deterministic(): - a, b = _Dummy(), _Dummy() - store_db_parameters(a, dict(CONN), {'pool_size': 2}) - store_db_parameters(b, dict(CONN), {'pool_size': 2}) - # identical config -> identical key -> shared engine via functools.cache - assert a.db_pool_options == b.db_pool_options - assert hash(a.db_pool_options) == hash(b.db_pool_options) - - c = _Dummy() - store_db_parameters(c, dict(CONN), {'pool_size': 9}) - # differing pool config -> distinct key (separate engine, by design) - assert c.db_pool_options != a.db_pool_options - - -def test_pool_options_coexist_with_search_path(): - obj = _Dummy() - store_db_parameters( - obj, dict(CONN), - {'search_path': ['published', 'public'], 'pool_size': 4}, - ) - assert obj.db_search_path == ('published', 'public') - assert dict(obj.db_pool_options)['pool_size'] == 4 - -@pytest.mark.parametrize('bad', [{'pool_size': 'two'}]) -def test_non_integer_pool_value_raises(bad): - # type coercion surfaces bad config loudly rather than silently - with pytest.raises(ValueError): - store_db_parameters(_Dummy(), dict(CONN), bad) + _, kwargs = mock_create.call_args + # pool keys are applied to the engine (QueuePool), with overrides + # honoured and unset pool keys falling back to the documented defaults + assert kwargs['pool_size'] == 2 + assert kwargs['pool_recycle'] == 300 + assert kwargs['max_overflow'] == 10 + assert kwargs['pool_timeout'] == 30 + assert kwargs['pool_pre_ping'] is True + # genuine DBAPI args are forwarded via connect_args; pool keys are not + assert kwargs['connect_args'] == {'connect_timeout': 10} \ No newline at end of file From 9be0bf7e9d7f129cad96e58c85c84093139884f3 Mon Sep 17 00:00:00 2001 From: KoalaGeo Date: Mon, 8 Jun 2026 22:52:48 +0100 Subject: [PATCH 07/11] Ruff format --- pygeoapi/process/manager/postgresql.py | 116 +++++---- pygeoapi/provider/sql.py | 298 ++++++++++-------------- tests/provider/test_sql_pool_options.py | 26 ++- 3 files changed, 198 insertions(+), 242 deletions(-) diff --git a/pygeoapi/process/manager/postgresql.py b/pygeoapi/process/manager/postgresql.py index c71ec69e2..f203a924c 100644 --- a/pygeoapi/process/manager/postgresql.py +++ b/pygeoapi/process/manager/postgresql.py @@ -45,7 +45,7 @@ String, Table, text, - update + update, ) from sqlalchemy.engine import Engine from sqlalchemy.orm import declarative_base, Session @@ -54,7 +54,7 @@ from pygeoapi.process.base import ( JobNotFoundError, JobResultNotFoundError, - ProcessorGenericError + ProcessorGenericError, ) from pygeoapi.process.manager.base import BaseManager from pygeoapi.provider.sql import get_engine, store_db_parameters @@ -81,21 +81,21 @@ def __init__(self, manager_def: dict): super().__init__(manager_def) self.is_async = True - self.id_field = 'identifier' + self.id_field = "identifier" self.supports_subscribing = True - self.connection = manager_def['connection'] + self.connection = manager_def["connection"] - options = manager_def.get('options', {}) - self._store_db_parameters(manager_def['connection'], options) + options = manager_def.get("options", {}) + self._store_db_parameters(manager_def["connection"], options) self._engine = get_engine( - 'postgresql+psycopg2', + "postgresql+psycopg2", self.db_host, self.db_port, self.db_name, self.db_user, self._db_password, self.db_conn, - **self.db_options + **self.db_options, ) self.table_output = self.output_dir is None @@ -104,16 +104,14 @@ def __init__(self, manager_def: dict): ) self.c = self.table_model.c try: - LOGGER.debug('Getting table model') + LOGGER.debug("Getting table model") except Exception as err: - msg = 'Table model fetch failed' - LOGGER.error(f'{msg}: {err}') + msg = "Table model fetch failed" + LOGGER.error(f"{msg}: {err}") raise ProcessorGenericError(msg) - def get_jobs( - self, status: JobStatus = None, limit=None, offset=None - ) -> dict: + def get_jobs(self, status: JobStatus = None, limit=None, offset=None) -> dict: """ Get jobs @@ -126,7 +124,7 @@ def get_jobs( and numberMatched """ - LOGGER.debug('Querying for jobs') + LOGGER.debug("Querying for jobs") with Session(self._engine) as session: results = session.query(self.table_model) @@ -134,7 +132,7 @@ def get_jobs( results = results.filter(self.c.status == status.value) jobs = [r._asdict() for r in results.all()] - return {'jobs': jobs, 'numberMatched': len(jobs)} + return {"jobs": jobs, "numberMatched": len(jobs)} def add_job(self, job_metadata: dict) -> str: """ @@ -145,20 +143,18 @@ def add_job(self, job_metadata: dict) -> str: :returns: identifier of added job """ - LOGGER.debug('Adding job') + LOGGER.debug("Adding job") with Session(self._engine) as session: try: - session.execute( - insert(self.table_model).values(**job_metadata) - ) + session.execute(insert(self.table_model).values(**job_metadata)) session.commit() except Exception as err: session.rollback() - msg = 'Insert failed' - LOGGER.error(f'{msg}: {err}') + msg = "Insert failed" + LOGGER.error(f"{msg}: {err}") raise ProcessorGenericError(msg) - return job_metadata['identifier'] + return job_metadata["identifier"] def update_job(self, job_id: str, update_dict: dict) -> bool: """ @@ -172,7 +168,7 @@ def update_job(self, job_id: str, update_dict: dict) -> bool: rowcount = 0 - LOGGER.debug('Updating job') + LOGGER.debug("Updating job") with Session(self._engine) as session: try: stmt = ( @@ -185,8 +181,8 @@ def update_job(self, job_id: str, update_dict: dict) -> bool: rowcount = result.rowcount except Exception as err: session.rollback() - msg = 'Update failed' - LOGGER.error(f'{msg}: {err}') + msg = "Update failed" + LOGGER.error(f"{msg}: {err}") raise ProcessorGenericError(msg) return rowcount == 1 @@ -202,7 +198,7 @@ def get_job(self, job_id: str) -> dict: :returns: `dict` # `pygeoapi.process.manager.Job` """ - LOGGER.debug('Querying for job') + LOGGER.debug("Querying for job") with Session(self._engine) as session: results = session.query(self.table_model).filter( self.c.identifier == job_id @@ -230,21 +226,19 @@ def delete_job(self, job_id: str) -> bool: # get result file if present for deletion job_result = self.get_job(job_id) - location = job_result.get('location') + location = job_result.get("location") - LOGGER.debug('Deleting job') + LOGGER.debug("Deleting job") with Session(self._engine) as session: try: - stmt = delete(self.table_model).where( - self.c.identifier == job_id - ) + stmt = delete(self.table_model).where(self.c.identifier == job_id) result = session.execute(stmt) session.commit() rowcount = result.rowcount except Exception as err: session.rollback() - msg = 'Delete failed' - LOGGER.error(f'{msg}: {err}') + msg = "Delete failed" + LOGGER.error(f"{msg}: {err}") raise ProcessorGenericError(msg) # delete result file if present @@ -270,28 +264,24 @@ def get_job_result(self, job_id: str) -> Tuple[str, Any]: """ job_result = self.get_job(job_id) - location = job_result.get('location') - mimetype = job_result.get('mimetype') - job_status = JobStatus[job_result['status']] + location = job_result.get("location") + mimetype = job_result.get("mimetype") + job_status = JobStatus[job_result["status"]] if job_status != JobStatus.successful: # Job is incomplete return (None,) if not location: - LOGGER.warning(f'job {job_id!r} - unknown result location') + LOGGER.warning(f"job {job_id!r} - unknown result location") raise JobResultNotFoundError() else: try: location = Path(location) - if mimetype in ( - None, - FORMAT_TYPES[F_JSON], - FORMAT_TYPES[F_JSONLD] - ): - with location.open('r', encoding='utf-8') as fh: + if mimetype in (None, FORMAT_TYPES[F_JSON], FORMAT_TYPES[F_JSONLD]): + with location.open("r", encoding="utf-8") as fh: result = json.load(fh) else: - with location.open('rb') as fh: + with location.open("rb") as fh: result = fh.read() except (TypeError, FileNotFoundError, json.JSONDecodeError): raise JobResultNotFoundError() @@ -299,7 +289,7 @@ def get_job_result(self, job_id: str) -> Tuple[str, Any]: return mimetype, result def __repr__(self): - return f' {self.name}' + return f" {self.name}" @functools.cache @@ -312,31 +302,31 @@ def get_table_model( schema = db_search_path[0] Jobs = Table( - 'jobs', + "jobs", Base.metadata, - Column('identifier', String, primary_key=True, nullable=False), + Column("identifier", String, primary_key=True, nullable=False), Column( - 'type', + "type", String, nullable=False, - server_default=text("'process'::character varying") + server_default=text("'process'::character varying"), ), - Column('process_id', String, nullable=False), - Column('created', DateTime), - Column('started', DateTime), - Column('finished', DateTime), - Column('updated', DateTime), - Column('status', String, nullable=False), - Column('location', String), - Column('mimetype', String), - Column('message', String), - Column('progress', Integer, nullable=False), - schema=schema + Column("process_id", String, nullable=False), + Column("created", DateTime), + Column("started", DateTime), + Column("finished", DateTime), + Column("updated", DateTime), + Column("status", String, nullable=False), + Column("location", String), + Column("mimetype", String), + Column("message", String), + Column("progress", Integer, nullable=False), + schema=schema, ) if table_output: - Jobs.append_column(Column('output', LargeBinary)) + Jobs.append_column(Column("output", LargeBinary)) Base.metadata.create_all(engine, tables=[Jobs], checkfirst=True) - return Jobs \ No newline at end of file + return Jobs diff --git a/pygeoapi/provider/sql.py b/pygeoapi/provider/sql.py index b5b773ffb..f0a960526 100644 --- a/pygeoapi/provider/sql.py +++ b/pygeoapi/provider/sql.py @@ -52,20 +52,13 @@ from pygeofilter.backends.sqlalchemy.evaluate import to_filter import shapely from sqlalchemy.sql import func -from sqlalchemy import ( - create_engine, - MetaData, - PrimaryKeyConstraint, - asc, - desc, - delete -) +from sqlalchemy import create_engine, MetaData, PrimaryKeyConstraint, asc, desc, delete from sqlalchemy.engine import URL, Engine from sqlalchemy.exc import ( ConstraintColumnNotFoundError, InvalidRequestError, OperationalError, - SQLAlchemyError + SQLAlchemyError, ) from sqlalchemy.ext.automap import automap_base from sqlalchemy.orm import Session, load_only @@ -78,7 +71,7 @@ ProviderConnectionError, ProviderInvalidDataError, ProviderQueryError, - ProviderItemNotFoundError + ProviderItemNotFoundError, ) LOGGER = logging.getLogger(__name__) @@ -91,10 +84,7 @@ class GenericSQLProvider(BaseProvider): """ def __init__( - self, - provider_def: dict, - driver_name: str, - extra_conn_args: Optional[dict] = {} + self, provider_def: dict, driver_name: str, extra_conn_args: Optional[dict] = {} ): """ GenericSQLProvider Class constructor @@ -109,23 +99,23 @@ def __init__( :returns: pygeoapi.provider.GenericSQLProvider """ - LOGGER.debug('Initialising GenericSQL provider.') + LOGGER.debug("Initialising GenericSQL provider.") super().__init__(provider_def) - self.table = provider_def['table'] - self.id_field = provider_def['id_field'] - self.geom = provider_def.get('geom_field', 'geom') + self.table = provider_def["table"] + self.id_field = provider_def["id_field"] + self.geom = provider_def.get("geom_field", "geom") self.driver_name = driver_name - LOGGER.debug(f'Name: {self.name}') - LOGGER.debug(f'Table: {self.table}') - LOGGER.debug(f'ID field: {self.id_field}') - LOGGER.debug(f'Geometry field: {self.geom}') - LOGGER.debug(f'Configured Storage CRS: {self.storage_crs}') + LOGGER.debug(f"Name: {self.name}") + LOGGER.debug(f"Table: {self.table}") + LOGGER.debug(f"ID field: {self.id_field}") + LOGGER.debug(f"Geometry field: {self.geom}") + LOGGER.debug(f"Configured Storage CRS: {self.storage_crs}") # Read table information from database - options = provider_def.get('options', {}) | extra_conn_args - store_db_parameters(self, provider_def['data'], options) + options = provider_def.get("options", {}) | extra_conn_args + store_db_parameters(self, provider_def["data"], options) self._engine = get_engine( driver_name, self.db_host, @@ -134,7 +124,7 @@ def __init__( self.db_user, self._db_password, self.db_conn, - **self.db_options + **self.db_options, ) self.table_model = get_table_model( self.table, self.id_field, self.db_search_path, self._engine @@ -146,7 +136,7 @@ def query( self, offset=0, limit=10, - resulttype='results', + resulttype="results", bbox=[], datetime_=None, properties=[], @@ -156,7 +146,7 @@ def query( q=None, filterq=None, crs_transform_spec=None, - **kwargs + **kwargs, ): """ Query sql database for all the content. @@ -179,7 +169,7 @@ def query( :returns: GeoJSON FeatureCollection """ - LOGGER.debug('Preparing filters') + LOGGER.debug("Preparing filters") property_filters = self._get_property_filters(properties) cql_filters = self._get_cql_filters(filterq) bbox_filter = self._get_bbox_filter(bbox) @@ -189,7 +179,7 @@ def query( select_properties, skip_geometry ) - LOGGER.debug('Querying Database') + LOGGER.debug("Querying Database") # Execute query within self-closing database Session context with Session(self._engine) as session: results = ( @@ -201,33 +191,32 @@ def query( .options(selected_properties) ) - LOGGER.debug('Preparing response') + LOGGER.debug("Preparing response") response = { - 'type': 'FeatureCollection', - 'features': [], - 'numberReturned': 0 + "type": "FeatureCollection", + "features": [], + "numberReturned": 0, } - if self.count or resulttype == 'hits': + if self.count or resulttype == "hits": matched = results.count() - response['numberMatched'] = matched - LOGGER.debug(f'Found {matched} result(s)') + response["numberMatched"] = matched + LOGGER.debug(f"Found {matched} result(s)") else: - LOGGER.debug('Count disabled') + LOGGER.debug("Count disabled") - if resulttype == 'hits' or not results: + if resulttype == "hits" or not results: return response crs_transform_out = get_transform_from_spec(crs_transform_spec) - response['numberReturned'] = 0 - for item in ( - results.order_by(*order_by_clauses).offset(offset).limit(limit) - ): - response['numberReturned'] += 1 - response['features'].append( - self._sqlalchemy_to_feature(item, crs_transform_out, - select_properties) + response["numberReturned"] = 0 + for item in results.order_by(*order_by_clauses).offset(offset).limit(limit): + response["numberReturned"] += 1 + response["features"].append( + self._sqlalchemy_to_feature( + item, crs_transform_out, select_properties + ) ) return response @@ -239,41 +228,41 @@ def get_fields(self): :returns: dict of fields """ - LOGGER.debug('Get available fields/properties') + LOGGER.debug("Get available fields/properties") # sql-schema only allows these types, so we need to map from sqlalchemy # string, number, integer, object, array, boolean, null, # https://json-schema.org/understanding-json-schema/reference/type.html column_type_map = { - bool: 'boolean', - datetime: 'string', - Decimal: 'number', - dict: 'object', - float: 'number', - int: 'integer', - str: 'string' + bool: "boolean", + datetime: "string", + Decimal: "number", + dict: "object", + float: "number", + int: "integer", + str: "string", } - default_type = 'string' + default_type = "string" # https://json-schema.org/understanding-json-schema/reference/string#built-in-formats # noqa column_format_map = { - 'date': 'date', - 'interval': 'duration', - 'time': 'time', - 'timestamp': 'date-time' + "date": "date", + "interval": "duration", + "time": "time", + "timestamp": "date-time", } def _column_type_to_json_schema_type(column_type): try: python_type = column_type.python_type except NotImplementedError: - LOGGER.warning(f'Unsupported column type {column_type}') + LOGGER.warning(f"Unsupported column type {column_type}") return default_type else: try: return column_type_map[python_type] except KeyError: - LOGGER.warning(f'Unsupported column type {column_type}') + LOGGER.warning(f"Unsupported column type {column_type}") return default_type def _column_format_to_json_schema_format(column_type): @@ -281,20 +270,18 @@ def _column_format_to_json_schema_format(column_type): ct = str(column_type).lower() return column_format_map[ct] except KeyError: - LOGGER.debug('No string format detected') + LOGGER.debug("No string format detected") return None if not self._fields: for column in self.table_model.__table__.columns: - LOGGER.debug(f'Testing {column.name}') + LOGGER.debug(f"Testing {column.name}") if column.name == self.geom: continue self._fields[str(column.name)] = { - 'type': _column_type_to_json_schema_type(column.type), - 'format': _column_format_to_json_schema_format( - column.type - ) + "type": _column_type_to_json_schema_type(column.type), + "format": _column_format_to_json_schema_format(column.type), } return self._fields @@ -309,7 +296,7 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): :returns: GeoJSON FeatureCollection """ - LOGGER.debug(f'Get item by ID: {identifier}') + LOGGER.debug(f"Get item by ID: {identifier}") # Execute query within self-closing database Session context with Session(self._engine) as session: @@ -320,14 +307,14 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): assert item is not None except (AssertionError, SQLAlchemyError) as e: LOGGER.debug(e, exc_info=True) - msg = f'No such item: {self.id_field}={identifier}.' + msg = f"No such item: {self.id_field}={identifier}." raise ProviderItemNotFoundError(msg) crs_transform_out = get_transform_from_spec(crs_transform_spec) feature = self._sqlalchemy_to_feature(item, crs_transform_out) # Drop non-defined properties if self.properties: - props = feature['properties'] + props = feature["properties"] dropping_keys = deepcopy(props).keys() for item in dropping_keys: if item not in self.properties: @@ -347,12 +334,12 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): .filter(id_field > identifier) .first() ) - feature['prev'] = ( + feature["prev"] = ( getattr(prev_item, self.id_field) if prev_item is not None else identifier ) - feature['next'] = ( + feature["next"] = ( getattr(next_item, self.id_field) if next_item is not None else identifier @@ -392,9 +379,7 @@ def update(self, identifier, item): :returns: `bool` of update result """ - identifier, json_data = self._load_and_prepare_item( - item, raise_if_exists=False - ) + identifier, json_data = self._load_and_prepare_item(item, raise_if_exists=False) new_instance = self._feature_to_sqlalchemy(json_data, identifier) with Session(self._engine) as session: @@ -420,8 +405,9 @@ def delete(self, identifier): return result.rowcount > 0 - def _sqlalchemy_to_feature(self, item, crs_transform_out=None, - select_properties=[]): + def _sqlalchemy_to_feature( + self, item, crs_transform_out=None, select_properties=[] + ): """ Helper function to transform an SQLAlchemy result to a GeoJSON feature. @@ -433,12 +419,12 @@ def _sqlalchemy_to_feature(self, item, crs_transform_out=None, :returns: `dict` of GeoJSON feature """ - feature = {'type': 'Feature', 'properties': {}} + feature = {"type": "Feature", "properties": {}} item_dict = item.__dict__ # set feature id - feature['id'] = item_dict[self.id_field] + feature["id"] = item_dict[self.id_field] # Convert geometry to GeoJSON style if item_dict.get(self.geom) is not None: @@ -450,40 +436,40 @@ def _sqlalchemy_to_feature(self, item, crs_transform_out=None, if crs_transform_out is not None: shapely_geom = crs_transform_out(shapely_geom) geojson_geom = shapely.geometry.mapping(shapely_geom) - feature['geometry'] = geojson_geom + feature["geometry"] = geojson_geom else: - feature['geometry'] = None + feature["geometry"] = None keys = select_properties or self.fields.keys() for key in keys: if key in item_dict: - feature['properties'][key] = item_dict[key] + feature["properties"][key] = item_dict[key] return feature def _feature_to_sqlalchemy(self, json_data, identifier=None): - attributes = {**json_data['properties']} + attributes = {**json_data["properties"]} # 'identifier' key maybe be present in geojson properties, but might # not be a valid db field - attributes.pop('identifier', None) + attributes.pop("identifier", None) attributes[self.geom] = from_shape( - shapely.geometry.shape(json_data['geometry']), - srid=get_srid(self.storage_crs) + shapely.geometry.shape(json_data["geometry"]), + srid=get_srid(self.storage_crs), ) attributes[self.id_field] = identifier try: return self.table_model(**attributes) except Exception as e: - LOGGER.exception('Failed to create db model') + LOGGER.exception("Failed to create db model") raise ProviderInvalidDataError(str(e)) def _get_order_by_clauses(self, sort_by, table_model): # Build sort_by clauses if provided clauses = [] for sort_by_dict in sort_by: - model_column = getattr(table_model, sort_by_dict['property']) - order_function = asc if sort_by_dict['order'] == '+' else desc + model_column = getattr(table_model, sort_by_dict["property"]) + order_function = asc if sort_by_dict["order"] == "+" else desc clauses.append(order_function(model_column)) # Otherwise sort by primary key (to ensure reproducible output) @@ -528,21 +514,21 @@ def _get_bbox_filter(self, bbox: list[float]): raise NotImplementedError def _get_datetime_filter(self, datetime_): - if datetime_ in (None, '../..'): + if datetime_ in (None, "../.."): return True else: if self.time_field is None: - LOGGER.error('time_field not enabled for collection') + LOGGER.error("time_field not enabled for collection") raise ProviderQueryError() time_column = getattr(self.table_model, self.time_field) - if '/' in datetime_: # envelope - LOGGER.debug('detected time range') - time_begin, time_end = datetime_.split('/') - if time_begin == '..': + if "/" in datetime_: # envelope + LOGGER.debug("detected time range") + time_begin, time_end = datetime_.split("/") + if time_begin == "..": datetime_filter = time_column <= time_end - elif time_end == '..': + elif time_end == "..": datetime_filter = time_column >= time_begin else: datetime_filter = time_column.between(time_begin, time_end) @@ -553,8 +539,7 @@ def _get_datetime_filter(self, datetime_): def _select_properties_clause(self, select_properties, skip_geometry): # List the column names that we want if select_properties: - column_names = sorted(set(select_properties), - key=select_properties.index) + column_names = sorted(set(select_properties), key=select_properties.index) else: # get_fields() doesn't include geometry column column_names = self.fields.keys() @@ -583,7 +568,7 @@ def _select_properties_clause(self, select_properties, skip_geometry): def store_db_parameters( self: GenericSQLProvider | Any, connection_data: str | dict[str], - options: dict[str, str] + options: dict[str, str], ) -> None: """ Store database connection parameters @@ -600,31 +585,24 @@ def store_db_parameters( else: self.db_conn = None # OR - self.db_user = connection_data.get('user') - self.db_host = connection_data.get('host') - self.db_port = connection_data.get('port', self.default_port) - self.db_name = ( - connection_data.get('dbname') or connection_data.get('database') - ) - self.db_query = connection_data.get('query') - self._db_password = connection_data.get('password') + self.db_user = connection_data.get("user") + self.db_host = connection_data.get("host") + self.db_port = connection_data.get("port", self.default_port) + self.db_name = connection_data.get("dbname") or connection_data.get("database") + self.db_query = connection_data.get("query") + self._db_password = connection_data.get("password") # db_search_path gets converted to a tuple here in order to ensure it # is hashable - which allows us to use functools.cache() when # reflecting the table definition from the DB self.db_search_path = tuple( - connection_data.get('search_path') or - options.pop('search_path', ['public']) + connection_data.get("search_path") or options.pop("search_path", ["public"]) ) # Connection-pool tuning keys (pool_size, max_overflow, pool_recycle, # pool_timeout, pool_pre_ping) are intentionally left in ``options`` and # flow through ``db_options`` to get_engine(), which separates them from # the DBAPI connect_args. Their types are validated by the config JSON # Schema, so no coercion is performed here. - self.db_options = { - k: v - for k, v in options.items() - if not isinstance(v, dict) - } + self.db_options = {k: v for k, v in options.items() if not isinstance(v, dict)} #: Connection-pool tuning keys recognised by get_engine(). These configure @@ -633,11 +611,11 @@ def store_db_parameters( #: behaviour exactly: SQLAlchemy's own QueuePool defaults, except for #: pool_pre_ping, which was previously hardcoded to True. POOL_OPTION_DEFAULTS = { - 'pool_size': 5, - 'max_overflow': 10, - 'pool_recycle': -1, # SQLAlchemy default; never recycles connections - 'pool_timeout': 30, - 'pool_pre_ping': True, + "pool_size": 5, + "max_overflow": 10, + "pool_recycle": -1, # SQLAlchemy default; never recycles connections + "pool_timeout": 30, + "pool_pre_ping": True, } @@ -650,7 +628,7 @@ def get_engine( user: str, password: str, conn_str: Optional[str] = None, - **connect_args + **connect_args, ) -> Engine: """ Get SQL Alchemy engine. @@ -677,7 +655,7 @@ def get_engine( password=password, host=host, port=int(port), - database=database + database=database, ) # Separate connection-pool tuning from DBAPI connect args. Pool keys are @@ -691,15 +669,10 @@ def get_engine( for key, default in POOL_OPTION_DEFAULTS.items() } - engine = create_engine( - conn_str, - connect_args=connect_args, - **pool_options - ) + engine = create_engine(conn_str, connect_args=connect_args, **pool_options) LOGGER.debug( - f'Created engine for {repr(engine.url)} ' - f'with pool options {pool_options}.' + f"Created engine for {repr(engine.url)} with pool options {pool_options}." ) return engine @@ -707,10 +680,7 @@ def get_engine( @functools.cache def get_table_model( - table_name: str, - id_field: str, - db_search_path: tuple[str], - engine: Engine + table_name: str, id_field: str, db_search_path: tuple[str], engine: Engine ) -> Table: """ Reflect table using SQLAlchemy Automap. @@ -722,35 +692,28 @@ def get_table_model( :returns: SQLAlchemy model of the reflected table """ - LOGGER.debug('Reflecting table definition from database') + LOGGER.debug("Reflecting table definition from database") metadata = MetaData() # Look for table in the first schema in the search path schema = db_search_path[0] try: - LOGGER.debug(f'Looking for table {table_name} in schema {schema}') - metadata.reflect( - bind=engine, schema=schema, only=[table_name], views=True - ) + LOGGER.debug(f"Looking for table {table_name} in schema {schema}") + metadata.reflect(bind=engine, schema=schema, only=[table_name], views=True) except OperationalError: raise ProviderConnectionError( - f'Could not connect to {repr(engine.url)} (password hidden).' + f"Could not connect to {repr(engine.url)} (password hidden)." ) except InvalidRequestError: msg = ( f"Table '{table_name}' not found in schema '{schema}' " - f'on {repr(engine.url)}.' + f"on {repr(engine.url)}." ) LOGGER.error(msg) if len(db_search_path) > 1: # If the table is not found in the first schema, try the next one - return get_table_model( - table_name, - id_field, - db_search_path[1:], - engine - ) + return get_table_model(table_name, id_field, db_search_path[1:], engine) else: # If the table is not found in any schema, raise an error raise ProviderQueryError(msg) @@ -759,12 +722,12 @@ def get_table_model( # It is necessary to add the primary key constraint because SQLAlchemy # requires it to reflect the table, but a view in a PostgreSQL database # does not have a primary key defined. - sqlalchemy_table_def = metadata.tables[f'{schema}.{table_name}'] + sqlalchemy_table_def = metadata.tables[f"{schema}.{table_name}"] try: sqlalchemy_table_def.append_constraint(PrimaryKeyConstraint(id_field)) except (ConstraintColumnNotFoundError, KeyError): raise ProviderQueryError( - f'No such id_field column ({id_field}) on {schema}.{table_name}.' + f"No such id_field column ({id_field}) on {schema}.{table_name}." ) _Base = automap_base(metadata=metadata) @@ -781,10 +744,10 @@ def _name_for_scalar_relationship(base, local_cls, referred_cls, constraint): name = referred_cls.__name__.lower() local_table = local_cls.__table__ if name in local_table.columns: - newname = name + '_' + newname = name + "_" LOGGER.debug( - f'Already detected column name {name!r} in table ' - f'{local_table!r}. Using {newname!r} for relationship name.' + f"Already detected column name {name!r} in table " + f"{local_table!r}. Using {newname!r} for relationship name." ) return newname return name @@ -794,6 +757,7 @@ class PostgreSQLProvider(GenericSQLProvider): """ A provider for querying a PostgreSQL database """ + default_port = 5432 def __init__(self, provider_def: dict): @@ -807,11 +771,8 @@ def __init__(self, provider_def: dict): :returns: pygeoapi.provider.sql.PostgreSQLProvider """ - driver_name = 'postgresql+psycopg2' - extra_conn_args = { - 'client_encoding': 'utf8', - 'application_name': 'pygeoapi' - } + driver_name = "postgresql+psycopg2" + extra_conn_args = {"client_encoding": "utf8", "application_name": "pygeoapi"} super().__init__(provider_def, driver_name, extra_conn_args) def _get_bbox_filter(self, bbox: list[float]): @@ -834,6 +795,7 @@ class MySQLProvider(GenericSQLProvider): """ A provider for a MySQL database """ + default_port = 3306 def __init__(self, provider_def: dict): @@ -847,10 +809,8 @@ def __init__(self, provider_def: dict): :returns: pygeoapi.provider.sql.MySQLProvider """ - driver_name = 'mysql+pymysql' - extra_conn_args = { - 'charset': 'utf8mb4' - } + driver_name = "mysql+pymysql" + extra_conn_args = {"charset": "utf8mb4"} super().__init__(provider_def, driver_name, extra_conn_args) def _get_bbox_filter(self, bbox: list[float]): @@ -866,12 +826,10 @@ def _get_bbox_filter(self, bbox: list[float]): # Create WKT POLYGON from bbox: (minx, miny, maxx, maxy) minx, miny, maxx, maxy = bbox - polygon_wkt = f'POLYGON(({minx} {miny}, {maxx} {miny}, {maxx} {maxy}, {minx} {maxy}, {minx} {miny}))' # noqa + polygon_wkt = f"POLYGON(({minx} {miny}, {maxx} {miny}, {maxx} {maxy}, {minx} {maxy}, {minx} {miny}))" # noqa geom_column = getattr(self.table_model, self.geom) # Use MySQL MBRContains for index-accelerated bounding box checks - bbox_filter = func.MBRContains( - func.ST_GeomFromText(polygon_wkt), geom_column - ) + bbox_filter = func.MBRContains(func.ST_GeomFromText(polygon_wkt), geom_column) return bbox_filter def get(self, identifier, crs_transform_spec=None, **kwargs): @@ -884,7 +842,7 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): :returns: GeoJSON FeatureCollection """ - LOGGER.debug(f'Get item by ID: {identifier}') + LOGGER.debug(f"Get item by ID: {identifier}") # Execute query within self-closing database Session context with Session(self._engine) as session: @@ -898,14 +856,14 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): assert str(feature_id) == identifier except (AssertionError, SQLAlchemyError) as e: LOGGER.debug(e, exc_info=True) - msg = f'No such item: {self.id_field}={identifier}.' + msg = f"No such item: {self.id_field}={identifier}." raise ProviderItemNotFoundError(msg) crs_transform_out = get_transform_from_spec(crs_transform_spec) feature = self._sqlalchemy_to_feature(item, crs_transform_out) # Drop non-defined properties if self.properties: - props = feature['properties'] + props = feature["properties"] dropping_keys = deepcopy(props).keys() for item in dropping_keys: if item not in self.properties: @@ -925,15 +883,15 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): .filter(id_field > feature_id) .first() ) - feature['prev'] = ( + feature["prev"] = ( getattr(prev_item, self.id_field) if prev_item is not None else feature_id ) - feature['next'] = ( + feature["next"] = ( getattr(next_item, self.id_field) if next_item is not None else feature_id ) - return feature \ No newline at end of file + return feature diff --git a/tests/provider/test_sql_pool_options.py b/tests/provider/test_sql_pool_options.py index 70f2817c6..06bac8846 100644 --- a/tests/provider/test_sql_pool_options.py +++ b/tests/provider/test_sql_pool_options.py @@ -9,21 +9,29 @@ from pygeoapi.provider import sql -@mock.patch.object(sql, 'create_engine') +@mock.patch.object(sql, "create_engine") def test_get_engine_separates_pool_options_from_connect_args(mock_create): sql.get_engine.cache_clear() sql.get_engine( - 'postgresql+psycopg2', 'h', 5432, 'd', 'u', 'p', None, - pool_size=2, pool_recycle=300, connect_timeout=10, + "postgresql+psycopg2", + "h", + 5432, + "d", + "u", + "p", + None, + pool_size=2, + pool_recycle=300, + connect_timeout=10, ) _, kwargs = mock_create.call_args # pool keys are applied to the engine (QueuePool), with overrides # honoured and unset pool keys falling back to the documented defaults - assert kwargs['pool_size'] == 2 - assert kwargs['pool_recycle'] == 300 - assert kwargs['max_overflow'] == 10 - assert kwargs['pool_timeout'] == 30 - assert kwargs['pool_pre_ping'] is True + assert kwargs["pool_size"] == 2 + assert kwargs["pool_recycle"] == 300 + assert kwargs["max_overflow"] == 10 + assert kwargs["pool_timeout"] == 30 + assert kwargs["pool_pre_ping"] is True # genuine DBAPI args are forwarded via connect_args; pool keys are not - assert kwargs['connect_args'] == {'connect_timeout': 10} \ No newline at end of file + assert kwargs["connect_args"] == {"connect_timeout": 10} From 04c0099655f40eb9a4b3547c510bd71a9423d3ea Mon Sep 17 00:00:00 2001 From: KoalaGeo Date: Mon, 8 Jun 2026 23:02:06 +0100 Subject: [PATCH 08/11] Revert "Ruff format" This reverts commit 9be0bf7e9d7f129cad96e58c85c84093139884f3, ruff used line length 88 instead of 79 --- pygeoapi/process/manager/postgresql.py | 116 ++++----- pygeoapi/provider/sql.py | 298 ++++++++++++++---------- tests/provider/test_sql_pool_options.py | 26 +-- 3 files changed, 242 insertions(+), 198 deletions(-) diff --git a/pygeoapi/process/manager/postgresql.py b/pygeoapi/process/manager/postgresql.py index f203a924c..c71ec69e2 100644 --- a/pygeoapi/process/manager/postgresql.py +++ b/pygeoapi/process/manager/postgresql.py @@ -45,7 +45,7 @@ String, Table, text, - update, + update ) from sqlalchemy.engine import Engine from sqlalchemy.orm import declarative_base, Session @@ -54,7 +54,7 @@ from pygeoapi.process.base import ( JobNotFoundError, JobResultNotFoundError, - ProcessorGenericError, + ProcessorGenericError ) from pygeoapi.process.manager.base import BaseManager from pygeoapi.provider.sql import get_engine, store_db_parameters @@ -81,21 +81,21 @@ def __init__(self, manager_def: dict): super().__init__(manager_def) self.is_async = True - self.id_field = "identifier" + self.id_field = 'identifier' self.supports_subscribing = True - self.connection = manager_def["connection"] + self.connection = manager_def['connection'] - options = manager_def.get("options", {}) - self._store_db_parameters(manager_def["connection"], options) + options = manager_def.get('options', {}) + self._store_db_parameters(manager_def['connection'], options) self._engine = get_engine( - "postgresql+psycopg2", + 'postgresql+psycopg2', self.db_host, self.db_port, self.db_name, self.db_user, self._db_password, self.db_conn, - **self.db_options, + **self.db_options ) self.table_output = self.output_dir is None @@ -104,14 +104,16 @@ def __init__(self, manager_def: dict): ) self.c = self.table_model.c try: - LOGGER.debug("Getting table model") + LOGGER.debug('Getting table model') except Exception as err: - msg = "Table model fetch failed" - LOGGER.error(f"{msg}: {err}") + msg = 'Table model fetch failed' + LOGGER.error(f'{msg}: {err}') raise ProcessorGenericError(msg) - def get_jobs(self, status: JobStatus = None, limit=None, offset=None) -> dict: + def get_jobs( + self, status: JobStatus = None, limit=None, offset=None + ) -> dict: """ Get jobs @@ -124,7 +126,7 @@ def get_jobs(self, status: JobStatus = None, limit=None, offset=None) -> dict: and numberMatched """ - LOGGER.debug("Querying for jobs") + LOGGER.debug('Querying for jobs') with Session(self._engine) as session: results = session.query(self.table_model) @@ -132,7 +134,7 @@ def get_jobs(self, status: JobStatus = None, limit=None, offset=None) -> dict: results = results.filter(self.c.status == status.value) jobs = [r._asdict() for r in results.all()] - return {"jobs": jobs, "numberMatched": len(jobs)} + return {'jobs': jobs, 'numberMatched': len(jobs)} def add_job(self, job_metadata: dict) -> str: """ @@ -143,18 +145,20 @@ def add_job(self, job_metadata: dict) -> str: :returns: identifier of added job """ - LOGGER.debug("Adding job") + LOGGER.debug('Adding job') with Session(self._engine) as session: try: - session.execute(insert(self.table_model).values(**job_metadata)) + session.execute( + insert(self.table_model).values(**job_metadata) + ) session.commit() except Exception as err: session.rollback() - msg = "Insert failed" - LOGGER.error(f"{msg}: {err}") + msg = 'Insert failed' + LOGGER.error(f'{msg}: {err}') raise ProcessorGenericError(msg) - return job_metadata["identifier"] + return job_metadata['identifier'] def update_job(self, job_id: str, update_dict: dict) -> bool: """ @@ -168,7 +172,7 @@ def update_job(self, job_id: str, update_dict: dict) -> bool: rowcount = 0 - LOGGER.debug("Updating job") + LOGGER.debug('Updating job') with Session(self._engine) as session: try: stmt = ( @@ -181,8 +185,8 @@ def update_job(self, job_id: str, update_dict: dict) -> bool: rowcount = result.rowcount except Exception as err: session.rollback() - msg = "Update failed" - LOGGER.error(f"{msg}: {err}") + msg = 'Update failed' + LOGGER.error(f'{msg}: {err}') raise ProcessorGenericError(msg) return rowcount == 1 @@ -198,7 +202,7 @@ def get_job(self, job_id: str) -> dict: :returns: `dict` # `pygeoapi.process.manager.Job` """ - LOGGER.debug("Querying for job") + LOGGER.debug('Querying for job') with Session(self._engine) as session: results = session.query(self.table_model).filter( self.c.identifier == job_id @@ -226,19 +230,21 @@ def delete_job(self, job_id: str) -> bool: # get result file if present for deletion job_result = self.get_job(job_id) - location = job_result.get("location") + location = job_result.get('location') - LOGGER.debug("Deleting job") + LOGGER.debug('Deleting job') with Session(self._engine) as session: try: - stmt = delete(self.table_model).where(self.c.identifier == job_id) + stmt = delete(self.table_model).where( + self.c.identifier == job_id + ) result = session.execute(stmt) session.commit() rowcount = result.rowcount except Exception as err: session.rollback() - msg = "Delete failed" - LOGGER.error(f"{msg}: {err}") + msg = 'Delete failed' + LOGGER.error(f'{msg}: {err}') raise ProcessorGenericError(msg) # delete result file if present @@ -264,24 +270,28 @@ def get_job_result(self, job_id: str) -> Tuple[str, Any]: """ job_result = self.get_job(job_id) - location = job_result.get("location") - mimetype = job_result.get("mimetype") - job_status = JobStatus[job_result["status"]] + location = job_result.get('location') + mimetype = job_result.get('mimetype') + job_status = JobStatus[job_result['status']] if job_status != JobStatus.successful: # Job is incomplete return (None,) if not location: - LOGGER.warning(f"job {job_id!r} - unknown result location") + LOGGER.warning(f'job {job_id!r} - unknown result location') raise JobResultNotFoundError() else: try: location = Path(location) - if mimetype in (None, FORMAT_TYPES[F_JSON], FORMAT_TYPES[F_JSONLD]): - with location.open("r", encoding="utf-8") as fh: + if mimetype in ( + None, + FORMAT_TYPES[F_JSON], + FORMAT_TYPES[F_JSONLD] + ): + with location.open('r', encoding='utf-8') as fh: result = json.load(fh) else: - with location.open("rb") as fh: + with location.open('rb') as fh: result = fh.read() except (TypeError, FileNotFoundError, json.JSONDecodeError): raise JobResultNotFoundError() @@ -289,7 +299,7 @@ def get_job_result(self, job_id: str) -> Tuple[str, Any]: return mimetype, result def __repr__(self): - return f" {self.name}" + return f' {self.name}' @functools.cache @@ -302,31 +312,31 @@ def get_table_model( schema = db_search_path[0] Jobs = Table( - "jobs", + 'jobs', Base.metadata, - Column("identifier", String, primary_key=True, nullable=False), + Column('identifier', String, primary_key=True, nullable=False), Column( - "type", + 'type', String, nullable=False, - server_default=text("'process'::character varying"), + server_default=text("'process'::character varying") ), - Column("process_id", String, nullable=False), - Column("created", DateTime), - Column("started", DateTime), - Column("finished", DateTime), - Column("updated", DateTime), - Column("status", String, nullable=False), - Column("location", String), - Column("mimetype", String), - Column("message", String), - Column("progress", Integer, nullable=False), - schema=schema, + Column('process_id', String, nullable=False), + Column('created', DateTime), + Column('started', DateTime), + Column('finished', DateTime), + Column('updated', DateTime), + Column('status', String, nullable=False), + Column('location', String), + Column('mimetype', String), + Column('message', String), + Column('progress', Integer, nullable=False), + schema=schema ) if table_output: - Jobs.append_column(Column("output", LargeBinary)) + Jobs.append_column(Column('output', LargeBinary)) Base.metadata.create_all(engine, tables=[Jobs], checkfirst=True) - return Jobs + return Jobs \ No newline at end of file diff --git a/pygeoapi/provider/sql.py b/pygeoapi/provider/sql.py index f0a960526..b5b773ffb 100644 --- a/pygeoapi/provider/sql.py +++ b/pygeoapi/provider/sql.py @@ -52,13 +52,20 @@ from pygeofilter.backends.sqlalchemy.evaluate import to_filter import shapely from sqlalchemy.sql import func -from sqlalchemy import create_engine, MetaData, PrimaryKeyConstraint, asc, desc, delete +from sqlalchemy import ( + create_engine, + MetaData, + PrimaryKeyConstraint, + asc, + desc, + delete +) from sqlalchemy.engine import URL, Engine from sqlalchemy.exc import ( ConstraintColumnNotFoundError, InvalidRequestError, OperationalError, - SQLAlchemyError, + SQLAlchemyError ) from sqlalchemy.ext.automap import automap_base from sqlalchemy.orm import Session, load_only @@ -71,7 +78,7 @@ ProviderConnectionError, ProviderInvalidDataError, ProviderQueryError, - ProviderItemNotFoundError, + ProviderItemNotFoundError ) LOGGER = logging.getLogger(__name__) @@ -84,7 +91,10 @@ class GenericSQLProvider(BaseProvider): """ def __init__( - self, provider_def: dict, driver_name: str, extra_conn_args: Optional[dict] = {} + self, + provider_def: dict, + driver_name: str, + extra_conn_args: Optional[dict] = {} ): """ GenericSQLProvider Class constructor @@ -99,23 +109,23 @@ def __init__( :returns: pygeoapi.provider.GenericSQLProvider """ - LOGGER.debug("Initialising GenericSQL provider.") + LOGGER.debug('Initialising GenericSQL provider.') super().__init__(provider_def) - self.table = provider_def["table"] - self.id_field = provider_def["id_field"] - self.geom = provider_def.get("geom_field", "geom") + self.table = provider_def['table'] + self.id_field = provider_def['id_field'] + self.geom = provider_def.get('geom_field', 'geom') self.driver_name = driver_name - LOGGER.debug(f"Name: {self.name}") - LOGGER.debug(f"Table: {self.table}") - LOGGER.debug(f"ID field: {self.id_field}") - LOGGER.debug(f"Geometry field: {self.geom}") - LOGGER.debug(f"Configured Storage CRS: {self.storage_crs}") + LOGGER.debug(f'Name: {self.name}') + LOGGER.debug(f'Table: {self.table}') + LOGGER.debug(f'ID field: {self.id_field}') + LOGGER.debug(f'Geometry field: {self.geom}') + LOGGER.debug(f'Configured Storage CRS: {self.storage_crs}') # Read table information from database - options = provider_def.get("options", {}) | extra_conn_args - store_db_parameters(self, provider_def["data"], options) + options = provider_def.get('options', {}) | extra_conn_args + store_db_parameters(self, provider_def['data'], options) self._engine = get_engine( driver_name, self.db_host, @@ -124,7 +134,7 @@ def __init__( self.db_user, self._db_password, self.db_conn, - **self.db_options, + **self.db_options ) self.table_model = get_table_model( self.table, self.id_field, self.db_search_path, self._engine @@ -136,7 +146,7 @@ def query( self, offset=0, limit=10, - resulttype="results", + resulttype='results', bbox=[], datetime_=None, properties=[], @@ -146,7 +156,7 @@ def query( q=None, filterq=None, crs_transform_spec=None, - **kwargs, + **kwargs ): """ Query sql database for all the content. @@ -169,7 +179,7 @@ def query( :returns: GeoJSON FeatureCollection """ - LOGGER.debug("Preparing filters") + LOGGER.debug('Preparing filters') property_filters = self._get_property_filters(properties) cql_filters = self._get_cql_filters(filterq) bbox_filter = self._get_bbox_filter(bbox) @@ -179,7 +189,7 @@ def query( select_properties, skip_geometry ) - LOGGER.debug("Querying Database") + LOGGER.debug('Querying Database') # Execute query within self-closing database Session context with Session(self._engine) as session: results = ( @@ -191,32 +201,33 @@ def query( .options(selected_properties) ) - LOGGER.debug("Preparing response") + LOGGER.debug('Preparing response') response = { - "type": "FeatureCollection", - "features": [], - "numberReturned": 0, + 'type': 'FeatureCollection', + 'features': [], + 'numberReturned': 0 } - if self.count or resulttype == "hits": + if self.count or resulttype == 'hits': matched = results.count() - response["numberMatched"] = matched - LOGGER.debug(f"Found {matched} result(s)") + response['numberMatched'] = matched + LOGGER.debug(f'Found {matched} result(s)') else: - LOGGER.debug("Count disabled") + LOGGER.debug('Count disabled') - if resulttype == "hits" or not results: + if resulttype == 'hits' or not results: return response crs_transform_out = get_transform_from_spec(crs_transform_spec) - response["numberReturned"] = 0 - for item in results.order_by(*order_by_clauses).offset(offset).limit(limit): - response["numberReturned"] += 1 - response["features"].append( - self._sqlalchemy_to_feature( - item, crs_transform_out, select_properties - ) + response['numberReturned'] = 0 + for item in ( + results.order_by(*order_by_clauses).offset(offset).limit(limit) + ): + response['numberReturned'] += 1 + response['features'].append( + self._sqlalchemy_to_feature(item, crs_transform_out, + select_properties) ) return response @@ -228,41 +239,41 @@ def get_fields(self): :returns: dict of fields """ - LOGGER.debug("Get available fields/properties") + LOGGER.debug('Get available fields/properties') # sql-schema only allows these types, so we need to map from sqlalchemy # string, number, integer, object, array, boolean, null, # https://json-schema.org/understanding-json-schema/reference/type.html column_type_map = { - bool: "boolean", - datetime: "string", - Decimal: "number", - dict: "object", - float: "number", - int: "integer", - str: "string", + bool: 'boolean', + datetime: 'string', + Decimal: 'number', + dict: 'object', + float: 'number', + int: 'integer', + str: 'string' } - default_type = "string" + default_type = 'string' # https://json-schema.org/understanding-json-schema/reference/string#built-in-formats # noqa column_format_map = { - "date": "date", - "interval": "duration", - "time": "time", - "timestamp": "date-time", + 'date': 'date', + 'interval': 'duration', + 'time': 'time', + 'timestamp': 'date-time' } def _column_type_to_json_schema_type(column_type): try: python_type = column_type.python_type except NotImplementedError: - LOGGER.warning(f"Unsupported column type {column_type}") + LOGGER.warning(f'Unsupported column type {column_type}') return default_type else: try: return column_type_map[python_type] except KeyError: - LOGGER.warning(f"Unsupported column type {column_type}") + LOGGER.warning(f'Unsupported column type {column_type}') return default_type def _column_format_to_json_schema_format(column_type): @@ -270,18 +281,20 @@ def _column_format_to_json_schema_format(column_type): ct = str(column_type).lower() return column_format_map[ct] except KeyError: - LOGGER.debug("No string format detected") + LOGGER.debug('No string format detected') return None if not self._fields: for column in self.table_model.__table__.columns: - LOGGER.debug(f"Testing {column.name}") + LOGGER.debug(f'Testing {column.name}') if column.name == self.geom: continue self._fields[str(column.name)] = { - "type": _column_type_to_json_schema_type(column.type), - "format": _column_format_to_json_schema_format(column.type), + 'type': _column_type_to_json_schema_type(column.type), + 'format': _column_format_to_json_schema_format( + column.type + ) } return self._fields @@ -296,7 +309,7 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): :returns: GeoJSON FeatureCollection """ - LOGGER.debug(f"Get item by ID: {identifier}") + LOGGER.debug(f'Get item by ID: {identifier}') # Execute query within self-closing database Session context with Session(self._engine) as session: @@ -307,14 +320,14 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): assert item is not None except (AssertionError, SQLAlchemyError) as e: LOGGER.debug(e, exc_info=True) - msg = f"No such item: {self.id_field}={identifier}." + msg = f'No such item: {self.id_field}={identifier}.' raise ProviderItemNotFoundError(msg) crs_transform_out = get_transform_from_spec(crs_transform_spec) feature = self._sqlalchemy_to_feature(item, crs_transform_out) # Drop non-defined properties if self.properties: - props = feature["properties"] + props = feature['properties'] dropping_keys = deepcopy(props).keys() for item in dropping_keys: if item not in self.properties: @@ -334,12 +347,12 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): .filter(id_field > identifier) .first() ) - feature["prev"] = ( + feature['prev'] = ( getattr(prev_item, self.id_field) if prev_item is not None else identifier ) - feature["next"] = ( + feature['next'] = ( getattr(next_item, self.id_field) if next_item is not None else identifier @@ -379,7 +392,9 @@ def update(self, identifier, item): :returns: `bool` of update result """ - identifier, json_data = self._load_and_prepare_item(item, raise_if_exists=False) + identifier, json_data = self._load_and_prepare_item( + item, raise_if_exists=False + ) new_instance = self._feature_to_sqlalchemy(json_data, identifier) with Session(self._engine) as session: @@ -405,9 +420,8 @@ def delete(self, identifier): return result.rowcount > 0 - def _sqlalchemy_to_feature( - self, item, crs_transform_out=None, select_properties=[] - ): + def _sqlalchemy_to_feature(self, item, crs_transform_out=None, + select_properties=[]): """ Helper function to transform an SQLAlchemy result to a GeoJSON feature. @@ -419,12 +433,12 @@ def _sqlalchemy_to_feature( :returns: `dict` of GeoJSON feature """ - feature = {"type": "Feature", "properties": {}} + feature = {'type': 'Feature', 'properties': {}} item_dict = item.__dict__ # set feature id - feature["id"] = item_dict[self.id_field] + feature['id'] = item_dict[self.id_field] # Convert geometry to GeoJSON style if item_dict.get(self.geom) is not None: @@ -436,40 +450,40 @@ def _sqlalchemy_to_feature( if crs_transform_out is not None: shapely_geom = crs_transform_out(shapely_geom) geojson_geom = shapely.geometry.mapping(shapely_geom) - feature["geometry"] = geojson_geom + feature['geometry'] = geojson_geom else: - feature["geometry"] = None + feature['geometry'] = None keys = select_properties or self.fields.keys() for key in keys: if key in item_dict: - feature["properties"][key] = item_dict[key] + feature['properties'][key] = item_dict[key] return feature def _feature_to_sqlalchemy(self, json_data, identifier=None): - attributes = {**json_data["properties"]} + attributes = {**json_data['properties']} # 'identifier' key maybe be present in geojson properties, but might # not be a valid db field - attributes.pop("identifier", None) + attributes.pop('identifier', None) attributes[self.geom] = from_shape( - shapely.geometry.shape(json_data["geometry"]), - srid=get_srid(self.storage_crs), + shapely.geometry.shape(json_data['geometry']), + srid=get_srid(self.storage_crs) ) attributes[self.id_field] = identifier try: return self.table_model(**attributes) except Exception as e: - LOGGER.exception("Failed to create db model") + LOGGER.exception('Failed to create db model') raise ProviderInvalidDataError(str(e)) def _get_order_by_clauses(self, sort_by, table_model): # Build sort_by clauses if provided clauses = [] for sort_by_dict in sort_by: - model_column = getattr(table_model, sort_by_dict["property"]) - order_function = asc if sort_by_dict["order"] == "+" else desc + model_column = getattr(table_model, sort_by_dict['property']) + order_function = asc if sort_by_dict['order'] == '+' else desc clauses.append(order_function(model_column)) # Otherwise sort by primary key (to ensure reproducible output) @@ -514,21 +528,21 @@ def _get_bbox_filter(self, bbox: list[float]): raise NotImplementedError def _get_datetime_filter(self, datetime_): - if datetime_ in (None, "../.."): + if datetime_ in (None, '../..'): return True else: if self.time_field is None: - LOGGER.error("time_field not enabled for collection") + LOGGER.error('time_field not enabled for collection') raise ProviderQueryError() time_column = getattr(self.table_model, self.time_field) - if "/" in datetime_: # envelope - LOGGER.debug("detected time range") - time_begin, time_end = datetime_.split("/") - if time_begin == "..": + if '/' in datetime_: # envelope + LOGGER.debug('detected time range') + time_begin, time_end = datetime_.split('/') + if time_begin == '..': datetime_filter = time_column <= time_end - elif time_end == "..": + elif time_end == '..': datetime_filter = time_column >= time_begin else: datetime_filter = time_column.between(time_begin, time_end) @@ -539,7 +553,8 @@ def _get_datetime_filter(self, datetime_): def _select_properties_clause(self, select_properties, skip_geometry): # List the column names that we want if select_properties: - column_names = sorted(set(select_properties), key=select_properties.index) + column_names = sorted(set(select_properties), + key=select_properties.index) else: # get_fields() doesn't include geometry column column_names = self.fields.keys() @@ -568,7 +583,7 @@ def _select_properties_clause(self, select_properties, skip_geometry): def store_db_parameters( self: GenericSQLProvider | Any, connection_data: str | dict[str], - options: dict[str, str], + options: dict[str, str] ) -> None: """ Store database connection parameters @@ -585,24 +600,31 @@ def store_db_parameters( else: self.db_conn = None # OR - self.db_user = connection_data.get("user") - self.db_host = connection_data.get("host") - self.db_port = connection_data.get("port", self.default_port) - self.db_name = connection_data.get("dbname") or connection_data.get("database") - self.db_query = connection_data.get("query") - self._db_password = connection_data.get("password") + self.db_user = connection_data.get('user') + self.db_host = connection_data.get('host') + self.db_port = connection_data.get('port', self.default_port) + self.db_name = ( + connection_data.get('dbname') or connection_data.get('database') + ) + self.db_query = connection_data.get('query') + self._db_password = connection_data.get('password') # db_search_path gets converted to a tuple here in order to ensure it # is hashable - which allows us to use functools.cache() when # reflecting the table definition from the DB self.db_search_path = tuple( - connection_data.get("search_path") or options.pop("search_path", ["public"]) + connection_data.get('search_path') or + options.pop('search_path', ['public']) ) # Connection-pool tuning keys (pool_size, max_overflow, pool_recycle, # pool_timeout, pool_pre_ping) are intentionally left in ``options`` and # flow through ``db_options`` to get_engine(), which separates them from # the DBAPI connect_args. Their types are validated by the config JSON # Schema, so no coercion is performed here. - self.db_options = {k: v for k, v in options.items() if not isinstance(v, dict)} + self.db_options = { + k: v + for k, v in options.items() + if not isinstance(v, dict) + } #: Connection-pool tuning keys recognised by get_engine(). These configure @@ -611,11 +633,11 @@ def store_db_parameters( #: behaviour exactly: SQLAlchemy's own QueuePool defaults, except for #: pool_pre_ping, which was previously hardcoded to True. POOL_OPTION_DEFAULTS = { - "pool_size": 5, - "max_overflow": 10, - "pool_recycle": -1, # SQLAlchemy default; never recycles connections - "pool_timeout": 30, - "pool_pre_ping": True, + 'pool_size': 5, + 'max_overflow': 10, + 'pool_recycle': -1, # SQLAlchemy default; never recycles connections + 'pool_timeout': 30, + 'pool_pre_ping': True, } @@ -628,7 +650,7 @@ def get_engine( user: str, password: str, conn_str: Optional[str] = None, - **connect_args, + **connect_args ) -> Engine: """ Get SQL Alchemy engine. @@ -655,7 +677,7 @@ def get_engine( password=password, host=host, port=int(port), - database=database, + database=database ) # Separate connection-pool tuning from DBAPI connect args. Pool keys are @@ -669,10 +691,15 @@ def get_engine( for key, default in POOL_OPTION_DEFAULTS.items() } - engine = create_engine(conn_str, connect_args=connect_args, **pool_options) + engine = create_engine( + conn_str, + connect_args=connect_args, + **pool_options + ) LOGGER.debug( - f"Created engine for {repr(engine.url)} with pool options {pool_options}." + f'Created engine for {repr(engine.url)} ' + f'with pool options {pool_options}.' ) return engine @@ -680,7 +707,10 @@ def get_engine( @functools.cache def get_table_model( - table_name: str, id_field: str, db_search_path: tuple[str], engine: Engine + table_name: str, + id_field: str, + db_search_path: tuple[str], + engine: Engine ) -> Table: """ Reflect table using SQLAlchemy Automap. @@ -692,28 +722,35 @@ def get_table_model( :returns: SQLAlchemy model of the reflected table """ - LOGGER.debug("Reflecting table definition from database") + LOGGER.debug('Reflecting table definition from database') metadata = MetaData() # Look for table in the first schema in the search path schema = db_search_path[0] try: - LOGGER.debug(f"Looking for table {table_name} in schema {schema}") - metadata.reflect(bind=engine, schema=schema, only=[table_name], views=True) + LOGGER.debug(f'Looking for table {table_name} in schema {schema}') + metadata.reflect( + bind=engine, schema=schema, only=[table_name], views=True + ) except OperationalError: raise ProviderConnectionError( - f"Could not connect to {repr(engine.url)} (password hidden)." + f'Could not connect to {repr(engine.url)} (password hidden).' ) except InvalidRequestError: msg = ( f"Table '{table_name}' not found in schema '{schema}' " - f"on {repr(engine.url)}." + f'on {repr(engine.url)}.' ) LOGGER.error(msg) if len(db_search_path) > 1: # If the table is not found in the first schema, try the next one - return get_table_model(table_name, id_field, db_search_path[1:], engine) + return get_table_model( + table_name, + id_field, + db_search_path[1:], + engine + ) else: # If the table is not found in any schema, raise an error raise ProviderQueryError(msg) @@ -722,12 +759,12 @@ def get_table_model( # It is necessary to add the primary key constraint because SQLAlchemy # requires it to reflect the table, but a view in a PostgreSQL database # does not have a primary key defined. - sqlalchemy_table_def = metadata.tables[f"{schema}.{table_name}"] + sqlalchemy_table_def = metadata.tables[f'{schema}.{table_name}'] try: sqlalchemy_table_def.append_constraint(PrimaryKeyConstraint(id_field)) except (ConstraintColumnNotFoundError, KeyError): raise ProviderQueryError( - f"No such id_field column ({id_field}) on {schema}.{table_name}." + f'No such id_field column ({id_field}) on {schema}.{table_name}.' ) _Base = automap_base(metadata=metadata) @@ -744,10 +781,10 @@ def _name_for_scalar_relationship(base, local_cls, referred_cls, constraint): name = referred_cls.__name__.lower() local_table = local_cls.__table__ if name in local_table.columns: - newname = name + "_" + newname = name + '_' LOGGER.debug( - f"Already detected column name {name!r} in table " - f"{local_table!r}. Using {newname!r} for relationship name." + f'Already detected column name {name!r} in table ' + f'{local_table!r}. Using {newname!r} for relationship name.' ) return newname return name @@ -757,7 +794,6 @@ class PostgreSQLProvider(GenericSQLProvider): """ A provider for querying a PostgreSQL database """ - default_port = 5432 def __init__(self, provider_def: dict): @@ -771,8 +807,11 @@ def __init__(self, provider_def: dict): :returns: pygeoapi.provider.sql.PostgreSQLProvider """ - driver_name = "postgresql+psycopg2" - extra_conn_args = {"client_encoding": "utf8", "application_name": "pygeoapi"} + driver_name = 'postgresql+psycopg2' + extra_conn_args = { + 'client_encoding': 'utf8', + 'application_name': 'pygeoapi' + } super().__init__(provider_def, driver_name, extra_conn_args) def _get_bbox_filter(self, bbox: list[float]): @@ -795,7 +834,6 @@ class MySQLProvider(GenericSQLProvider): """ A provider for a MySQL database """ - default_port = 3306 def __init__(self, provider_def: dict): @@ -809,8 +847,10 @@ def __init__(self, provider_def: dict): :returns: pygeoapi.provider.sql.MySQLProvider """ - driver_name = "mysql+pymysql" - extra_conn_args = {"charset": "utf8mb4"} + driver_name = 'mysql+pymysql' + extra_conn_args = { + 'charset': 'utf8mb4' + } super().__init__(provider_def, driver_name, extra_conn_args) def _get_bbox_filter(self, bbox: list[float]): @@ -826,10 +866,12 @@ def _get_bbox_filter(self, bbox: list[float]): # Create WKT POLYGON from bbox: (minx, miny, maxx, maxy) minx, miny, maxx, maxy = bbox - polygon_wkt = f"POLYGON(({minx} {miny}, {maxx} {miny}, {maxx} {maxy}, {minx} {maxy}, {minx} {miny}))" # noqa + polygon_wkt = f'POLYGON(({minx} {miny}, {maxx} {miny}, {maxx} {maxy}, {minx} {maxy}, {minx} {miny}))' # noqa geom_column = getattr(self.table_model, self.geom) # Use MySQL MBRContains for index-accelerated bounding box checks - bbox_filter = func.MBRContains(func.ST_GeomFromText(polygon_wkt), geom_column) + bbox_filter = func.MBRContains( + func.ST_GeomFromText(polygon_wkt), geom_column + ) return bbox_filter def get(self, identifier, crs_transform_spec=None, **kwargs): @@ -842,7 +884,7 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): :returns: GeoJSON FeatureCollection """ - LOGGER.debug(f"Get item by ID: {identifier}") + LOGGER.debug(f'Get item by ID: {identifier}') # Execute query within self-closing database Session context with Session(self._engine) as session: @@ -856,14 +898,14 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): assert str(feature_id) == identifier except (AssertionError, SQLAlchemyError) as e: LOGGER.debug(e, exc_info=True) - msg = f"No such item: {self.id_field}={identifier}." + msg = f'No such item: {self.id_field}={identifier}.' raise ProviderItemNotFoundError(msg) crs_transform_out = get_transform_from_spec(crs_transform_spec) feature = self._sqlalchemy_to_feature(item, crs_transform_out) # Drop non-defined properties if self.properties: - props = feature["properties"] + props = feature['properties'] dropping_keys = deepcopy(props).keys() for item in dropping_keys: if item not in self.properties: @@ -883,15 +925,15 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): .filter(id_field > feature_id) .first() ) - feature["prev"] = ( + feature['prev'] = ( getattr(prev_item, self.id_field) if prev_item is not None else feature_id ) - feature["next"] = ( + feature['next'] = ( getattr(next_item, self.id_field) if next_item is not None else feature_id ) - return feature + return feature \ No newline at end of file diff --git a/tests/provider/test_sql_pool_options.py b/tests/provider/test_sql_pool_options.py index 06bac8846..70f2817c6 100644 --- a/tests/provider/test_sql_pool_options.py +++ b/tests/provider/test_sql_pool_options.py @@ -9,29 +9,21 @@ from pygeoapi.provider import sql -@mock.patch.object(sql, "create_engine") +@mock.patch.object(sql, 'create_engine') def test_get_engine_separates_pool_options_from_connect_args(mock_create): sql.get_engine.cache_clear() sql.get_engine( - "postgresql+psycopg2", - "h", - 5432, - "d", - "u", - "p", - None, - pool_size=2, - pool_recycle=300, - connect_timeout=10, + 'postgresql+psycopg2', 'h', 5432, 'd', 'u', 'p', None, + pool_size=2, pool_recycle=300, connect_timeout=10, ) _, kwargs = mock_create.call_args # pool keys are applied to the engine (QueuePool), with overrides # honoured and unset pool keys falling back to the documented defaults - assert kwargs["pool_size"] == 2 - assert kwargs["pool_recycle"] == 300 - assert kwargs["max_overflow"] == 10 - assert kwargs["pool_timeout"] == 30 - assert kwargs["pool_pre_ping"] is True + assert kwargs['pool_size'] == 2 + assert kwargs['pool_recycle'] == 300 + assert kwargs['max_overflow'] == 10 + assert kwargs['pool_timeout'] == 30 + assert kwargs['pool_pre_ping'] is True # genuine DBAPI args are forwarded via connect_args; pool keys are not - assert kwargs["connect_args"] == {"connect_timeout": 10} + assert kwargs['connect_args'] == {'connect_timeout': 10} \ No newline at end of file From 58c0d96f4e7bb24623faafa150d3be0129cb37de Mon Sep 17 00:00:00 2001 From: KoalaGeo Date: Mon, 8 Jun 2026 23:04:28 +0100 Subject: [PATCH 09/11] Ensure files end with a newline --- pygeoapi/process/manager/postgresql.py | 2 +- pygeoapi/provider/sql.py | 3 ++- tests/provider/test_sql_pool_options.py | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pygeoapi/process/manager/postgresql.py b/pygeoapi/process/manager/postgresql.py index c71ec69e2..7a2adc559 100644 --- a/pygeoapi/process/manager/postgresql.py +++ b/pygeoapi/process/manager/postgresql.py @@ -339,4 +339,4 @@ def get_table_model( Base.metadata.create_all(engine, tables=[Jobs], checkfirst=True) - return Jobs \ No newline at end of file + return Jobs diff --git a/pygeoapi/provider/sql.py b/pygeoapi/provider/sql.py index b5b773ffb..85361aaaa 100644 --- a/pygeoapi/provider/sql.py +++ b/pygeoapi/provider/sql.py @@ -936,4 +936,5 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): else feature_id ) - return feature \ No newline at end of file + return feature + \ No newline at end of file diff --git a/tests/provider/test_sql_pool_options.py b/tests/provider/test_sql_pool_options.py index 70f2817c6..6c208b905 100644 --- a/tests/provider/test_sql_pool_options.py +++ b/tests/provider/test_sql_pool_options.py @@ -26,4 +26,5 @@ def test_get_engine_separates_pool_options_from_connect_args(mock_create): assert kwargs['pool_timeout'] == 30 assert kwargs['pool_pre_ping'] is True # genuine DBAPI args are forwarded via connect_args; pool keys are not - assert kwargs['connect_args'] == {'connect_timeout': 10} \ No newline at end of file + assert kwargs['connect_args'] == {'connect_timeout': 10} + \ No newline at end of file From 60f3c4aae47e477f53d7e0b9ecd8be0b4d36f967 Mon Sep 17 00:00:00 2001 From: KoalaGeo Date: Mon, 8 Jun 2026 23:06:47 +0100 Subject: [PATCH 10/11] Normalize file endings --- pygeoapi/provider/sql.py | 1 - tests/provider/test_sql_pool_options.py | 1 - 2 files changed, 2 deletions(-) diff --git a/pygeoapi/provider/sql.py b/pygeoapi/provider/sql.py index 85361aaaa..121f48ac2 100644 --- a/pygeoapi/provider/sql.py +++ b/pygeoapi/provider/sql.py @@ -937,4 +937,3 @@ def get(self, identifier, crs_transform_spec=None, **kwargs): ) return feature - \ No newline at end of file diff --git a/tests/provider/test_sql_pool_options.py b/tests/provider/test_sql_pool_options.py index 6c208b905..327f8bebd 100644 --- a/tests/provider/test_sql_pool_options.py +++ b/tests/provider/test_sql_pool_options.py @@ -27,4 +27,3 @@ def test_get_engine_separates_pool_options_from_connect_args(mock_create): assert kwargs['pool_pre_ping'] is True # genuine DBAPI args are forwarded via connect_args; pool keys are not assert kwargs['connect_args'] == {'connect_timeout': 10} - \ No newline at end of file From 0b0f68406944e786c1356e090bf797afbf2d481d Mon Sep 17 00:00:00 2001 From: KoalaGeo Date: Tue, 9 Jun 2026 09:58:31 +0100 Subject: [PATCH 11/11] Add copyright and license to SQL pooling test --- tests/provider/test_sql_pool_options.py | 27 +++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/provider/test_sql_pool_options.py b/tests/provider/test_sql_pool_options.py index 327f8bebd..3c3419c2a 100644 --- a/tests/provider/test_sql_pool_options.py +++ b/tests/provider/test_sql_pool_options.py @@ -1,4 +1,31 @@ # ================================================================= +# +# Authors: Edward Lewis +# +# Copyright (c) 2026 Edward Lewis +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= # Test that get_engine() separates SQLAlchemy connection-pool tuning # options from DBAPI connect_args. This is the contract introduced by # the configurable-pool change; it needs no live database.