From d9ca6e4133ad7ec69e940bf218bdba17827cfbca Mon Sep 17 00:00:00 2001 From: AlexWells Date: Thu, 23 Jun 2022 09:56:20 +0100 Subject: [PATCH 1/9] Add blocking flag and skeleton callback processing --- softioc/builder.py | 6 +++++- softioc/device.py | 20 +++++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/softioc/builder.py b/softioc/builder.py index 591141c5..2f64e75b 100644 --- a/softioc/builder.py +++ b/softioc/builder.py @@ -8,6 +8,8 @@ LoadDbdFile(os.path.join(os.path.dirname(__file__), 'device.dbd')) from . import device, pythonSoftIoc # noqa +# Re-export this so users only have to import the builder +from .device import set_blocking # noqa PythonDevice = pythonSoftIoc.PythonDevice() @@ -301,5 +303,7 @@ def UnsetDevice(): 'Action', # Other builder support functions 'LoadDatabase', - 'SetDeviceName', 'UnsetDevice' + 'SetDeviceName', 'UnsetDevice', + # Device support functions + 'set_blocking' ] diff --git a/softioc/device.py b/softioc/device.py index ad14a7ae..748c35bf 100644 --- a/softioc/device.py +++ b/softioc/device.py @@ -14,6 +14,15 @@ # dispatcher(func, *args) will queue a callback to happen dispatcher = None +# Global blocking flag, used to mark asynchronous (False) or synchronous (True) +# processing modes for Out records. +# Default False to maintain behaviour from previous versions. +blocking = False + +def set_blocking(val): + global blocking + blocking = val + # EPICS processing return codes EPICS_OK = 0 @@ -142,6 +151,8 @@ def __init__(self, name, **kargs): else: self._value = None + self._blocking = kargs.pop('blocking', blocking) + self.__super.__init__(name, **kargs) def init_record(self, record): @@ -162,6 +173,13 @@ def init_record(self, record): recGblResetAlarms(record) return self._epics_rc_ + def __completion(self): + pass + + def __wrap_completion(self, value): + self.__on_update(value) + self.__completion() + def _process(self, record): '''Processing suitable for output records. Performs immediate value validation and asynchronous update notification.''' @@ -183,7 +201,7 @@ def _process(self, record): self._value = value record.UDF = 0 if self.__on_update and self.__enable_write: - dispatcher(self.__on_update, python_value) + dispatcher(self.__wrap_completion, python_value) return EPICS_OK From 0a494215c611149ffc5ce3d0a71754ae0e9cfc3d Mon Sep 17 00:00:00 2001 From: AlexWells Date: Tue, 26 Jul 2022 10:46:16 +0100 Subject: [PATCH 2/9] Implement asynchronous record processing Includes a "blocking" flag on record creation. This change allows you to use caput in asynchronous mode, where it will wait for record processing to complete. --- softioc/device.py | 32 +++++++++++++++++++++++++++-- softioc/extension.c | 44 +++++++++++++++++++++++++++++++++++++++- softioc/imports.py | 13 ++++++++++++ softioc/pythonSoftIoc.py | 2 +- tests/conftest.py | 2 +- 5 files changed, 88 insertions(+), 5 deletions(-) diff --git a/softioc/device.py b/softioc/device.py index 748c35bf..1cb5ca46 100644 --- a/softioc/device.py +++ b/softioc/device.py @@ -3,10 +3,17 @@ import ctypes from ctypes import * import numpy +from threading import Event from . import alarm from . import fields -from .imports import dbLoadDatabase, recGblResetAlarms, db_put_field +from .imports import ( + create_callback_capsule, + dbLoadDatabase, + signal_processing_complete, + recGblResetAlarms, + db_put_field, +) from .device_core import DeviceSupportCore, RecordLookup @@ -19,6 +26,7 @@ # Default False to maintain behaviour from previous versions. blocking = False +# TODO: Docs and Tests for the Blocking feature def set_blocking(val): global blocking blocking = val @@ -152,6 +160,10 @@ def __init__(self, name, **kargs): self._value = None self._blocking = kargs.pop('blocking', blocking) + if self._blocking: + self._completion_event = Event() + self._callback = create_callback_capsule() + self.__super.__init__(name, **kargs) @@ -174,15 +186,26 @@ def init_record(self, record): return self._epics_rc_ def __completion(self): + if self._blocking: + self._completion_event.set() pass + def __wait_for_completion(self, record): + self._completion_event.wait() + signal_processing_complete(record, self._callback) + self._completion_event.clear() + def __wrap_completion(self, value): - self.__on_update(value) + dispatcher(self.__on_update, value) self.__completion() def _process(self, record): '''Processing suitable for output records. Performs immediate value validation and asynchronous update notification.''' + + if record.PACT: + return EPICS_OK + value = self._read_value(record) if not self.__always_update and \ self._compare_values(value, self._value): @@ -201,7 +224,12 @@ def _process(self, record): self._value = value record.UDF = 0 if self.__on_update and self.__enable_write: + record.PACT = self._blocking dispatcher(self.__wrap_completion, python_value) + + if self._blocking: + # Create a process to wait for on_update to finish + dispatcher(self.__wait_for_completion, record) return EPICS_OK diff --git a/softioc/extension.c b/softioc/extension.c index 3ab7b2c4..eeb3bdd4 100644 --- a/softioc/extension.c +++ b/softioc/extension.c @@ -1,4 +1,4 @@ - +/* Provide EPICS functions in Python format */ #define PY_SSIZE_T_CLEAN #include #include @@ -6,6 +6,7 @@ #define db_accessHFORdb_accessC // Needed to get correct DBF_ values #include #include +#include #include #include #include @@ -13,6 +14,7 @@ #include #include + /* Reference stealing version of PyDict_SetItemString */ static void set_dict_item_steal( PyObject *dict, const char *name, PyObject *py_value) @@ -209,6 +211,42 @@ static PyObject *install_pv_logging(PyObject *self, PyObject *args) Py_RETURN_NONE; } +#define CAPSULE_NAME "ProcessDeviceSupportOut.callback" + +static void capsule_destructor(PyObject *obj) +{ + void *callback = PyCapsule_GetPointer(obj, CAPSULE_NAME); + free(callback); +} + + +static PyObject *create_callback_capsule(PyObject *self, PyObject *args) +{ + void *callback = malloc(sizeof(CALLBACK)); + + printf("Created CALLBACK struct %p\n", callback); + + return PyCapsule_New(callback, CAPSULE_NAME, &capsule_destructor); +} + +static PyObject *signal_processing_complete(PyObject *self, PyObject *args) +{ + int priority; + dbCommon *record; + PyObject *callback_capsule; + + if (!PyArg_ParseTuple(args, "inO", &priority, &record, &callback_capsule)) + { + return NULL; + } + + CALLBACK *callback = PyCapsule_GetPointer(callback_capsule, CAPSULE_NAME); + + callbackRequestProcessCallback(callback, priority, record); + + Py_RETURN_NONE; +} + static struct PyMethodDef softioc_methods[] = { {"get_DBF_values", get_DBF_values, METH_VARARGS, "Get a map of DBF names to values"}, @@ -218,6 +256,10 @@ static struct PyMethodDef softioc_methods[] = { "Put a database field to a value"}, {"install_pv_logging", install_pv_logging, METH_VARARGS, "Install caput logging to stdout"}, + {"signal_processing_complete", signal_processing_complete, METH_VARARGS, + "Inform EPICS that asynchronous record processing has completed"}, + {"create_callback_capsule", create_callback_capsule, METH_VARARGS, + "Create a CALLBACK structure inside a PyCapsule"}, {NULL, NULL, 0, NULL} /* Sentinel */ }; diff --git a/softioc/imports.py b/softioc/imports.py index 699f2302..5dd14a61 100644 --- a/softioc/imports.py +++ b/softioc/imports.py @@ -27,6 +27,17 @@ def install_pv_logging(acf_file): '''Install pv logging''' _extension.install_pv_logging(acf_file) +def create_callback_capsule(): + return _extension.create_callback_capsule() + +def signal_processing_complete(record, callback): + '''Signal that asynchronous record processing has completed''' + _extension.signal_processing_complete( + record.PRIO, + record.record.value, + callback + ) + def expect_success(status, function, args): assert status == 0, 'Expected success' @@ -94,6 +105,8 @@ def from_param(cls, value): __all__ = [ 'get_field_offsets', + 'create_callback_capsule', + 'signal_processing_complete', 'registryDeviceSupportAdd', 'IOSCANPVT', 'scanIoRequest', 'scanIoInit', 'dbLoadDatabase', diff --git a/softioc/pythonSoftIoc.py b/softioc/pythonSoftIoc.py index 8d79627f..47b9d00e 100644 --- a/softioc/pythonSoftIoc.py +++ b/softioc/pythonSoftIoc.py @@ -24,7 +24,7 @@ def __init__(self, builder, device, name, **fields): # have to maintain this separately from the corresponding device list. DeviceKeywords = [ 'on_update', 'on_update_name', 'validate', 'always_update', - 'initial_value', '_wf_nelm', '_wf_dtype'] + 'initial_value', '_wf_nelm', '_wf_dtype', 'blocking'] device_kargs = {} for keyword in DeviceKeywords: if keyword in fields: diff --git a/tests/conftest.py b/tests/conftest.py index bcb473b7..b7be1f4d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -117,7 +117,7 @@ def enable_code_coverage(): def select_and_recv(conn, expected_char = None): """Wait for the given Connection to have data to receive, and return it. If a character is provided check its correct before returning it.""" - # Must use cothread's select if cothread is prsent, otherwise we'd block + # Must use cothread's select if cothread is present, otherwise we'd block # processing on all cothread processing. But we don't want to use it # unless we have to, as importing cothread can cause issues with forking. if "cothread" in sys.modules: From ae2b6e4fb48aeb9f7e37a7f0c21aef337373a184 Mon Sep 17 00:00:00 2001 From: AlexWells Date: Wed, 27 Jul 2022 09:14:46 +0100 Subject: [PATCH 3/9] Avoid multiple dispatcher() calls for non-async This implementation still leaks abstractions - the device now needs to know about the difference between cothread and async calls. Perhaps another round of refactoring is required... --- softioc/device.py | 27 +++++++++++++-------------- softioc/extension.c | 14 +++++++++----- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/softioc/device.py b/softioc/device.py index 1cb5ca46..e0cdadd5 100644 --- a/softioc/device.py +++ b/softioc/device.py @@ -1,3 +1,4 @@ +from inspect import isawaitable import os import time import ctypes @@ -161,7 +162,6 @@ def __init__(self, name, **kargs): self._blocking = kargs.pop('blocking', blocking) if self._blocking: - self._completion_event = Event() self._callback = create_callback_capsule() @@ -185,19 +185,21 @@ def init_record(self, record): recGblResetAlarms(record) return self._epics_rc_ - def __completion(self): + def __completion(self, record): if self._blocking: - self._completion_event.set() + signal_processing_complete(record, self._callback) pass - def __wait_for_completion(self, record): - self._completion_event.wait() - signal_processing_complete(record, self._callback) - self._completion_event.clear() + def __wrap_completion(self, value, record): + update = self.__on_update(value) + if isawaitable(update): + dispatcher(self._complete_update, update, record) + else: + self.__completion(record) - def __wrap_completion(self, value): - dispatcher(self.__on_update, value) - self.__completion() + async def _complete_update(self, future, record): + await future + self.__completion(record) def _process(self, record): '''Processing suitable for output records. Performs immediate value @@ -225,11 +227,8 @@ def _process(self, record): record.UDF = 0 if self.__on_update and self.__enable_write: record.PACT = self._blocking - dispatcher(self.__wrap_completion, python_value) + dispatcher(self.__wrap_completion, python_value, record) - if self._blocking: - # Create a process to wait for on_update to finish - dispatcher(self.__wait_for_completion, record) return EPICS_OK diff --git a/softioc/extension.c b/softioc/extension.c index eeb3bdd4..b95b26d4 100644 --- a/softioc/extension.c +++ b/softioc/extension.c @@ -215,17 +215,13 @@ static PyObject *install_pv_logging(PyObject *self, PyObject *args) static void capsule_destructor(PyObject *obj) { - void *callback = PyCapsule_GetPointer(obj, CAPSULE_NAME); - free(callback); + free(PyCapsule_GetPointer(obj, CAPSULE_NAME)); } static PyObject *create_callback_capsule(PyObject *self, PyObject *args) { void *callback = malloc(sizeof(CALLBACK)); - - printf("Created CALLBACK struct %p\n", callback); - return PyCapsule_New(callback, CAPSULE_NAME, &capsule_destructor); } @@ -240,6 +236,14 @@ static PyObject *signal_processing_complete(PyObject *self, PyObject *args) return NULL; } + if (!PyCapsule_IsValid(callback_capsule, CAPSULE_NAME)) + { + return PyErr_Format( + PyExc_TypeError, + "Given object was not a capsule with name \"%s\"", + CAPSULE_NAME); + } + CALLBACK *callback = PyCapsule_GetPointer(callback_capsule, CAPSULE_NAME); callbackRequestProcessCallback(callback, priority, record); From 799f4c04380feb74a38a0806dac20f4c1351992c Mon Sep 17 00:00:00 2001 From: AlexWells Date: Wed, 27 Jul 2022 09:15:18 +0100 Subject: [PATCH 4/9] Add tests for blocking argument & global setting --- tests/test_records.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/test_records.py b/tests/test_records.py index ebb54da7..55e0ecc5 100644 --- a/tests/test_records.py +++ b/tests/test_records.py @@ -14,6 +14,7 @@ ) from softioc import asyncio_dispatcher, builder, softioc +from softioc.device import set_blocking # Test file for miscellaneous tests related to records @@ -180,6 +181,35 @@ def test_pini_always_on(): assert mbbi.PINI.Value() == "YES" +def check_record_blocking_attributes(record): + """Helper function to assert expected attributes exist for a blocking + record""" + assert record._blocking is True + assert record._callback != 0 + +def test_blocking_creates_attributes(): + """Test that setting the blocking flag on record creation creates the + expected attributes""" + ao1 = builder.aOut("OUTREC1", blocking=True) + check_record_blocking_attributes(ao1) + + ao2 = builder.aOut("OUTREC2", blocking=False) + assert ao2._blocking is False + +def test_blocking_global_flag_creates_attributes(): + """Test that the global blocking flag creates the expected attributes""" + set_blocking(True) + bo1 = builder.boolOut("OUTREC1") + + check_record_blocking_attributes(bo1) + + set_blocking(False) + bo2 = builder.boolOut("OUTREC2") + assert bo2._blocking is False + + bo3 = builder.boolOut("OUTREC3", blocking=True) + check_record_blocking_attributes(bo3) + def validate_fixture_names(params): """Provide nice names for the out_records fixture in TestValidate class""" From bb322f0a29462114653a95f9435c59c9924992d1 Mon Sep 17 00:00:00 2001 From: AlexWells Date: Thu, 28 Jul 2022 10:08:22 +0100 Subject: [PATCH 5/9] Refactor the check for async on_update calls Also add single and multi threaded tests for blocking records. --- softioc/device.py | 28 +++-- tests/test_records.py | 245 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 234 insertions(+), 39 deletions(-) diff --git a/softioc/device.py b/softioc/device.py index e0cdadd5..04f6047f 100644 --- a/softioc/device.py +++ b/softioc/device.py @@ -1,4 +1,4 @@ -from inspect import isawaitable +from inspect import iscoroutinefunction import os import time import ctypes @@ -186,18 +186,16 @@ def init_record(self, record): return self._epics_rc_ def __completion(self, record): + '''Signals that all on_update processing is finished''' if self._blocking: signal_processing_complete(record, self._callback) pass def __wrap_completion(self, value, record): - update = self.__on_update(value) - if isawaitable(update): - dispatcher(self._complete_update, update, record) - else: - self.__completion(record) + self.__on_update(value) + self.__completion(record) - async def _complete_update(self, future, record): + async def __async_wrap_completion(self, future, record): await future self.__completion(record) @@ -227,7 +225,21 @@ def _process(self, record): record.UDF = 0 if self.__on_update and self.__enable_write: record.PACT = self._blocking - dispatcher(self.__wrap_completion, python_value, record) + + if iscoroutinefunction(self.__on_update): + # This is an unfortunate, but unavoidable, leak of + # implementation detail that really should be kept within + # the dispatcher, but cannot be. This is due to asyncio not + # allowing its event loop to be nested, thus either + # requiring an additional call to the dispatcher once you + # acquire the Future from the coroutine, or doing this. + dispatcher( + self.__async_wrap_completion, + self.__on_update(python_value), + record + ) + else: + dispatcher(self.__wrap_completion, python_value, record) return EPICS_OK diff --git a/tests/test_records.py b/tests/test_records.py index 55e0ecc5..aa84d2ba 100644 --- a/tests/test_records.py +++ b/tests/test_records.py @@ -1,9 +1,11 @@ +import asyncio import multiprocessing import numpy import os import pytest from conftest import ( + aioca_cleanup, log, create_random_prefix, requires_cothread, @@ -180,37 +182,6 @@ def test_pini_always_on(): mbbi = builder.mbbIn("BBB", initial_value=5) assert mbbi.PINI.Value() == "YES" - -def check_record_blocking_attributes(record): - """Helper function to assert expected attributes exist for a blocking - record""" - assert record._blocking is True - assert record._callback != 0 - -def test_blocking_creates_attributes(): - """Test that setting the blocking flag on record creation creates the - expected attributes""" - ao1 = builder.aOut("OUTREC1", blocking=True) - check_record_blocking_attributes(ao1) - - ao2 = builder.aOut("OUTREC2", blocking=False) - assert ao2._blocking is False - -def test_blocking_global_flag_creates_attributes(): - """Test that the global blocking flag creates the expected attributes""" - set_blocking(True) - bo1 = builder.boolOut("OUTREC1") - - check_record_blocking_attributes(bo1) - - set_blocking(False) - bo2 = builder.boolOut("OUTREC2") - assert bo2._blocking is False - - bo3 = builder.boolOut("OUTREC3", blocking=True) - check_record_blocking_attributes(bo3) - - def validate_fixture_names(params): """Provide nice names for the out_records fixture in TestValidate class""" return params[0].__name__ @@ -530,3 +501,215 @@ def test_on_update_true_false(self, out_records): """Test that on_update works correctly for all out records when always_update is True and the put'ed value is always different""" self.on_update_runner(out_records, True, False) + + + +class TestBlocking: + """Tests related to the Blocking functionality""" + + def check_record_blocking_attributes(self, record): + """Helper function to assert expected attributes exist for a blocking + record""" + assert record._blocking is True + assert record._callback != 0 + + def test_blocking_creates_attributes(self): + """Test that setting the blocking flag on record creation creates the + expected attributes""" + ao1 = builder.aOut("OUTREC1", blocking=True) + self.check_record_blocking_attributes(ao1) + + ao2 = builder.aOut("OUTREC2", blocking=False) + assert ao2._blocking is False + + def test_blocking_global_flag_creates_attributes(self): + """Test that the global blocking flag creates the expected attributes""" + set_blocking(True) + bo1 = builder.boolOut("OUTREC1") + self.check_record_blocking_attributes(bo1) + + set_blocking(False) + bo2 = builder.boolOut("OUTREC2") + assert bo2._blocking is False + + bo3 = builder.boolOut("OUTREC3", blocking=True) + self.check_record_blocking_attributes(bo3) + + def blocking_test_func(self, device_name, conn): + + builder.SetDeviceName(device_name) + + count_rec = builder.longIn("BLOCKING-COUNTER", initial_value=0) + + async def blocking_update_func(new_val): + """A function that will block for some time""" + log("CHILD: blocking_update_func starting") + await asyncio.sleep(0.5) + log("CHILD: Finished sleep!") + completed_count = count_rec.get() + 1 + count_rec.set(completed_count) + log( + "CHILD: blocking_update_func finished, completed ", + completed_count + ) + + builder.longOut( + "BLOCKING-REC", + on_update=blocking_update_func, + always_update=True, + blocking=True + ) + + + dispatcher = asyncio_dispatcher.AsyncioDispatcher() + builder.LoadDatabase() + softioc.iocInit(dispatcher) + + conn.send("R") # "Ready" + + log("CHILD: Sent R over Connection to Parent") + + # Keep process alive while main thread runs CAGET + if conn.poll(TIMEOUT): + val = conn.recv() + assert val == "D", "Did not receive expected Done character" + + log("CHILD: Received exit command, child exiting") + + def test_blocking_single_thread_multiple_calls(self): + """Test that a blocking record correctly causes multiple caputs from + a single thread to wait for the expected time""" + parent_conn, child_conn = multiprocessing.Pipe() + + device_name = create_random_prefix() + + process = multiprocessing.Process( + target=self.blocking_test_func, + args=(device_name, child_conn), + ) + + process.start() + + log("PARENT: Child started, waiting for R command") + + from cothread.catools import caget, caput, _channel_cache + + try: + # Wait for message that IOC has started + select_and_recv(parent_conn, "R") + + log("PARENT: received R command") + + # Suppress potential spurious warnings + _channel_cache.purge() + + # Track number of puts sent + count = 1 + MAX_COUNT = 4 + + log("PARENT: begining While loop") + + while count <= MAX_COUNT: + put_ret = caput( + device_name + ":BLOCKING-REC", + 5, # Arbitrary value + wait=True, + timeout=TIMEOUT + ) + assert put_ret.ok, f"caput did not succeed: {put_ret.errorcode}" + + log(f"PARENT: completed caput with count {count}") + + count += 1 + + log("PARENT: Getting value from counter") + + ret_val = caget( + device_name + ":BLOCKING-COUNTER", + timeout=TIMEOUT, + ) + assert ret_val.ok, \ + f"caget did not succeed: {ret_val.errorcode}, {ret_val}" + + log(f"PARENT: Received val from COUNTER: {ret_val}") + + assert ret_val == MAX_COUNT + + finally: + # Suppress potential spurious warnings + _channel_cache.purge() + + log("PARENT: Sending Done command to child") + parent_conn.send("D") # "Done" + process.join(timeout=TIMEOUT) + log(f"PARENT: Join completed with exitcode {process.exitcode}") + if process.exitcode is None: + pytest.fail("Process did not terminate") + + @pytest.mark.asyncio + async def test_blocking_multiple_threads(self): + """Test that a blocking record correctly causes caputs from multiple + threads to wait for the expected time""" + parent_conn, child_conn = multiprocessing.Pipe() + + device_name = create_random_prefix() + + process = multiprocessing.Process( + target=self.blocking_test_func, + args=(device_name, child_conn), + ) + + process.start() + + log("PARENT: Child started, waiting for R command") + + from aioca import caget, caput + + try: + # Wait for message that IOC has started + select_and_recv(parent_conn, "R") + + log("PARENT: received R command") + + MAX_COUNT = 4 + + async def query_record(index): + log("SPAWNED: beginning blocking caput ", index) + await caput( + device_name + ":BLOCKING-REC", + 5, # Arbitrary value + wait=True, + timeout=TIMEOUT + ) + log("SPAWNED: caput complete ", index) + + queries = [query_record(i) for i in range(MAX_COUNT)] * MAX_COUNT + + log("PARENT: Gathering list of queries") + + await asyncio.gather(*queries) + + log("PARENT: Getting value from counter") + + ret_val = await caget( + device_name + ":BLOCKING-COUNTER", + timeout=TIMEOUT, + ) + assert ret_val.ok, \ + f"caget did not succeed: {ret_val.errorcode}, {ret_val}" + + log(f"PARENT: Received val from COUNTER: {ret_val}") + + assert ret_val == MAX_COUNT + + finally: + # Clear the cache before stopping the IOC stops + # "channel disconnected" error messages + aioca_cleanup() + + log("PARENT: Sending Done command to child") + parent_conn.send("D") # "Done" + process.join(timeout=TIMEOUT) + log(f"PARENT: Join completed with exitcode {process.exitcode}") + if process.exitcode is None: + pytest.fail("Process did not terminate") From 0591f5171629932793c9a15885b7762d4f5db69b Mon Sep 17 00:00:00 2001 From: AlexWells Date: Thu, 28 Jul 2022 11:24:07 +0100 Subject: [PATCH 6/9] Add documentation of blocking and SetBlocking Renamed the function to conform to naming convention. Added CHANGELOG entry. --- CHANGELOG.rst | 4 ++++ docs/reference/api.rst | 28 ++++++++++++++++++++++++++++ softioc/builder.py | 4 ++-- softioc/device.py | 3 +-- tests/test_records.py | 8 +++++--- 5 files changed, 40 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 125aec3c..fa54fa57 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -10,6 +10,10 @@ Versioning `_. Unreleased_ ----------- +Added: + +- `Caput with callback <../../pull/98>`_ + Fixed: - `Passing a custom asyncio event loop into the AsyncioDispatcher causes methods to never run <../../pull/96>`_ diff --git a/docs/reference/api.rst b/docs/reference/api.rst index 9b311240..210fc425 100644 --- a/docs/reference/api.rst +++ b/docs/reference/api.rst @@ -228,6 +228,23 @@ Test Facilities`_ documentation for more details of each function. which don't change its value will be discarded. In particular this means that such updates don't call `validate` or `on_update`. + .. _blocking: + + `blocking` + ~~~~~~~~~~ + + Only available on OUT records. When set to `True` the record will set the + ``PACT`` field when processing is ongoing. This means that ``caput`` and + similar tools can correctly wait for processing to complete. + + This flag defaults to `False`, to retain compatibility with previous + versions. + + .. seealso:: + `SetBlocking` for configuring a global default blocking value + + + For all of these functions any EPICS database field can be assigned a value by passing it as a keyword argument for the corresponding field name (in upper case) or by assigning to the corresponding field of the returned record object. @@ -358,6 +375,17 @@ record creation function. prevent the accidential creation of records with the currently set device name. +.. function:: SetBlocking(blocking) + + This can be used to globally set the default `blocking` flag, which will + apply to all records created after this point. This allows blocking to be + easily set/unset when creating groups of records. + + This does not change the blocking value for any already created records. + + .. seealso:: + `blocking` for description of the flag + The following helper functions are useful when constructing links between records. diff --git a/softioc/builder.py b/softioc/builder.py index 2f64e75b..497e8b5e 100644 --- a/softioc/builder.py +++ b/softioc/builder.py @@ -9,7 +9,7 @@ from . import device, pythonSoftIoc # noqa # Re-export this so users only have to import the builder -from .device import set_blocking # noqa +from .device import SetBlocking # noqa PythonDevice = pythonSoftIoc.PythonDevice() @@ -305,5 +305,5 @@ def UnsetDevice(): 'LoadDatabase', 'SetDeviceName', 'UnsetDevice', # Device support functions - 'set_blocking' + 'SetBlocking' ] diff --git a/softioc/device.py b/softioc/device.py index 04f6047f..3c446922 100644 --- a/softioc/device.py +++ b/softioc/device.py @@ -27,8 +27,7 @@ # Default False to maintain behaviour from previous versions. blocking = False -# TODO: Docs and Tests for the Blocking feature -def set_blocking(val): +def SetBlocking(val): global blocking blocking = val diff --git a/tests/test_records.py b/tests/test_records.py index aa84d2ba..c54363a0 100644 --- a/tests/test_records.py +++ b/tests/test_records.py @@ -16,7 +16,7 @@ ) from softioc import asyncio_dispatcher, builder, softioc -from softioc.device import set_blocking +from softioc.device import SetBlocking # Test file for miscellaneous tests related to records @@ -185,6 +185,7 @@ def test_pini_always_on(): def validate_fixture_names(params): """Provide nice names for the out_records fixture in TestValidate class""" return params[0].__name__ + class TestValidate: """Tests related to the validate callback""" @@ -524,11 +525,11 @@ def test_blocking_creates_attributes(self): def test_blocking_global_flag_creates_attributes(self): """Test that the global blocking flag creates the expected attributes""" - set_blocking(True) + SetBlocking(True) bo1 = builder.boolOut("OUTREC1") self.check_record_blocking_attributes(bo1) - set_blocking(False) + SetBlocking(False) bo2 = builder.boolOut("OUTREC2") assert bo2._blocking is False @@ -576,6 +577,7 @@ async def blocking_update_func(new_val): log("CHILD: Received exit command, child exiting") + @requires_cothread def test_blocking_single_thread_multiple_calls(self): """Test that a blocking record correctly causes multiple caputs from a single thread to wait for the expected time""" From 5791af4330d9b4133f02880e63d0d475d4bbb63b Mon Sep 17 00:00:00 2001 From: AlexWells Date: Thu, 28 Jul 2022 14:18:45 +0100 Subject: [PATCH 7/9] Fix bug with async on_update_name callbacks This leaks even more async code into the device, but I can't see a way around it without having to require the dispatcher be provided before creating records... --- softioc/device.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/softioc/device.py b/softioc/device.py index 3c446922..51e271a4 100644 --- a/softioc/device.py +++ b/softioc/device.py @@ -150,6 +150,19 @@ def __init__(self, name, **kargs): else: self.__on_update = None + # We cannot simply use `iscoroutinefunction` at the point of calling + # on_update as the lambda used for on_update_name is NOT a coroutine. + # There's no way to examine the lambda to see if it'd return a + # coroutine without calling it, so we must keep track of it ourselves. + # This is an unfortunate, unavoidable, leak of implementation detail + # that really should be contained in the dispatcher. + self.__on_update_is_coroutine = False + if ( + iscoroutinefunction(on_update) + or iscoroutinefunction(on_update_name) + ): + self.__on_update_is_coroutine = True + self.__validate = kargs.pop('validate', None) self.__always_update = kargs.pop('always_update', False) self.__enable_write = True @@ -225,7 +238,7 @@ def _process(self, record): if self.__on_update and self.__enable_write: record.PACT = self._blocking - if iscoroutinefunction(self.__on_update): + if self.__on_update_is_coroutine: # This is an unfortunate, but unavoidable, leak of # implementation detail that really should be kept within # the dispatcher, but cannot be. This is due to asyncio not From fdeca46ad92ca20f0031735ad1bba8abeb58f877 Mon Sep 17 00:00:00 2001 From: AlexWells Date: Thu, 28 Jul 2022 16:44:13 +0100 Subject: [PATCH 8/9] Refactor dispatchers to remove implementation leak By creating/modifying the dispatchers we can easily handle the __completion being called after the __on_update. Without doing this there seem to be unavoidable implementation leaks, where the device.py file would have to care about the difference between cothread and asyncio. --- softioc/asyncio_dispatcher.py | 14 ++++++++---- softioc/cothread_dispatcher.py | 17 ++++++++++++++ softioc/device.py | 42 +++++----------------------------- softioc/softioc.py | 6 ++--- tests/sim_asyncio_ioc.py | 11 +++++++++ tests/test_asyncio.py | 3 +++ 6 files changed, 49 insertions(+), 44 deletions(-) create mode 100644 softioc/cothread_dispatcher.py diff --git a/softioc/asyncio_dispatcher.py b/softioc/asyncio_dispatcher.py index 7eae5f8b..a80c64d3 100644 --- a/softioc/asyncio_dispatcher.py +++ b/softioc/asyncio_dispatcher.py @@ -32,12 +32,18 @@ def aioJoin(worker=worker, loop=self.loop): else: self.loop = loop - def __call__(self, func, *args): + def __call__(self, func, completion, func_args=(), completion_args=()): async def async_wrapper(): try: - ret = func(*args) - if inspect.isawaitable(ret): - await ret + if inspect.iscoroutinefunction(func): + await func(*func_args) + else: + ret = func(*func_args) + # Handle the case of a synchronous function that returns a + # coroutine, like the lambda for on_update_name does + if inspect.isawaitable(ret): + await ret + completion(*completion_args) except Exception: logging.exception("Exception when awaiting callback") asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop) diff --git a/softioc/cothread_dispatcher.py b/softioc/cothread_dispatcher.py new file mode 100644 index 00000000..f9f2d265 --- /dev/null +++ b/softioc/cothread_dispatcher.py @@ -0,0 +1,17 @@ + +class CothreadDispatcher: + def __init__(self): + """A dispatcher for `cothread` based IOCs, suitable to be passed to + `softioc.iocInit`. """ + # Import here to ensure we don't instantiate any of cothread's global + # state unless we have to + import cothread + # Create our own cothread callback queue so that our callbacks + # processing doesn't interfere with other callback processing. + self.__dispatcher = cothread.cothread._Callback() + + def __call__(self, func, completion, func_args=(), completion_args=()): + def wrapper(): + func(*func_args) + completion(*completion_args) + self.__dispatcher(wrapper) diff --git a/softioc/device.py b/softioc/device.py index 51e271a4..dba8db12 100644 --- a/softioc/device.py +++ b/softioc/device.py @@ -150,19 +150,6 @@ def __init__(self, name, **kargs): else: self.__on_update = None - # We cannot simply use `iscoroutinefunction` at the point of calling - # on_update as the lambda used for on_update_name is NOT a coroutine. - # There's no way to examine the lambda to see if it'd return a - # coroutine without calling it, so we must keep track of it ourselves. - # This is an unfortunate, unavoidable, leak of implementation detail - # that really should be contained in the dispatcher. - self.__on_update_is_coroutine = False - if ( - iscoroutinefunction(on_update) - or iscoroutinefunction(on_update_name) - ): - self.__on_update_is_coroutine = True - self.__validate = kargs.pop('validate', None) self.__always_update = kargs.pop('always_update', False) self.__enable_write = True @@ -203,14 +190,6 @@ def __completion(self, record): signal_processing_complete(record, self._callback) pass - def __wrap_completion(self, value, record): - self.__on_update(value) - self.__completion(record) - - async def __async_wrap_completion(self, future, record): - await future - self.__completion(record) - def _process(self, record): '''Processing suitable for output records. Performs immediate value validation and asynchronous update notification.''' @@ -237,21 +216,12 @@ def _process(self, record): record.UDF = 0 if self.__on_update and self.__enable_write: record.PACT = self._blocking - - if self.__on_update_is_coroutine: - # This is an unfortunate, but unavoidable, leak of - # implementation detail that really should be kept within - # the dispatcher, but cannot be. This is due to asyncio not - # allowing its event loop to be nested, thus either - # requiring an additional call to the dispatcher once you - # acquire the Future from the coroutine, or doing this. - dispatcher( - self.__async_wrap_completion, - self.__on_update(python_value), - record - ) - else: - dispatcher(self.__wrap_completion, python_value, record) + dispatcher( + self.__on_update, + self.__completion, + func_args=(python_value,), + completion_args=(record,) + ) return EPICS_OK diff --git a/softioc/softioc.py b/softioc/softioc.py index 07449525..24d291a1 100644 --- a/softioc/softioc.py +++ b/softioc/softioc.py @@ -7,6 +7,7 @@ from epicsdbbuilder.recordset import recordset from . import imports, device +from . import cothread_dispatcher __all__ = ['dbLoadDatabase', 'iocInit', 'interactive_ioc'] @@ -31,10 +32,7 @@ def iocInit(dispatcher=None): ''' if dispatcher is None: # Fallback to cothread - import cothread - # Create our own cothread callback queue so that our callbacks - # processing doesn't interfere with other callback processing. - dispatcher = cothread.cothread._Callback() + dispatcher = cothread_dispatcher.CothreadDispatcher() # Set the dispatcher for record processing callbacks device.dispatcher = dispatcher imports.iocInit() diff --git a/tests/sim_asyncio_ioc.py b/tests/sim_asyncio_ioc.py index bc54a3fc..4ad8f05f 100644 --- a/tests/sim_asyncio_ioc.py +++ b/tests/sim_asyncio_ioc.py @@ -36,6 +36,17 @@ async def callback(value): # Create a record to set the alarm t_ao = builder.aOut('ALARM', on_update=callback) + async def on_update_name_callback(value, name): + print(name, "value", value) + + builder.longOut( + "NAME-CALLBACK", + initial_value = 3, + always_update=True, + on_update_name=on_update_name_callback, + blocking=True + ) + # Run the IOC builder.LoadDatabase() softioc.iocInit(asyncio_dispatcher.AsyncioDispatcher()) diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index df756672..7a794b28 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -39,6 +39,8 @@ async def test_asyncio_ioc(asyncio_ioc): await caput(pre + ":ALARM", 3, wait=True) + await caput(pre + ":NAME-CALLBACK", 12, wait=True) + # Confirm the ALARM callback has completed select_and_recv(conn, "C") # "Complete" @@ -68,6 +70,7 @@ async def test_asyncio_ioc(asyncio_ioc): assert "%s:ALARM.VAL 0 -> 3" % pre in out assert 'on_update %s:AO : 3.0' % pre in out assert 'async update 3.0 (23.45)' in out + assert "%s:NAME-CALLBACK value 12" % pre in out assert 'Starting iocInit' in err assert 'iocRun: All initialization complete' in err except Exception: From 877a89c1ee258ea0c28a16d384dea9993147a064 Mon Sep 17 00:00:00 2001 From: AlexWells Date: Fri, 29 Jul 2022 10:38:45 +0100 Subject: [PATCH 9/9] Code review changes from Michael. Includes: - SetBlocking returns the old blocking state value. Added test. - Fix some style issues - Make completion function on the dispatcher optional, to maintain some compatibility with the previous API --- docs/reference/api.rst | 10 ++++++++-- softioc/asyncio_dispatcher.py | 21 +++++++++++---------- softioc/cothread_dispatcher.py | 10 ++++++++-- softioc/device.py | 17 +++++++---------- softioc/imports.py | 3 +-- tests/test_records.py | 14 ++++++++++++++ 6 files changed, 49 insertions(+), 26 deletions(-) diff --git a/docs/reference/api.rst b/docs/reference/api.rst index 210fc425..2967c21d 100644 --- a/docs/reference/api.rst +++ b/docs/reference/api.rst @@ -377,10 +377,16 @@ record creation function. .. function:: SetBlocking(blocking) - This can be used to globally set the default `blocking` flag, which will - apply to all records created after this point. This allows blocking to be + This can be used to globally set the default of the `blocking` flag, which + will apply to all records created after this point. This allows blocking to be easily set/unset when creating groups of records. + Returns the previous value of the `blocking` flag, which enables code like this:: + + old_blocking = SetBlocking(new_blocking) + create_records() + SetBlocking(old_blocking) + This does not change the blocking value for any already created records. .. seealso:: diff --git a/softioc/asyncio_dispatcher.py b/softioc/asyncio_dispatcher.py index a80c64d3..e6faa344 100644 --- a/softioc/asyncio_dispatcher.py +++ b/softioc/asyncio_dispatcher.py @@ -32,18 +32,19 @@ def aioJoin(worker=worker, loop=self.loop): else: self.loop = loop - def __call__(self, func, completion, func_args=(), completion_args=()): + def __call__( + self, + func, + func_args=(), + completion = None, + completion_args=()): async def async_wrapper(): try: - if inspect.iscoroutinefunction(func): - await func(*func_args) - else: - ret = func(*func_args) - # Handle the case of a synchronous function that returns a - # coroutine, like the lambda for on_update_name does - if inspect.isawaitable(ret): - await ret - completion(*completion_args) + ret = func(*func_args) + if inspect.isawaitable(ret): + await ret + if completion: + completion(*completion_args) except Exception: logging.exception("Exception when awaiting callback") asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop) diff --git a/softioc/cothread_dispatcher.py b/softioc/cothread_dispatcher.py index f9f2d265..647b7537 100644 --- a/softioc/cothread_dispatcher.py +++ b/softioc/cothread_dispatcher.py @@ -10,8 +10,14 @@ def __init__(self): # processing doesn't interfere with other callback processing. self.__dispatcher = cothread.cothread._Callback() - def __call__(self, func, completion, func_args=(), completion_args=()): + def __call__( + self, + func, + func_args=(), + completion = None, + completion_args=()): def wrapper(): func(*func_args) - completion(*completion_args) + if completion: + completion(*completion_args) self.__dispatcher(wrapper) diff --git a/softioc/device.py b/softioc/device.py index dba8db12..0b5dad81 100644 --- a/softioc/device.py +++ b/softioc/device.py @@ -1,10 +1,8 @@ -from inspect import iscoroutinefunction import os import time import ctypes from ctypes import * import numpy -from threading import Event from . import alarm from . import fields @@ -19,7 +17,6 @@ # This is set from softioc.iocInit -# dispatcher(func, *args) will queue a callback to happen dispatcher = None # Global blocking flag, used to mark asynchronous (False) or synchronous (True) @@ -27,9 +24,12 @@ # Default False to maintain behaviour from previous versions. blocking = False -def SetBlocking(val): +# Set the current global blocking flag, and return the previous value. +def SetBlocking(new_val): global blocking - blocking = val + old_val = blocking + blocking = new_val + return old_val # EPICS processing return codes @@ -163,7 +163,6 @@ def __init__(self, name, **kargs): if self._blocking: self._callback = create_callback_capsule() - self.__super.__init__(name, **kargs) def init_record(self, record): @@ -188,7 +187,6 @@ def __completion(self, record): '''Signals that all on_update processing is finished''' if self._blocking: signal_processing_complete(record, self._callback) - pass def _process(self, record): '''Processing suitable for output records. Performs immediate value @@ -218,10 +216,9 @@ def _process(self, record): record.PACT = self._blocking dispatcher( self.__on_update, - self.__completion, func_args=(python_value,), - completion_args=(record,) - ) + completion = self.__completion, + completion_args=(record,)) return EPICS_OK diff --git a/softioc/imports.py b/softioc/imports.py index 5dd14a61..7208ebae 100644 --- a/softioc/imports.py +++ b/softioc/imports.py @@ -35,8 +35,7 @@ def signal_processing_complete(record, callback): _extension.signal_processing_complete( record.PRIO, record.record.value, - callback - ) + callback) def expect_success(status, function, args): assert status == 0, 'Expected success' diff --git a/tests/test_records.py b/tests/test_records.py index c54363a0..fb1c9394 100644 --- a/tests/test_records.py +++ b/tests/test_records.py @@ -536,6 +536,20 @@ def test_blocking_global_flag_creates_attributes(self): bo3 = builder.boolOut("OUTREC3", blocking=True) self.check_record_blocking_attributes(bo3) + def test_blocking_returns_old_state(self): + """Test that SetBlocking returns the previously set value""" + old_val = SetBlocking(True) + assert old_val is False # Default is False + + old_val = SetBlocking(False) + assert old_val is True + + # Test it correctly maintains state when passed the current value + old_val = SetBlocking(False) + assert old_val is False + old_val = SetBlocking(False) + assert old_val is False + def blocking_test_func(self, device_name, conn): builder.SetDeviceName(device_name)