diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 125aec3c..fa54fa57 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -10,6 +10,10 @@ Versioning `_. Unreleased_ ----------- +Added: + +- `Caput with callback <../../pull/98>`_ + Fixed: - `Passing a custom asyncio event loop into the AsyncioDispatcher causes methods to never run <../../pull/96>`_ diff --git a/docs/reference/api.rst b/docs/reference/api.rst index 9b311240..2967c21d 100644 --- a/docs/reference/api.rst +++ b/docs/reference/api.rst @@ -228,6 +228,23 @@ Test Facilities`_ documentation for more details of each function. which don't change its value will be discarded. In particular this means that such updates don't call `validate` or `on_update`. + .. _blocking: + + `blocking` + ~~~~~~~~~~ + + Only available on OUT records. When set to `True` the record will set the + ``PACT`` field when processing is ongoing. This means that ``caput`` and + similar tools can correctly wait for processing to complete. + + This flag defaults to `False`, to retain compatibility with previous + versions. + + .. seealso:: + `SetBlocking` for configuring a global default blocking value + + + For all of these functions any EPICS database field can be assigned a value by passing it as a keyword argument for the corresponding field name (in upper case) or by assigning to the corresponding field of the returned record object. @@ -358,6 +375,23 @@ record creation function. prevent the accidential creation of records with the currently set device name. +.. function:: SetBlocking(blocking) + + This can be used to globally set the default of the `blocking` flag, which + will apply to all records created after this point. This allows blocking to be + easily set/unset when creating groups of records. + + Returns the previous value of the `blocking` flag, which enables code like this:: + + old_blocking = SetBlocking(new_blocking) + create_records() + SetBlocking(old_blocking) + + This does not change the blocking value for any already created records. + + .. seealso:: + `blocking` for description of the flag + The following helper functions are useful when constructing links between records. diff --git a/softioc/asyncio_dispatcher.py b/softioc/asyncio_dispatcher.py index 7eae5f8b..e6faa344 100644 --- a/softioc/asyncio_dispatcher.py +++ b/softioc/asyncio_dispatcher.py @@ -32,12 +32,19 @@ def aioJoin(worker=worker, loop=self.loop): else: self.loop = loop - def __call__(self, func, *args): + def __call__( + self, + func, + func_args=(), + completion = None, + completion_args=()): async def async_wrapper(): try: - ret = func(*args) + ret = func(*func_args) if inspect.isawaitable(ret): await ret + if completion: + completion(*completion_args) except Exception: logging.exception("Exception when awaiting callback") asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop) diff --git a/softioc/builder.py b/softioc/builder.py index 591141c5..497e8b5e 100644 --- a/softioc/builder.py +++ b/softioc/builder.py @@ -8,6 +8,8 @@ LoadDbdFile(os.path.join(os.path.dirname(__file__), 'device.dbd')) from . import device, pythonSoftIoc # noqa +# Re-export this so users only have to import the builder +from .device import SetBlocking # noqa PythonDevice = pythonSoftIoc.PythonDevice() @@ -301,5 +303,7 @@ def UnsetDevice(): 'Action', # Other builder support functions 'LoadDatabase', - 'SetDeviceName', 'UnsetDevice' + 'SetDeviceName', 'UnsetDevice', + # Device support functions + 'SetBlocking' ] diff --git a/softioc/cothread_dispatcher.py b/softioc/cothread_dispatcher.py new file mode 100644 index 00000000..647b7537 --- /dev/null +++ b/softioc/cothread_dispatcher.py @@ -0,0 +1,23 @@ + +class CothreadDispatcher: + def __init__(self): + """A dispatcher for `cothread` based IOCs, suitable to be passed to + `softioc.iocInit`. """ + # Import here to ensure we don't instantiate any of cothread's global + # state unless we have to + import cothread + # Create our own cothread callback queue so that our callbacks + # processing doesn't interfere with other callback processing. + self.__dispatcher = cothread.cothread._Callback() + + def __call__( + self, + func, + func_args=(), + completion = None, + completion_args=()): + def wrapper(): + func(*func_args) + if completion: + completion(*completion_args) + self.__dispatcher(wrapper) diff --git a/softioc/device.py b/softioc/device.py index ad14a7ae..0b5dad81 100644 --- a/softioc/device.py +++ b/softioc/device.py @@ -6,14 +6,31 @@ from . import alarm from . import fields -from .imports import dbLoadDatabase, recGblResetAlarms, db_put_field +from .imports import ( + create_callback_capsule, + dbLoadDatabase, + signal_processing_complete, + recGblResetAlarms, + db_put_field, +) from .device_core import DeviceSupportCore, RecordLookup # This is set from softioc.iocInit -# dispatcher(func, *args) will queue a callback to happen dispatcher = None +# Global blocking flag, used to mark asynchronous (False) or synchronous (True) +# processing modes for Out records. +# Default False to maintain behaviour from previous versions. +blocking = False + +# Set the current global blocking flag, and return the previous value. +def SetBlocking(new_val): + global blocking + old_val = blocking + blocking = new_val + return old_val + # EPICS processing return codes EPICS_OK = 0 @@ -142,6 +159,10 @@ def __init__(self, name, **kargs): else: self._value = None + self._blocking = kargs.pop('blocking', blocking) + if self._blocking: + self._callback = create_callback_capsule() + self.__super.__init__(name, **kargs) def init_record(self, record): @@ -162,9 +183,18 @@ def init_record(self, record): recGblResetAlarms(record) return self._epics_rc_ + def __completion(self, record): + '''Signals that all on_update processing is finished''' + if self._blocking: + signal_processing_complete(record, self._callback) + def _process(self, record): '''Processing suitable for output records. Performs immediate value validation and asynchronous update notification.''' + + if record.PACT: + return EPICS_OK + value = self._read_value(record) if not self.__always_update and \ self._compare_values(value, self._value): @@ -183,7 +213,13 @@ def _process(self, record): self._value = value record.UDF = 0 if self.__on_update and self.__enable_write: - dispatcher(self.__on_update, python_value) + record.PACT = self._blocking + dispatcher( + self.__on_update, + func_args=(python_value,), + completion = self.__completion, + completion_args=(record,)) + return EPICS_OK diff --git a/softioc/extension.c b/softioc/extension.c index 3ab7b2c4..b95b26d4 100644 --- a/softioc/extension.c +++ b/softioc/extension.c @@ -1,4 +1,4 @@ - +/* Provide EPICS functions in Python format */ #define PY_SSIZE_T_CLEAN #include #include @@ -6,6 +6,7 @@ #define db_accessHFORdb_accessC // Needed to get correct DBF_ values #include #include +#include #include #include #include @@ -13,6 +14,7 @@ #include #include + /* Reference stealing version of PyDict_SetItemString */ static void set_dict_item_steal( PyObject *dict, const char *name, PyObject *py_value) @@ -209,6 +211,46 @@ static PyObject *install_pv_logging(PyObject *self, PyObject *args) Py_RETURN_NONE; } +#define CAPSULE_NAME "ProcessDeviceSupportOut.callback" + +static void capsule_destructor(PyObject *obj) +{ + free(PyCapsule_GetPointer(obj, CAPSULE_NAME)); +} + + +static PyObject *create_callback_capsule(PyObject *self, PyObject *args) +{ + void *callback = malloc(sizeof(CALLBACK)); + return PyCapsule_New(callback, CAPSULE_NAME, &capsule_destructor); +} + +static PyObject *signal_processing_complete(PyObject *self, PyObject *args) +{ + int priority; + dbCommon *record; + PyObject *callback_capsule; + + if (!PyArg_ParseTuple(args, "inO", &priority, &record, &callback_capsule)) + { + return NULL; + } + + if (!PyCapsule_IsValid(callback_capsule, CAPSULE_NAME)) + { + return PyErr_Format( + PyExc_TypeError, + "Given object was not a capsule with name \"%s\"", + CAPSULE_NAME); + } + + CALLBACK *callback = PyCapsule_GetPointer(callback_capsule, CAPSULE_NAME); + + callbackRequestProcessCallback(callback, priority, record); + + Py_RETURN_NONE; +} + static struct PyMethodDef softioc_methods[] = { {"get_DBF_values", get_DBF_values, METH_VARARGS, "Get a map of DBF names to values"}, @@ -218,6 +260,10 @@ static struct PyMethodDef softioc_methods[] = { "Put a database field to a value"}, {"install_pv_logging", install_pv_logging, METH_VARARGS, "Install caput logging to stdout"}, + {"signal_processing_complete", signal_processing_complete, METH_VARARGS, + "Inform EPICS that asynchronous record processing has completed"}, + {"create_callback_capsule", create_callback_capsule, METH_VARARGS, + "Create a CALLBACK structure inside a PyCapsule"}, {NULL, NULL, 0, NULL} /* Sentinel */ }; diff --git a/softioc/imports.py b/softioc/imports.py index 699f2302..7208ebae 100644 --- a/softioc/imports.py +++ b/softioc/imports.py @@ -27,6 +27,16 @@ def install_pv_logging(acf_file): '''Install pv logging''' _extension.install_pv_logging(acf_file) +def create_callback_capsule(): + return _extension.create_callback_capsule() + +def signal_processing_complete(record, callback): + '''Signal that asynchronous record processing has completed''' + _extension.signal_processing_complete( + record.PRIO, + record.record.value, + callback) + def expect_success(status, function, args): assert status == 0, 'Expected success' @@ -94,6 +104,8 @@ def from_param(cls, value): __all__ = [ 'get_field_offsets', + 'create_callback_capsule', + 'signal_processing_complete', 'registryDeviceSupportAdd', 'IOSCANPVT', 'scanIoRequest', 'scanIoInit', 'dbLoadDatabase', diff --git a/softioc/pythonSoftIoc.py b/softioc/pythonSoftIoc.py index 8d79627f..47b9d00e 100644 --- a/softioc/pythonSoftIoc.py +++ b/softioc/pythonSoftIoc.py @@ -24,7 +24,7 @@ def __init__(self, builder, device, name, **fields): # have to maintain this separately from the corresponding device list. DeviceKeywords = [ 'on_update', 'on_update_name', 'validate', 'always_update', - 'initial_value', '_wf_nelm', '_wf_dtype'] + 'initial_value', '_wf_nelm', '_wf_dtype', 'blocking'] device_kargs = {} for keyword in DeviceKeywords: if keyword in fields: diff --git a/softioc/softioc.py b/softioc/softioc.py index 07449525..24d291a1 100644 --- a/softioc/softioc.py +++ b/softioc/softioc.py @@ -7,6 +7,7 @@ from epicsdbbuilder.recordset import recordset from . import imports, device +from . import cothread_dispatcher __all__ = ['dbLoadDatabase', 'iocInit', 'interactive_ioc'] @@ -31,10 +32,7 @@ def iocInit(dispatcher=None): ''' if dispatcher is None: # Fallback to cothread - import cothread - # Create our own cothread callback queue so that our callbacks - # processing doesn't interfere with other callback processing. - dispatcher = cothread.cothread._Callback() + dispatcher = cothread_dispatcher.CothreadDispatcher() # Set the dispatcher for record processing callbacks device.dispatcher = dispatcher imports.iocInit() diff --git a/tests/conftest.py b/tests/conftest.py index bcb473b7..b7be1f4d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -117,7 +117,7 @@ def enable_code_coverage(): def select_and_recv(conn, expected_char = None): """Wait for the given Connection to have data to receive, and return it. If a character is provided check its correct before returning it.""" - # Must use cothread's select if cothread is prsent, otherwise we'd block + # Must use cothread's select if cothread is present, otherwise we'd block # processing on all cothread processing. But we don't want to use it # unless we have to, as importing cothread can cause issues with forking. if "cothread" in sys.modules: diff --git a/tests/sim_asyncio_ioc.py b/tests/sim_asyncio_ioc.py index bc54a3fc..4ad8f05f 100644 --- a/tests/sim_asyncio_ioc.py +++ b/tests/sim_asyncio_ioc.py @@ -36,6 +36,17 @@ async def callback(value): # Create a record to set the alarm t_ao = builder.aOut('ALARM', on_update=callback) + async def on_update_name_callback(value, name): + print(name, "value", value) + + builder.longOut( + "NAME-CALLBACK", + initial_value = 3, + always_update=True, + on_update_name=on_update_name_callback, + blocking=True + ) + # Run the IOC builder.LoadDatabase() softioc.iocInit(asyncio_dispatcher.AsyncioDispatcher()) diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index df756672..7a794b28 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -39,6 +39,8 @@ async def test_asyncio_ioc(asyncio_ioc): await caput(pre + ":ALARM", 3, wait=True) + await caput(pre + ":NAME-CALLBACK", 12, wait=True) + # Confirm the ALARM callback has completed select_and_recv(conn, "C") # "Complete" @@ -68,6 +70,7 @@ async def test_asyncio_ioc(asyncio_ioc): assert "%s:ALARM.VAL 0 -> 3" % pre in out assert 'on_update %s:AO : 3.0' % pre in out assert 'async update 3.0 (23.45)' in out + assert "%s:NAME-CALLBACK value 12" % pre in out assert 'Starting iocInit' in err assert 'iocRun: All initialization complete' in err except Exception: diff --git a/tests/test_records.py b/tests/test_records.py index ebb54da7..fb1c9394 100644 --- a/tests/test_records.py +++ b/tests/test_records.py @@ -1,9 +1,11 @@ +import asyncio import multiprocessing import numpy import os import pytest from conftest import ( + aioca_cleanup, log, create_random_prefix, requires_cothread, @@ -14,6 +16,7 @@ ) from softioc import asyncio_dispatcher, builder, softioc +from softioc.device import SetBlocking # Test file for miscellaneous tests related to records @@ -179,11 +182,10 @@ def test_pini_always_on(): mbbi = builder.mbbIn("BBB", initial_value=5) assert mbbi.PINI.Value() == "YES" - - def validate_fixture_names(params): """Provide nice names for the out_records fixture in TestValidate class""" return params[0].__name__ + class TestValidate: """Tests related to the validate callback""" @@ -500,3 +502,230 @@ def test_on_update_true_false(self, out_records): """Test that on_update works correctly for all out records when always_update is True and the put'ed value is always different""" self.on_update_runner(out_records, True, False) + + + +class TestBlocking: + """Tests related to the Blocking functionality""" + + def check_record_blocking_attributes(self, record): + """Helper function to assert expected attributes exist for a blocking + record""" + assert record._blocking is True + assert record._callback != 0 + + def test_blocking_creates_attributes(self): + """Test that setting the blocking flag on record creation creates the + expected attributes""" + ao1 = builder.aOut("OUTREC1", blocking=True) + self.check_record_blocking_attributes(ao1) + + ao2 = builder.aOut("OUTREC2", blocking=False) + assert ao2._blocking is False + + def test_blocking_global_flag_creates_attributes(self): + """Test that the global blocking flag creates the expected attributes""" + SetBlocking(True) + bo1 = builder.boolOut("OUTREC1") + self.check_record_blocking_attributes(bo1) + + SetBlocking(False) + bo2 = builder.boolOut("OUTREC2") + assert bo2._blocking is False + + bo3 = builder.boolOut("OUTREC3", blocking=True) + self.check_record_blocking_attributes(bo3) + + def test_blocking_returns_old_state(self): + """Test that SetBlocking returns the previously set value""" + old_val = SetBlocking(True) + assert old_val is False # Default is False + + old_val = SetBlocking(False) + assert old_val is True + + # Test it correctly maintains state when passed the current value + old_val = SetBlocking(False) + assert old_val is False + old_val = SetBlocking(False) + assert old_val is False + + def blocking_test_func(self, device_name, conn): + + builder.SetDeviceName(device_name) + + count_rec = builder.longIn("BLOCKING-COUNTER", initial_value=0) + + async def blocking_update_func(new_val): + """A function that will block for some time""" + log("CHILD: blocking_update_func starting") + await asyncio.sleep(0.5) + log("CHILD: Finished sleep!") + completed_count = count_rec.get() + 1 + count_rec.set(completed_count) + log( + "CHILD: blocking_update_func finished, completed ", + completed_count + ) + + builder.longOut( + "BLOCKING-REC", + on_update=blocking_update_func, + always_update=True, + blocking=True + ) + + + dispatcher = asyncio_dispatcher.AsyncioDispatcher() + builder.LoadDatabase() + softioc.iocInit(dispatcher) + + conn.send("R") # "Ready" + + log("CHILD: Sent R over Connection to Parent") + + # Keep process alive while main thread runs CAGET + if conn.poll(TIMEOUT): + val = conn.recv() + assert val == "D", "Did not receive expected Done character" + + log("CHILD: Received exit command, child exiting") + + @requires_cothread + def test_blocking_single_thread_multiple_calls(self): + """Test that a blocking record correctly causes multiple caputs from + a single thread to wait for the expected time""" + parent_conn, child_conn = multiprocessing.Pipe() + + device_name = create_random_prefix() + + process = multiprocessing.Process( + target=self.blocking_test_func, + args=(device_name, child_conn), + ) + + process.start() + + log("PARENT: Child started, waiting for R command") + + from cothread.catools import caget, caput, _channel_cache + + try: + # Wait for message that IOC has started + select_and_recv(parent_conn, "R") + + log("PARENT: received R command") + + # Suppress potential spurious warnings + _channel_cache.purge() + + # Track number of puts sent + count = 1 + MAX_COUNT = 4 + + log("PARENT: begining While loop") + + while count <= MAX_COUNT: + put_ret = caput( + device_name + ":BLOCKING-REC", + 5, # Arbitrary value + wait=True, + timeout=TIMEOUT + ) + assert put_ret.ok, f"caput did not succeed: {put_ret.errorcode}" + + log(f"PARENT: completed caput with count {count}") + + count += 1 + + log("PARENT: Getting value from counter") + + ret_val = caget( + device_name + ":BLOCKING-COUNTER", + timeout=TIMEOUT, + ) + assert ret_val.ok, \ + f"caget did not succeed: {ret_val.errorcode}, {ret_val}" + + log(f"PARENT: Received val from COUNTER: {ret_val}") + + assert ret_val == MAX_COUNT + + finally: + # Suppress potential spurious warnings + _channel_cache.purge() + + log("PARENT: Sending Done command to child") + parent_conn.send("D") # "Done" + process.join(timeout=TIMEOUT) + log(f"PARENT: Join completed with exitcode {process.exitcode}") + if process.exitcode is None: + pytest.fail("Process did not terminate") + + @pytest.mark.asyncio + async def test_blocking_multiple_threads(self): + """Test that a blocking record correctly causes caputs from multiple + threads to wait for the expected time""" + parent_conn, child_conn = multiprocessing.Pipe() + + device_name = create_random_prefix() + + process = multiprocessing.Process( + target=self.blocking_test_func, + args=(device_name, child_conn), + ) + + process.start() + + log("PARENT: Child started, waiting for R command") + + from aioca import caget, caput + + try: + # Wait for message that IOC has started + select_and_recv(parent_conn, "R") + + log("PARENT: received R command") + + MAX_COUNT = 4 + + async def query_record(index): + log("SPAWNED: beginning blocking caput ", index) + await caput( + device_name + ":BLOCKING-REC", + 5, # Arbitrary value + wait=True, + timeout=TIMEOUT + ) + log("SPAWNED: caput complete ", index) + + queries = [query_record(i) for i in range(MAX_COUNT)] * MAX_COUNT + + log("PARENT: Gathering list of queries") + + await asyncio.gather(*queries) + + log("PARENT: Getting value from counter") + + ret_val = await caget( + device_name + ":BLOCKING-COUNTER", + timeout=TIMEOUT, + ) + assert ret_val.ok, \ + f"caget did not succeed: {ret_val.errorcode}, {ret_val}" + + log(f"PARENT: Received val from COUNTER: {ret_val}") + + assert ret_val == MAX_COUNT + + finally: + # Clear the cache before stopping the IOC stops + # "channel disconnected" error messages + aioca_cleanup() + + log("PARENT: Sending Done command to child") + parent_conn.send("D") # "Done" + process.join(timeout=TIMEOUT) + log(f"PARENT: Join completed with exitcode {process.exitcode}") + if process.exitcode is None: + pytest.fail("Process did not terminate")