aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlfred Perlstein <alfred@FreeBSD.org>2008-10-25 01:46:29 +0000
committerAlfred Perlstein <alfred@FreeBSD.org>2008-10-25 01:46:29 +0000
commitb1144a09de5ef6b809711c7b25f7db1b45a9084d (patch)
treee81a2e5e7b0cb49adc72d1477f67dbd8d310022d
parentb604a3d273f4d2ad13d0e3864d9d3d8b72a10769 (diff)
downloadsrc-b1144a09de5ef6b809711c7b25f7db1b45a9084d.tar.gz
src-b1144a09de5ef6b809711c7b25f7db1b45a9084d.zip
Merge r184172 (pthread condvar race fix) into 6.4-release.
Reviewed by: re, davidxu Approved by: re
Notes
Notes: svn path=/releng/6.4/; revision=184239
-rw-r--r--lib/libthr/thread/thr_cond.c71
-rw-r--r--lib/libthr/thread/thr_private.h2
-rw-r--r--sys/kern/kern_umtx.c149
3 files changed, 71 insertions, 151 deletions
diff --git a/lib/libthr/thread/thr_cond.c b/lib/libthr/thread/thr_cond.c
index cd50c4db0a65..73ec0be21a29 100644
--- a/lib/libthr/thread/thr_cond.c
+++ b/lib/libthr/thread/thr_cond.c
@@ -71,7 +71,7 @@ cond_init(pthread_cond_t *cond, const pthread_condattr_t *cond_attr)
_thr_umtx_init(&pcond->c_lock);
pcond->c_seqno = 0;
pcond->c_waiters = 0;
- pcond->c_wakeups = 0;
+ pcond->c_broadcast = 0;
if (cond_attr == NULL || *cond_attr == NULL) {
pcond->c_pshared = 0;
pcond->c_clockid = CLOCK_REALTIME;
@@ -122,7 +122,7 @@ _pthread_cond_destroy(pthread_cond_t *cond)
else {
/* Lock the condition variable structure: */
THR_LOCK_ACQUIRE(curthread, &(*cond)->c_lock);
- if ((*cond)->c_waiters + (*cond)->c_wakeups != 0) {
+ if ((*cond)->c_waiters != 0) {
THR_LOCK_RELEASE(curthread, &(*cond)->c_lock);
return (EBUSY);
}
@@ -166,14 +166,13 @@ cond_cancel_handler(void *arg)
cv = *(cci->cond);
THR_LOCK_ACQUIRE(curthread, &cv->c_lock);
- if (cv->c_seqno != cci->seqno && cv->c_wakeups != 0) {
- if (cv->c_waiters > 0) {
- cv->c_seqno++;
- _thr_umtx_wake(&cv->c_seqno, 1);
- } else
- cv->c_wakeups--;
- } else {
- cv->c_waiters--;
+ if (--cv->c_waiters == 0)
+ cv->c_broadcast = 0;
+ if (cv->c_seqno != cci->seqno) {
+ _thr_umtx_wake(&cv->c_seqno, 1);
+ /* cv->c_seqno++; XXX why was this here? */
+ _thr_umtx_wake(&cv->c_seqno, 1);
+
}
THR_LOCK_RELEASE(curthread, &cv->c_lock);
@@ -191,6 +190,7 @@ cond_wait_common(pthread_cond_t *cond, pthread_mutex_t *mutex,
long seq, oldseq;
int oldcancel;
int ret = 0;
+ int loops = -1;
/*
* If the condition variable is statically initialized,
@@ -202,18 +202,24 @@ cond_wait_common(pthread_cond_t *cond, pthread_mutex_t *mutex,
cv = *cond;
THR_LOCK_ACQUIRE(curthread, &cv->c_lock);
+ oldseq = cv->c_seqno;
ret = _mutex_cv_unlock(mutex);
if (ret) {
THR_LOCK_RELEASE(curthread, &cv->c_lock);
return (ret);
}
- oldseq = seq = cv->c_seqno;
+ seq = cv->c_seqno;
cci.mutex = mutex;
cci.cond = cond;
cci.seqno = oldseq;
cv->c_waiters++;
- do {
+ /*
+ * loop if we have never been told to wake up
+ * or we lost a race.
+ */
+ while (seq == oldseq /* || cv->c_wakeups == 0*/) {
+ loops++;
THR_LOCK_RELEASE(curthread, &cv->c_lock);
if (abstime != NULL) {
@@ -232,24 +238,23 @@ cond_wait_common(pthread_cond_t *cond, pthread_mutex_t *mutex,
} else {
ret = _thr_umtx_wait(&cv->c_seqno, seq, tsp);
}
+ /*
+ * If we get back EINTR we want to loop as condvars
+ * do NOT return EINTR, they just restart.
+ */
THR_LOCK_ACQUIRE(curthread, &cv->c_lock);
seq = cv->c_seqno;
if (abstime != NULL && ret == ETIMEDOUT)
break;
- /*
- * loop if we have never been told to wake up
- * or we lost a race.
- */
- } while (seq == oldseq || cv->c_wakeups == 0);
-
- if (seq != oldseq && cv->c_wakeups != 0) {
- cv->c_wakeups--;
- ret = 0;
- } else {
- cv->c_waiters--;
}
+
+ if (--cv->c_waiters == 0)
+ cv->c_broadcast = 0;
+ if (seq != oldseq)
+ ret = 0;
+
THR_LOCK_RELEASE(curthread, &cv->c_lock);
_mutex_cv_lock(mutex);
return (ret);
@@ -298,7 +303,7 @@ cond_signal_common(pthread_cond_t *cond, int broadcast)
{
struct pthread *curthread = _get_curthread();
pthread_cond_t cv;
- int ret = 0, oldwaiters;
+ int ret = 0;
/*
* If the condition variable is statically initialized, perform dynamic
@@ -311,19 +316,15 @@ cond_signal_common(pthread_cond_t *cond, int broadcast)
cv = *cond;
/* Lock the condition variable structure. */
THR_LOCK_ACQUIRE(curthread, &cv->c_lock);
+ cv->c_seqno++;
+ if (cv->c_broadcast == 0)
+ cv->c_broadcast = broadcast;
+
if (cv->c_waiters) {
- if (!broadcast) {
- cv->c_wakeups++;
- cv->c_waiters--;
- cv->c_seqno++;
+ if (cv->c_broadcast)
+ _thr_umtx_wake(&cv->c_seqno, INT_MAX);
+ else
_thr_umtx_wake(&cv->c_seqno, 1);
- } else {
- oldwaiters = cv->c_waiters;
- cv->c_wakeups += cv->c_waiters;
- cv->c_waiters = 0;
- cv->c_seqno++;
- _thr_umtx_wake(&cv->c_seqno, oldwaiters);
- }
}
THR_LOCK_RELEASE(curthread, &cv->c_lock);
return (ret);
diff --git a/lib/libthr/thread/thr_private.h b/lib/libthr/thread/thr_private.h
index 63b023fd8d43..610ac299e4c6 100644
--- a/lib/libthr/thread/thr_private.h
+++ b/lib/libthr/thread/thr_private.h
@@ -166,7 +166,7 @@ struct pthread_cond {
volatile umtx_t c_lock;
volatile umtx_t c_seqno;
volatile int c_waiters;
- volatile int c_wakeups;
+ volatile int c_broadcast;
int c_pshared;
int c_clockid;
};
diff --git a/sys/kern/kern_umtx.c b/sys/kern/kern_umtx.c
index ebb589c62857..bc1bd950b7c5 100644
--- a/sys/kern/kern_umtx.c
+++ b/sys/kern/kern_umtx.c
@@ -36,6 +36,7 @@ __FBSDID("$FreeBSD$");
#include <sys/malloc.h>
#include <sys/mutex.h>
#include <sys/proc.h>
+#include <sys/sysctl.h>
#include <sys/sysent.h>
#include <sys/systm.h>
#include <sys/sysproto.h>
@@ -81,6 +82,8 @@ struct umtx_key {
struct umtx_q {
LIST_ENTRY(umtx_q) uq_next; /* Linked list for the hash. */
struct umtx_key uq_key; /* Umtx key. */
+ int uq_flags;
+#define UQF_UMTXQ 0x0001
struct thread *uq_thread; /* The thread waits on. */
LIST_ENTRY(umtx_q) uq_rqnext; /* Linked list for requeuing. */
vm_offset_t uq_addr; /* Umtx's virtual address. */
@@ -229,9 +232,7 @@ umtxq_insert(struct umtx_q *uq)
mtx_assert(umtxq_mtx(chain), MA_OWNED);
head = &umtxq_chains[chain].uc_queue;
LIST_INSERT_HEAD(head, uq, uq_next);
- mtx_lock_spin(&sched_lock);
- uq->uq_thread->td_flags |= TDF_UMTXQ;
- mtx_unlock_spin(&sched_lock);
+ uq->uq_flags |= UQF_UMTXQ;
}
/*
@@ -241,12 +242,10 @@ static inline void
umtxq_remove(struct umtx_q *uq)
{
mtx_assert(umtxq_mtx(umtxq_hash(&uq->uq_key)), MA_OWNED);
- if (uq->uq_thread->td_flags & TDF_UMTXQ) {
+ if (uq->uq_flags & UQF_UMTXQ) {
LIST_REMOVE(uq, uq_next);
- /* turning off TDF_UMTXQ should be the last thing. */
- mtx_lock_spin(&sched_lock);
- uq->uq_thread->td_flags &= ~TDF_UMTXQ;
- mtx_unlock_spin(&sched_lock);
+ /* turning off UQF_UMTXQ should be the last thing. */
+ uq->uq_flags &= ~UQF_UMTXQ;
}
}
@@ -308,7 +307,7 @@ umtxq_sleep(struct thread *td, struct umtx_key *key, int priority,
static int
umtx_key_get(struct thread *td, void *umtx, struct umtx_key *key)
{
-#if defined(UMTX_DYNAMIC_SHARED) || defined(UMTX_STATIC_SHARED)
+#if defined(UMTX_STATIC_SHARED)
vm_map_t map;
vm_map_entry_t entry;
vm_pindex_t pindex;
@@ -321,20 +320,7 @@ umtx_key_get(struct thread *td, void *umtx, struct umtx_key *key)
&wired) != KERN_SUCCESS) {
return EFAULT;
}
-#endif
-#if defined(UMTX_DYNAMIC_SHARED)
- key->type = UMTX_SHARED;
- key->info.shared.offset = entry->offset + entry->start -
- (vm_offset_t)umtx;
- /*
- * Add object reference, if we don't do this, a buggy application
- * deallocates the object, the object will be reused by other
- * applications, then unlock will wake wrong thread.
- */
- vm_object_reference(key->info.shared.object);
- vm_map_lookup_done(map, entry);
-#elif defined(UMTX_STATIC_SHARED)
if (VM_INHERIT_SHARE == entry->inheritance) {
key->type = UMTX_SHARED;
key->info.shared.offset = entry->offset + entry->start -
@@ -380,74 +366,6 @@ umtxq_queue_me(struct thread *td, void *umtx, struct umtx_q *uq)
return (0);
}
-#if defined(UMTX_DYNAMIC_SHARED)
-static void
-fork_handler(void *arg, struct proc *p1, struct proc *p2, int flags)
-{
- vm_map_t map;
- vm_map_entry_t entry;
- vm_object_t object;
- vm_pindex_t pindex;
- vm_prot_t prot;
- boolean_t wired;
- struct umtx_key key;
- LIST_HEAD(, umtx_q) workq;
- struct umtx_q *uq;
- struct thread *td;
- int onq;
-
- LIST_INIT(&workq);
-
- /* Collect threads waiting on umtxq */
- PROC_LOCK(p1);
- FOREACH_THREAD_IN_PROC(p1, td) {
- if (td->td_flags & TDF_UMTXQ) {
- uq = td->td_umtxq;
- if (uq)
- LIST_INSERT_HEAD(&workq, uq, uq_rqnext);
- }
- }
- PROC_UNLOCK(p1);
-
- LIST_FOREACH(uq, &workq, uq_rqnext) {
- map = &p1->p_vmspace->vm_map;
- if (vm_map_lookup(&map, uq->uq_addr, VM_PROT_WRITE,
- &entry, &object, &pindex, &prot, &wired) != KERN_SUCCESS) {
- continue;
- }
- key.type = UMTX_SHARED;
- key.info.shared.object = object;
- key.info.shared.offset = entry->offset + entry->start -
- uq->uq_addr;
- if (umtx_key_match(&key, &uq->uq_key)) {
- vm_map_lookup_done(map, entry);
- continue;
- }
-
- umtxq_lock(&uq->uq_key);
- umtxq_busy(&uq->uq_key);
- if (uq->uq_thread->td_flags & TDF_UMTXQ) {
- umtxq_remove(uq);
- onq = 1;
- } else
- onq = 0;
- umtxq_unbusy(&uq->uq_key);
- umtxq_unlock(&uq->uq_key);
- if (onq) {
- vm_object_deallocate(uq->uq_key.info.shared.object);
- uq->uq_key = key;
- umtxq_lock(&uq->uq_key);
- umtxq_busy(&uq->uq_key);
- umtxq_insert(uq);
- umtxq_unbusy(&uq->uq_key);
- umtxq_unlock(&uq->uq_key);
- vm_object_reference(uq->uq_key.info.shared.object);
- }
- vm_map_lookup_done(map, entry);
- }
-}
-#endif
-
static int
_do_lock(struct thread *td, struct umtx *umtx, long id, int timo)
{
@@ -526,7 +444,7 @@ _do_lock(struct thread *td, struct umtx *umtx, long id, int timo)
* unlocking the umtx.
*/
umtxq_lock(&uq->uq_key);
- if (old == owner && (td->td_flags & TDF_UMTXQ)) {
+ if (old == owner && (uq->uq_flags & UQF_UMTXQ)) {
error = umtxq_sleep(td, &uq->uq_key, PCATCH,
"umtx", timo);
}
@@ -705,7 +623,7 @@ _do_lock32(struct thread *td, uint32_t *m, uint32_t id, int timo)
* unlocking the umtx.
*/
umtxq_lock(&uq->uq_key);
- if (old == owner && (td->td_flags & TDF_UMTXQ)) {
+ if (old == owner && (uq->uq_flags & UQF_UMTXQ)) {
error = umtxq_sleep(td, &uq->uq_key, PCATCH,
"umtx", timo);
}
@@ -825,35 +743,22 @@ do_wait(struct thread *td, struct umtx *umtx, long id, struct timespec *timeout,
tmp = fuword(&umtx->u_owner);
else
tmp = fuword32(&umtx->u_owner);
+ umtxq_lock(&uq->uq_key);
if (tmp != id) {
- umtxq_lock(&uq->uq_key);
umtxq_remove(uq);
- umtxq_unlock(&uq->uq_key);
} else if (timeout == NULL) {
- umtxq_lock(&uq->uq_key);
- if (td->td_flags & TDF_UMTXQ)
+ if (uq->uq_flags & UQF_UMTXQ)
error = umtxq_sleep(td, &uq->uq_key,
PCATCH, "ucond", 0);
- if (!(td->td_flags & TDF_UMTXQ))
- error = 0;
- else
- umtxq_remove(uq);
- umtxq_unlock(&uq->uq_key);
} else {
getnanouptime(&ts);
timespecadd(&ts, timeout);
TIMESPEC_TO_TIMEVAL(&tv, timeout);
for (;;) {
- umtxq_lock(&uq->uq_key);
- if (td->td_flags & TDF_UMTXQ) {
+ if (uq->uq_flags & UQF_UMTXQ) {
error = umtxq_sleep(td, &uq->uq_key, PCATCH,
- "ucond", tvtohz(&tv));
- }
- if (!(td->td_flags & TDF_UMTXQ)) {
- umtxq_unlock(&uq->uq_key);
- goto out;
+ "ucondt", tvtohz(&tv));
}
- umtxq_unlock(&uq->uq_key);
if (error != ETIMEDOUT)
break;
getnanouptime(&ts2);
@@ -865,14 +770,28 @@ do_wait(struct thread *td, struct umtx *umtx, long id, struct timespec *timeout,
timespecsub(&ts3, &ts2);
TIMESPEC_TO_TIMEVAL(&tv, &ts3);
}
- umtxq_lock(&uq->uq_key);
- umtxq_remove(uq);
- umtxq_unlock(&uq->uq_key);
}
-out:
+ if (error != 0) {
+ if ((uq->uq_flags & UQF_UMTXQ) == 0) {
+ /*
+ * If we concurrently got do_cv_signal()d
+ * and we got an error or UNIX signals or a timeout,
+ * then, perform another umtxq_signal to avoid
+ * consuming the wakeup. This may cause supurious
+ * wakeup for another thread which was just queued,
+ * but SUSV3 explicitly allows supurious wakeup to
+ * occur, and indeed a kernel based implementation
+ * can not avoid it.
+ */
+ if (!umtxq_signal(&uq->uq_key, 1))
+ error = 0;
+ }
+ if (error == ERESTART)
+ error = EINTR;
+ }
+ umtxq_remove(uq);
+ umtxq_unlock(&uq->uq_key);
umtx_key_release(&uq->uq_key);
- if (error == ERESTART)
- error = EINTR;
return (error);
}