Skip to content
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ Versioning <https://semver.org/spec/v2.0.0.html>`_.
Unreleased_
-----------

Added:

- `Caput with callback <../../pull/98>`_

Fixed:

- `Passing a custom asyncio event loop into the AsyncioDispatcher causes methods to never run <../../pull/96>`_
Expand Down
28 changes: 28 additions & 0 deletions docs/reference/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,23 @@ Test Facilities`_ documentation for more details of each function.
which don't change its value will be discarded. In particular this means
that such updates don't call `validate` or `on_update`.

.. _blocking:

`blocking`
~~~~~~~~~~

Only available on OUT records. When set to `True` the record will set the
``PACT`` field when processing is ongoing. This means that ``caput`` and
similar tools can correctly wait for processing to complete.

This flag defaults to `False`, to retain compatibility with previous
versions.

.. seealso::
`SetBlocking` for configuring a global default blocking value



For all of these functions any EPICS database field can be assigned a value by
passing it as a keyword argument for the corresponding field name (in upper
case) or by assigning to the corresponding field of the returned record object.
Expand Down Expand Up @@ -358,6 +375,17 @@ record creation function.
prevent the accidential creation of records with the currently set device
name.

.. function:: SetBlocking(blocking)

This can be used to globally set the default `blocking` flag, which will
apply to all records created after this point. This allows blocking to be
easily set/unset when creating groups of records.

This does not change the blocking value for any already created records.

.. seealso::
`blocking` for description of the flag


The following helper functions are useful when constructing links between
records.
Expand Down
14 changes: 10 additions & 4 deletions softioc/asyncio_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@ def aioJoin(worker=worker, loop=self.loop):
else:
self.loop = loop

def __call__(self, func, *args):
def __call__(self, func, completion, func_args=(), completion_args=()):
async def async_wrapper():
try:
ret = func(*args)
if inspect.isawaitable(ret):
await ret
if inspect.iscoroutinefunction(func):
await func(*func_args)
else:
ret = func(*func_args)
# Handle the case of a synchronous function that returns a
# coroutine, like the lambda for on_update_name does
if inspect.isawaitable(ret):
await ret
Comment thread
AlexanderWells-diamond marked this conversation as resolved.
Outdated
completion(*completion_args)
except Exception:
logging.exception("Exception when awaiting callback")
asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop)
6 changes: 5 additions & 1 deletion softioc/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
LoadDbdFile(os.path.join(os.path.dirname(__file__), 'device.dbd'))

from . import device, pythonSoftIoc # noqa
# Re-export this so users only have to import the builder
from .device import SetBlocking # noqa

PythonDevice = pythonSoftIoc.PythonDevice()

Expand Down Expand Up @@ -301,5 +303,7 @@ def UnsetDevice():
'Action',
# Other builder support functions
'LoadDatabase',
'SetDeviceName', 'UnsetDevice'
'SetDeviceName', 'UnsetDevice',
# Device support functions
'SetBlocking'
]
17 changes: 17 additions & 0 deletions softioc/cothread_dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

class CothreadDispatcher:
def __init__(self):
"""A dispatcher for `cothread` based IOCs, suitable to be passed to
`softioc.iocInit`. """
# Import here to ensure we don't instantiate any of cothread's global
# state unless we have to
import cothread
# Create our own cothread callback queue so that our callbacks
# processing doesn't interfere with other callback processing.
self.__dispatcher = cothread.cothread._Callback()

def __call__(self, func, completion, func_args=(), completion_args=()):
def wrapper():
func(*func_args)
completion(*completion_args)
self.__dispatcher(wrapper)
43 changes: 41 additions & 2 deletions softioc/device.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,36 @@
from inspect import iscoroutinefunction
import os
import time
import ctypes
from ctypes import *
import numpy
from threading import Event

from . import alarm
from . import fields
from .imports import dbLoadDatabase, recGblResetAlarms, db_put_field
from .imports import (
create_callback_capsule,
dbLoadDatabase,
signal_processing_complete,
recGblResetAlarms,
db_put_field,
)
from .device_core import DeviceSupportCore, RecordLookup


# This is set from softioc.iocInit
# dispatcher(func, *args) will queue a callback to happen
dispatcher = None

# Global blocking flag, used to mark asynchronous (False) or synchronous (True)
# processing modes for Out records.
# Default False to maintain behaviour from previous versions.
blocking = False

def SetBlocking(val):
global blocking
blocking = val
Comment thread
AlexanderWells-diamond marked this conversation as resolved.
Outdated


# EPICS processing return codes
EPICS_OK = 0
Expand Down Expand Up @@ -142,6 +159,11 @@ def __init__(self, name, **kargs):
else:
self._value = None

self._blocking = kargs.pop('blocking', blocking)
if self._blocking:
self._callback = create_callback_capsule()


self.__super.__init__(name, **kargs)

def init_record(self, record):
Expand All @@ -162,9 +184,19 @@ def init_record(self, record):
recGblResetAlarms(record)
return self._epics_rc_

def __completion(self, record):
'''Signals that all on_update processing is finished'''
if self._blocking:
signal_processing_complete(record, self._callback)
pass
Comment thread
AlexanderWells-diamond marked this conversation as resolved.
Outdated

def _process(self, record):
'''Processing suitable for output records. Performs immediate value
validation and asynchronous update notification.'''

if record.PACT:
return EPICS_OK

value = self._read_value(record)
if not self.__always_update and \
self._compare_values(value, self._value):
Expand All @@ -183,7 +215,14 @@ def _process(self, record):
self._value = value
record.UDF = 0
if self.__on_update and self.__enable_write:
dispatcher(self.__on_update, python_value)
record.PACT = self._blocking
dispatcher(
self.__on_update,
self.__completion,
func_args=(python_value,),
completion_args=(record,)
)
Comment thread
AlexanderWells-diamond marked this conversation as resolved.
Outdated

return EPICS_OK


Expand Down
48 changes: 47 additions & 1 deletion softioc/extension.c
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@

/* Provide EPICS functions in Python format */
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <string.h>

#define db_accessHFORdb_accessC // Needed to get correct DBF_ values
#include <dbAccess.h>
#include <dbFldTypes.h>
#include <callback.h>
#include <dbStaticLib.h>
#include <asTrapWrite.h>
#include <epicsVersion.h>
#include <dbChannel.h>
#include <asTrapWrite.h>
#include <asDbLib.h>


/* Reference stealing version of PyDict_SetItemString */
static void set_dict_item_steal(
PyObject *dict, const char *name, PyObject *py_value)
Expand Down Expand Up @@ -209,6 +211,46 @@ static PyObject *install_pv_logging(PyObject *self, PyObject *args)
Py_RETURN_NONE;
}

#define CAPSULE_NAME "ProcessDeviceSupportOut.callback"

static void capsule_destructor(PyObject *obj)
{
free(PyCapsule_GetPointer(obj, CAPSULE_NAME));
}


static PyObject *create_callback_capsule(PyObject *self, PyObject *args)
{
void *callback = malloc(sizeof(CALLBACK));
return PyCapsule_New(callback, CAPSULE_NAME, &capsule_destructor);
}

static PyObject *signal_processing_complete(PyObject *self, PyObject *args)
{
int priority;
dbCommon *record;
PyObject *callback_capsule;

if (!PyArg_ParseTuple(args, "inO", &priority, &record, &callback_capsule))
{
return NULL;
}

if (!PyCapsule_IsValid(callback_capsule, CAPSULE_NAME))
{
return PyErr_Format(
PyExc_TypeError,
"Given object was not a capsule with name \"%s\"",
CAPSULE_NAME);
}

CALLBACK *callback = PyCapsule_GetPointer(callback_capsule, CAPSULE_NAME);

callbackRequestProcessCallback(callback, priority, record);

Py_RETURN_NONE;
}

static struct PyMethodDef softioc_methods[] = {
{"get_DBF_values", get_DBF_values, METH_VARARGS,
"Get a map of DBF names to values"},
Expand All @@ -218,6 +260,10 @@ static struct PyMethodDef softioc_methods[] = {
"Put a database field to a value"},
{"install_pv_logging", install_pv_logging, METH_VARARGS,
"Install caput logging to stdout"},
{"signal_processing_complete", signal_processing_complete, METH_VARARGS,
"Inform EPICS that asynchronous record processing has completed"},
{"create_callback_capsule", create_callback_capsule, METH_VARARGS,
"Create a CALLBACK structure inside a PyCapsule"},
{NULL, NULL, 0, NULL} /* Sentinel */
};

Expand Down
13 changes: 13 additions & 0 deletions softioc/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ def install_pv_logging(acf_file):
'''Install pv logging'''
_extension.install_pv_logging(acf_file)

def create_callback_capsule():
return _extension.create_callback_capsule()

def signal_processing_complete(record, callback):
'''Signal that asynchronous record processing has completed'''
_extension.signal_processing_complete(
record.PRIO,
record.record.value,
callback
)
Comment thread
AlexanderWells-diamond marked this conversation as resolved.
Outdated

def expect_success(status, function, args):
assert status == 0, 'Expected success'

Expand Down Expand Up @@ -94,6 +105,8 @@ def from_param(cls, value):

__all__ = [
'get_field_offsets',
'create_callback_capsule',
'signal_processing_complete',
'registryDeviceSupportAdd',
'IOSCANPVT', 'scanIoRequest', 'scanIoInit',
'dbLoadDatabase',
Expand Down
2 changes: 1 addition & 1 deletion softioc/pythonSoftIoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, builder, device, name, **fields):
# have to maintain this separately from the corresponding device list.
DeviceKeywords = [
'on_update', 'on_update_name', 'validate', 'always_update',
'initial_value', '_wf_nelm', '_wf_dtype']
'initial_value', '_wf_nelm', '_wf_dtype', 'blocking']
device_kargs = {}
for keyword in DeviceKeywords:
if keyword in fields:
Expand Down
6 changes: 2 additions & 4 deletions softioc/softioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from epicsdbbuilder.recordset import recordset

from . import imports, device
from . import cothread_dispatcher

__all__ = ['dbLoadDatabase', 'iocInit', 'interactive_ioc']

Expand All @@ -31,10 +32,7 @@ def iocInit(dispatcher=None):
'''
if dispatcher is None:
# Fallback to cothread
import cothread
# Create our own cothread callback queue so that our callbacks
# processing doesn't interfere with other callback processing.
dispatcher = cothread.cothread._Callback()
dispatcher = cothread_dispatcher.CothreadDispatcher()
# Set the dispatcher for record processing callbacks
device.dispatcher = dispatcher
imports.iocInit()
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def enable_code_coverage():
def select_and_recv(conn, expected_char = None):
"""Wait for the given Connection to have data to receive, and return it.
If a character is provided check its correct before returning it."""
# Must use cothread's select if cothread is prsent, otherwise we'd block
# Must use cothread's select if cothread is present, otherwise we'd block
# processing on all cothread processing. But we don't want to use it
# unless we have to, as importing cothread can cause issues with forking.
if "cothread" in sys.modules:
Expand Down
11 changes: 11 additions & 0 deletions tests/sim_asyncio_ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ async def callback(value):
# Create a record to set the alarm
t_ao = builder.aOut('ALARM', on_update=callback)

async def on_update_name_callback(value, name):
print(name, "value", value)

builder.longOut(
"NAME-CALLBACK",
initial_value = 3,
always_update=True,
on_update_name=on_update_name_callback,
blocking=True
)

# Run the IOC
builder.LoadDatabase()
softioc.iocInit(asyncio_dispatcher.AsyncioDispatcher())
Expand Down
3 changes: 3 additions & 0 deletions tests/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ async def test_asyncio_ioc(asyncio_ioc):

await caput(pre + ":ALARM", 3, wait=True)

await caput(pre + ":NAME-CALLBACK", 12, wait=True)

# Confirm the ALARM callback has completed
select_and_recv(conn, "C") # "Complete"

Expand Down Expand Up @@ -68,6 +70,7 @@ async def test_asyncio_ioc(asyncio_ioc):
assert "%s:ALARM.VAL 0 -> 3" % pre in out
assert 'on_update %s:AO : 3.0' % pre in out
assert 'async update 3.0 (23.45)' in out
assert "%s:NAME-CALLBACK value 12" % pre in out
assert 'Starting iocInit' in err
assert 'iocRun: All initialization complete' in err
except Exception:
Expand Down
Loading