Skip to content

Commit 43c5568

Browse files
committed
refactor semlock_acquire and semlock_release to make them atomic
1 parent acf8b1d commit 43c5568

2 files changed

Lines changed: 160 additions & 72 deletions

File tree

Modules/_multiprocessing/semaphore.c

Lines changed: 156 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -376,78 +376,102 @@ shm_semlock_counters = {
376376
.handle_glock = (SEM_HANDLE)0,
377377
.header = (HeaderObject *)NULL,
378378
.counters = (CounterObject *)NULL,
379+
.uuid = 0L
379380
};
380381

381382
/* ---------------------------------*/
382383
/* unlink sem and shared mem from multiprocessing.resource_tracker
383384
module */
384385

385386
static int
386-
_register_resource_tracker_sem(void)
387+
_resource_tracker_getfd(void)
387388
{
388-
PyObject *f_register = PyImport_ImportModuleAttrString("multiprocessing.resource_tracker",\
389-
"register");
390-
if (f_register == NULL) {
391-
PyErr_Clear();
392-
puts("no import");
393-
return -1;
394-
}
395-
PyObject *psem_name = PyUnicode_FromString(GLOCK_NAME);
396-
if (psem_name == NULL) {
397-
PyErr_Clear();
398-
Py_DECREF(f_register);
399-
puts("no sem name");
400-
return -1;
401-
}
402-
PyObject *ptype_sem = PyUnicode_FromString("semaphore");
403-
if (psem_name == NULL) {
404-
Py_DECREF(f_register);
405-
Py_DECREF(psem_name);
389+
const char *operation = "getfd";
390+
PyObject *f_operation = PyImport_ImportModuleAttrString("multiprocessing.resource_tracker",
391+
operation);
392+
if (f_operation == NULL) {
406393
PyErr_Clear();
407-
puts("no type name");
394+
printf("no call to %s\n", operation);
408395
return -1;
409396
}
410-
411-
PyObject *ret = PyObject_CallFunctionObjArgs(f_register, psem_name, ptype_sem, NULL);
412-
Py_DECREF(psem_name);
413-
Py_DECREF(ptype_sem);
414-
Py_DECREF(f_register);
397+
PyObject *ret = PyObject_CallNoArgs(f_operation);
398+
Py_DECREF(f_operation);
415399
if (ret == NULL) {
416400
PyErr_Clear();
417-
puts("no call to register");
401+
printf("no call to %s()\n", operation);
418402
return -1;
419403
}
420404
Py_DECREF(ret);
421-
DEBUG_PID_FUNC("register SEMAPHORE", 0, 0, "O");
405+
DEBUG_PID_FUNC(operation, 0, 0, "");
422406
return 0;
423407
}
424408

409+
/*
410+
static long
411+
_get_GC(void)
412+
{
413+
PyObject *module = PyImport_ImportModule("gc");
414+
if (module == NULL) {
415+
return -1;
416+
}
417+
Py_DECREF(module);
418+
DEBUG_PID_FUNC("import gc", 0, 0, "");
419+
return (long)module;
420+
}
421+
*/
422+
425423
static int
426-
_register_resource_tracker_mem(void)
424+
_resource_tracker(const char *operation, const char *name, const char *rtype)
427425
{
428-
PyObject *f_register = PyImport_ImportModuleAttrString("multiprocessing.resource_tracker",
429-
"register");
430-
if (f_register == NULL) {
426+
PyObject *f_operation = PyImport_ImportModuleAttrString("multiprocessing.resource_tracker",
427+
operation);
428+
if (f_operation == NULL) {
429+
PyErr_Clear();
430+
printf("no call to %s\n", operation);
431+
return -1;
432+
}
433+
PyObject *p_name = PyUnicode_FromString(name);
434+
if (p_name == NULL) {
431435
PyErr_Clear();
432-
puts("no call to register");
436+
Py_DECREF(f_operation);
437+
puts("no name");
433438
return -1;
434439
}
435-
PyObject *pmem_name = PyUnicode_FromString(SHAREDMEM_NAME);
436-
PyObject *ptype_mem = PyUnicode_FromString("shared_memory");
437-
PyObject *ret = PyObject_CallFunctionObjArgs(f_register, pmem_name, ptype_mem, NULL);
438-
Py_DECREF(pmem_name);
439-
Py_DECREF(ptype_mem);
440-
Py_DECREF(f_register);
440+
PyObject *p_type = PyUnicode_FromString(rtype);
441+
if (p_type == NULL) {
442+
Py_DECREF(f_operation);
443+
Py_DECREF(p_name);
444+
PyErr_Clear();
445+
puts("no rtype");
446+
return -1;
447+
}
448+
449+
PyObject *ret = PyObject_CallFunctionObjArgs(f_operation, p_name, p_type, NULL);
450+
Py_DECREF(p_name);
451+
Py_DECREF(p_type);
452+
Py_DECREF(f_operation);
441453
if (ret == NULL) {
442454
PyErr_Clear();
443-
puts("no call to register");
455+
printf("no call to %s(%s, %s)\n", operation, name, rtype);
444456
return -1;
445457
}
446458
Py_DECREF(ret);
447-
DEBUG_PID_FUNC("register MEMORY", 0, 0, "Okay");
459+
DEBUG_PID_FUNC(operation, 0, 0, name);
448460
return 0;
449461
}
450462

463+
static int
464+
_register_resource_tracker_sem(void)
465+
{
466+
return _resource_tracker("register", GLOCK_NAME, "semaphore") ;
467+
}
468+
469+
static int
470+
_register_resource_tracker_mem(void)
471+
{
472+
return _resource_tracker("register", SHAREDMEM_NAME, "shared_memory") ;
473+
}
474+
451475
/*-- */
452476

453477
/*
@@ -635,7 +659,6 @@ connect_glock_and_lock(const char *sem_name)
635659
return 0;
636660
-- */
637661

638-
639662
SEM_HANDLE sem = SEM_FAILED;
640663
errno = 0;
641664
// create and lock the semaphore, via initial value set 0)
@@ -685,6 +708,21 @@ _posixshmem_shm_open(char *name, int flags, int mode)
685708
return fd;
686709
}
687710

711+
712+
static char *
713+
_build_glock_name(char *buf, int size, const char *name)
714+
{
715+
// get uuid if not set.
716+
if (shm_semlock_counters.uuid == 0L) {
717+
shm_semlock_counters.uuid = 100;
718+
}
719+
720+
// make name with glock sem name or shm name copncatenate with addr of GC
721+
// PyOS_snprintf(buf, size, "%s_%012ld", name, shm_semlock_counters.uuid);
722+
PyOS_snprintf(buf, size, "%s", name);
723+
return buf;
724+
}
725+
688726
static int
689727
create_shm_semlock_counters(const char *from_sem_name)
690728
{
@@ -695,20 +733,26 @@ create_shm_semlock_counters(const char *from_sem_name)
695733
char *datas = NULL;
696734
HeaderObject *header = NULL;
697735
long size_shm = ALIGN_SHM_PAGE(CALC_SIZE_SHM);
736+
char glock_name[SIZE_GLOCK_NAME+16] ;
737+
738+
// Build glock name
739+
_build_glock_name(glock_name, SIZE_GLOCK_NAME, shm_semlock_counters.name_glock);
698740

699741
// Link to semaphore and lock immediatly.
700-
if (connect_glock_and_lock(shm_semlock_counters.name_glock) < 0) {
742+
if (connect_glock_and_lock(glock_name) < 0) {
701743
DEBUG_PID_FUNC(shm_semlock_counters.name_shm, shm, 0, "connect or create GLOCK Failed");
702744
return -1;
703745
};
704746

705747
// We are alone to create/connect to the shared memory.
706748
if (shm_semlock_counters.handle_shm == (MEMORY_HANDLE)0) {
707-
shm = _posixshmem_shm_open(shm_semlock_counters.name_shm, oflag, mode);
749+
// Link to semaphore and lock immediatly.
750+
_build_glock_name(glock_name, SIZE_GLOCK_NAME, shm_semlock_counters.name_shm);
751+
shm = _posixshmem_shm_open(glock_name, oflag, mode);
708752
res = 0;
709753
if (shm == -1) {
710754
oflag |= O_CREAT;
711-
shm = _posixshmem_shm_open(shm_semlock_counters.name_shm, oflag, mode);
755+
shm = _posixshmem_shm_open(glock_name, oflag, mode);
712756
if (shm > 0) {
713757
DEBUG_PID_FUNC(shm_semlock_counters.name_shm, shm, 0, "***create shm***");
714758
// Set size.
@@ -834,15 +878,17 @@ _connect_shm_semlock_counters(const char *from_sem_name)
834878
int res = -1;
835879
char *datas = NULL;
836880
long size_shm = CALC_SIZE_SHM;
881+
char glock_name[SIZE_GLOCK_NAME] ;
837882

838883
// Link to semaphore and lock immediatly.
839-
if (_connect_glock_and_lock(shm_semlock_counters.name_glock) < 0) {
884+
_build_glock_name(glock_name, SIZE_GLOCK_NAME, shm_semlock_counters.name_glock);
885+
if (_connect_glock_and_lock(glock_name) < 0) {
840886
DEBUG_PID_FUNC("", 0, 0, "error connect glock and lock");
841887
return -1;
842888
};
843-
// The global lock is now acquired,
844-
// connect to the shared memory.
845-
shm = _posixshmem_shm_open(shm_semlock_counters.name_shm, oflag, S_IRUSR | S_IWUSR);
889+
// The global lock is now acquired, connect to the shared memory.
890+
_build_glock_name(glock_name, SIZE_GLOCK_NAME, shm_semlock_counters.name_shm);
891+
shm = _posixshmem_shm_open(glock_name, oflag, S_IRUSR | S_IWUSR);
846892
if (shm == -1) {
847893
shm_semlock_counters.handle_shm = (MEMORY_HANDLE)0;
848894
return -1;
@@ -1030,13 +1076,46 @@ _multiprocessing_SemLock_acquire_impl(SemLockObject *self, int blocking,
10301076
deadline.tv_nsec %= 1000000000;
10311077
}
10321078

1079+
#ifdef HAVE_BROKEN_SEM_GETVALUE
1080+
/* For counting semaphores: acquire the counter mutex before sem_trywait so
1081+
that the trywait and the internal_value decrement are atomic with respect
1082+
to release_impl. We must NOT hold this mutex while blocking on the
1083+
semaphore (see blocking path below), because release_impl holds it while
1084+
calling sem_post — keeping it here would deadlock. */
1085+
if (ISSEMAPHORE(self) && !ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) {
1086+
return NULL;
1087+
}
1088+
#endif
1089+
10331090
/* Check whether we can acquire without releasing the GIL and blocking */
10341091
do {
10351092
res = sem_trywait(self->handle);
10361093
err = errno;
10371094
} while (res < 0 && errno == EINTR && !PyErr_CheckSignals());
1038-
errno = err;
1095+
//errno = err;
10391096

1097+
#ifdef HAVE_BROKEN_SEM_GETVALUE
1098+
if (ISSEMAPHORE(self)) {
1099+
if (res >= 0) {
1100+
/* Atomically decrement internal_value while the mutex is held */
1101+
--self->counter->internal_value;
1102+
if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) {
1103+
return NULL;
1104+
}
1105+
// acquire done
1106+
++self->count;
1107+
self->last_tid = PyThread_get_thread_ident();
1108+
Py_RETURN_TRUE;
1109+
} else {
1110+
/* sem_trywait failed: release the mutex before potentially blocking */
1111+
if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) {
1112+
return NULL;
1113+
}
1114+
}
1115+
}
1116+
#endif
1117+
1118+
errno = err;
10401119
if (res < 0 && errno == EAGAIN && blocking) {
10411120
/* Couldn't acquire immediately, need to block */
10421121
do {
@@ -1052,6 +1131,27 @@ _multiprocessing_SemLock_acquire_impl(SemLockObject *self, int blocking,
10521131
if (res == MP_EXCEPTION_HAS_BEEN_SET)
10531132
break;
10541133
} while (res < 0 && errno == EINTR && !PyErr_CheckSignals());
1134+
1135+
#ifdef HAVE_BROKEN_SEM_GETVALUE
1136+
/* After the blocking wait: acquire the mutex and decrement
1137+
internal_value. A brief non-atomic window exists between
1138+
sem_(timed)wait returning and acquiring the mutex here. This is
1139+
unavoidable: release_impl holds the mutex across sem_post,
1140+
so we cannot hold it during sem_(timed)wait without deadlocking. */
1141+
if (ISSEMAPHORE(self) && res >= 0) {
1142+
if (ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) {
1143+
--self->counter->internal_value;
1144+
if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) {
1145+
return NULL;
1146+
}
1147+
++self->count;
1148+
self->last_tid = PyThread_get_thread_ident();
1149+
Py_RETURN_TRUE;
1150+
} else {
1151+
return NULL;
1152+
}
1153+
}
1154+
#endif
10551155
}
10561156
if (res < 0) {
10571157
errno = err;
@@ -1063,25 +1163,13 @@ _multiprocessing_SemLock_acquire_impl(SemLockObject *self, int blocking,
10631163
}
10641164
return PyErr_SetFromErrno(PyExc_OSError);
10651165
}
1066-
#ifdef HAVE_BROKEN_SEM_GETVALUE
1067-
if (ISSEMAPHORE(self)) {
1068-
// error is set in ACQUIRE/RELEASE_* macros.
1069-
if (ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) {
1070-
--self->counter->internal_value;
1071-
if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) {
1072-
return NULL;
1073-
}
1074-
} else {
1075-
return NULL;
1076-
}
1077-
}
1078-
#endif
10791166

10801167
++self->count;
10811168
self->last_tid = PyThread_get_thread_ident();
10821169
Py_RETURN_TRUE;
10831170
}
10841171

1172+
10851173
/*[clinic input]
10861174
@critical_section
10871175
_multiprocessing.SemLock.release
@@ -1132,16 +1220,17 @@ _multiprocessing_SemLock_release_impl(SemLockObject *self)
11321220
if (ISSEMAPHORE(self)) {
11331221
if (ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) {
11341222
sval = self->counter->internal_value;
1135-
if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) {
1136-
return NULL;
1137-
}
1223+
//if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) {
1224+
// return NULL;
1225+
//}
11381226
} else {
11391227
return NULL;
11401228
}
11411229

11421230
if (sval >= self->maxvalue) {
11431231
PyErr_SetString(PyExc_ValueError, "semaphore or lock "
11441232
"released too many times");
1233+
RELEASE_COUNTER_MUTEX(self->handle_mutex);
11451234
return NULL;
11461235
}
11471236
}
@@ -1165,13 +1254,8 @@ _multiprocessing_SemLock_release_impl(SemLockObject *self)
11651254

11661255
#ifdef HAVE_BROKEN_SEM_GETVALUE
11671256
if (ISSEMAPHORE(self)) {
1168-
// error is set in ACQUIRE/RELEASE_* macros.
1169-
if (ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) {
1170-
++self->counter->internal_value;
1171-
if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) {
1172-
return NULL;
1173-
}
1174-
} else {
1257+
++self->counter->internal_value;
1258+
if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) {
11751259
return NULL;
11761260
}
11771261
}

Modules/_multiprocessing/semaphore_macosx.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ typedef struct {
2828

2929
#define SIZE_SEM_NAME 16
3030
#define SIZE_MUTEX_NAME (SIZE_SEM_NAME<<1)
31+
#define SIZE_GLOCK_NAME SIZE_MUTEX_NAME
3132

3233
typedef struct {
3334
char sem_name[SIZE_SEM_NAME]; // Name of semaphore.
@@ -55,6 +56,9 @@ struct _CountersWorkaround{
5556
/*-- Pointers to shared memory --*/
5657
HeaderObject *header; // Pointer to header (shared memory).
5758
CounterObject*counters; // Pointer to the first item of fixed array (shared memory).
59+
60+
/* ---uuid link to main process- */
61+
long uuid;
5862
};
5963

6064
#define ISSEMAPHORE(o) ((o)->maxvalue > 1 && (o)->kind == SEMAPHORE)

0 commit comments

Comments
 (0)