Skip to content

Commit ffce5fd

Browse files
[Backport 5.0.x] [Fixes #14041] Allow upsert / replace for GPKG (#14073)
1 parent 080ccc0 commit ffce5fd

5 files changed

Lines changed: 83 additions & 98 deletions

File tree

geonode/upload/handlers/common/tests_vector.py

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -519,34 +519,6 @@ def test_upsert_validation_should_fail(self):
519519
"The Dynamic model generation must be enabled to perform the upsert IMPORTER_ENABLE_DYN_MODELS=True",
520520
)
521521

522-
@override_settings(IMPORTER_ENABLE_DYN_MODELS=True)
523-
@patch("geonode.upload.handlers.common.vector.ModelSchema")
524-
@patch("geonode.upload.handlers.common.vector.BaseVectorFileHandler.extract_upsert_key")
525-
def test_upsert_data_should_fail_if_upsertkey_is_not_provided(self, upsert_function, schema):
526-
"""
527-
The test should fail since the upsert key provided is empty/Null and
528-
was not possible to extract the key from the DB schema
529-
"""
530-
schema.return_value = MagicMock()
531-
data = create_single_dataset("example_upsert_dataset")
532-
exec_id = orchestrator.create_execution_request(
533-
user=self.user,
534-
func_name="funct1",
535-
step="step",
536-
input_params={"files": self.valid_files, "skip_existing_layer": True, "resource_pk": data.pk},
537-
)
538-
539-
upsert_function.return_value = None
540-
handler = ShapeFileHandler()
541-
with self.assertRaises(Exception) as exept:
542-
handler.upsert_data(["files"], exec_id)
543-
544-
self.assertIsNotNone(exept)
545-
self.assertEqual(
546-
str(exept.exception),
547-
"Was not possible to find the upsert key, upsert is aborted",
548-
)
549-
550522
def test_get_error_file_csv_headers(self):
551523
handler = BaseVectorFileHandler()
552524
mock_validator = MagicMock()
@@ -622,29 +594,6 @@ def test_upsert_data_without_dynamic_model_schema(self):
622594
"This dataset does't support updates. Please upload the dataset again to have the upsert operations enabled",
623595
)
624596

625-
def test_upsert_data_raise_error_if_upsert_key_is_not_defined(self):
626-
"""
627-
Should raise error if the dynamic model schema is not present
628-
"""
629-
data = create_single_dataset("example_upsert_dataset")
630-
exec_id = orchestrator.create_execution_request(
631-
user=self.user,
632-
func_name="funct1",
633-
step="step",
634-
input_params={
635-
"files": self.original,
636-
"skip_existing_layer": True,
637-
"resource_pk": data.pk,
638-
"upsert_key": None,
639-
},
640-
)
641-
ModelSchema.objects.create(name="example_upsert_dataset", db_name="datastore", managed=True)
642-
643-
with self.assertRaises(UpsertException) as exp:
644-
self.handler.upsert_data(self.original, exec_id)
645-
646-
self.assertEqual(str(exp.exception), "Was not possible to find the upsert key, upsert is aborted")
647-
648597
def test_validate_single_feature_raise_error(self):
649598
"""
650599
Should raise error if the dynamic model schema is not present

geonode/upload/handlers/common/vector.py

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ def import_resource(self, files: dict, execution_id: str, **kwargs) -> str:
455455
data inside the geonode_data database
456456
"""
457457
all_layers = self.get_ogr2ogr_driver().Open(files.get("base_file"))
458-
layers = self._select_valid_layers(all_layers)
458+
layers = self._select_valid_layers(all_layers, execution_id=execution_id)
459459
# for the moment we skip the dyanamic model creation
460460
layer_count = len(layers)
461461
logger.info(f"Total number of layers available: {layer_count}")
@@ -556,7 +556,7 @@ def import_resource(self, files: dict, execution_id: str, **kwargs) -> str:
556556
raise e
557557
return layer_names, alternates, execution_id
558558

559-
def _select_valid_layers(self, all_layers):
559+
def _select_valid_layers(self, all_layers, **kwargs):
560560
layers = []
561561
for layer in all_layers:
562562
try:
@@ -738,7 +738,17 @@ def _define_dynamic_layer_schema(self, layer, **kwargs):
738738
"authority": self.identify_authority(layer),
739739
}
740740
]
741-
741+
# if the layer comes from a DB, the fid column is not included in the schema, but we need to add it as primary key for the dynamic model
742+
fid_in_schema = any(x["name"] == DEFAULT_PK_COLUMN_NAME for x in layer_schema)
743+
if not fid_in_schema and layer.GetFIDColumn():
744+
layer_schema += [
745+
{
746+
"name": layer.GetFIDColumn(),
747+
"class_name": "django.db.models.BigAutoField",
748+
"null": False,
749+
"primary_key": True,
750+
}
751+
]
742752
return layer_schema
743753

744754
def promote_to_multi(self, geometry_name):
@@ -1129,7 +1139,7 @@ def __get_new_and_original_schema(self, files, execution_id):
11291139

11301140
# use ogr2ogr to read the uploaded files for the upsert
11311141
all_layers = self.get_ogr2ogr_driver().Open(files.get("base_file"))
1132-
layers = self._select_valid_layers(all_layers)
1142+
layers = self._select_valid_layers(all_layers, execution_id=execution_id)
11331143
if not layers:
11341144
raise UpsertException("No valid layers found in the provided file for upsert.")
11351145

@@ -1213,20 +1223,20 @@ def upsert_data(self, files, execution_id, **kwargs):
12131223
# get the rows that match the upsert key
12141224
OriginalResource = model.as_model()
12151225

1216-
# retrieve the upsert key.
1217-
upsert_key = self.extract_upsert_key(exec_obj, dynamic_model_instance=model)
1218-
if not upsert_key:
1219-
# if for any reason the key is not present, better to raise an error
1220-
raise UpsertException("Was not possible to find the upsert key, upsert is aborted")
12211226
# use ogr2ogr to read the uploaded files values for the upsert
12221227
all_layers = self.get_ogr2ogr_driver().Open(files.get("base_file"))
12231228
valid_create = 0
12241229
valid_update = 0
1225-
layers = self._select_valid_layers(all_layers)
1230+
layers = self._select_valid_layers(all_layers, execution_id=execution_id)
12261231
if not layers:
12271232
raise UpsertException("No valid layers were found in the file provided")
12281233
# we can upsert just 1 layer at time
12291234

1235+
upsert_key = self.extract_upsert_key(layers[0])
1236+
if not upsert_key:
1237+
# if for any reason the key is not present, better to raise an error
1238+
raise UpsertException("Was not possible to find the upsert key, upsert is aborted")
1239+
12301240
self._validate_single_feature(exec_obj, OriginalResource, upsert_key, layers, iter(layers[0]))
12311241

12321242
valid_create, valid_update = self._commit_upsert(model, OriginalResource, upsert_key, iter(layers[0]))
@@ -1359,9 +1369,16 @@ def _validate_feature(self, data_chunk, model_instance, upsert_key, errors):
13591369
def _save_feature(self, data_chunk, model_obj, model_instance, upsert_key, valid_update, valid_create):
13601370
# getting all the upsert_key value from the data chunk
13611371
# retrieving the data from the DB
1362-
value_in_db = model_instance.objects.filter(
1363-
**{f"{upsert_key}__in": (getattr(feature, upsert_key) for feature in data_chunk)}
1364-
).in_bulk(field_name=upsert_key)
1372+
use_get_fid = False # flag to understand if we need to use the GetFID as upsert key, this is needed for DB drivers with FID columns that hide the FID field from the schema
1373+
filters = []
1374+
for feature in data_chunk:
1375+
# DB drivers with FID columns hide the FID field from the schema, so we need to check if the FID is present and use it as upsert key if the upsert key is the default one
1376+
if not getattr(feature, upsert_key, None) and feature.GetFID() != ogr.NullFID:
1377+
filters.append(feature.GetFID())
1378+
use_get_fid = True
1379+
else:
1380+
filters.append(getattr(feature, upsert_key))
1381+
value_in_db = model_instance.objects.filter(**{f"{upsert_key}__in": filters}).in_bulk(field_name=upsert_key)
13651382
# looping over the chunk data
13661383
to_process = []
13671384
feature_to_save = []
@@ -1384,6 +1401,8 @@ def _save_feature(self, data_chunk, model_obj, model_instance, upsert_key, valid
13841401
feature_as_dict.update(
13851402
{self.default_geometry_column_name: f"SRID={code};{self.promote_geom_to_multi(geom).ExportToWkt()}"}
13861403
)
1404+
if use_get_fid:
1405+
feature_as_dict[upsert_key] = feature.GetFID()
13871406
to_process.append(feature_as_dict)
13881407

13891408
for feature_as_dict in to_process:
@@ -1419,18 +1438,13 @@ def validate_feature(self, feature):
14191438
feature["error"] = " | ".join(errors)
14201439
return feature, False
14211440

1422-
def extract_upsert_key(self, exec_obj, dynamic_model_instance):
1423-
# first we check if the upsert key is passed by the call
1424-
key = exec_obj.input_params.get("upsert_key", DEFAULT_PK_COLUMN_NAME)
1425-
if not key:
1426-
# if the upsert key is not passed, we use the primary key as upsert key
1427-
# the primary key is defined in the Fields of the dynamic model
1428-
# dynamic models raise error if we filter the json with ORM
1429-
key = [x.name for x in dynamic_model_instance.fields.all() if x.kwargs.get("primary_key")]
1430-
if key:
1431-
return key[0]
1441+
def extract_upsert_key(self, layer):
14321442

1433-
return key
1443+
fid_in_schema = any(x.name == DEFAULT_PK_COLUMN_NAME for x in layer.schema)
1444+
if not fid_in_schema and layer.GetFIDColumn():
1445+
return layer.GetFIDColumn()
1446+
1447+
return DEFAULT_PK_COLUMN_NAME
14341448

14351449
def refresh_geonode_resource(self, execution_id, asset=None, dataset=None, create_asset=True, **kwargs):
14361450
# getting execution_id information
@@ -1485,7 +1499,6 @@ def fixup_dynamic_model_fields(self, _exec, files, **kwargs):
14851499
self.__fixup_primary_key(dataset)
14861500

14871501

1488-
14891502
@importer_app.task(
14901503
base=ErrorBaseTaskClass,
14911504
name="geonode.upload.import_next_step",

geonode/upload/handlers/gpkg/handler.py

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#########################################################################
1919
import logging
2020

21-
from geonode.resource.enumerator import ExecutionRequestAction as exa
2221
from geonode.upload.api.exceptions import UploadParallelismLimitException
2322
from geonode.upload.utils import UploadLimitValidator
2423
from geopackage_validator.validate import validate
@@ -27,6 +26,7 @@
2726

2827
from geonode.upload.handlers.common.vector import BaseVectorFileHandler
2928
from geonode.upload.utils import ImporterRequestAction as ira
29+
from geonode.upload.orchestrator import orchestrator
3030

3131
logger = logging.getLogger("importer")
3232

@@ -37,26 +37,6 @@ class GPKGFileHandler(BaseVectorFileHandler):
3737
It must provide the task_lists required to comple the upload
3838
"""
3939

40-
TASKS = {
41-
exa.UPLOAD.value: (
42-
"start_import",
43-
"geonode.upload.import_resource",
44-
"geonode.upload.publish_resource",
45-
"geonode.upload.create_geonode_resource",
46-
),
47-
exa.COPY.value: (
48-
"start_copy",
49-
"geonode.upload.copy_dynamic_model",
50-
"geonode.upload.copy_geonode_data_table",
51-
"geonode.upload.publish_resource",
52-
"geonode.upload.copy_geonode_resource",
53-
),
54-
ira.ROLLBACK.value: (
55-
"start_rollback",
56-
"geonode.upload.rollback",
57-
),
58-
}
59-
6040
@property
6141
def supported_file_extension_config(self):
6242
return {
@@ -161,3 +141,12 @@ def handle_xml_file(self, saved_dataset, _exec):
161141
Not implemented for GPKG, skipping
162142
"""
163143
pass
144+
145+
def _select_valid_layers(self, all_layers, **kwargs):
146+
layers = super()._select_valid_layers(all_layers=all_layers)
147+
if execution_id := kwargs.get("execution_id", None):
148+
exec_obj = orchestrator.get_execution_object(execution_id)
149+
if exec_obj.action in (ira.REPLACE.value, ira.UPSERT.value):
150+
if len(layers) > 1:
151+
raise InvalidGeopackageException("For Upsert and Replace, only one layer is allowed in the GPKG")
152+
return layers

geonode/upload/handlers/gpkg/tests.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
#
1818
#########################################################################
1919
import shutil
20-
20+
import os
2121
from django.test import TestCase, override_settings
2222
from django.contrib.auth import get_user_model
2323
from geonode.upload.handlers.gpkg.handler import GPKGFileHandler
@@ -41,6 +41,7 @@ def setUpClass(cls):
4141
cls.handler = GPKGFileHandler()
4242
cls.valid_gpkg = f"{project_dir}/tests/fixture/valid.gpkg"
4343
cls.invalid_gpkg = f"{project_dir}/tests/fixture/invalid.gpkg"
44+
cls.multiple_layer = f"{project_dir}/tests/fixture/multiple_layer.gpkg"
4445
cls.user, _ = get_user_model().objects.get_or_create(username="admin")
4546
cls.invalid_files = {"base_file": cls.invalid_gpkg}
4647
cls.valid_files = {"base_file": cls.valid_gpkg, "action": "upload"}
@@ -164,3 +165,36 @@ def test_single_message_error_handler(self):
164165
# Assert the error was recorded
165166
errors = exec_request_obj.output_params.get("errors", [])
166167
assert any("exception raised" in str(e) for e in errors)
168+
169+
def test_select_valid_layers(self):
170+
"""
171+
The function should return only the datasets with a geometry
172+
The other one are discarded
173+
"""
174+
# Copy the file to the temporary folder
175+
shutil.copy(self.multiple_layer, "/tmp")
176+
177+
exec_id = orchestrator.create_execution_request(
178+
user=get_user_model().objects.first(),
179+
func_name="funct1",
180+
step="step",
181+
action="replace",
182+
input_params={
183+
"files": {"base_file": "/tmp/multiple_layer.gpkg"},
184+
"skip_existing_layer": True,
185+
"store_spatial_file": False,
186+
"handler_module_path": str(self.handler),
187+
},
188+
)
189+
190+
all_layers = GPKGFileHandler().get_ogr2ogr_driver().Open("/tmp/multiple_layer.gpkg")
191+
192+
with self.assertRaises(Exception) as exp:
193+
GPKGFileHandler()._select_valid_layers(all_layers, execution_id=str(exec_id))
194+
195+
self.assertIn(
196+
"For Upsert and Replace, only one layer is allowed in the GPKG",
197+
exp.exception.args[0],
198+
)
199+
200+
os.remove("/tmp/multiple_layer.gpkg")
116 KB
Binary file not shown.

0 commit comments

Comments
 (0)