Skip to content

Commit 9edce20

Browse files
authored
Use declarative SQLalchemy Job Table (#2341)
* Use declarative SQLalchemy Table Use declarative SQLAlchemy table and create if not present in the schema * Cache get_table_model
1 parent bc5e58e commit 9edce20

3 files changed

Lines changed: 87 additions & 119 deletions

File tree

.github/workflows/main.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ jobs:
139139
gunzip < tests/data/hotosm_bdi_waterways.sql.gz | psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test
140140
psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/dummy_data.sql
141141
psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/dummy_types_data.sql
142-
psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/postgres_manager_full_structure.backup.sql
143142
mysql -h 127.0.0.1 -P 3306 -u root -p'mysql' test_geo_app < tests/data/mysql_data.sql
144143
docker ps
145144
- name: run API tests ⚙️

pygeoapi/process/manager/postgresql.py

Lines changed: 87 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -29,37 +29,35 @@
2929
#
3030
# =================================================================
3131

32-
# Requires postgresql database structure.
33-
# Create the database:
34-
# e.g.
35-
# CREATE DATABASE test
36-
# WITH TEMPLATE = template0
37-
# ENCODING = 'UTF8'
38-
# LOCALE = 'en_US.UTF-8';
39-
# ALTER DATABASE test OWNER TO postgres;
40-
#
41-
# Import dump:
42-
# psql -U postgres -h 127.0.0.1 -p 5432 test <
43-
# tests/data/postgres_manager_full_structure.backup.sql
44-
32+
import functools
4533
import json
4634
import logging
4735
from pathlib import Path
4836
from typing import Any, Tuple
4937

50-
from sqlalchemy import insert, update, delete
51-
from sqlalchemy.orm import Session
38+
from sqlalchemy import (
39+
Column,
40+
DateTime,
41+
delete,
42+
insert,
43+
Integer,
44+
LargeBinary,
45+
String,
46+
Table,
47+
text,
48+
update
49+
)
50+
from sqlalchemy.engine import Engine
51+
from sqlalchemy.orm import declarative_base, Session
5252

53+
from pygeoapi.formats import F_JSON, F_JSONLD, FORMAT_TYPES
5354
from pygeoapi.process.base import (
5455
JobNotFoundError,
5556
JobResultNotFoundError,
5657
ProcessorGenericError
5758
)
58-
from pygeoapi.formats import FORMAT_TYPES, F_JSON, F_JSONLD
5959
from pygeoapi.process.manager.base import BaseManager
60-
from pygeoapi.provider.sql import (
61-
get_engine, get_table_model, store_db_parameters
62-
)
60+
from pygeoapi.provider.sql import get_engine, store_db_parameters
6361
from pygeoapi.util import JobStatus
6462

6563

@@ -70,6 +68,7 @@ class PostgreSQLManager(BaseManager):
7068
"""PostgreSQL Manager"""
7169

7270
default_port = 5432
71+
_store_db_parameters = store_db_parameters
7372

7473
def __init__(self, manager_def: dict):
7574
"""
@@ -87,7 +86,7 @@ def __init__(self, manager_def: dict):
8786
self.connection = manager_def['connection']
8887

8988
options = manager_def.get('options', {})
90-
store_db_parameters(self, manager_def['connection'], options)
89+
self._store_db_parameters(manager_def['connection'], options)
9190
self._engine = get_engine(
9291
'postgresql+psycopg2',
9392
self.db_host,
@@ -98,22 +97,23 @@ def __init__(self, manager_def: dict):
9897
self.db_conn,
9998
**self.db_options
10099
)
100+
self.table_output = self.output_dir is None
101101

102+
self.table_model = get_table_model(
103+
self.db_search_path, self._engine, self.table_output
104+
)
105+
self.c = self.table_model.c
102106
try:
103107
LOGGER.debug('Getting table model')
104-
self.table_model = get_table_model(
105-
'jobs',
106-
self.id_field,
107-
self.db_search_path,
108-
self._engine
109-
)
108+
110109
except Exception as err:
111110
msg = 'Table model fetch failed'
112111
LOGGER.error(f'{msg}: {err}')
113112
raise ProcessorGenericError(msg)
114113

115-
def get_jobs(self, status: JobStatus = None, limit=None, offset=None
116-
) -> dict:
114+
def get_jobs(
115+
self, status: JobStatus = None, limit=None, offset=None
116+
) -> dict:
117117
"""
118118
Get jobs
119119
@@ -129,15 +129,12 @@ def get_jobs(self, status: JobStatus = None, limit=None, offset=None
129129
LOGGER.debug('Querying for jobs')
130130
with Session(self._engine) as session:
131131
results = session.query(self.table_model)
132+
132133
if status is not None:
133-
column = getattr(self.table_model, 'status')
134-
results = results.filter(column == status.value)
134+
results = results.filter(self.c.status == status.value)
135135

136-
jobs = [r.__dict__ for r in results.all()]
137-
return {
138-
'jobs': jobs,
139-
'numberMatched': len(jobs)
140-
}
136+
jobs = [r._asdict() for r in results.all()]
137+
return {'jobs': jobs, 'numberMatched': len(jobs)}
141138

142139
def add_job(self, job_metadata: dict) -> str:
143140
"""
@@ -151,8 +148,9 @@ def add_job(self, job_metadata: dict) -> str:
151148
LOGGER.debug('Adding job')
152149
with Session(self._engine) as session:
153150
try:
154-
session.execute(insert(self.table_model)
155-
.values(**job_metadata))
151+
session.execute(
152+
insert(self.table_model).values(**job_metadata)
153+
)
156154
session.commit()
157155
except Exception as err:
158156
session.rollback()
@@ -177,10 +175,9 @@ def update_job(self, job_id: str, update_dict: dict) -> bool:
177175
LOGGER.debug('Updating job')
178176
with Session(self._engine) as session:
179177
try:
180-
column = getattr(self.table_model, self.id_field)
181178
stmt = (
182179
update(self.table_model)
183-
.where(column == job_id)
180+
.where(self.c.identifier == job_id)
184181
.values(**update_dict)
185182
)
186183
result = session.execute(stmt)
@@ -207,14 +204,14 @@ def get_job(self, job_id: str) -> dict:
207204

208205
LOGGER.debug('Querying for job')
209206
with Session(self._engine) as session:
210-
results = session.query(self.table_model)
211-
column = getattr(self.table_model, self.id_field)
212-
results = session.query(self.table_model).filter(column == job_id)
207+
results = session.query(self.table_model).filter(
208+
self.c.identifier == job_id
209+
)
213210

214211
first = results.first()
215212

216213
if first is not None:
217-
return first.__dict__
214+
return first._asdict()
218215
else:
219216
raise JobNotFoundError()
220217

@@ -238,10 +235,8 @@ def delete_job(self, job_id: str) -> bool:
238235
LOGGER.debug('Deleting job')
239236
with Session(self._engine) as session:
240237
try:
241-
column = getattr(self.table_model, self.id_field)
242-
stmt = (
243-
delete(self.table_model)
244-
.where(column == job_id)
238+
stmt = delete(self.table_model).where(
239+
self.c.identifier == job_id
245240
)
246241
result = session.execute(stmt)
247242
session.commit()
@@ -288,8 +283,11 @@ def get_job_result(self, job_id: str) -> Tuple[str, Any]:
288283
else:
289284
try:
290285
location = Path(location)
291-
if mimetype in (None, FORMAT_TYPES[F_JSON],
292-
FORMAT_TYPES[F_JSONLD]):
286+
if mimetype in (
287+
None,
288+
FORMAT_TYPES[F_JSON],
289+
FORMAT_TYPES[F_JSONLD]
290+
):
293291
with location.open('r', encoding='utf-8') as fh:
294292
result = json.load(fh)
295293
else:
@@ -302,3 +300,43 @@ def get_job_result(self, job_id: str) -> Tuple[str, Any]:
302300

303301
def __repr__(self):
304302
return f'<PostgreSQLManager> {self.name}'
303+
304+
305+
@functools.cache
306+
def get_table_model(
307+
db_search_path: tuple[str], engine: Engine, table_output: bool
308+
) -> Any:
309+
"""Define SQLAlchemy job model"""
310+
311+
Base = declarative_base()
312+
schema = db_search_path[0]
313+
314+
Jobs = Table(
315+
'jobs',
316+
Base.metadata,
317+
Column('identifier', String, primary_key=True, nullable=False),
318+
Column(
319+
'type',
320+
String,
321+
nullable=False,
322+
server_default=text("'process'::character varying")
323+
),
324+
Column('process_id', String, nullable=False),
325+
Column('created', DateTime),
326+
Column('started', DateTime),
327+
Column('finished', DateTime),
328+
Column('updated', DateTime),
329+
Column('status', String, nullable=False),
330+
Column('location', String),
331+
Column('mimetype', String),
332+
Column('message', String),
333+
Column('progress', Integer, nullable=False),
334+
schema=schema
335+
)
336+
337+
if table_output:
338+
Jobs.append_column(Column('output', LargeBinary))
339+
340+
Base.metadata.create_all(engine, tables=[Jobs], checkfirst=True)
341+
342+
return Jobs

tests/data/postgres_manager_full_structure.backup.sql

Lines changed: 0 additions & 69 deletions
This file was deleted.

0 commit comments

Comments
 (0)