Skip to content

Commit 6d169c5

Browse files
committed
seq: improve seq_queue implementation and tests
- use epicsAtomic and memory barriers to allow concurrent get/put without a mutex when the queue is neither full nor empty - ensure all accesses to shared indices (rd, wr) are atomic to prevent data races and ensure consistency (fix MacOS issues) - extend queueTest with a high-iteration concurrent test (verify data integrity using bit negation and sequence tracking) Co-authored-by: google-labs-jules[bot]
1 parent 8dc6efc commit 6d169c5

2 files changed

Lines changed: 124 additions & 58 deletions

File tree

src/seq/seq_queue.c

Lines changed: 96 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,45 @@
11
/*************************************************************************\
22
Copyright (c) 2010-2015 Helmholtz-Zentrum Berlin f. Materialien
33
und Energie GmbH, Germany (HZB)
4+
Copyright (c) 2026 ITER Organization
45
This file is distributed subject to a Software License Agreement found
56
in the file LICENSE that is included with this distribution.
67
\*************************************************************************/
78
#include "seq.h"
89
#include "seq_debug.h"
10+
#include "epicsVersion.h"
11+
12+
/*
13+
* Use epicsAtomic if available (EPICS >= 3.15)
14+
*/
15+
#ifndef VERSION_INT
16+
# define VERSION_INT(V,R,M,P) ( ((V)<<24) | ((R)<<16) | ((M)<<8) | (P))
17+
#endif
18+
#ifndef EPICS_VERSION_INT
19+
# define EPICS_VERSION_INT VERSION_INT(EPICS_VERSION, EPICS_REVISION, EPICS_MODIFICATION, EPICS_PATCH_LEVEL)
20+
#endif
21+
22+
#if EPICS_VERSION_INT >= VERSION_INT(3,15,0,0)
23+
#include "epicsAtomic.h"
24+
#define HAS_ATOMICS
25+
#endif
26+
27+
/* Fallbacks for older EPICS Base or when atomics are not available */
28+
#ifndef HAS_ATOMICS
29+
#define epicsAtomicGetSizeT(p) (*(p))
30+
#define epicsAtomicSetSizeT(p,v) (*(p) = (v))
31+
#define epicsAtomicGetIntT(p) (*(p))
32+
#define epicsAtomicSetIntT(p,v) (*(p) = (v))
33+
#define epicsAtomicReadMemoryBarrier()
34+
#define epicsAtomicWriteMemoryBarrier()
35+
#endif
936

1037
struct seqQueue {
1138
size_t wr;
1239
size_t rd;
1340
size_t numElems;
1441
size_t elemSize;
15-
boolean overflow;
42+
int overflow; /* Use int for atomic access */
1643
epicsMutexId mutex;
1744
char *buffer;
1845
};
@@ -23,8 +50,8 @@ epicsShareFunc boolean seqQueueInvariant(QUEUE q)
2350
&& q->elemSize > 0
2451
&& q->numElems > 0
2552
&& q->numElems <= seqQueueMaxNumElems
26-
&& q->rd < q->numElems
27-
&& q->wr < q->numElems;
53+
&& epicsAtomicGetSizeT(&q->rd) < q->numElems
54+
&& epicsAtomicGetSizeT(&q->wr) < q->numElems;
2855
}
2956

3057
epicsShareFunc QUEUE seqQueueCreate(size_t numElems, size_t elemSize)
@@ -51,7 +78,7 @@ epicsShareFunc QUEUE seqQueueCreate(size_t numElems, size_t elemSize)
5178
free(q);
5279
return 0;
5380
}
54-
DEBUG("%s:%d:calloc(%u,%u)\n",__FILE__,__LINE__,numElems, elemSize);
81+
DEBUG("%s:%d:calloc(%u,%u)\n",__FILE__,__LINE__,(unsigned)numElems, (unsigned)elemSize);
5582
q->buffer = (char *)calloc(numElems, elemSize);
5683
if (!q->buffer) {
5784
errlogSevPrintf(errlogFatal, "seqQueueCreate: out of memory\n");
@@ -67,13 +94,14 @@ epicsShareFunc QUEUE seqQueueCreate(size_t numElems, size_t elemSize)
6794
}
6895
q->elemSize = elemSize;
6996
q->numElems = numElems;
70-
q->overflow = FALSE;
97+
q->overflow = 0;
7198
q->rd = q->wr = 0;
7299
return q;
73100
}
74101

75102
epicsShareFunc void seqQueueDestroy(QUEUE q)
76103
{
104+
if (!q) return;
77105
epicsMutexDestroy(q->mutex);
78106
free(q->buffer);
79107
free(q);
@@ -86,22 +114,44 @@ epicsShareFunc boolean seqQueueGet(QUEUE q, void *value)
86114

87115
epicsShareFunc boolean seqQueueGetF(QUEUE q, seqQueueFunc *get, void *arg)
88116
{
117+
#ifdef HAS_ATOMICS
118+
size_t rd = epicsAtomicGetSizeT(&q->rd);
119+
size_t wr = epicsAtomicGetSizeT(&q->wr);
120+
121+
/* Lock-free fast path for Single-Consumer Get */
122+
if (wr != rd) {
123+
epicsAtomicReadMemoryBarrier();
124+
get(arg, q->buffer + rd * q->elemSize, q->elemSize);
125+
/* Ensure the data is read before we update the read index.
126+
This prevents a producer from overwriting the element before
127+
the consumer has finished copying it. */
128+
epicsAtomicWriteMemoryBarrier();
129+
epicsAtomicSetSizeT(&q->rd, (rd + 1) % q->numElems);
130+
return FALSE;
131+
}
132+
#endif
133+
134+
/* Mutex path for when wr == rd (empty or overflow)
135+
OR if we don't have atomics */
136+
epicsMutexLock(q->mutex);
89137
if (q->wr == q->rd) {
90138
if (!q->overflow) {
139+
epicsMutexUnlock(q->mutex);
91140
return TRUE;
92141
}
93-
epicsMutexLock(q->mutex);
94142
get(arg, q->buffer + q->rd * q->elemSize, q->elemSize);
95143
/* check again, a put might have intervened */
96-
if (q->wr == q->rd && q->overflow)
97-
q->overflow = FALSE;
98-
else
99-
q->rd = (q->rd + 1) % q->numElems;
100-
epicsMutexUnlock(q->mutex);
144+
if (q->wr == q->rd && q->overflow) {
145+
epicsAtomicSetIntT(&q->overflow, 0);
146+
} else {
147+
epicsAtomicSetSizeT(&q->rd, (q->rd + 1) % q->numElems);
148+
}
101149
} else {
150+
/* Can happen if wr moved after our lock-free check */
102151
get(arg, q->buffer + q->rd * q->elemSize, q->elemSize);
103-
q->rd = (q->rd + 1) % q->numElems;
152+
epicsAtomicSetSizeT(&q->rd, (q->rd + 1) % q->numElems);
104153
}
154+
epicsMutexUnlock(q->mutex);
105155
return FALSE;
106156
}
107157

@@ -113,46 +163,50 @@ epicsShareFunc boolean seqQueuePut(QUEUE q, const void *value)
113163
epicsShareFunc boolean seqQueuePutF(QUEUE q, seqQueueFunc *put, const void *arg)
114164
{
115165
boolean r = FALSE;
166+
size_t rd, wr;
116167

117-
if (q->overflow || (q->wr + 1) % q->numElems == q->rd) {
118-
epicsMutexLock(q->mutex);
119-
if ((q->wr + 1) % q->numElems == q->rd) {
120-
if (q->overflow) {
121-
r = TRUE; /* we will overwrite the last element */
122-
}
123-
q->overflow = TRUE;
168+
/* Always use mutex for Put to support Multi-Producer and safely handle overflow. */
169+
epicsMutexLock(q->mutex);
170+
rd = epicsAtomicGetSizeT(&q->rd);
171+
wr = epicsAtomicGetSizeT(&q->wr);
172+
173+
if (q->overflow || (wr + 1) % q->numElems == rd) {
174+
if ((wr + 1) % q->numElems == rd) {
175+
if (q->overflow) r = TRUE;
176+
epicsAtomicSetIntT(&q->overflow, 1);
124177
} else if (q->overflow) {
125-
/* we had a get since the last put, so
126-
can now eliminate overflow flag and instead
127-
increment the write pointer */
128-
q->wr = (q->wr + 1) % q->numElems;
129-
if ((q->wr + 1) % q->numElems != q->rd) {
130-
q->overflow = FALSE;
178+
/* A get happened, move wr forward */
179+
wr = (wr + 1) % q->numElems;
180+
epicsAtomicSetSizeT(&q->wr, wr);
181+
if ((wr + 1) % q->numElems != rd) {
182+
epicsAtomicSetIntT(&q->overflow, 0);
131183
}
132184
}
133-
put(q->buffer + q->wr * q->elemSize, arg, q->elemSize);
134-
if (!q->overflow) {
135-
q->wr = (q->wr + 1) % q->numElems;
136-
}
137-
epicsMutexUnlock(q->mutex);
138-
} else {
139-
put(q->buffer + q->wr * q->elemSize, arg, q->elemSize);
140-
q->wr = (q->wr + 1) % q->numElems;
141185
}
186+
187+
put(q->buffer + wr * q->elemSize, arg, q->elemSize);
188+
189+
if (!epicsAtomicGetIntT(&q->overflow)) {
190+
epicsAtomicWriteMemoryBarrier();
191+
epicsAtomicSetSizeT(&q->wr, (wr + 1) % q->numElems);
192+
}
193+
epicsMutexUnlock(q->mutex);
142194
return r;
143195
}
144196

145197
epicsShareFunc void seqQueueFlush(QUEUE q)
146198
{
147199
epicsMutexLock(q->mutex);
148-
q->rd = q->wr;
149-
q->overflow = FALSE;
200+
epicsAtomicSetSizeT(&q->rd, epicsAtomicGetSizeT(&q->wr));
201+
epicsAtomicSetIntT(&q->overflow, 0);
150202
epicsMutexUnlock(q->mutex);
151203
}
152204

153205
static size_t used(const QUEUE q)
154206
{
155-
return (q->numElems + q->wr - q->rd) % q->numElems + (q->overflow ? 1 : 0);
207+
size_t rd = epicsAtomicGetSizeT(&q->rd);
208+
size_t wr = epicsAtomicGetSizeT(&q->wr);
209+
return (q->numElems + wr - rd) % q->numElems + (epicsAtomicGetIntT(&q->overflow) ? 1 : 0);
156210
}
157211

158212
epicsShareFunc size_t seqQueueFree(const QUEUE q)
@@ -167,12 +221,16 @@ epicsShareFunc size_t seqQueueUsed(const QUEUE q)
167221

168222
epicsShareFunc boolean seqQueueIsEmpty(const QUEUE q)
169223
{
170-
return q->wr == q->rd && !q->overflow;
224+
size_t rd = epicsAtomicGetSizeT(&q->rd);
225+
size_t wr = epicsAtomicGetSizeT(&q->wr);
226+
return wr == rd && !epicsAtomicGetIntT(&q->overflow);
171227
}
172228

173229
epicsShareFunc boolean seqQueueIsFull(const QUEUE q)
174230
{
175-
return (q->wr + 1) % q->numElems == q->rd && q->overflow;
231+
size_t rd = epicsAtomicGetSizeT(&q->rd);
232+
size_t wr = epicsAtomicGetSizeT(&q->wr);
233+
return (wr + 1) % q->numElems == rd && epicsAtomicGetIntT(&q->overflow);
176234
}
177235

178236
epicsShareFunc size_t seqQueueNumElems(const QUEUE q)

test/unit/queueTest.c

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Copyright (c) 2008 UChicago Argonne LLC, as Operator of Argonne
55
National Laboratory.
66
Copyright (c) 2010-2015 Helmholtz-Zentrum Berlin f. Materialien
77
und Energie GmbH, Germany (HZB)
8+
Copyright (c) 2026 ITER Organization
89
This file is distributed subject to a Software License Agreement found
910
in file LICENSE that is included with this distribution.
1011
\*************************************************************************/
@@ -13,6 +14,7 @@ in file LICENSE that is included with this distribution.
1314
#include "epicsEvent.h"
1415
#include "epicsUnitTest.h"
1516
#include "testMain.h"
17+
#include <string.h>
1618

1719
typedef unsigned long long ELEM;
1820

@@ -33,32 +35,40 @@ static void check(QUEUE q, size_t expectedFree)
3335
testOk(isFull == expectedFull, "Full: %d == %d", isFull, expectedFull);
3436
}
3537

36-
static epicsEventId wdone, rdone, ready;
38+
static epicsEventId wdone, rdone;
3739

3840
static const int threadTestIterations = 1000000;
3941
static const size_t threadTestMaxNumElems = 20;
4042

41-
static int readerLost, writerLost;
43+
static volatile int readerLost, writerLost;
44+
45+
typedef struct {
46+
int seq;
47+
int inv_seq;
48+
} THREAD_ELEM;
4249

4350
static void readerTask(void *arg)
4451
{
4552
QUEUE q = (QUEUE)arg;
46-
string data;
53+
THREAD_ELEM elem;
4754
boolean empty;
4855
int i, j;
4956

5057
for (i = 0; i < threadTestIterations; i++) {
5158
do {
52-
empty = seqQueueGet(q, data);
59+
empty = seqQueueGet(q, &elem);
5360
} while (empty);
54-
j = atoi(data);
55-
if (j<i) {
56-
testAbort("%d<=%d", i, j);
61+
j = elem.seq;
62+
if (j != ~elem.inv_seq) {
63+
testAbort("Data corruption detected: seq=%d, inv_seq=%d (at i=%d)", j, elem.inv_seq, i);
64+
}
65+
if (j < i) {
66+
testAbort("Sequence error: received %d, expected >= %d", j, i);
5767
}
58-
if (j>threadTestIterations) {
59-
testAbort("%d<=%d", j, threadTestIterations);
68+
if (j >= threadTestIterations) {
69+
testAbort("Sequence error: received %d, expected < %d", j, threadTestIterations);
6070
}
61-
readerLost+=(j-i);
71+
readerLost += (j - i);
6272
i = j;
6373
}
6474
epicsEventWait(wdone);
@@ -68,13 +78,14 @@ static void readerTask(void *arg)
6878
static void writerTask(void *arg)
6979
{
7080
QUEUE q = (QUEUE)arg;
71-
string data;
81+
THREAD_ELEM elem;
7282
boolean full;
7383
int i;
7484

7585
for (i = 0; i < threadTestIterations; i++) {
76-
sprintf(data, "%d", i);
77-
full = seqQueuePut(q, data);
86+
elem.seq = i;
87+
elem.inv_seq = ~i;
88+
full = seqQueuePut(q, &elem);
7889
if (full) writerLost++;
7990
}
8091
epicsEventSignal(wdone);
@@ -142,11 +153,10 @@ MAIN(queueTest)
142153

143154
testDiag("concurrent queueTest with numElems=%u", (unsigned)numElems);
144155

145-
q = seqQueueCreate(numElems, sizeof(string));
156+
q = seqQueueCreate(numElems, sizeof(THREAD_ELEM));
146157
wdone = epicsEventCreate(epicsEventEmpty);
147158
rdone = epicsEventCreate(epicsEventEmpty);
148-
ready = epicsEventCreate(epicsEventEmpty);
149-
if (!wdone || !rdone || !ready) {
159+
if (!wdone || !rdone) {
150160
testAbort("epicsEventCreate failed");
151161
}
152162
readerLost = writerLost = 0;
@@ -162,11 +172,9 @@ MAIN(queueTest)
162172

163173
seqQueueDestroy(q);
164174
testPass("ok");
175+
epicsEventDestroy(wdone);
176+
epicsEventDestroy(rdone);
165177
}
166178

167-
epicsEventDestroy(wdone);
168-
epicsEventDestroy(rdone);
169-
epicsEventDestroy(ready);
170-
171179
return testDone();
172180
}

0 commit comments

Comments
 (0)