Skip to content

Commit 85ca819

Browse files
authored
Merge pull request #659 from gschwind/new-handle-process
Better handle of crashed processes
2 parents 5f84f5b + 57b6301 commit 85ca819

11 files changed

Lines changed: 169 additions & 71 deletions

File tree

docs/api.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ Request and response objects
9595
A MultiDict object containing input values sent by the client.
9696

9797

98-
.. autoclass:: pywps.response.WPSResponse
98+
.. autoclass:: pywps.response.basic.WPSResponse
9999
:members:
100100

101101
.. attribute:: status

docs/process.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ The instance of a *Process* needs following attributes to be configured:
5353
:outputs:
5454
list of process outputs
5555
:handler:
56-
method which recieves :class:`pywps.app.WPSRequest` and :class:`pywps.response.WPSResponse` as inputs.
56+
method which recieves :class:`pywps.app.WPSRequest` and :class:`pywps.response.basic.WPSResponse` as inputs.
5757

5858
Example vector buffer process
5959
=============================
@@ -118,12 +118,12 @@ Next we create a new list variables for inputs and outputs.
118118

119119
Next we define the *handler* method. In it, *geospatial analysis
120120
may happen*. The method gets a :class:`pywps.app.WPSRequest` and a
121-
:class:`pywps.response.WPSResponse` object as parameters. In our case, we
121+
:class:`pywps.response.basic.WPSResponse` object as parameters. In our case, we
122122
calculate the buffer around each vector feature using
123123
`GDAL/OGR library <https://gdal.org>`_. We will not got much into the details,
124124
what you should note is how to get input data from the
125125
:class:`pywps.app.WPSRequest` object and how to set data as outputs in the
126-
:class:`pywps.response.WPSResponse` object.
126+
:class:`pywps.response.basic.WPSResponse` object.
127127

128128
.. literalinclude:: demobuffer.py
129129
:language: python

pywps/app/Process.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,11 @@ def _execute_process(self, async_, wps_request, wps_response):
194194

195195
running, stored = dblog.get_process_counts()
196196

197+
if maxparallel != -1 and running >= maxparallel:
198+
# Try to check for crashed process
199+
dblog.cleanup_crashed_process()
200+
running, stored = dblog.get_process_counts()
201+
197202
# async
198203
if async_:
199204

@@ -238,7 +243,9 @@ def _run_async(self, wps_request, wps_response):
238243
# This function may not raise exception and must return a valid wps_response
239244
# Failure must be reported as wps_response.status = WPS_STATUS.FAILED
240245
def _run_process(self, wps_request, wps_response):
241-
LOGGER.debug("Started processing request: {}".format(self.uuid))
246+
LOGGER.debug("Started processing request: {} with pid: {}".format(self.uuid, os.getpid()))
247+
# Update the actual pid of current process to check if failed latter
248+
dblog.update_pid(self.uuid, os.getpid())
242249
try:
243250
self._set_grass(wps_request)
244251
# if required set HOME to the current working directory.

pywps/dblog.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
"""
99

1010
import logging
11+
import sys
12+
1113
from pywps import configuration
1214
from pywps.exceptions import NoApplicableCode
1315
import sqlite3
@@ -23,6 +25,8 @@
2325
from sqlalchemy.orm import sessionmaker
2426
from sqlalchemy.pool import NullPool, StaticPool
2527

28+
from pywps.response.status import WPS_STATUS
29+
2630
LOGGER = logging.getLogger('PYWPS')
2731
_SESSION_MAKER = None
2832

@@ -128,6 +132,54 @@ def store_status(uuid, wps_status, message=None, status_percentage=None):
128132
session.close()
129133

130134

135+
def update_pid(uuid, pid):
136+
"""Update actual pid for the uuid processing
137+
"""
138+
session = get_session()
139+
140+
requests = session.query(ProcessInstance).filter_by(uuid=str(uuid))
141+
if requests.count():
142+
request = requests.one()
143+
request.pid = pid
144+
session.commit()
145+
session.close()
146+
147+
148+
def cleanup_crashed_process():
149+
# TODO: implement other platform
150+
if sys.platform != "linux":
151+
return
152+
153+
session = get_session()
154+
stored_query = session.query(RequestInstance.uuid)
155+
running_cur = (
156+
session.query(ProcessInstance)
157+
.filter(ProcessInstance.status.in_([WPS_STATUS.STARTED, WPS_STATUS.PAUSED]))
158+
.filter(~ProcessInstance.uuid.in_(stored_query))
159+
)
160+
161+
failed = []
162+
running = [(p.uuid, p.pid) for p in running_cur]
163+
for uuid, pid in running:
164+
# No process with this pid, the process has crashed
165+
if not os.path.exists(os.path.join("/proc", str(pid))):
166+
failed.append(uuid)
167+
continue
168+
169+
# If we can't read the environ, that mean the process belong another user
170+
# which mean that this is not our process, thus our process has crashed
171+
# this not work because root is the user for the apache
172+
# if not os.access(os.path.join("/proc", str(pid), "environ"), os.R_OK):
173+
# failed.append(uuid)
174+
# continue
175+
pass
176+
177+
for uuid in failed:
178+
store_status(uuid, WPS_STATUS.FAILED, "Process crashed", 100)
179+
180+
session.close()
181+
182+
131183
def _get_identifier(request):
132184
"""Get operation identifier
133185
"""

pywps/processing/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
##################################################################
55

66
import pywps.configuration as config
7-
from pywps.processing.basic import MultiProcessing
7+
from pywps.processing.basic import MultiProcessing, DetachProcessing
88
from pywps.processing.scheduler import Scheduler
99
# api only
1010
from pywps.processing.basic import Processing # noqa: F401
@@ -14,6 +14,7 @@
1414
LOGGER = logging.getLogger("PYWPS")
1515

1616
MULTIPROCESSING = 'multiprocessing'
17+
DETACHPROCESSING = 'detachprocessing'
1718
SCHEDULER = 'scheduler'
1819
DEFAULT = MULTIPROCESSING
1920

@@ -29,6 +30,9 @@ def Process(process, wps_request, wps_response):
2930
LOGGER.info("Processing mode: {}".format(mode))
3031
if mode == SCHEDULER:
3132
process = Scheduler(process, wps_request, wps_response)
33+
elif mode == DETACHPROCESSING:
34+
process = DetachProcessing(process, wps_request, wps_response)
3235
else:
3336
process = MultiProcessing(process, wps_request, wps_response)
37+
3438
return process

pywps/processing/basic.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright 2018 Open Source Geospatial Foundation and others #
33
# licensed under MIT, Please consult LICENSE.txt for details #
44
##################################################################
5+
import os
56

67
from pywps.processing.job import Job
78

@@ -34,3 +35,32 @@ def start(self):
3435
args=(self.job.wps_request, self.job.wps_response)
3536
)
3637
process.start()
38+
39+
40+
class DetachProcessing(Processing):
41+
"""
42+
:class:`DetachProcessing` run job as detached process. The process will be run as child of pid 1
43+
"""
44+
45+
def start(self):
46+
pid = os.fork()
47+
if pid != 0:
48+
# Wait that the children get detached.
49+
os.waitpid(pid, 0)
50+
return
51+
52+
# Detach ourself.
53+
54+
# Ensure that we are the session leader to avoid to be zombified.
55+
os.setsid()
56+
if os.fork():
57+
# Stop running now
58+
os._exit(0)
59+
60+
# We are the detached child, run the actual process
61+
try:
62+
getattr(self.job.process, self.job.method)(self.job.wps_request, self.job.wps_response)
63+
except Exception:
64+
pass
65+
# Ensure to stop ourself here what ever append.
66+
os._exit(0)

pywps/response/__init__.py

Lines changed: 2 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,8 @@
1-
from abc import abstractmethod
2-
from typing import TYPE_CHECKING
3-
if TYPE_CHECKING:
4-
from pywps import WPSRequest
51

6-
from pywps.dblog import store_status
7-
from pywps.response.status import WPS_STATUS
8-
from pywps.translations import get_translation
9-
from jinja2 import Environment, PackageLoader
102
import os
113

4+
from jinja2 import Environment
5+
126

137
class RelEnvironment(Environment):
148
"""Override join_path() to enable relative template paths."""
@@ -28,57 +22,3 @@ def get_response(operation):
2822
return DescribeResponse
2923
elif operation == "execute":
3024
return ExecuteResponse
31-
32-
33-
class WPSResponse(object):
34-
35-
def __init__(self, wps_request: 'WPSRequest', uuid=None, version="1.0.0"):
36-
37-
self.wps_request = wps_request
38-
self.uuid = uuid
39-
self.message = ''
40-
self.status = WPS_STATUS.ACCEPTED
41-
self.status_percentage = 0
42-
self.doc = None
43-
self.content_type = None
44-
self.version = version
45-
self.template_env = RelEnvironment(
46-
loader=PackageLoader('pywps', 'templates'),
47-
trim_blocks=True, lstrip_blocks=True,
48-
autoescape=True,
49-
)
50-
self.template_env.globals.update(get_translation=get_translation)
51-
52-
def _update_status(self, status, message, status_percentage):
53-
"""
54-
Update status report of currently running process instance
55-
56-
:param str message: Message you need to share with the client
57-
:param int status_percentage: Percent done (number betwen <0-100>)
58-
:param pywps.response.status.WPS_STATUS status: process status - user should usually
59-
ommit this parameter
60-
"""
61-
self.message = message
62-
self.status = status
63-
self.status_percentage = status_percentage
64-
store_status(self.uuid, self.status, self.message, self.status_percentage)
65-
66-
@abstractmethod
67-
def _construct_doc(self):
68-
...
69-
70-
def get_response_doc(self):
71-
try:
72-
self.doc, self.content_type = self._construct_doc()
73-
except Exception as e:
74-
if hasattr(e, "description"):
75-
msg = e.description
76-
else:
77-
msg = e
78-
self._update_status(WPS_STATUS.FAILED, msg, 100)
79-
raise e
80-
81-
else:
82-
self._update_status(WPS_STATUS.SUCCEEDED, "Response generated", 100)
83-
84-
return self.doc, self.content_type

pywps/response/basic.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from abc import abstractmethod
2+
from typing import TYPE_CHECKING
3+
if TYPE_CHECKING:
4+
from pywps import WPSRequest
5+
6+
from pywps.dblog import store_status
7+
from . import RelEnvironment
8+
from .status import WPS_STATUS
9+
from pywps.translations import get_translation
10+
from jinja2 import Environment, PackageLoader
11+
import os
12+
13+
14+
class WPSResponse(object):
15+
16+
def __init__(self, wps_request: 'WPSRequest', uuid=None, version="1.0.0"):
17+
18+
self.wps_request = wps_request
19+
self.uuid = uuid
20+
self.message = ''
21+
self.status = WPS_STATUS.ACCEPTED
22+
self.status_percentage = 0
23+
self.doc = None
24+
self.content_type = None
25+
self.version = version
26+
self.template_env = RelEnvironment(
27+
loader=PackageLoader('pywps', 'templates'),
28+
trim_blocks=True, lstrip_blocks=True,
29+
autoescape=True,
30+
)
31+
self.template_env.globals.update(get_translation=get_translation)
32+
33+
def _update_status(self, status, message, status_percentage):
34+
"""
35+
Update status report of currently running process instance
36+
37+
:param str message: Message you need to share with the client
38+
:param int status_percentage: Percent done (number betwen <0-100>)
39+
:param pywps.response.status.WPS_STATUS status: process status - user should usually
40+
ommit this parameter
41+
"""
42+
self.message = message
43+
self.status = status
44+
self.status_percentage = status_percentage
45+
store_status(self.uuid, self.status, self.message, self.status_percentage)
46+
47+
@abstractmethod
48+
def _construct_doc(self):
49+
...
50+
51+
def get_response_doc(self):
52+
try:
53+
self.doc, self.content_type = self._construct_doc()
54+
except Exception as e:
55+
if hasattr(e, "description"):
56+
msg = e.description
57+
else:
58+
msg = e
59+
self._update_status(WPS_STATUS.FAILED, msg, 100)
60+
raise e
61+
62+
else:
63+
self._update_status(WPS_STATUS.SUCCEEDED, "Response generated", 100)
64+
65+
return self.doc, self.content_type

pywps/response/capabilities.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from werkzeug.wrappers import Request
44
import pywps.configuration as config
55
from pywps.app.basic import make_response, get_response_type, get_json_indent
6-
from pywps.response import WPSResponse
6+
from .basic import WPSResponse
77
from pywps import __version__
88
from pywps.exceptions import NoApplicableCode
99
import os

pywps/response/describe.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from pywps.exceptions import NoApplicableCode
77
from pywps.exceptions import MissingParameterValue
88
from pywps.exceptions import InvalidParameterValue
9-
from pywps.response import WPSResponse
9+
from .basic import WPSResponse
1010
from pywps import __version__
1111
import os
1212

0 commit comments

Comments
 (0)