/* * struct suspension * * Set of rendezvous events on which a thread may wait for any one * of. */ struct suspension { kmutex_t s_lock; kcondvar_t s_cv; SIMPLEQ_HEAD(, event) s_events; struct event *s_event; int s_flags; }; /* * struct event * * State for a rendezvous event that a thread may be preparing to * wait for or waiting for. * * An event is in one of the following states: * * FREE * INITIALIZED * SUBSCRIBED * CANCELLING * CANCELLED * NOTIFIED * * An event consumer calls event_init, under an event source's * locks, to transition from FREE to INITIALIZED. Then suspend(S) * calls each event's eo_subscribe routine in the order of * event_init to transition each event from INITIALIZED to * SUBSCRIBED, possibly stopping if an event was notified before * all events transitioned to SUBSCRIBED. * * An event producer eventually calls event_notify on an event. * * - If the event was SUBSCRIBED, it transitions to NOTIFIED and * event_notify, and event_notify returns true if it was the * _first_ event in its corresponding suspension S to be * notified or false if otherwise. * * - If the event was CANCELLING, then event_notify has no effect * and returns false: some other event in S was notified first. * * Next, the event consumer calls each SUBSCRIBED event * transitions it to CANCELLING, calls eo_cancel, and transitions * it to CANCELLED. Events that were already notified are not * cancelled. eo_cancel is responsible for ensuring that the * event cannot subsequently be notified after it returns. * * Finally, suspend(S) returns the first event in S that was * notified, and the caller of suspend(S) calls event_done for * every event it initialized. * * The CML API promises that when suspend(S) returns, for every * event E in S, either an event producer has invoked * event_notify(E) or an event consumer has invoked eo_cancel(E), * but not both. * * Only one event in a suspension can ever be selected to wake the * suspension. event_notify returns true if it is that event, or * false otherwise. * * Locking rules: * I static after initialization * S modified only by suspending thread * L SUBSCRIBED->{CANCELLING,NOTIFIED} only under * suspension lock; all other transitions * only done by suspending thread */ struct event { struct suspend *e_suspension; /* I */ struct event_ops *e_ops; /* I */ SIMPLEQ_ENTRY(event) e_entry; /* S */ enum event_state { EVENT_FREE, EVENT_INITIALIZED, EVENT_SUBSCRIBED, EVENT_CANCELLING, EVENT_CANCELLED, EVENT_NOTIFIED, } e_state; /* L */ }; static inline void transition(struct event *E, enum event_state old __diagused, enum event_state new) { KASSERT(E->e_state == old); E->e_state = new; } struct event_ops { void (*eo_subscribe)(struct event *); void (*eo_cancel)(struct event *); }; /* * suspension_init(S, wmesg, flags) * * Prepare for the current thread to wait for any of a collection * of events. Caller must have exclusive access to S. */ void suspension_init(struct suspension *S, char *wmesg, int flags) { mutex_init(&S->s_lock, MUTEX_DEFAULT, IPL_SCHED); /* XXX IPL_SCHED? */ cv_init(&S->s_cv, wmesg); SIMPLEQ_INIT(&S->s_events); S->s_event = NULL; S->s_flags = flags; } /* * suspension_destroy(S) * * Done with a suspension. */ void suspension_destroy(struct suspension *S) { cv_destroy(&S->s_cv); mutex_destroy(&S->s_lock); } /* * suspend(S) * * Wait for any of the events to which S has subscribed. Return * the one that notified us. */ struct event * suspend(struct suspension *S) { struct event *E_subscribe, *E_cancel, *E; KASSERT(S->s_event == NULL); /* Subscribe to all the events. */ SIMPLEQ_FOREACH(E_subscribe, &S->s_events, e_entry) { /* Subscribe: INITIALIZED -> SUBSCRIBED; call eo_subscribe. */ transition(E_subscribe, EVENT_INITIALIZED, EVENT_SUBSCRIBED); (*E_subscribe->e_ops->eo_subscribe)(E_subscribe); /* * Check whether we've finished already. This may * happen synchronously because we dropped one of the * event source's locks before subscribing, or it may * happen asynchronously if another thread has already * noticed an event subscription. */ if ((E = S->s_event) != NULL) { membar_datadep_consumer(); goto out; } } /* Wait until an event has occurred. */ mutex_enter(&S->s_lock); while ((E = S->s_event) == NULL) cv_wait(&S->s_cv, &S->s_lock); mutex_exit(&S->s_lock); out: /* Cancel all the other ones we had subscribed. */ SIMPLEQ_FOREACH(E_cancel, &S->s_events, e_entry) { /* * If it's the winner, no need to cancel: it won by * event_notify in the first place. */ if (E_cancel == E) { KASSERT(E_cancel->e_state == EVENT_NOTIFIED); goto next; } /* * Otherwise, under the lock, find out whether we need * to cancel it. */ mutex_enter(&S->s_lock); if (E_cancel->e_state == EVENT_NOTIFIED) { /* event_notify beat us to it -- do nothing. */ mutex_exit(&S->s_lock); goto next; } /* * We beat event_notify to it -- mark it cancelling in * preparation to cancel it. */ transition(E_cancel, EVENT_SUBSCRIBED, EVENT_CANCELLING); mutex_exit(&S->s_lock); /* Cancel it outside the lock. */ (*E_cancel->e_ops->eo_cancel)(E_cancel); /* * We now have exclusive access to E_cancel, so we can * change its state without the lock. */ transition(E_cancel, EVENT_CANCELLING, EVENT_CANCELLED); next: /* If it's the last one we subscribed, we're done. */ if (E_cancel == E_subscribed) break; } /* Return the winning event. */ return E; } /* * event_init(E, O, S) * * Arrange that event_notify(E) will wake suspend(S) after E has * been subscribed by O->eo_subscribe. If another event * notification woke suspend(S), this event subscription will be * cancelled by O->eo_cancel. Caller must have exclusive access * to S. */ void event_init(struct event *E, struct event_ops *O, struct suspension *S) { E->e_suspension = S; E->e_ops = O; SIMPLEQ_INSERT_TAIL(&S->s_initialized, E, e_entry); E->e_state = EVENT_INITIALIZED; } /* * event_done(E, S) * * Done with the event E with suspension S. */ void event_done(struct event *E, struct suspension *S) { KASSERT(E->e_suspension == S); KASSERT(E->e_state == EVENT_CANCELLED || E->e_state == EVENT_NOTIFIED); E->e_state = EVENT_FREE; } /* * event_notify(E) * * If E was initialized with event_init(E, O, S) and subscribed * with O->eo_subscribe, cause any pending call suspend(S) to * wake. * * Return true if this was the first event_notify for S, false if * not. Thus the notifier can learn whether its event was the one * consumed by S or not, e.g. if it was to pass a scarce resource * to S. */ bool event_notify(struct event *E) { struct suspend *S = E->e_suspension; bool consumed = false; mutex_enter(&S->s_lock); /* * Check whether it's been cancelled. If so, some other event * must have won. We have nothing left to do here. */ if (E->e_state == EVENT_CANCELLED) { KASSERT(S->s_event != NULL); goto out; } /* Transition from SUBSCRIBED to NOTIFIED. */ KASSERT(E->e_state == EVENT_SUBSCRIBED); E->e_state = EVENT_NOTIFIED; /* If someone else already won, nothing left to do here. */ if (S->s_event != NULL) goto out; /* Otherwise, inform waiter and the caller that we are the winner. */ S->s_event = E; consumed = true; cv_signal(&S->s_cv); out: mutex_exit(&S->s_lock); return consumed; } /* pool example: if pool_get is waiting, pool_put hands off to it */ struct pool { ... /* XXX Priority queue, ordered by thread priority? */ TAILQ_HEAD(, pool_event) pr_waiters; ... }; struct pool_event { struct event pe_event; TAILQ_ENTRY(pool_event) pe_entry; struct pool *pe_pool; void *pe_obj; }; void * pool_get(struct pool *pp, int flags) { struct suspension suspension, *S; void *obj = NULL; if (ISSET(flags, PR_WAITOK)) { S = &suspension; suspension_init(S, pp->pr_wchan, 0); } else { S = NULL; } mutex_enter(&pp->pr_lock); startover: ... if ((ph = pp->pr_curpage) == NULL) { struct pool_event pe; int error; if (ISSET(flags, PR_WAITOK)) pool_event_init(pp, &pe, S); error = pool_grow(pp, flags, S); if (ISSET(flags, PR_WAITOK)) { if ((obj = pool_event_done(pe, &pe, S)) != NULL) { /* * XXX Should we shrink the pool if we * had grown it in this case? */ goto out; } } if (error) { if (pp->pr_curpage != NULL) goto startover; pp->pr_nfail++; goto out; } } ... mutex_exit(&pp->pr_lock); out: if (ISSET(flags, PR_WAITOK)) suspension_destroy(S); return obj; } void pool_put(struct pool *pp, void *obj) { struct pool_event *pe; struct pool_pagelist pq; mutex_enter(&pp->pr_lock); /* * If there's an active pool_get waiting for an object, try to * hand this object to it. */ while ((pe = TAILQ_FIRST(&pp->pr_waiters)) != NULL) { TAILQ_REMOVE(pe, pe_entry); if (pool_event_notify(&pe->pe_event, obj)) goto out; } /* Otherwise, put it back into the pool. */ LIST_INIT(&pq); pool_do_put(pp, obj, &pq); out: mutex_exit(&pp->pr_lock); if (pe == NULL) pr_pagelist_free(pp, &pq); } static void pool_event_init(struct pool *pp, struct pool_event *pe, struct suspension *S) { static const struct pool_event zero_pe; KASSERT(mutex_owned(&pp->pr_lock)); *pe = zero_pe; pe->pe_pool = pp; event_init(&pe->pe_event, &pool_event_ops, S); } static void pool_event_subscribe(struct event *E) { struct pool_event *pe = container_of(E, struct pool_event, pe_event); struct pool *pp = pe->pe_pool; mutex_enter(&pp->pr_lock); /* * Check again whether there are any objects available. Don't * ask the backing allocator -- the caller will also be * subscribing to backing allocation events. */ if (pool has objects) { obj = the next object; if (pool_event_notify(pp, pe, obj)) { dequeue the next object; goto out; } } /* None found. Record this event subscription. */ TAILQ_INSERT_TAIL(&pp->pr_waiters, pe, pe_entry); out: mutex_exit(&pp->pr_lock); } static void pool_event_cancel(struct event *E) { struct pool_event *pe = container_of(E, struct pool_event, pe_event); struct pool *pp = pe->pe_pool; mutex_enter(&pp->pr_lock); KASSERT(pe->pe_obj == NULL); TAILQ_REMOVE(pe, pe_entry); /* remove from active waiters */ event_cancelled(E); mutex_exit(&pp->pr_lock); } static void * pool_event_done(struct pool *pp, struct pool_event *pe, struct suspension *S) { void *obj; KASSERT(mutex_owned(&pp->pr_lock)); obj = pe->pe_obj; pe->pe_obj = NULL; event_done(&pe->pe_event, S); return obj; } static bool pool_event_notify(struct pool_event *pe, void *obj) { KASSERT(mutex_owned(&pp->pr_lock)); /* * If a thread waiting for event has already been notified of * another event and is no longer interested in recycling this * object, do nothing and report we did not rendezvous with * this event. */ if (!event_notify(&pe->pe_event)) return false; /* * Otherwise, pass this object along to the thread that was * waiting for the event of a free object and report that we * did rendezvous with this event. */ pe->pe_obj = obj; return true; } /* vmem example: if vmem_alloc must sleep, let others wake caller too */ struct vmem { ... LIST_HEAD(, vmem_event) vm_waiters; ... }; struct vmem_event { struct event *vme_event; LIST_ENTRY(vmem_event) vme_entry; struct vmem *vme_vm; uint64_t vme_gen; bool vme_triggered; }; int vmem_xalloc(struct vmem *vm, ..., struct suspension *S) { KASSERT(ISSET(flags, VM_SLEEP) == (S != NULL)); ... if (ISSET(flags, VM_SLEEP)) { struct vmem_event vme; vmem_event_init(vm, &vme, S); VMEM_UNLOCK(vm); suspend(S); VMEM_LOCK(vm); /* * If we were woken by a change to the vmem, retry the * vmem allocation. */ if (vmem_event_done(vm, &vme, S)) goto retry; /* * Otherwise, something else higher up in the stack * happened (first). Tell the caller ENOMEM here so * they know to use another event instead. */ } fail: ... return ENOMEM; } void vmem_xfree(struct vmem *vm, ...) { ... /* * Notify every thread interested in vm. We don't care if we * rendezvoused with this event; we're not handing anything off * to each thread, just waking each thread to retry. */ while ((vme = LIST_FIRST(&vm->vm_waiters)) != NULL) { LIST_REMOVE(vme, vme_entry); (void)vmem_event_notify(vm, vme); } ... } static void vmem_event_init(struct vmem *vm, struct vmem_event *vme, struct suspension *S) { static const struct vmem_event zero_vme; VMEM_ASSERT_LOCKED(vm); *vme = zero_vme; vme->vme_vm = vm; vme->vme_gen = vm->vm_gen; vme->vme_triggered = false; event_init(&vme->vme_event, &vmem_event_ops, S); } static void vmem_event_subscribe(struct event *E) { struct vmem_event *vme = container_of(E, struct vmem_event, vme_event); struct vmem *vm = vme->vme_vm; VMEM_LOCK(vm); if (vm->vm_gen != vme->vme_gen) { /* * If anything changed, wake. Caller must reexamine * the address space afresh for what it seeks. */ if (vmem_event_notify(vm, vme)) goto out; } LIST_INSERT_HEAD(&vm->vm_activewaiters, vme, vme_entry); out: VMEM_UNLOCK(vm); } static void vmem_event_cancel(struct event *E) { struct vmem_event *vme = container_of(E, struct vmem_event, vme_event); struct vmem *vm = vme->vme_vm; VMEM_LOCK(vm); KASSERT(!vme->vme_triggered); LIST_REMOVE(vme, vme_entry); VMEM_UNLOCK(vm); } static bool vmem_event_done(struct vmem *vm, struct vmem_event *vme, struct suspension *S) { VMEM_ASSERT_LOCKED(vm); event_done(vme, S); return vme->vme_triggered; } static bool vmem_event_notify(struct vmem *vm, struct vmem_event *vme) { VMEM_ASSERT_LOCKED(vm); KASSERT(vm->vm_gen != vme->vme_gen); vme->vme_triggered = event_notify(&vme->vme_event); return vme->vme_triggered; } /* uvm */ struct uvm_free_event { struct event ufe_event; struct prioq_entry ufe_entry; unsigned long ufe_npages; }; struct prioq uvm_free_waiters; void uvm_free_event_init(unsigned long npages, struct uvm_free_event *ufe, struct suspension *S) { static const struct uvm_free_event zero_ufe; KASSERT(mutex_owned(&uvm_fpageqlock)); *ufe = zero_ufe; ufe->ufe_npages = npages; event_init(&ufe->ufe_event, &uvm_free_event_ops, S); } static void uvm_free_event_subscribe(struct event *E) { struct uvm_free_event *ufe = container_of(E, struct uvm_free_event, ufe_event); mutex_enter(&uvm_fpageqlock); if (uvm_enough_free()) { if (event_notify(E)) goto out; } prioq_insert(&uvm_free_waiters, &ufe->ufe_entry); out: mutex_exit(&uvm_fpageqlock); } static void uvm_free_event_cancel(struct event *E) { struct uvm_free_event *ufe = container_of(E, struct uvm_free_event, ufe_event); mutex_enter(&uvm_fpageqlock); prioq_delete(&uvm_free_waiters, &ufe->ufe_entry); mutex_exit(&uvm_fpageqlock); } static void uvm_free_event_done(struct uvm_free_event *ufe, struct suspension *S) { event_done(&ufe->ufe_event, S); } void uvm_wait(unsigned long npages, struct suspension *S) { struct uvm_free_event ufe; mutex_spin_enter(&uvm_fpageqlock); uvm_free_event_init(npages, &ufe, S); mutex_spin_exit(&uvm_fpageqlock); (void)suspend(S); }