Skip to content

Commit 5ab9935

Browse files
committed
nextmsg: optionally reuse given carray objects
1 parent 3c8714f commit 5ab9935

13 files changed

Lines changed: 500 additions & 57 deletions

File tree

.github/workflows/test.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ jobs:
2424
- name: setup
2525
run: |
2626
luarocks install .github/workflows/lua-llthreads2-0.1.6-1.rockspec
27+
luarocks --server=https://luarocks.org/dev install carray
2728
luarocks make rockspecs/mtmsg-scm-0.rockspec
2829
2930
- name: test
@@ -43,6 +44,7 @@ jobs:
4344
lua test10.lua
4445
lua test11.lua
4546
lua test12.lua
47+
lua test13.lua
4648
cd ../examples
4749
lua example01.lua
4850
lua example02.lua

README.md

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ assert(lst:nextmsg(0) == nil)
359359
Possible errors: *mtmsg.error.object_closed*
360360

361361

362-
* **`buffer:nextmsg([timeout])`**
362+
* **`buffer:nextmsg([timeout][, carray]*)`**
363363

364364
Returns all the arguments that were given as one message by invoking the method
365365
*buffer:addmsg()* or *buffer:setmsg()*. The returned message is removed
@@ -368,10 +368,15 @@ assert(lst:nextmsg(0) == nil)
368368
* *timeout* optional float, maximal time in seconds for waiting for the next
369369
message. The method returns without result if timeout is reached.
370370

371-
If no timeout is given and *buffer:isnonblock() == false* then this methods waits
372-
without timeout limit until a next message becomes available.
371+
* *carray* optional one or more [carray] objects. These objects are reused
372+
if the message contains an array of the corresponding data type
373+
of the given *carray*. Unused *carray* objects are reset to length 0
374+
after this call.
375+
376+
If timeout is not given or *nil* and *buffer:isnonblock() == false* then this
377+
methods waits without timeout limit forever until a next message becomes available.
373378

374-
If no timeout is given and *buffer:isnonblock() == true* then this method
379+
If timeout is not given or *nil* and *buffer:isnonblock() == true* then this method
375380
returns immediately without result if no next message is available or if the
376381
buffer is concurrently accessed from another thread.
377382

@@ -492,7 +497,7 @@ assert(lst:nextmsg(0) == nil)
492497
Possible errors: *mtmsg.error.operation_aborted*
493498

494499

495-
* **`listener:nextmsg([timeout])`**
500+
* **`listener:nextmsg([timeout][, carray]*)`**
496501

497502
Returns all the arguments that were given as one message by invoking the method
498503
*buffer:addmsg()* or *buffer:setmsg()* to one of the buffers that are
@@ -502,10 +507,15 @@ assert(lst:nextmsg(0) == nil)
502507
* *timeout* optional float, maximal time in seconds for waiting for the next
503508
message. The method returns without result if timeout is reached.
504509

505-
If no timeout is given and *listener:isnonblock() == false* then this methods waits
506-
without timeout limit until a next message becomes available.
510+
* *carray* optional one or more [carray] objects. These objects are reused
511+
if the message contains an array of the corresponding data type
512+
of the given *carray*. Unused *carray* objects are reset to length 0
513+
after this call.
514+
515+
If timeout is not given or *nil* and *listener:isnonblock() == false* then this
516+
methods waits without timeout limit forever until a next message becomes available.
507517

508-
If no timeout is given and *listener:isnonblock() == true* then this methods
518+
If timeout is not given or *nil* and *listener:isnonblock() == true* then this methods
509519
returns immediately without result if no next message is available or if the
510520
listener is concurrently accessed from another thread.
511521

@@ -653,15 +663,20 @@ is equivalent to
653663
local a, b, c = buffer:nextmsg()
654664
```
655665

656-
* **`reader:next([count])`**
666+
* **`reader:next([count][, carray]*)`**
657667

658-
Returns next message elements from the reader. Returns nothing no more message elements
668+
Returns next message elements from the reader. Returns nothing if no more message elements
659669
are available. The returned elements are removed from the reader's internal list of message
660670
elements.
661671

662672
* *count* - optional integer, the maximaum number of message elements to be returned
663673
(defaults to 1).
664674

675+
* *carray* optional one or more [carray] objects. These objects are reused
676+
if the message contains an array of the corresponding data type
677+
of the given *carray*. Unused *carray* objects are reset to length 0
678+
after this call.
679+
665680
* **`reader:nextmsg(buffer|listener, [timeout])`**
666681

667682
Gets the next message of a buffer or listener into the reader in one atomic step. The

appveyor.yml

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
version: 0.3.2.{build}
22

3-
os:
4-
- Windows Server 2012 R2
3+
image: Windows Server 2012 R2
54

65
shallow_clone: true
76

@@ -12,6 +11,7 @@ environment:
1211
- LUA: "lua 5.1"
1312
- LUA: "lua 5.2"
1413
- LUA: "lua 5.3"
14+
- LUA: "lua 5.4"
1515

1616
platform:
1717
- x64
@@ -24,7 +24,13 @@ cache:
2424

2525
install:
2626
- echo "Setup..."
27-
- set PATH=C:\Python27\Scripts;%LR_EXTERNAL%;%PATH%
27+
- set PATH=C:\Python38\;C:\Python38\Scripts;%LR_EXTERNAL%;%PATH%
28+
- where py
29+
- py --version
30+
- where python
31+
- python --version
32+
- where pip
33+
- pip --version
2834
- if /I "%platform%"=="x86" set HR_TARGET=vs_32
2935
- if /I "%platform%"=="x64" set HR_TARGET=vs_64
3036
- if /I "%platform%"=="mingw" set HR_TARGET=mingw
@@ -34,10 +40,9 @@ install:
3440
mkdir "%LR_EXTERNAL%\lib" &&
3541
mkdir "%LR_EXTERNAL%\include"
3642
)
37-
- if not exist c:\hererocks (
38-
pip install hererocks &&
39-
hererocks c:\hererocks --%LUA% --target %HR_TARGET% -rlatest
40-
)
43+
- python -m pip install --upgrade pip
44+
- pip install git+https://github.com/luarocks/hererocks@0.25.0
45+
- hererocks c:\hererocks --%LUA% --target %HR_TARGET% -rlatest
4146
- call c:\hererocks\bin\activate
4247

4348
before_build:
@@ -46,6 +51,7 @@ before_build:
4651
build_script:
4752
- echo "Making..."
4853
- luarocks install lua-llthreads2
54+
- luarocks --server=https://luarocks.org/dev install carray
4955
- luarocks make rockspecs/mtmsg-scm-0.rockspec
5056

5157
before_test:
@@ -74,6 +80,7 @@ test_script:
7480
- lua test10.lua
7581
- lua test11.lua
7682
- lua test12.lua
83+
- lua test13.lua
7784

7885
after_test:
7986
# deploy

src/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ LNX_COPTS :=
1111
WIN_COPTS := -I/mingw64/include/lua5.1
1212
MAC_COPTS := -I/usr/local/opt/lua/include/lua5.3
1313

14-
LNX_LOPTS := -lpthread
14+
LNX_LOPTS := -g -lpthread
1515
WIN_LOPTS := -lkernel32
1616
MAC_LOPTS := -lpthread
1717

src/buffer.c

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#define RECEIVER_CAPI_IMPLEMENT_SET_CAPI 1
55
#define SENDER_CAPI_IMPLEMENT_SET_CAPI 1
66

7+
#define CARRAY_CAPI_IMPLEMENT_GET_CAPI 1
8+
79
#include "buffer.h"
810
#include "listener.h"
911
#include "serialize.h"
@@ -12,6 +14,7 @@
1214
#include "receiver_capi_impl.h"
1315
#include "sender_capi_impl.h"
1416
#include "notify_capi_impl.h"
17+
#include "carray_capi.h"
1518

1619
const char* const MTMSG_BUFFER_CLASS_NAME = "mtmsg.buffer";
1720

@@ -793,16 +796,41 @@ int mtmsg_buffer_next_msg(lua_State* L, BufferUserData* udata,
793796
MsgBuffer* b, bool nonblock, int arg, double timeoutSeconds , MemBuffer* resultBuffer, size_t* argsSize,
794797
sender_error_handler sender_eh, void* sender_ehdata)
795798
{
796-
lua_Number endTime = 0; /* 0 = no timeout */
797-
799+
lua_Number endTime = -1; /* -1 = no timeout, wait forever */
800+
int argTop = 0;
798801
if (L) {
799-
if (arg && !lua_isnoneornil(L, arg)) {
800-
lua_Number waitSeconds = luaL_checknumber(L, arg);
801-
endTime = mtmsg_current_time_seconds() + waitSeconds;
802+
argTop = lua_gettop(L);
803+
if (arg && arg <= argTop) {
804+
int t = lua_type(L, arg);
805+
if (t == LUA_TNIL || t == LUA_TNUMBER) {
806+
if (t == LUA_TNUMBER) {
807+
lua_Number waitSeconds = lua_tonumber(L, arg);
808+
if (waitSeconds < 0) waitSeconds = 0;
809+
endTime = mtmsg_current_time_seconds() + waitSeconds;
810+
if (waitSeconds == 0) {
811+
nonblock = true;
812+
}
813+
}
814+
arg += 1;
815+
} else {
816+
int reason = 0;
817+
if (t != LUA_TUSERDATA || !carray_get_capi(L, arg, &reason)) {
818+
if (reason == 1) {
819+
return luaL_argerror(L, arg, "incompatible carray capi version number");
820+
} else if (!resultBuffer) {
821+
return luaL_argerror(L, arg, "timeout seconds or carray expected");
822+
} else {
823+
return luaL_argerror(L, arg, "timeout seconds expected");
824+
}
825+
}
826+
}
802827
}
803828
} else {
804-
if (timeoutSeconds >= 0) {
829+
if (timeoutSeconds >= 0) { /* timeoutSeconds < 0 -> no timeout, wait forever */
805830
endTime = mtmsg_current_time_seconds() + timeoutSeconds;
831+
if (timeoutSeconds == 0) {
832+
nonblock = true;
833+
}
806834
}
807835
}
808836

@@ -848,13 +876,21 @@ int mtmsg_buffer_next_msg(lua_State* L, BufferUserData* udata,
848876
par.inMaxArgCount = -1;
849877
par.parsedLength = 0;
850878
par.parsedArgCount = 0;
851-
par.carrayCapi = udata->carrayCapi;
879+
par.carrayCapi = udata->carrayCapi;
880+
par.errorArg = 0;
852881
lua_pushcfunction(L, mtmsg_serialize_get_msg_args);
882+
lua_insert(L, arg);
853883
lua_pushlightuserdata(L, &par);
854-
int rc = lua_pcall(L, 1, LUA_MULTRET, 0);
884+
lua_insert(L, arg + 1);
885+
int nargs = argTop - arg + 1;
886+
int rc = lua_pcall(L, nargs + 1, LUA_MULTRET, 0);
855887
if (rc != LUA_OK) {
856888
async_mutex_unlock(b->sharedMutex);
857-
return lua_error(L);
889+
if (par.errorArg) {
890+
return luaL_argerror(L, arg + par.errorArg - 2, lua_tostring(L, -1));
891+
} else {
892+
return lua_error(L);
893+
}
858894
}
859895
rslt = par.parsedArgCount;
860896
udata->carrayCapi = par.carrayCapi;
@@ -912,7 +948,7 @@ int mtmsg_buffer_next_msg(lua_State* L, BufferUserData* udata,
912948
}
913949
return rslt;
914950
} else {
915-
if (endTime > 0) {
951+
if (endTime >= 0) {
916952
lua_Number now = mtmsg_current_time_seconds();
917953
if (now < endTime) {
918954
async_mutex_wait_millis(b->sharedMutex, (int)((endTime - now) * 1000 + 0.5));

src/carray_capi.h

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
#endif
1212

1313
#define CARRAY_CAPI_ID_STRING "_capi_carray"
14-
#define CARRAY_CAPI_VERSION_MAJOR -1
14+
#define CARRAY_CAPI_VERSION_MAJOR -2
1515
#define CARRAY_CAPI_VERSION_MINOR 0
16-
#define CARRAY_CAPI_VERSION_PATCH 1
16+
#define CARRAY_CAPI_VERSION_PATCH 0
1717

1818
typedef struct carray_capi carray_capi;
1919
typedef struct carray_info carray_info;
@@ -69,6 +69,7 @@ struct carray_info
6969
carray_attr attr;
7070
size_t elementSize;
7171
size_t elementCount;
72+
size_t elementCapacity;
7273
};
7374

7475
/**
@@ -186,6 +187,20 @@ struct carray_capi
186187
* otherwise behaviour may be undefined.
187188
*/
188189
const void* (*getReadableElementPtr)(const carray* a, size_t offset, size_t count);
190+
191+
/**
192+
* Resizes the array.
193+
*
194+
* newElementCount - new number of elements. If this is larger than the current
195+
* element count, the new elements are uninitialized.
196+
*
197+
* shrinkCapacity - flag, if true the capacity is set to the new size.
198+
*
199+
* Returns pointer to the first element in the array. The caller may read
200+
* or write newElementCount bytes at this pointer.
201+
* Returns NULL on failure or if newElementCount == 0.
202+
*/
203+
void* (*resizeCarray)(carray* a, size_t newElementCount, int shrinkCapacity);
189204
};
190205

191206
#if CARRAY_CAPI_IMPLEMENT_SET_CAPI

src/listener.c

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
#define CARRAY_CAPI_IMPLEMENT_GET_CAPI 1
2+
13
#include "listener.h"
24
#include "buffer.h"
35
#include "serialize.h"
46
#include "main.h"
57
#include "error.h"
8+
#include "carray_capi.h"
69

710
const char* const MTMSG_LISTENER_CLASS_NAME = "mtmsg.listener";
811

@@ -407,11 +410,35 @@ int mtmsg_listener_next_msg(lua_State* L, ListenerUserData* udata,
407410
MsgListener* listener, bool nonblock, int arg,
408411
MemBuffer* resultBuffer, size_t* argsSize)
409412
{
410-
lua_Number endTime = 0; /* 0 = no timeout */
413+
lua_Number endTime = -1; /* -1 = no timeout, wait forever */
414+
415+
int argTop = lua_gettop(L);
411416

412-
if (!lua_isnoneornil(L, arg)) {
413-
lua_Number waitSeconds = luaL_checknumber(L, arg);
414-
endTime = mtmsg_current_time_seconds() + waitSeconds;
417+
if (arg && arg <= argTop) {
418+
int t = lua_type(L, arg);
419+
if (t == LUA_TNIL || t == LUA_TNUMBER) {
420+
if (t == LUA_TNUMBER) {
421+
lua_Number waitSeconds = lua_tonumber(L, arg);
422+
if (waitSeconds < 0) waitSeconds = 0;
423+
endTime = mtmsg_current_time_seconds() + waitSeconds;
424+
if (waitSeconds == 0) {
425+
nonblock = true;
426+
}
427+
}
428+
arg += 1;
429+
}
430+
else {
431+
int reason = 0;
432+
if (t != LUA_TUSERDATA || !carray_get_capi(L, arg, &reason)) {
433+
if (reason == 1) {
434+
return luaL_argerror(L, arg, "incompatible carray capi version number");
435+
} else if (!resultBuffer) {
436+
return luaL_argerror(L, arg, "timeout seconds or carray expected");
437+
} else {
438+
return luaL_argerror(L, arg, "timeout seconds expected");
439+
}
440+
}
441+
}
415442
}
416443

417444
if (nonblock) {
@@ -457,12 +484,20 @@ int mtmsg_listener_next_msg(lua_State* L, ListenerUserData* udata,
457484
par.parsedLength = 0;
458485
par.parsedArgCount = 0;
459486
par.carrayCapi = udata->carrayCapi;
487+
par.errorArg = 0;
460488
lua_pushcfunction(L, mtmsg_serialize_get_msg_args);
489+
lua_insert(L, arg);
461490
lua_pushlightuserdata(L, &par);
462-
int rc = lua_pcall(L, 1, LUA_MULTRET, 0);
491+
lua_insert(L, arg + 1);
492+
int nargs = argTop - arg + 1;
493+
int rc = lua_pcall(L, nargs + 1, LUA_MULTRET, 0);
463494
if (rc != LUA_OK) {
464495
async_mutex_unlock(&listener->listenerMutex);
465-
return lua_error(L);
496+
if (par.errorArg) {
497+
return luaL_argerror(L, par.errorArg, lua_tostring(L, -1));
498+
} else {
499+
return lua_error(L);
500+
}
466501
}
467502
rslt = par.parsedArgCount;
468503
udata->carrayCapi = par.carrayCapi;
@@ -525,7 +560,7 @@ int mtmsg_listener_next_msg(lua_State* L, ListenerUserData* udata,
525560
}
526561
}
527562
}
528-
if (endTime > 0) {
563+
if (endTime >= 0) {
529564
lua_Number now = mtmsg_current_time_seconds();
530565
if (now < endTime) {
531566
async_mutex_wait_millis(&listener->listenerMutex, (int)((endTime - now) * 1000 + 0.5));

0 commit comments

Comments
 (0)